ES 作为一个分布式搜索引擎,从扩展能力和搜索特性上而言无出其右,然而它有自身的弱势存在,其作为近实时存储系统,由于其分片和复制的设计原理,也使其在数据延迟和一致性方面都是无法和 OLTP(Online Transaction Processing)系统相媲美的。
也正因如此,通常它的数据都来源于其他存储系统同步而来,做二次过滤和分析的。这就引入了一个关键节点,即 ES 数据的同步写入方式,本文介绍的则是 MySQL 同步 ES 方式。
将 MySQL 数据写入 ES,首先想到的一定是消费 Binlog 直连 ES 写入,这种方式简单明了,然而如果稍微考量维度多一点,就会发现该方式的一些弊端。因此还有另外一个方式,即 【RocketMQ + Flink Consumer + ES Bulk】集成生态 , 我们将 从同步延迟、消费特性,ES 写入性能、系统容灾能力四个方面评估这两种接入方式 ,希望给到大家灵感并选择适合业务的同步方式。
ES 基础写入原理
ES 写入属于追加式写入,先形成特定大小的 Segment,然后定时 Merge 小数据段为大数据段以减少内存碎片,提升查询效率的过程。一个 Index 由 N 个 Shard 及其副本构成,存储了同一种 Type 类型的 Documents,由 Mapping 定义了其索引方式,每一个 Shard 由 N 个 Segment 组成,每个 Shard 都是一个全功能且完整的 Lucene 索引,它是 ES 的最小处理单元;Segment 是 ES 最小的数据处理单位,每个 Segment 都是一个独立的倒排索引。
ES 写入其实是不断将数据写入到同一个 Segment(内存),然后触发 Refresh 刷新,将 Segment 刷新到 OS Cache(默认 1s),此时数据就可以查询到了,OS Cache 会由操作系统触发 Flush 操作持久化到磁盘。
引发思考:ES 是如何保证数据不丢失的呢?追加式写入的优劣点是什么?追加式写入是如何处理数据更新问题的?MySQL 是属于哪种写入方式呢?本文重点不在此处,大家可以另行查阅文章。
ES 基本概念
ES 写入过程
ES 直连写入
采用 ES 直连写入的优点是因为路径短,依赖组件少,加上 Dsyncer(异构存储转换系统)通常已经提供了完善的限流重试机制,所以消费延迟和消费的数据完整性都是可以保证的。
缺点:
- 不易于接入多机房容灾部署,目前 ES 容灾机房都属于独立部署,独立读写模式,所以如果采用该方式,则难以同时对多机房写入分别做管控,达不到容灾效果。Binlog-->Dsyncer 通常一个 MySQL Table 对应一个转换任务,如果为了写多机房起多个重复的转换任务,则显得有些愚笨。
- 如果自身业务场景有对同一条记录并发写场景,但写不一定全部来源于 Binlog 的情况下,那全局考虑直写 ES 则更容易遇到写入冲突问题,因为缺乏有序队列的保障。
通过 Flink 搭建 ES 集成系统
Flink 搭建 ES 集成系统,则指的是所有的 ES 写入都由 Flink 任务完成,Flink 监听 RocketMQ 实时数据流,既保证了数据的分区有序性,又充分利用了 ES 的批量写入能力,ES 的批量写入能力比单条写入性能高出多倍。同时由于 Flink 本身的容错性,即使在异常场景下,也能保证数据的最终一致性。
优点 :
- 通过 MQ 可以更快捷的接入多机房 ES 集群,写入解耦,三机房分别起消费者写入数据, 彼此独立 ,当出现单机房故障时,只要有可用机房,直接处理读流量切流即可, 容灾方案简单清晰 ;
- 网络抖动等问题会导致 ES 暂时性写入失败时,不影响其他集群写入的情况下,RocketMQ 会暂存消息,Flink 会保存消费快照,不断重试直至成功, 更好的保障了数据最终一致性 ;
- 多数据源写入能保证 全局分区一致性 。
缺点 :
- 依赖了更多组件,会增加全链路数据同步延迟,而 ES 默认的 Refresh 频率是每秒一次,经测试该链路正常情况下数据延迟都是秒级的,不是完全不可接受;
- 依赖了更多组件,对基础组件的稳定性有更高的要求,RocketMQ 异常,或者 Flink 任务异常都会导致同步链路出现问题,增加一定的业务异常风险。
在这里需要注意的一个问题是有人可能会考虑接入多机房 ES 集群,是怎么保证多机房同时成功的、以及怎么保证写入成功后就可以查询得到?目前这两点暂时无法做到,因为多个机房都是独立写入的,互不影响,且 ES 集群属于弱数据一致性集群,无法保证写入成功立刻就能查到。
搭建并运行一个 ES Flink 消费程序的必备条件 :
- Flink 运行环境 :首先需要有 Flink 任务的运行环境,通常企业级的 Flink 任务会作为一个 YARN 作业在分布式系统中被调度并分配资源执行,但同时 Flink 也可作为单机进程,亦或搭建一个独立集群运行。
- ES 消息格式 :需要约定一种 ES 消息传输格式和序列化方式,一套范式解决所有同步场景,目前流行的序列化方式是 pb 格式或 json 格式,目前我们都是推荐使用 pb 格式的,数据格式 Schema 定义:
syntax = "proto3";
message ESIndexInfo {
string Name = 1; // 文档要写入索引的名称或别名
}
enum ESOPType { // 文档写入操作类型
DELETE = 0; // 删除文档
INDEX = 1; // 创建新文档或更新老文档,只能全量更新 (替换老文档)
UPDATE = 2; // 更新老文档,支持部分更新 (合并老文档)
UPSERT = 3; // 创建新文档或更新老文档,支持部分更新 (合并老文档)
CREATE = 4; // 创建新文档,存在时报错丢弃
}
message ESDocAction {
ESIndexInfo IndexInfo = 1; // 索引信息 (必需)
ESOPType OPType = 2; // 操作类型 (必需)
string ID = 3; // 文档 ID (可选)
string Doc = 4; // 文档内容 (JSON 格式, 删除操作时不需要)
int64 Version = 5; // 文档版本 (可选, 大于 0 且操作为 index/create/delete 有效)
string Routing = 6; // 文档路由 (可选, 非空有效)
string Script = 7; // 文档脚本 (JSON 格式, 操作类型为 update/upsert 有效,但和 Doc 不能同时存在)
}
- Flink 任务必要配置 :监听的 RocketMQ Topic 信息,写 ES 集群信息;
- Flink 执行函数 :Flink 处理流式消息有流式 SQL 和自定义应用程序两种方式,流式 SQL 约束于本身的一些限制,比如不支持同一个 MQ 有多个索引消息,而自定义编程更加灵活,比如添加各种打点,日志,错误码处理等,推荐该方式;
- Flink 资源配置 :JobManager 资源配置,TaskManager 资源配置等等;
- Flink 自定义参数配置 :可以自定义一些与应用程序紧密相关的动态配置,方便动态调节 Flink 消费能力,比如:
对比建议
经过以上介绍如果业务在都可接受秒级延迟的条件下,使用 RocketMQ+Flink 的方式能够 更好的实现有序性和容灾能力 ,Flink 在 流式任务处理能力上也远优 FaaS ,但是 直连方式 明显链路更加简洁,架构更加轻量,系统集成和维护成本较低 ,所以还是需要依照业务特性选择最适合的才是最好的。
欢迎使用上述解决方案相关产品: 火山引擎云搜索服务、消息队列 RocketMQ 版、流式计算 Flink 版。