V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
chenfang
V2EX  ›  程序员

RocketMQ 消费写入 MySQL 问题

  •  
  •   chenfang · 346 天前 · 2882 次点击
    这是一个创建于 346 天前的主题,其中的信息可能已经有所发展或是发生改变。

    公司需求是把项目中产生的数据,实时入库到 MySQL,这些数据分别属于不同的 table.

    现在的方案是把产生的数据通过 tomcat -> RocketMQ 的一个 topic 中,然后启用一个 MQ 消费者组合并消息(如果不合并入库次数太多也会慢),然后 batch 入库,这个方案单表没有任何问题.

    如果设计到多个表入库,因为 mq 消费是顺序读取 topic 中的消息,也就导致如果一个表需要入库的数据量大,那么入库时间就会长,这会导致整体消费变慢,从而导致实时表不实时的问题.

    这就是我现在遇到的问题

    1. 入库 MySQL 的时间长,解决时间长的问题(这个我目前想不到解决办法)
    2. 如果解决不了入库时间长的问题,那么就让他不影响其他表入库

    我自己想过解决方案,针对问题 2 的

    方案 1

    从一个消费者组 改为 按表名创建消费者组,但是可用性不高

    按表名创建消费者组,使用过一段时间,因为表比较多,然后在一个 jvm(如果多个程序分别启动不同的消费者组也不好管理)中启动那么多消费者组,首先内存会占用很多(应该是每个消费者组都会缓存一定数量的消息),也就是内存的大小跟实时表的数量是等比上涨的.其次我查了一下说是消费者组数量也不是可以无限加的,目前我们是 110 多张表,也比较多了.考虑到之后还会加表,这个方案试行了一段时间就废掉了.

    或者这个方案有没有优化空间?

    方案 2

    把入库时间长的表单独分一个消费者组,可用性比第一个还低

    经过实践,不晓得哪个时间点,表里数据生产就会增多,所以这样也会导致可能突然就延迟了....很不可控

    然后就是大家有什么思路么?万分感谢!

    28 条回复    2023-04-18 10:02:44 +08:00
    Seulgi
        1
    Seulgi  
       346 天前
    一个队列,消费时按表丢缓存,异步任务定时 10 秒,30 秒自己看能接受的延迟,记得设置缓存大小触发强制写。
    举个例子,table_a ,table_b ,异步定时写任务 10 秒触发一次,表缓存强制 50mb 时触发写数据库操作。注意锁问题
    chenfang
        2
    chenfang  
    OP
       346 天前
    @Seulgi 这就是现在正在跑的版本,也是用了 jvm 的内存做缓存这种机制,还有强制触发写操作,但是还是不成,首先缓存强制触发不可以无限触发,比如同时入一个表的线程最多是 3 个,那么最后还是需要等待入库完成,从 MQ 消费读取消息也会触发等待...
    Seulgi
        3
    Seulgi  
       346 天前   ❤️ 1
    表过多,建议将表名设置为 tag ,按 tag 切分消费组,多部署消费者提高消费速度,实时性较高的表,可以单独一个 tag 一个消费组。
    举例:比如 table_a,table_b,table_c ,table_a 要求实时,部署 4 个 pod 只消费 tag:table_a 的数据,配置定时异步写任务为 1 秒或者为实时写。table_b,table_c 不要求实时,部署 4 个 pod 只消费 tag:table_b|table_c ,配置定时异步写任务为 30 秒
    Seulgi
        4
    Seulgi  
       346 天前
    @chenfang 强制触发有等待,那说明你们的消费速度>写速度。要做的时将消费均摊。也就是 pod 要多部署。
    AS4694lAS4808
        5
    AS4694lAS4808  
       346 天前   ❤️ 1
    有类似的场景,不过是 AWS 云上。
    目前是 API (tomcat) -> Kinesis data stream (RocketMQ) -> Kinesis Firehose (Flink/Fluentd) -> S3 (MinIO)
    -> OpenSearch (ES)
    每日定时把 S3 的数据 Load 到 Redshift (Mysql)里,删除 ES 7 日以上 index

    查询的时候 7 日以下 -> ES
    7 日以上 -> Redshift (MYSQL 分库分表)

    我们业务一开始也是读写一起走库的,但是显然只能支撑小数据量,而且后面查询也多了,就重构了一遍。
    8355
        6
    8355  
       346 天前
    问题 1 为什么不直接同步入库而采用异步入库的形式?就算使用异步入库也不用很多表或者说达到 110 张表都异步去写吧。
    问题 2 现在每天写入量大概是多少?入库 MySQL 的时间长大概是多长 每条 sql 写入多少行?慢写入有多少个索引多少个字段?
    chenfang
        7
    chenfang  
    OP
       346 天前
    @AS4694lAS4808 很多项目都是读数据库表里的表,改成 ES 很难...费时费力估计老板不会同意去搞
    chenfang
        8
    chenfang  
    OP
       346 天前
    @8355
    答案 1 单个消息里存不了太多数据,单次入库的时间加起来,是比批量入库的时间长的,还是跟表数据量太大有关系
    答案 2 最大的表迁移到了 Doris 一次入库 50-80 万条左右,设置的是间隔 50s 一次强制写入,入库时间现在是 40-50s

    慢写入有多少个索引多少个字段? 这个我不晓得..
    8355
        9
    8355  
       346 天前
    不管在方案 1 还是方案 2 都无法满足你的原始需求 项目中产生的数据,实时入库到 MySQL
    当执行 update 操作时 你前台返回操作成功 但后台并不一定能绝对执行成功
    当消息堆积时现开消费者时来不及的,会出现执行增删改操作后查询还是原来的值

    项目复杂度会指数型上升 你需要特别小心的处理缓存数据和刷新的时机 为了这个方案你的代码量起码要翻一倍
    所以你这两个方案都是及其糟糕的

    主力削峰业务进队列 99%的业务应该同步读写 最简单的就是最好的也是最不容易出问题的
    8355
        10
    8355  
       346 天前
    @chenfang #8 那我的理解你的核心最大的问题是在写库时间 而不是为了解决这个问题再上面增加复杂度
    哪怕你的队列消费的再快你的写库时间还是会很长,还会牵扯到刷盘策略问题,失败异常数据全丢问题更大。
    pkoukk
        11
    pkoukk  
       346 天前
    如果非要把数据直接写入 MySQL ,那你这个场景的瓶颈在 MySQL 上啊
    几十万的数据再 mysql 配置再高也得好几秒吧,就算你分 topic 了,其它写入也会被阻塞住
    分库吧,按消息的某个 ID 字段分 HASH ,提高下游处理速度
    liprais
        12
    liprais  
       346 天前
    搞个 flink 完事
    dlmy
        13
    dlmy  
       346 天前
    这个问题的核心是 “MQ 中消息消费速度远大于入库速度,并且需要实时入库到 MySQL”,如果一定要坚持 “实时入库”,那么不管你用什么方式解决,系统的复杂度相对都会变很高,也可能会带来新的问题,如果去掉 “实时” 这两个字,单就入库来说,会有很多解决方案。

    标记一下,蹲大佬的 trade-off 方案
    standchan
        14
    standchan  
       346 天前
    这个实时性有点麻烦啊,一个是很快的消息队列,一个是必须要进行 io 的数据库。要不试试 clickhouse ?但是换数据库要大改更麻烦
    lolizeppelin
        15
    lolizeppelin  
       346 天前
    搞笑啊 所有 mq 只要多个消费者都可能出现写入顺序问题 还要实时
    拍啥脑门写方案呀
    hhjswf
        16
    hhjswf  
       346 天前   ❤️ 2
    谁做的选型啊,mq 本来就是拿来做异步,要求实时不是搞笑?
    fkdog
        17
    fkdog  
       346 天前
    入库的数据除了 insert 是不是也包括 update ?
    如果是 update 是不是需要考虑并行消费顺序不一致脏写问题?
    kafka 开多个 partition 不同表写入不同的 partition ,或者干脆不同表设计不同 topic 不知道能不能满足你的需求。
    没用过 rocket ,不过应该有对应概念。
    Red998
        18
    Red998  
       346 天前
    看业务对实时怎么看待了、技术角度都不是实时、只是延迟时间长短问题。 或者可以使用 binlog 方式去监听然后消费 MQ
    、canal 了解下。
    urnoob
        19
    urnoob  
       346 天前
    OP 需要澄清下 实时 这个需求.是真的实时还是可以接受一定范围内的延迟.
    真实时,那为啥不直接入库,就想常见的 CRUD 那样,何必放个 MQ
    可延迟,那上面已经提供了一部分方案了.
    对于方案 1 的优化
    可采用高低搭配,量大的表部署更多消费者,硬件配置也更好.(OP 方案 2)
    还取决于具体业务.
    比如 tomcat 过来的数据进同一张表,数据之间是否有关联.
    有关联 比如同一个设备位置更新. 但是设备之间无关联,那就按设备唯一标识符区分,确保对于同一设备入库先后顺序.
    如果有关联可合并,那就在写入 MQ 前做一定的合并操作减少总量.
    没关联 那就直(批量)入库.

    很奇怪 OP 的方案 2
    不晓得哪个时间点,表里数据生产就会增多
    数据已经入库 但凡有个创建时间都应该知道什么时候变多,怎么会不知道呢.我觉得是需要具体深入挖掘的.


    另外还有一种优化,就是 tomcat 那作为生产者是可以知道某几个表数据量增长的 它可以
    临时增加消费者
    用 MQ 通知所有消费者,对消费策略做一定更改.不分表的情况下将量多的表做批量写入 或者搭配方案 1,对这部分数据再写入 MQ,然后再按表(Tag)进行消费 这都能减小其他表的写入延迟影响..

    但没有具体业务场景也只能说的泛泛

    如果大到 Mysql 性能跟不上,那就肯定要做扩容了.
    buddyy
        20
    buddyy  
       346 天前
    建议你看一下 MySQL 所在机器的磁盘 IO 情况,是否出现了 IO 饱和的情况。如果 IO 饱和你必须得分库了。
    burymme11
        21
    burymme11  
       346 天前 via Android
    要实时的去写入 mysql ???我猜下是不是有其他地方要实时的去读?
    如果是这样就有别的曲线救国方案。
    burymme11
        22
    burymme11  
       346 天前 via Android
    在 mq 和 mysql 之间加一层
    lopssh
        23
    lopssh  
       346 天前
    没看懂需求,要实时的数据,为什么还走 MQ 呢?
    DinnyXu
        24
    DinnyXu  
       346 天前
    @burymme11 我觉得你说的对,如果不实时读,干嘛要实时写,实时写一般是为了应付统计之类的,50-80w 的数据要进行实时写,对 MySQL 来说有很大的压力。

    如果要实时读,可以根据读的业务来缓存数据。
    DinnyXu
        25
    DinnyXu  
       346 天前
    @lopssh MQ 就是一个中转层,数据没法第一时间入库,只能通过 MQ 中转
    wqhui
        26
    wqhui  
       346 天前
    问题是多表的变更都在一个 topic 上串行处理,不同表的数据也要串行吗?如果只是单表串行,是不是可以分开消费线程跟工作线程,每个表开一个工作线程,消费线程直接把接收到的消息扔给对应表的工作线程写库。不过有个风险就是某一个表的数据特别多,对应工作线程处理不过来缓存被挤爆了,而且因为消费线程接收完就 ack ,实际接收到的消息还在工作线程排队写入,会丢数据。另外你的 mysql 扛的住吗,扛不住就分库
    zcxey2911
        27
    zcxey2911  
       346 天前
    和 mq 就没关系啊,OP 的问题瓶颈是 Mysql ,说白了就是数据量大,写库慢啊

    实时写入 Mysql 数据应该采用 Binlog Load 机制实现。Binlog Load 提供了一种使 Doris 增量同步用户对 Mysql 数据库的数据更新操作的 CDC(Change Data Capture)功能
    raysonlu
        28
    raysonlu  
       345 天前
    尝试解读一下 OP 的需求。不考虑 mysql 写入性能,从 OP 视角来说,项目的问题通过 MQ 对数据库写入进行 batch 入库处理就能解决了,毕竟 OP 看到现在项目是“单表单队列进行 batch 入库”起到优化作用。这种情况下,如果是多表写入想 batch ,能有什么好的方案?
    队列的设计我比较喜欢用现实场景类比:景区入口可以设有 1 ~ 3 个入口队伍。站在门口的保安发现有旅行团来,就临时增设“旅行团通道”;如果了解到 xx 旅行团来了一大批人,临时增设“xx 旅行团专属通道”;如果收到通知景区与 xx 旅行团签订了长期合作协议,应该就可以长期保留一个“xx 旅行团专属 vip 通道”。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   943 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 28ms · UTC 20:57 · PVG 04:57 · LAX 13:57 · JFK 16:57
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.