MySQL同步ES的6种方案!

MySQLNoSQL数据库

大家好,我是苏三,又跟大家见面了。

引言

在分布式架构中,MySQL与Elasticsearch(ES)的协同已成为解决高并发查询与复杂检索的标配组合。

然而,如何实现两者间的高效数据同步,是架构设计中绕不开的难题。

这篇文章跟大家一起聊聊MySQL同步ES的6种主流方案,结合代码示例与场景案例,帮助开发者避开常见陷阱,做出最优技术选型。

最近建了一些工作内推群,各大城市都有,欢迎各位HR和找工作的小伙伴进群交流,群里目前已经收集了不少的工作内推岗位。加苏三的微信:li_su223,备注:所在城市,即可进群。

picture.image

方案一:同步双写

场景 :适用于对数据实时性要求极高,且业务逻辑简单的场景,如金融交易记录同步。

在业务代码中同时写入MySQL与ES。

代码如下:

  
@Transactional    
public void createOrder(Order order) {    
    // 写入MySQL    
    orderMapper.insert(order);    
    // 同步写入ES    
    IndexRequest request = new IndexRequest("orders")    
        .id(order.getId())    
        .source(JSON.toJSONString(order), XContentType.JSON);    
    client.index(request, RequestOptions.DEFAULT);    
}  

痛点

  1. 硬编码侵入 :所有涉及写操作的地方均需添加ES写入逻辑。
  2. 性能瓶颈 :双写操作导致事务时间延长,TPS下降30%以上。
  3. 数据一致性风险 :若ES写入失败,需引入补偿机制(如本地事务表+定时重试)。

方案二:异步双写

场景 :电商订单状态更新后需同步至ES供客服系统检索。

我们可以使用MQ进行解耦。

架构图如下

picture.image

代码示例如下

  
// 生产者端    
public void updateProduct(Product product) {    
    productMapper.update(product);    
    kafkaTemplate.send("product-update", product.getId());    
}    
  
// 消费者端    
@KafkaListener(topics = "product-update")    
public void syncToEs(String productId) {    
    Product product = productMapper.selectById(productId);    
    esClient.index(product);    
}  

优势

  • 吞吐量提升:通过MQ削峰填谷,可承载万级QPS。
  • 故障隔离:ES宕机不影响主业务链路。

缺陷

  • 消息堆积 :突发流量可能导致消费延迟(需监控Lag值)。
  • 顺序性问题 :需通过分区键保证同一数据的顺序消费。

方案三:Logstash定时拉取

场景 :用户行为日志的T+1分析场景。

该方案低侵入但高延迟。

配置示例如下

  
input {  
jdbc{  
    jdbc\_driver=>"com.mysql.jdbc.Driver"  
    jdbc\_url=>"jdbc:mysql://localhost:3306/log\_db"  
    schedule=>"*/5 * * * *"# 每5分钟执行    
    statement=>"SELECT * FROM user\_log WHERE update\_time > :sql\_last\_value"  
}  
}  
output{  
elasticsearch{  
    hosts=>["es-host:9200"]  
    index=>"user\_logs"  
}  
}  

适用性分析

  • 优点 :零代码改造,适合历史数据迁移。
  • 致命伤
  • 分钟级延迟(无法满足实时搜索)
  • 全表扫描压力大(需优化增量字段索引)

方案四:Canal监听Binlog

场景 :社交平台动态实时搜索(如微博热搜更新)。
技术栈 :Canal + RocketMQ + ES

该方案高实时,并且低侵入。

架构流程如下

picture.image

关键配置

  
# canal.properties    
canal.instance.master.address=127.0.0.1:3306    
canal.mq.topic=canal.es.sync  

避坑指南

  1. 数据漂移 :需处理DDL变更(通过Schema Registry管理映射)。
  2. 幂等消费 :通过 \_id 唯一键避免重复写入。

方案五:DataX批量同步

场景 :将历史订单数据从分库分表MySQL迁移至ES。

该方案是大数据迁移的首选。

配置文件如下

  
{    
"job":{  
    "content":[{  
      "reader":{  
        "name":"mysqlreader",  
        "parameter":{"splitPk":"id","querySql":"SELECT * FROM orders"}  
      },  
      "writer":{  
        "name":"elasticsearchwriter",  
        "parameter":{"endpoint":"http://es-host:9200","index":"orders"}  
      }  
    }]  
}  
}  

性能调优

  • 调整 channel 数提升并发(建议与分片数对齐)
  • 启用 limit 分批查询避免OOM

方案六:Flink流处理

场景 :商品价格变更时,需关联用户画像计算实时推荐评分。

该方案适合于复杂的ETL场景。

代码片段如下

  
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    
env.addSource(new CanalSource())    
   .map(record -> parseToPriceEvent(record))    
   .keyBy(event -> event.getProductId())    
   .connect(userProfileBroadcastStream)    
   .process(new PriceRecommendationProcess())    
   .addSink(new ElasticsearchSink());  

优势

  • 状态管理 :精准处理乱序事件(Watermark机制)
  • 维表关联 :通过Broadcast State实现实时画像关联

总结:

对于文章上面给出的这6种技术方案,我们在实际工作中,该如何做选型呢?

下面用一张表格做对比:

| 方案 | 实时性 | 侵入性 | 复杂度 | 适用阶段 | | --- | --- | --- | --- | --- | | 同步双写 | 秒级 | 高 | 低 | 小型单体项目 | | MQ异步 | 秒级 | 中 | 中 | 中型分布式系统 | | Logstash | 分钟级 | 无 | 低 | 离线分析 | | Canal | 毫秒级 | 无 | 高 | 高并发生产环境 | | DataX | 小时级 | 无 | 中 | 历史数据迁移 | | Flink | 毫秒级 | 低 | 极高 | 实时数仓 |

苏三的建议

  1. 若团队无运维中间件能力 → 选择Logstash或同步双写

  2. 需秒级延迟且允许改造 → MQ异步 + 本地事务表

  3. 追求极致实时且资源充足 → Canal + Flink双保险

最后欢迎加入苏三的星球,你将获得:AI开发项目课程、苏三AI项目、商城微服务实战、秒杀系统实战、商城系统实战、秒杀系统实战、代码生成工具、系统设计、性能优化、技术选型、底层原理、Spring源码解读、工作经验分享、痛点问题、面试八股文等多个优质专栏。

还有1V1答疑、修改简历、职业规划、送书活动、技术交流。

扫描下方二维码,即可加入星球:

picture.image

目前星球已经更新了5200+篇优质内容,还在持续爆肝中.....

星球已经被官方推荐了3次,收到了小伙伴们的一致好评。戳我加入学习,已有1700+小伙伴加入学习。

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
云原生数据库 veDB 核心技术剖析与展望
veDB 是一款分布式数据库,采用了云原生计算存储分离架构。本次演讲将为大家介绍火山引擎这款云原生数据库的核心技术原理,并对未来进行展望。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论