近年来,基于云原生架构的新一代消息队列和流处理引擎 Apache Pulsar 在大数据领域发挥着愈发重要的作用,其应用场景和客户案例也在不断地丰富与扩充。
火山引擎是字节跳动的企业服务品牌,主要面向 To B 业务场景。火山引擎中 Stateless 云原生开源大数据平台 E-MapReduce(简称 EMR)为用户提供了云上的端到端的大数据解决方案。与此同时,Apache Pulsar 的一个十分重要的特性也是云原生。先进的存算分离的架构使其非常适合在云化的环境中部署、运维,而 Topic 数据的存储方式也使其扩容操作大为简化,不需要数据的 rebalance 过程。于是,将 Pulsar 集成到火山引擎 EMR 的生态系统中便是一件水到渠成且极具价值的事情。
本文介绍火山引擎 EMR 中 Apache Pulsar 的集成情况和应用场景,按照如下结构来编排:
- 业务背景
- 详解Apache Pulsar 在EMR的集成方案
- Apache Pulsar 典型应用场景、问题与解法
- 火山引擎 EMR 集成 Pulsar 的未来规划
火山引擎是字节跳动旗下的云服务平台,将字节跳动快速发展过程中积累的增长方法、技术能力和工具开放给外部企业,提供云基础、视频与内容分发、数智平台 VeDI、人工智能、开发与运维等服务,帮助企业在数字化升级中实现持续增长。
火山引擎 EMR 是火山引擎数据中台产品体系的基座。数据中台是火山引擎中的一类重要产品,服务于用户的大数据体系,支撑用户构建端到端的数据链路。火山引擎数据中台产品体系如下图所示。
数据中台的大数据生产、服务体系,数据来源于交易系统、日志、IoT、消息、文件等,通过数据集成进入到数据湖中,然后经过数据开发、治理过程,进入到专题集市,最后通过数据分析平台提供给数据的最终用户,包括 BI 报表、离线分析、实时分析、即席查询、数据挖掘等。以上是用户搭建大数据体系的一条完整的数据链路。在这条数据链路上的各个环节都有火山引擎数据中台的产品来对接。火山引擎 EMR 产品在数据中台整个的产品体系全景图中,处于基座的位置(如上图中黄色框所示),对于用户构建端到端的数据链路起着重要的支撑作用。火山引擎 EMR 基于火山引擎的 IaaS 能力,提供底层基础的大数据体系的计算引擎和存储引擎,并向上对接数据开发治理工具 DataLeap。
如果用一句话来定义火山引擎 EMR 这个云产品,那就是“Stateless 云原生开源大数据平台”。用户可以在 EMR 产品中创建自己的集群,并使用 EMR 集群中配置好的服务,进行大数据的计算与存储。
这里重点分析一下火山引擎 EMR 产品定义中的几个关键词。云原生、开源、大数据平台这些概念相信都是读者们耳熟能详的。
云原生是指云上资源的池化、用户的弹性按需使用、资源的成本摊薄和利用率提升等。开源大数据平台则是 EMR 这类云产品的共有定义。接下来重点讲一下 Stateless 这个概念。
Stateless 指的是“无状态”。在 EMR 中创建的用户集群的“状态”指的是什么呢?以有状态场景下的 Hadoop 集群类型为例,集群的状态包括用户的 HDFS 中的数据(属于用户的核心数据资产)、Hive Metastore 中的元数据、Ranger 中的权限配置、各个服务的日志、历史作业执行统计信息、集群的配置信息等等。这些状态信息都是存储在用户集群内部的,是用户集群的一部分。在这样的情形下,用户的集群是一个有状态的(Stateful)集群。在 EMR 的场景下,状态信息无处不在,集群内部包含大量状态信息并不稀奇,且这些状态信息的量级较重。
然而,用户集群富含状态信息,会给用户带来额外的一些成本和困扰。例如,如果用户想升级自己的集群版本,或者对自己的集群做一些其他的运维操作(例如服务的启停、执行定制化的运维脚本等),就会有一些顾虑:用户的数据、元数据、配置等信息都在集群内部,在执行集群升级或运维操作的时候,会不会对集群内部的状态信息造成影响。事实上,如果状态信息内置在用户集群内部,用户在对集群进行运维操作的时候,是需要做仔细的评估的,确保运维操作不会对集群内部的状态信息产生预期外的影响。这会给用户对集群的运维操作带来额外的顾虑和成本。
从上面的讨论不难看出有状态的集群会给客户带来一系列痛点问题,而火山引擎的 Stateless 的 EMR 集群则针对以上问题,为用户提供了解决方案。如果我们把集群的数据、元数据、配置、历史作业信息等状态通过一些方案放置在用户集群的外部,而在用户集群的内部不再持有状态信息,这样用户的集群就是一个无状态的集群,此时用户如果需要对集群执行升级或者其他运维操作,就不会有“集群状态数据受影响”相关的顾虑了,减少了运维的风险与成本。
在 Stateless 集群的场景下,用户甚至可以选择按需去持有集群,即:需要使用计算资源的时候,创建一个集群;不需要使用计算资源的时候,将集群释放。例如如果用户的数据生产 ETL 作业集中在凌晨执行,那么可以在当日的数据生产任务执行前将集群创建出来,然后用这个集群执行一系列的 ETL 作业,而在所有作业都成功执行完成后,再把这个集群释放掉。而到第二天凌晨,新一轮的数据生产作业执行之前,再创建出一个集群,待数据生产完成后再释放集群。如此循环往复。这样用户可以只为集群真正被使用的那段时间付费,而在不需要使用集群的时段,用户不需要持有集群,不存在用户持有的资源闲置的问题,用户也就不需要为闲置资源付费。这样可以给用户带来极大的成本优化,并提升云上资源的利用率。Stateless 的EMR 集群为这样的使用方式提供了可能。
上面介绍了火山引擎 EMR 的核心定义。针对火山引擎 EMR 的核心功能,进一步展开讲一下,就是提供了企业级的大数据生态组件,例如:Hadoop、Spark、Flink、Hive、Presto、Kafka、ClickHouse、Hudi、Iceberg 等,100% 开源兼容,快速构建企业级大数据平台,降低运维⻔槛。
火山引擎 EMR 的核心特性包括以下几点:
- 开源兼容 & 开放环境:大数据组件来自开源社区,与开源版本兼容。EMR 提供半托管的环境。EMR 托管在火山引擎的基础设施之上,通过管控面将用户在控制台上的操作传递到用户集群内部。但是这个意义上的托管并不是“全托管”,而是“半托管”——用户有足够的自主性、灵活性,可以登录到自己集群的节点的命令行环境中,执行灵活的运维操作,如脚本执行、软件安装与部署等,以满足用户的个性化需求。也就是说,“半托管”一方面可以通过云托管、白屏化来解决用户实际运维中的痛点问题,降低用户的运维成本,另一方面又不失灵活性,用户可以自主控制自己集群内的节点,有极大的自由度。
- Stateless 云原生湖仓:Stateless 的概念在上文已有详述。火山引擎 EMR 通过存算分离把集群内部的数据外置到云存储中,如火山引擎对象存储 TOS,不再依赖用户集群内部的 HDFS。此外,通过外置 Hive Metastore、Public History Server、作业管理、配置中心等产品和技术方案,进一步把集群内部的状态信息外置。另外,通过弹性伸缩,支持用户在云上合理地调配资源,实现资源利用的最大化和成本的节约。Stateless 的架构也使得弹性伸缩的扩缩容过程更加轻量化,运维成本和风险得以降低。另外,火山引擎 EMR 也支持 Lakehouse(湖仓)这一近年来兴起的数据开发理念。
- 引擎企业级优化:可以分两方面来看。一方面是火山引擎 EMR 针对开源的大数据组件在功能和性能上做了一些增强,后续也会将一些增强回馈社区。另一方面是给引擎增加了一些企业级的特性,例如权限相关的功能。
- 云上便捷运维:复用了云上 EMR 的通用的管控底座能力,各个类型的集群的创建等操作复用 EMR 的公共管控底座。支持按量付费和包年包月的计费模式。支持集群的按需创建和释放。支持集群内服务的操作、参数配置、监控、报警、日志等运维能力。用户在购买 EMR 后可以直接在控制台对接使用这些功能,开箱即用,十分方便。用户可以把大量的运维操作交给云,或者借助云上提供的能力大大降低用户的运维成本。很多原本需要通过命令行和运维流程操作的运维动作,在火山引擎 EMR 中可以通过控制台界面白屏操作。这样用户可以专注于自身的业务逻辑、增长逻辑,而把大数据平台的构建和运维交给云平台。这也是云上的 EMR 产品能够给用户提供的核心价值之一。
下图为火山引擎 EMR 的功能架构图。
火山引擎 EMR 建构在火山引擎的基础设施底座上,由火山引擎提供云服务器、公网 IP、云存储、VPC 等基础设施。在基础设施底座上,建构出数据存储引擎(如 HDFS、CloudFS、表格式等)、数据调度引擎(如 YARN 等)、各种面向不同场景的大数据计算、存储组件以及贯穿整个 EMR 服务端到端的管控面。EMR 向上可以对接火山引擎的大数据研发治理套件 DataLeap,支持用户构建数据仓库,赋能百行百业,助力企业决策,帮助业务成长,体现数据价值。
从 EMR-1.3.0 版本开始,火山引擎 EMR 支持 Pulsar 集群类型的创建。下面我们来具体看一下火山引擎 EMR 集成 Apache Pulsar 的情况。
本节内容重点讨论 Apache Pulsar 集成火山引擎 EMR 的原因和方案。
火山引擎 EMR 是一个云上的大数据平台,覆盖大数据开发领域各个场景,包括离线计算、实时计算以及存储、数据调度、工具链等。
除此之外,还有一类组件不可或缺的,即消息队列,至少有两类不同的场景依赖消息队列:
- 第一个场景是数据摄入(Data Ingestion),即从业务系统(也就是整个 大数据 体系的外部)把源头数据接入到大数据体系中,涉及到一个数据从业务系统向大数据体系传输的过程。
-
- 以客户端埋点日志为例,埋点日志被上报到消息队列,该消息队列为大数据链路的第一站。从该消息队列开始,数据会继续向下游的离线 Hive 表或者实时数仓的下游消息队列流动。在此场景下,作为整个大数据体系的源头,消息队列连通业务系统和数据仓库,将大数据体系外面的数据上报到消息队列后,消息队列作为一个沟通的纽带,消息会流向下游的数据仓库的各层存储中,进入大数据体系内部。
- 不光是埋点日志信息,用户的业务数据库的信息,也可以通过把数据库 binlog 上报到消息队列,由计算任务消费消息队列中的 binlog 并把数据写入下游表,实现业务数据库的数据向数仓的同步,在数仓中重建出业务库的副本。
- 此外,像监控、日志类型的数据也可以上报到消息队列,再通过消息队列将对应的数据传导到大数据体系的内部。
- 第二个典型应用场景是 实时数仓 。
-
- 数据接入到数据仓库后,可以继续通过 ETL 过程构建离线表,也可以构建实时数据链路,使用实时处理逻辑将数据写到下游的消息队列中,而这个消息队列可以再进入下一级的实时处理逻辑,或做 mapping,或做聚合,进入到下一级的消息队列中。
- 以上消息队列相当于实时数仓的实时表,存放 ODS、DWD、DWS、ADS 等层级的实时数仓数据。在这里,是使用消息队列作为实时数仓各层数据的存储。
- 在最终数据应用的时候,根据应用场景的实际需要和查询特点,可以将实时数仓消息队列中的数据导出到像 Redis 这样的 K-V 存储中,或者像 StarRocks、Doris、ClickHouse 这样的 OLAP 引擎中。
- 实时数仓的数据链路的中间层依赖消息队列的,因为实时数据的处理主要是流处理,而消息队列的存储与计算模式与流处理的模式是天然契合的。
从上面的讨论可以看出,消息队列至少在数据接入和实时数仓中间层两个大数据体系的场景中扮演着不可或缺的作用,因此是大数据体系离不开的一类组件。所以火山引擎 EMR 将消息队列集成进来也就成为了一件水到渠成的很自然的事情了。
而在消息队列领域中,近年来发展迅速、表现优异、备受关注的一个佼佼者便是 Apache Pulsar。以上是我们选择将 Apache Pulsar 集成到火山引擎 EMR 的原动力之一。
当然除了这一点之外,还有以下的一些其他的原因。让我们来看一下 Apache Pulsar 的基本情况,以及一些核心的特性和优势。正是这些特性和优势,促成了我们将 Apache Pulsar 集成到火山引擎 EMR 中,并相信这样做会给用户带来很大的价值。
Apache Pulsar 是一个开源的基于发布 / 订阅模式的分布式、云原生、多租户的高性能消息与流平台,提供消息队列和计算服务,解决服务器间的消息传输与队列问题。
Pulsar 具有很多令人瞩目的特性和优势,下面选取了其中的一部分,主要是与把 Pulsar 集成到 EMR 最相关的一些关键要素。正是这些关键要素,使得我们相信把 Pulsar 集成到火山引擎 EMR 中确定会给用户带来很大的价值。这些关键要素列举如下:
- 弹性:支持用户无感知的动态扩缩容,提供更好的弹性,为用户节省硬件成本,更好地契合了云上产品的特征。这是云上产品的基础特性,也是一个产品想要上云所需要具备的特性,能够给客户带来上云的实际价值。
- 云原生:采用先进的云原生架构,将有状态的存储与无状态的计算分离在不同的架构层级中,非常适合在云化的基础设施中部署、使用和运维。这个也是被大家常常提到的 Pulsar 的核心特性,无论是基于 Kubernetes 部署,还是通过 Bare metal / ECS 部署,都可以利用到存算分离的架构特点,更好地利用云上资源池化、弹性的特点,实现更好的云原生。
- 易扩容:存算分离以及数据的分散存储的架构特点极大减少了用户对计算或存储能力进行扩容时的成本与风险,用户可以对计算或存储节点分别扩容,特别是在扩容的时候不需要做繁重的数据迁移、rebalance,对系统的可用性、稳定性、可运维性和运维成本优化大有裨益。这也是大家津津乐道的 Pulsar 的一个非常令人瞩目的优秀特征。
- 与用户既有系统(如 Kafka)兼容:通过 KoP (Kafka on Pulsar),提供与 Kafka 的在使用层面上的兼容性,便于用户直接复用已有的基于 Kafka 的代码体验 Pulsar 的特性。这一点也是非常重要的,能够带来很大的用户价值。Kafka 也是非常流行且在业内被广泛使用的一个消息队列组件,用户可能也会有很多基于 Kafka 开发的业务代码。如果用户希望把这些业务代码在 Pulsar 上面进行试用与体验,那么如果 Pulsar 与用户既有的一些系统(如 Kafka)兼容,就可以零成本或者低成本地把既有的业务代码放到 Pulsar 上来体验,更易于用户去体验 Pulsar 的各种令人瞩目的特性和功能。这一点对用户的价值很大。假设 Pulsar 没有提供与 Kafka 协议的兼容性,那么如果用户想体验 Pulsar,把既有的一些代码放到 Pulsar 上面试用、体验,可能需要对既有业务代码做一些修改、适配和迁移,这些工作也是有成本的,且迁移工作能够给用户在业务层面带来的价值有限,只是相当于在技术实现层面把代码进行了系统之间的迁移和适配,但是会给用户带来一些痛点和运维成本。所以如果能够做到和用户既有系统的兼容,可以帮用户省去一些很繁重的迁移工作,会带来很大的用户价值。
基于以上这几点, Pulsar 可以很好地为客户提供价值、增值,这也促成Pulsar 集成到火山引擎 EMR 中。
下面针对上文中提到的 Pulsar 的云原生架构和易扩容的特性,再展开讲一下技术细节。
Pulsar 的云原生架构,如下图所示:
具体来讲,有以下几点要素:
- 计算和存储分离,消息数据存储在 BookKeeper 的 Bookie 中,由 Broker 提供服务。
- Broker 节点和 Bookie 节点可分别运维、扩缩容。
- 支持数据 offload 到云上的对象存储。
此外,Pulsar Client 与 Pulsar Broker 进行对接。ZooKeeper 节点与 Broker、Bookie 交互,处理元数据以及分布式系统中的协调。
Pulsar 的另一个重要特性是易扩容。Pulsar Topic 数据的存储模式使得节点扩容时不需要 rebalance。这个的原因是 Pulsar 采用了 Topic - Ledger - Fragment - Entry 的多级结构来存储 Topic 的消息数据。如下图所示:
一个 Topic 下会有多个 Ledger,一个 Ledger 下面会有一个或多个 Fragment,每一个 Fragment 下面会有多条消息(多个 Entry)。每个 Fragment 的实际数据的存储位置是在一组 Bookie 上面,不同的 Fragment 对应的 Bookie 的集合都是不一样的。这样的一个结构使得每一个 Topic 的消息天然分布在不同的 Bookie 节点中,而不同的 Fragment 的数据存储在不同的 Bookie 集合中。
如果用户扩容一个新的 Bookie 节点,只需要把 Topic 的新的 Ledger / Fragment 的数据写入新 Bookie。旧 Bookie 的数据不用 rebalance。Pulsar 中的 Topic 和具体的存储节点并没有耦合、绑定。假设一个 Topic 的数据绑定在某一个固定的存储节点上,那么如果单纯地扩容存储节点,且如果 Topic 的数量不变,那么新的存储节点是不会有 Topic 的数据写进去的。为了让新扩容出来的存储节点能够被利用到,能够被写入 Topic 的数据,就需要更改一部分 Topic 与存储节点的绑定关系,这样就涉及到了数据的搬迁,即 rebalance。
而 Pulsar 不存在这个问题,因为 Pulsar 天然就是一个 Topic 的数据分散在不同的 Bookie 节点中存储,所以在新扩容出一个 Bookie 节点后,一个 Topic 中的新的数据是可以写入到新的 Bookie 节点中的,新的 Bookie 节点也不用担心没有数据写进去。而 Topic 中的一些历史存量数据仍然存放在原来的地方,不用做存量数据的搬迁、rebalance。
这样的话,对于用户来说,在扩容时的运维成本、风险和复杂性都大大降低了。这是 Pulsar 给客户提供的核心价值之一。
相比于其他消息队列组件,Pulsar 也提供了一些差异化价值。下面这张表对比了 Pulsar 与 Kafka 的部分特性。
综上所述,基于以上的一些情况,促成了我们把 Pulsar 集成到火山引擎 EMR 中。这样做可以给用户、Pulsar 和火山引擎 EMR 三方都带来收益,是一个“多赢”的局面。
- 给用户带来价值:
-
- 将 Pulsar 的众多令人瞩目的特性更便捷地提供给用户,在火山引擎 EMR 中一键创建 Pulsar 集群后“开箱即用”。
- 方便用户在云原生环境下扩容消息队列,复用云上 EMR 的管控能力,降低大数据体系的使用和运维成本。
- 方便用户将 Pulsar 与火山引擎生态的其他的一些服务(例如 DataLeap 大数据开发、治理)融合起来,构建大数据端到端的全链路。
- 给 Pulsar 带来价值
-
- 直接为 Pulsar 集群提供扩展性和弹性,按需付费。
- 快速、系统化对接服务的配置、启停、扩容等操作。
- 将 Pulsar 融入到火山引擎 EMR 生态中,与大数据生态系统中的其他组件更方便地交互。
- Pulsar 集群与其他类型的 EMR 集群(如 Hadoop、Flink)位于同一个 VPC 内,网络互通,减少网络打通的成本。
- 复用 EMR 通用的管控能力。
- 与火山引擎丰富的产品线融合,例如大数据研发治理套件 DataLeap。
- 为火山引擎 EMR 带来价值。
-
- 提供云原生、运维成本低的大数据基础组件。EMR 中需要集成消息队列组件,而 Pulsar 是其中的佼佼者。
- 扩充火山引擎 EMR 的场景和整体生态的端到端能力,增强实时流数据处理能力,构成用户数据链路中的重要一环。
接下来的几张截图展示了火山引擎 EMR 中创建和使用 Pulsar 集群类型的场景。
从 EMR-1.3.0 版本起,用户可以创建类型为 Pulsar 的集群:
包含 BookKeeper、Pulsar、ZooKeeper 服务,用户可以白屏化运维,例如服务的启停、服务的基本信息查看等:
用户可以在控制台对 Pulsar 的参数进行配置:
用户可以在控制台查看 Pulsar 运行时的监控数据、服务日志和操作日志:
在本节的最后,主要介绍 Pulsar 集成到火山引擎 EMR 的方案。主要步骤如下:
- 镜像制作与手动拉起:将 Pulsar 安装包集成进 EMR 镜像,建立一个既有类型的 EMR 集群,手动部署 / 运行 ZooKeeper, BookKeeper, Pulsar (Broker)。
- 自动化部署代码编写:将手动部署的逻辑转化为集群内的 Agent 调用的自动化部署代码,并考虑异常情况处理。
- 管控服务端:管控服务端配置元数据,以在控制台增加 Pulsar 集群类型相关内容,并驱动管控通用底座调用上一步编写好的自动化部署代码。
- 参数:Pulsar 参数支持用户可配置 / 系统动态生成。
- 监控、告警、日志的对接。
下图为系统整体的控制流。管控服务端会和用户集群内部的 Agent 交互,把管控的操作命令下发到集群中去,在集群中执行具体的运维操作。如集群、服务的启停、参数的配置等。
在集成 Pulsar 的整个过程中,也遇到过一些问题。这些问题最终都通过排查以及查阅社区资料等做法得以解决。以下面这个问题为例:
Pulsar Broker 在自动化启动时报错:
ERROR org.apache.pulsar.broker.PulsarService - Failed to start Pulsar service:
org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException:
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /counters/producer-name
- 问题排查:通过查阅社区资料,社区已经遇到过并已解决该问题。在多个 Pulsar Broker 同时启动的时候会出现这个问题。
- 短期解决方案:Pulsar Broker 启动时增加重试机制。
- 长期解决方案:目前 Pulsar 社区针对此问题的修复已合入,后续考虑升级 EMR 集成的 Pulsar 版本。
-
- 上面我们对火山引擎 EMR 集成 Apache Pulsar 的情况进行了概要介绍。下面我们来看一下火山引擎 EMR 中的 Pulsar 的一些典型应用场景。
本节将简要介绍火山引擎 EMR 集成 Apache Pulsar 的两个典型应用场景:实时数仓与批流一体。
Pulsar 和火山引擎 EMR 中的其他一些组件可以相互配合,共同完成场景问题的解决,发挥价值、发挥作用。
3.1 实时数仓
首先看一个典型的、简化的实时数仓场景:给定业务库中全量商品的订单表,统计截止到当前的各个商品的订单总量。
这里面有两点需要注意:
- 订单表中有订单状态,在统计订单量的时候需要过滤掉无效订单。
- 订单状态随时可能发生变化。
上面两点给实时数仓的开发带来了很大的复杂性。源头的业务库中的数据可变,在实时流处理的时候需要考虑到这种变化,并对实时计算结果进行调整。
输入输出样例如下图所示:
上图左边是业务库中订单粒度的原始表,我们期望聚合成右边的以商品为粒度的商品总订单数的统计表。
另外,为了不影响线上业务,不允许直接查询线上业务库得到结果,需要以业务库为数据源建立数据仓库来支持数据分析需求。
当然,有很多成熟的方案可以解决这个问题。例如经典的 Lambda 架构,其核心思想是分为离线和实时两条链路:离线链路计算历史数据,实时链路计算当日数据。最后把历史数据和当日数据 merge 起来。如下图所示:
Lambda 架构是比较成熟的方案,但也存在一些问题,如下:
- 同时维护离线、实时链路,链路复杂,资源消耗大,维护成本高。
- 对于部分订单状态发生变化的情况,难以很好处理。例如历史订单在当日(今日)发生了失效,状态从有效变为了无效,这时处理起来会有一些复杂性,需要考虑对离线历史数据的实时调整。
- 离线计算和实时计算结果需要 merge,需要精确把握时间点,离线和实时的计算结果的时间范围需要做到不重、不漏。
- 对于需要从多个源表获取数据,且多个源表的字段值有可能发生变化的情况,则更为复杂。这里限于篇幅,不展开讲了。感兴趣的读者可以构造一些情况来推演一下相关的处理逻辑,会发现里面确实会有许多复杂的情况,涉及到流 join、数据的消费顺序等。可以梳理一下其中遇到的问题。
除了 Lambda 架构,还有另一个方案基于 upsert 离线表(如 Hudi 表)的计算。其核心思想是在 Hudi 表中近实时同步业务库中的数据(通过消费 binlog 数据),在 Hudi 表(相当于一个订单粒度的近实时表)的基础上,每隔一段时间(如 15 分钟)按照离线链路聚合数据的方式全量计算一次聚合结果,并将生成的结果同步到 OLAP 引擎中供查询。聚合计算的源头 Hudi 表是近实时更新的,聚合计算过程是近实时触发的,因此 OLAP 引擎中的结果表的时效性也是近实时的。这个方案的数据处理链路如下图所示:
这个方案的一个好处是,复用离线数据开发的逻辑到 Hudi 表的近实时全量计算逻辑中,以较低的成本来实现近实时的统计分析,但也会有一些问题,列举如下:
- 需要较高频率的离线全量计算,消耗计算资源。
- 对离线存储资源仍有消耗。
- 不是纯实时(秒级)更新,而是一个近实时的过程。
针对以上实时数仓的场景, Pulsar 具备解决方案。具体来说,线上业务库的订单表输出 binlog 到 Pulsar 消息队列中。这个消息队列有全量的数据,其中冷数据可以 offload 到对象存储中。接下来可以使用 Pulsar SQL 每 15 分钟针对 Pulsar 中的全量数据计算一次聚合结果,并将计算结果写入 OLAP 引擎中供查询。这个方案类似于上面提到的 Hudi 方案,不同之处在于利用了 Pulsar SQL,相当于可以直接去查询消息队列中存储的数据。
整个计算链路如下图所示:
好处是:
- 可以利用 Pulsar 的分级存储特性,将冷数据写入对象存储。
- Pulsar 消息队列的存储,既可以作为中间数据的存储,也可以作为离线 ODS 层数据的存储,节省存储资源,链路简化。Pulsar 的分级存储和 Pulsar SQL 等特性使得直接在消息队列存储中做计算成为可能,进而简化数据处理链路。
通过上面的讨论,我们看到了在火山引擎 EMR 中,可以将其中的一些大数据组件和 Pulsar 结合起来使用,解决实时数仓开发中的一些问题。
3.2 批流一体
埋点日志数据存在实时处理和离线处理的需求:
- 离线链路:用于天级报表、离线训练数据等场景。
- 实时链路:用于实时分析、推荐等场景。
一个经典方案,类似于上文提到的 Lambda 架构,需要维护离线和实时两套数据链路,如下图所示:
这样的方案在实施上比较成熟,但是占用资源较多,维护成本较高。
而基于 Pulsar 也可以有一类方案,聚焦在实时链路。埋点日志数据上报到 Pulsar 中,用实时任务去写下游的 DWD 和 DWS 层(到 Pulsar 中)。整个 Pulsar 的实时链路也支持数据 offload 到对象存储。数据也可以直接写到 OLAP 层。如果有离线数据计算的需求,可以用 Pulsar SQL 直接对接 Pulsar 中存储的数据。整个数据链路如下图所示:
- 基于 Pulsar 的分级存储和 Pulsar SQL 等特性,可以直接把 Pulsar 中的数据作为离线链路的 ODS 层。
- Pulsar 的下游可以直接对接实时处理逻辑。
-
- 若基于 Pulsar 中的原始日志数据,建立实时数仓,实时计算 ODS 层数据生成 DWD 层数据到 Pulsar topic 中,则 Pulsar topic 中的 DWD 层数据可以同时直接用于后续的离线计算和实时处理。
- DWS 层同理。
以上列举了实时数仓和批流一体中的一些典型场景和可能遇到的问题,以及使用火山引擎 EMR 中的 Pulsar 和其他组件的可能的解决思路。在本文的下一节,我们将简要介绍一下火山引擎 EMR 集成 Apache Pulsar 的未来规划。
目前火山引擎 EMR 已将 Apache Pulsar 集成进来,用户可以在火山引擎 EMR 中创建、使用、运维 Pulsar 集群。关于这部分工作未来的规划,主要分为以下几部分。
首先,我们会进一步探索云原生方向,在云原生的背景下把火山引擎 EMR 与 Pulsar 集成地更好,例如与 Kubernetes、火山引擎对象存储的结合等。
与此同时,我们也希望在当前的 Pulsar 的集成工作的基础上,对 Pulsar 引擎本身有更多的贡献,参与到社区开发中,为 Pulsar 贡献功能和代码。
当然,我们也会持续把火山引擎 EMR 上的 Pulsar 做得更好用,包括但不限于以下几点:
- 增加高可用模式,3 个 Master 节点且独立部署 ZooKeeper。
- 更多周边组件的集成。
- 更加顺滑的端到端使用体验和最佳实践。
- 与火山引擎 EMR 的其他服务,以及火山引擎其他产品更好的集成,例如 EMR Flink 集群类型、大数据研发治理套件 DataLeap 等。
- 参数、性能调优等。
本文介绍了火山引擎 EMR,以及我们将 Apache Pulsar 集成到火山引擎 EMR 的原因和方法,同时介绍了 Pulsar 的一些令人瞩目的优秀特性,并讨论了实时数仓和批流一体的一些典型场景、其中可能存在的痛点问题以及使用火山引擎 EMR 中的 Pulsar 结合其他组件的可能的解决方案。最后我们还展望了火山引擎 EMR 集成 Apache Pulsar 这部分工作的一些未来规划。我们会持续努力,提供更好的云上大数据产品与服务,将火山引擎 EMR 的大数据生态与 Pulsar 的卓越能力更好地结合起来并相互赋能,创造更大的价值,覆盖更多的业务场景,更好地服务用户。