MQ的数据一致性,如何保证?

向量数据库大模型关系型数据库

picture.image

苏三的免费八股文网站:

www.susan.net.cn

大家好,我是苏三,又跟大家见面了。

前言

上个月,我们有个电商系统出了个灵异事件:用户支付成功了,但订单状态死活不改成“已发货”。

折腾了半天才定位到问题:订单服务的MQ消息,像人间蒸发一样消失了。

这个Bug让我明白:(MQ)消息队列的数据一致性设计,绝对能排进分布式系统三大噩梦之一!

今天这篇文章跟大家一起聊聊,MQ如何保证数据一致性?希望对你会有所帮助。

1 数据一致性问题的原因

这些年在Kafka、RabbitMQ、RocketMQ踩过的坑,总结成四类致命原因:

  1. 生产者悲剧 :消息成功进Broker,却没写入磁盘就断电。
  2. 消费者悲剧 :消息消费成功,但业务执行失败。
  3. 轮盘赌局 :网络抖动导致消息重复投递。
  4. 数据孤岛 :数据库和消息状态割裂(下完单没发券)

这些情况,都会导致MQ产生数据不一致的问题。

那么,如何解决这些问题呢?

2 消息不丢的方案

我们首先需要解决消息丢失的问题。

2.1 事务消息的两阶段提交

以RocketMQ的事务消息为例,工作原理就像双11的预售定金伪代码如下:


      
      
          

        
 // 发送事务消息核心代码
 
        
   

 
        TransactionMQProducer producer = 
        
 new
 
         TransactionMQProducer(
        
 "group"
 
        );
        
   

 
        producer.setTransactionListener(
        
 new
 
         TransactionListener() {
        
   

 
            
        
 // 执行本地事务(比如扣库存)
 
        
   

 
            
        
 
 public
 
  LocalTransactionState 
 
 executeLocalTransaction
 
 
 (Message msg, Object arg)
 
  
 
        {
        
   

 
                
        
 return
 
         doBiz() ? LocalTransactionState.COMMIT : LocalTransactionState.ROLLBACK;
        
   

 
            }
        
   

 
        
   

 
            
        
 // Broker回调检查本地事务状态
 
        
   

 
            
        
 
 public
 
  LocalTransactionState 
 
 checkLocalTransaction
 
 
 (MessageExt msg)
 
  
 
        {
        
   

 
                
        
 return
 
         checkDB(msg.getTransactionId()) ? COMMIT : ROLLBACK;
        
   

 
            }
        
   

 
        });
        
   

 
      
    

真实场景中,别忘了在 checkLocalTransaction 里做好妥协查询(查流水表或分布式事务日志)。

去年在物流系统救火,就遇到过事务超时的坑——本地事务成功了,但因网络问题没收到Commit,导致Broker不断回查。

2.2 持久化配置

RabbitMQ的坑都在配置表里:

| 配置项 | 例子 | 作用 | | --- | --- | --- | | 队列持久化 | durable=true | 队列元数据不丢 | | 消息持久化 | deliveryMode=2 | 消息存入磁盘 | | Lazy Queue | x-queue-mode=lazy | 消息直接写盘不读取进内存 | | Confirm机制 | publisher-confirm-type | 生产者确认消息投递成功 |

RabbitMQ本地存储+备份交换机双重保护代码如下:


      
      
          

        channel.queueDeclare(
        
 "order\_queue"
 
        , 
        
 true
 
        , 
        
 false
 
        , 
        
 false
 
        , 
        
   

 
            
        
 new
 
         HashMap<String, Object>(){{
        
   

 
                put(
        "x-dead-letter-exchange"
        , 
        "dlx\_exchange"
        ); 
        // 死信交换机
        
   

 
            }});
        
   

 
      
    

去年双十一订单系统就靠这个组合拳硬刚流量峰值:主队列消息积压触发阈值时,自动转移消息到备份队列给应急服务处理。

2.3 副本配置

| 消息队列 | 保命绝招 | | --- | --- | | Kafka | acks=all + 副本数≥3 | | RocketMQ | 同步刷盘 + 主从同步策略 | | Pulsar | BookKeeper多副本存储 |

上周帮一个金融系统迁移到Kafka,为了数据安全启用了最高配置。

server.properties配置如下:


      
      
          

        
 acks=all
 
        
   

 
        
 min.insync.replicas=2
 
        
   

 
        
 unclean.leader.election.enable=false
 
        
   

 
      
    

结果发现吞吐量只剩原来的三分之一,但客户说“钱比速度重要”——这一行哪有银弹,全是取舍。

不同的业务场景,情况不一样。

3 应对重复消费的方案

接下来,需要解决消息的重复消费问题。

3.1 唯一ID

订单系统的架构课代表代码:


      
      
          

        
 // 雪花算法生成全局唯一ID
 
        
   

 
        Snowflake snowflake = 
        
 new
 
         Snowflake(datacenterId, machineId);
        
   

 
        String bizId = 
        
 "ORDER\_"
 
         + snowflake.nextId();
        
   

 
        
   

 
        
 // 查重逻辑(Redis原子操作)
 
        
   

 
        String key = 
        
 "msg:"
 
         + bizId;
        
   

 
        
 if
 
        (redis.setnx(key, 
        
 "1"
 
        )) {
        
   

 
            redis.expire(key, 
        
 72
 
         * 
        
 3600
 
        );
        
   

 
            processMsg();
        
   

 
        }
        
   

 
      
    

先使用雪花算法生成全局唯一ID,然后使用Redis的setnx命令加分布式锁,来保证请求的唯一性。

某次促销活动因Redis集群抖动,导致重复扣款。

后来改用:本地布隆过滤器+分布式Redis 双校验,总算解决这个世纪难题。

3.2 幂等设计

针对不同业务场景的三种对策:

| 场景 | 代码示例 | 关键点 | | --- | --- | --- | | 强一致性 | SELECT FOR UPDATE先查后更新 | 数据库行锁 | | 最终一致性 | 版本号控制(类似CAS) | 乐观锁重试3次 | | 补偿型事务 | 设计反向操作(如退款、库存回滚) | 操作日志必须落库 |

去年重构用户积分系统时,就靠着这个三板斧把错误率从0.1%降到了0.001%:

积分变更幂等示例如下:


      
      
          

        
 
 public
 
  
 
 void
 
  
 
 addPoints
 
 
 (String userId, String orderId, Long points)
 
  
 
        {
        
   

 
            
        
 if
 
         (pointLogDao.exists(orderId)) 
        
 return
 
        ;
        
   

 
            
        
   

 
            User user = userDao.selectForUpdate(userId); 
        
 // 悲观锁
 
        
   

 
            user.setPoints(user.getPoints() + points);
        
   

 
            userDao.update(user);
        
   

 
            pointLogDao.insert(
        
 new
 
         PointLog(orderId)); 
        
 // 幂等日志
 
        
   

 
        }
        
   

 
      
    

这里使用了数据库行锁实现的幂等性。

3.3 死信队列

RabbitMQ的终极保命配置如下:


      
      
          

        
 // 消费者设置手动ACK
 
        
   

 
        channel.basicConsume(queue, 
        
 false
 
        , deliverCallback, cancelCallback);
        
   

 
        
   

 
        
 // 达到重试上限后进入死信队列
 
        
   

 
        
 
 public
 
  
 
 void
 
  
 
 process
 
 
 (Message msg)
 
  
 
        {
        
   

 
            
        
 try
 
         {
        
   

 
                doBiz();
        
   

 
                channel.basicAck(deliveryTag);
        
   

 
            } 
        
 catch
 
        (Exception e) {
        
   

 
                
        
 if
 
        (retryCount < 
        
 3
 
        ) {
        
   

 
                    channel.basicNack(deliveryTag, 
        
 false
 
        , 
        
 true
 
        );
        
   

 
                } 
        
 else
 
         {
        
   

 
                    channel.basicNack(deliveryTag, 
        
 false
 
        , 
        
 false
 
        ); 
        
 // 进入DLX
 
        
   

 
                }
        
   

 
            }
        
   

 
        }
        
   

 
      
    

消费者端手动ACK消息。

在消费者端消费消息时,如果消费失败次数,达到重试上限后进入死信队列。

这个方案救了社交系统的推送服务——通过DLX收集全部异常消息,凌晨用补偿Job重跑。

4 系统架构设计

接下来,从系统架构设计的角度,聊聊MQ要如何保证数据一致性?

4.1 生产者端

对于实效性要求不太高的业务场景,可以使用:本地事务表+定时任务扫描的补偿方案。

流程图如下:picture.image

4.2 消费者端

消费者端为了防止消息风暴,要设置合理的并发消费线程数。

流程图如下:picture.image

4.3 终极方案

对于实时性要求比较高的业务场景,可以使用 事务消息+本地事件表 的黄金组合.

流程图如下:picture.image

5 血泪经验十条

  1. 消息必加唯一业务ID (别用MQ自带的ID)
  2. 消费逻辑一定要幂等 (重复消费是必然事件)
  3. 数据库事务和消息发送必须二选一 (或者用事务消息)
  4. 消费者线程数不要超过分区数*2 (Kafka的教训)
  5. 死信队列必须加监控报警 (别等客服找你)
  6. 测试环境一定要模拟网络抖动 (chaos engineering)
  7. 消息体要兼容版本号 (血的教训警告)
  8. 不要用消息队列做业务主流程 (它只配当辅助)
  9. 消费者offset定时存库 (防止重平衡丢消息)
  10. 业务指标和MQ监控要联动 (比如订单量和消息量的波动要同步)

总结

(MQ)消息队列像金融系统的SWIFT结算网络,看似简单实则处处杀机。

真正的高手不仅要会调参,更要设计出能兼容 可靠性性能 的架构。

记住,分布式系统的数据一致性不是银弹,而是通过层层防御达成的动态平衡。

就像当年我在做资金结算系统时,老板说的那句震耳发聩的话: “宁可慢十秒,不可错一分”

最后欢迎

加入苏三的星球

,你将获得:AI开发项目课程、苏三AI项目、

商城微服务实战、秒杀系统实战

商城系统实战、秒杀系统实战、代码生成工具、系统设计、性能优化、技术选型、底层原理、Spring源码解读、工作经验分享、痛点问题

、面试八股文

等多个优质专栏。

还有1V1答疑、修改简历、职业规划、送书活动、技术交流。

扫描下方二维码,即可加入星球:

picture.image

目前星球已经更新了 5200+ 篇优质内容,还在持续爆肝中.....

星球已经被 官方推荐了3次 ,收到了小伙伴们的一致好评。戳我加入学习,已有 1600+ 小伙伴加入学习。

picture.image

苏三的免费八股文网站:

www.susan.net.cn

picture.image

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
字节跳动云原生降本增效实践
本次分享主要介绍字节跳动如何利用云原生技术不断提升资源利用效率,降低基础设施成本;并重点分享字节跳动云原生团队在构建超大规模云原生系统过程中遇到的问题和相关解决方案,以及过程中回馈社区和客户的一系列开源项目和产品。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论