BitSail是字节跳动自研的数据集成产品,支持多种异构数据源间的数据同步,并提供离线、实时、全量、增量场景下全域数据集成解决方案 。 本系列聚焦BitSail Connector开发模块,为大家带来详细全面的开发方法与场景示例,本篇将主要介绍Source接口部分。
文 | 浩宇 来自字节跳动数据平台BitSail团队
持续关注,本开发详解将分为四篇呈现。
● 开发详解系列一:Source(本篇)
● 开发详解系列二:SourceSplitCoordinator
● 开发详解系列三:SourceReader
● 开发详解系列四:Sink、Writer
本文将主要介绍Source接口部分:
● Source: 参与数据读取组件的生命周期管理,主要负责和框架的交互,构架作业,不参与作业真正的执行。
● SourceSplit: 数据读取分片,大数据处理框架的核心目的就是将大规模的数据拆分成为多个合理的Split并行处理。
● State: 作业状态快照,当开启checkpoint之后,会保存当前执行状态。
一、Source
数据读取组件的生命周期管理,主要负责和框架的交互,构架作业,它不参与作业真正的执行。
以RocketMQSource为例:Source方法需要实现Source和ParallelismComputable接口。
1.1 Source接口
public interface Source<T, SplitT extends SourceSplit, StateT extends Serializable>
extends Serializable, TypeInfoConverterFactory {
/**
* Run in client side for source initialize;
*/
void configure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) throws IOException;
/**
* Indicate the Source type.
*/
Boundedness getSourceBoundedness();
/**
* Create Source Reader.
*/
SourceReader<T, SplitT> createReader(SourceReader.Context readerContext);
/**
* Create split coordinator.
*/
SourceSplitCoordinator<SplitT, StateT> createSplitCoordinator(SourceSplitCoordinator.Context<SplitT, StateT> coordinatorContext);
/**
* Get Split serializer for the framework,{@link SplitT}should implement from {@link Serializable}
*/
default BinarySerializer<SplitT> getSplitSerializer() {
return new SimpleBinarySerializer<>();
}
/**
* Get State serializer for the framework, {@link StateT}should implement from {@link Serializable}
*/
default BinarySerializer<StateT> getSplitCoordinatorCheckpointSerializer() {
return new SimpleBinarySerializer<>();
}
/**
* Create type info converter for the source, default value {@link BitSailTypeInfoConverter}
*/
default TypeInfoConverter createTypeInfoConverter() {
return new BitSailTypeInfoConverter();
}
/**
* Get Source' name.
*/
String getReaderName();
}
/ configure方法 /
主要去做一些客户端的配置的分发和提取,可以操作运行时环境ExecutionEnviron的配置和readerConfiguration的配置。
示例:
@Override
public void configure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) {
this.readerConfiguration = readerConfiguration;
this.commonConfiguration = execution.getCommonConfiguration();
}
/ getSourceBoundedness方法 /
设置作业的处理方式,是采用流式处理方法、批式处理方法,或者是流批一体的处理方式,在流批一体的场景中,我们需要根据作业的不同类型设置不同的处理方式。
具体对应关系如下:
| Job Type | Boundedness | | batch | Boundedness.BOUNDEDNESS | | stream | Boundedness.UNBOUNDEDNESS |
流批一体场景示例:
@Override
public Boundedness getSourceBoundedness() {
return Mode.BATCH.equals(Mode.getJobRunMode(commonConfiguration.get(CommonOptions.JOB_TYPE))) ?
Boundedness.BOUNDEDNESS :
Boundedness.UNBOUNDEDNESS;
}
批式场景示例:
public Boundedness getSourceBoundedness() {
return Boundedness.BOUNDEDNESS;
}
/ createTypeInfoConverter方法 /
用于指定Source连接器的类型转换器;我们知道大多数的外部数据系统都存在着自己的类型定义,它们的定义与BitSail的类型定义不会完全一致;为了简化类型定义的转换,我们支持了通过配置文件来映射两者之间的关系,进而来简化配置文件的开发。
在行为上表现为对任务描述Json文件中
reader
部分的
columns
的解析,对于
columns
中不同字段的type会根据上面描述文件从
Clic
khouseReaderOptions.COLUMNS
字段中解析到
readerContext.getTypeInfos()
中。
实现:
●
BitSailT
ypeInfoConverte
默认的 TypeInfoConverter ,直接对 ReaderOptions.COLUMNS 字段进行字符串的直接解析, COLUMNS 字段中是什么类型, TypeInfoConverter 中就是什么类型。
●
FileMappingTypeInfoConverter
会在BitSail类型系统转换时去绑定 {readername}-type-converter.yaml 文件,做数据库字段类型和BitSail类型的映射。 ReaderOptions.COLUMNS 字段在通过这个映射文件转换后才会映射到 TypeInfoConverter 中。
示例:
1. FileMappingTypeInfoConverter
通过JDBC方式连接的数据库,包括MySql、Oracle、SqlServer、Kudu、ClickHouse等。
这里数据源的特点是以 java.sql.ResultSet 的接口形式返回获取的数据,对于这类数据库,我们往往将 TypeInfoConverter 对象设计为 FileMappingTypeInfoConverter ,这个对象会在BitSail类型系统转换时去绑定 {readername}-type-converter.yaml 文件,做数据库字段类型和BitSail类型的映射。
@Override
public TypeInfoConverter createTypeInfoConverter() {
return new FileMappingTypeInfoConverter(getReaderName());
}
对于 {readername}-type-converter.yaml 文件的解析,以 clickhouse-type-converter.yaml 为例。
# Clickhouse Type to BitSail Type
engine.type.to.bitsail.type.converter:
- source.type: int32
target.type: int
- source.type: float64
target.type: double
- source.type: string
target.type: string
- source.type: date
target.type: date.date
- source.type: null
target.type: void
# BitSail Type to Clickhouse Type
bitsail.type.to.engine.type.converter:
- source.type: int
target.type: int32
- source.type: double
target.type: float64
- source.type: date.date
target.type: date
- source.type: string
target.type: string
这个文件起到的作用是进行job描述json文件中 reader 部分的 columns 的解析,对于 columns 中不同字段的type会根据上面描述文件从 ClickhouseReaderOptions.COLUMNS 字段中解析到 readerContext.getTypeInfos() 中。
"reader": {
"class": "com.bytedance.bitsail.connector.clickhouse.source.ClickhouseSource",
"jdbc_url": "jdbc:clickhouse://localhost:8123",
"db_name": "default",
"table_name": "test_ch_table",
"split_field": "id",
"split_config": "{\"name\": \"id\", \"lower_bound\": 0, \"upper_bound\": \"10000\", \"split_num\": 3}",
"sql_filter": "( id % 2 == 0 )",
"columns": [
{
"name": "id",
"type": "int64"
},
{
"name": "int_type",
"type": "int32"
},
{
"name": "double_type",
"type": "float64"
},
{
"name": "string_type",
"type": "string"
},
{
"name": "p_date",
"type": "date"
}
]
},
这种方式不仅仅适用于数据库,也适用于所有需要在类型转换中需要引擎侧和BitSail侧进行类型映射的场景。
2. BitSailTypeInfoConverter
通常采用默认的方式进行类型转换,直接对 ReaderOptions.COLUMNS 字段进行字符串的直接解析。
@Override
public TypeInfoConverter createTypeInfoConverter() {
return new BitSailTypeInfoConverter();
}
以Hadoop为例:
"reader": {
"class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource",
"path_list": "hdfs://127.0.0.1:9000/test_namespace/source/test.json",
"content_type":"json",
"reader_parallelism_num": 1,
"columns": [
{
"name":"id",
"type": "int"
},
{
"name": "string_type",
"type": "string"
},
{
"name": "map_string_string",
"type": "map<string,string>"
},
{
"name": "array_string",
"type": "list<string>"
}
]
}
/ createSourceReader方法 /
书写具体的数据读取逻辑,负责数据读取的组件,在接收到Split后会对其进行数据读取,然后将数据传输给下一个算子。
具体传入构造SourceReader的参数按需求决定,但是一定要保证所有参数可以序列化。如果不可序列化,将会在createJobGraph的时候出错。
示例:
public SourceReader<Row, RocketMQSplit> createReader(SourceReader.Context readerContext) {
return new RocketMQSourceReader(
readerConfiguration,
readerContext,
getSourceBoundedness());
}
/ createSplitCoordinator方法 /
书写具体的数据分片、分片分配逻辑,SplitCoordinator承担了去创建、管理Split的角色。
具体传入构造SplitCoordinator的参数按需求决定,但是一定要保证所有参数可以序列化。如果不可序列化,将会在createJobGraph的时候出错。
示例:
public SourceSplitCoordinator<RocketMQSplit, RocketMQState> createSplitCoordinator(SourceSplitCoordinator
.Context<RocketMQSplit, RocketMQState> coordinatorContext) {
return new RocketMQSourceSplitCoordinator(
coordinatorContext,
readerConfiguration,
getSourceBoundedness());
}
1.2 ParallelismComputable接口
public interface ParallelismComputable extends Serializable {
/**
* give a parallelism advice for reader/writer based on configurations and upstream parallelism advice
*
* @param commonConf common configuration
* @param selfConf reader/writer configuration
* @param upstreamAdvice parallelism advice from upstream (when an operator has no upstream in DAG, its upstream is
* global parallelism)
* @return parallelism advice for the reader/writer
*/
ParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConf,
BitSailConfiguration selfConf,
ParallelismAdvice upstreamAdvice) throws Exception;
}
/ getParallelismAdvice方法 /
用于指定下游reader的并行数目。一般有以下的方式:
可以选 selfConf.get(ClickhouseReaderOptions.READER_PARALLELISM_NUM) 来指定并行度。也可以自定义自己的并行度划分逻辑。
示例:
比如在RocketMQ中,我们可以定义每1个reader可以处理至多4个队列 DEFAULT_ROCKETMQ_PARALLELISM_THRESHOLD = 4
通过这种自定义的方式获取对应的并行度。
public ParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConfiguration,
BitSailConfiguration rocketmqConfiguration,
ParallelismAdvice upstreamAdvice) throws Exception {
String cluster = rocketmqConfiguration.get(RocketMQSourceOptions.CLUSTER);
String topic = rocketmqConfiguration.get(RocketMQSourceOptions.TOPIC);
String consumerGroup = rocketmqConfiguration.get(RocketMQSourceOptions.CONSUMER_GROUP);
DefaultLitePullConsumer consumer = RocketMQUtils.prepareRocketMQConsumer(rocketmqConfiguration, String.format(SOURCE_INSTANCE_NAME_TEMPLATE,
cluster,
topic,
consumerGroup,
UUID.randomUUID()
));
try {
consumer.start();
Collection<MessageQueue> messageQueues = consumer.fetchMessageQueues(topic);
int adviceParallelism = Math.max(CollectionUtils.size(messageQueues) / DEFAULT_ROCKETMQ_PARALLELISM_THRESHOLD, 1);
return ParallelismAdvice.builder()
.adviceParallelism(adviceParallelism)
.enforceDownStreamChain(true)
.build();
} finally {
consumer.shutdown();
}
}
}
二、 SourceSplit
数据源的数据分片格式,需要我们实现SourceSplit接口。
2.1 SourceSplit接口
要求我们实现一个实现一个获取splitId的方法。
public interface SourceSplit extends Serializable {
String uniqSplitId();
}
对于具体切片的格式,开发者可以按照自己的需求进行自定义。
/ JDBC类存储 /
一般会通过主键,来对数据进行最大、最小值的划分;对于无主键类则通常会将其认定为一个split,不再进行拆分,所以split中的参数包括主键的最大最小值,以及一个布尔类型的 readTable
。
如果无主键类或是不进行主键的切分则整张表会视为一个split,此时 readTable 为 true ,如果按主键最大最小值进行切分,则设置为 false 。
以ClickhouseSourceSplit为例:
@Setter
public class ClickhouseSourceSplit implements SourceSplit {
public static final String SOURCE_SPLIT_PREFIX = "clickhouse_source_split_";
private static final String BETWEEN_CLAUSE = "( `%s` BETWEEN ? AND ? )";
private final String splitId;
/**
* Read whole table or range [lower, upper]
*/
private boolean readTable;
private Long lower;
private Long upper;
public ClickhouseSourceSplit(int splitId) {
this.splitId = SOURCE_SPLIT_PREFIX + splitId;
}
@Override
public String uniqSplitId() {
return splitId;
}
public void decorateStatement(PreparedStatement statement) {
try {
if (readTable) {
lower = Long.MIN_VALUE;
upper = Long.MAX_VALUE;
}
statement.setObject(1, lower);
statement.setObject(2, upper);
} catch (SQLException e) {
throw BitSailException.asBitSailException(CommonErrorCode.RUNTIME_ERROR, "Failed to decorate statement with split " + this, e.getCause());
}
}
public static String getRangeClause(String splitField) {
return StringUtils.isEmpty(splitField) ? null : String.format(BETWEEN_CLAUSE, splitField);
}
@Override
public String toString() {
return String.format(
"{\"split_id\":\"%s\", \"lower\":%s, \"upper\":%s, \"readTable\":%s}",
splitId, lower, upper, readTable);
}
}
/ 消息队列 /
一般按照消息队列中topic注册的partitions的数量进行split的划分,切片中主要应包含消费的起点和终点以及消费的队列。
以RocketMQSplit为例:
@Builder
@Getter
public class RocketMQSplit implements SourceSplit {
private MessageQueue messageQueue;
@Setter
private long startOffset;
private long endOffset;
private String splitId;
@Override
public String uniqSplitId() {
return splitId;
}
@Override
public String toString() {
return "RocketMQSplit{" +
"messageQueue=" + messageQueue +
", startOffset=" + startOffset +
", endOffset=" + endOffset +
'}';
}
}
/ 文件系统 /
一般会按照文件作为最小粒度进行划分,同时有些格式也支持将单个文件拆分为多个子Splits。文件系统split中需要包装所需的文件切片。
以FtpSourceSplit为例:
public class FtpSourceSplit implements SourceSplit {
public static final String FTP_SOURCE_SPLIT_PREFIX = "ftp_source_split_";
private final String splitId;
@Setter
private String path;
@Setter
private long fileSize;
public FtpSourceSplit(int splitId) {
this.splitId = FTP_SOURCE_SPLIT_PREFIX + splitId;
}
@Override
public String uniqSplitId() {
return splitId;
}
@Override
public boolean equals(Object obj) {
return (obj instanceof FtpSourceSplit) && (splitId.equals(((FtpSourceSplit) obj).splitId));
}
}
特别的,在Hadoop文件系统中,我们也可以利用对 org.apache.hadoop.mapred.InputSplit 类的包装来自定义我们的Split。
public class HadoopSourceSplit implements SourceSplit {
private static final long serialVersionUID = 1L;
private final Class<? extends InputSplit> splitType;
private transient InputSplit hadoopInputSplit;
private byte[] hadoopInputSplitByteArray;
public HadoopSourceSplit(InputSplit inputSplit) {
if (inputSplit == null) {
throw new NullPointerException("Hadoop input split must not be null");
}
this.splitType = inputSplit.getClass();
this.hadoopInputSplit = inputSplit;
}
public InputSplit getHadoopInputSplit() {
return this.hadoopInputSplit;
}
public void initInputSplit(JobConf jobConf) {
if (this.hadoopInputSplit != null) {
return;
}
checkNotNull(hadoopInputSplitByteArray);
try {
this.hadoopInputSplit = (InputSplit) WritableFactories.newInstance(splitType);
if (this.hadoopInputSplit instanceof Configurable) {
((Configurable) this.hadoopInputSplit).setConf(jobConf);
} else if (this.hadoopInputSplit instanceof JobConfigurable) {
((JobConfigurable) this.hadoopInputSplit).configure(jobConf);
}
if (hadoopInputSplitByteArray != null) {
try (ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(hadoopInputSplitByteArray))) {
this.hadoopInputSplit.readFields(objectInputStream);
}
this.hadoopInputSplitByteArray = null;
}
} catch (Exception e) {
throw new RuntimeException("Unable to instantiate Hadoop InputSplit", e);
}
}
private void writeObject(ObjectOutputStream out) throws IOException {
if (hadoopInputSplit != null) {
try (
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)
) {
this.hadoopInputSplit.write(objectOutputStream);
objectOutputStream.flush();
this.hadoopInputSplitByteArray = byteArrayOutputStream.toByteArray();
}
}
out.defaultWriteObject();
}
@Override
public String uniqSplitId() {
return hadoopInputSplit.toString();
}
}
三、State
在需要做checkpoint的场景下,通常我们会通过Map来保留当前的执行状态。
/ 流批一体场景 /
在流批一体场景中,我们需要保存状态以便从异常中断的流式作业恢复。
以RocketMQState为例:
public class RocketMQState implements Serializable {
private final Map<MessageQueue, String> assignedWithSplitIds;
public RocketMQState(Map<MessageQueue, String> assignedWithSplitIds) {
this.assignedWithSplitIds = assignedWithSplitIds;
}
public Map<MessageQueue, String> getAssignedWithSplits() {
return assignedWithSplitIds;
}
}
/ 批式场景 /
对于批式场景,我们可以使用EmptyState不存储状态,如果需要状态存储,和流批一体场景采用相似的设计方案。
public class EmptyState implements Serializable {
public static EmptyState fromBytes() {
return new EmptyState();
}
}
产品介绍
字节跳动开源数据集成引擎BitSail
BitSail是字节跳动自研的数据集成引擎,于2022年10月26日正式开源。
BitSail 支持20多种异构数据源间的数据同步,并提供离线、实时、全量、增量场景下的全域数据集成解决方案,目前服务于字节内部几乎所有业务线,包括抖音、今日头条等大家耳熟能详的应用,同时也支撑了火山引擎多个客户的数据集成需求。 后台回复数字“12”了解更多信息。
了解更多Bi tSail信息 ⬇⬇
⭐️ Star不迷路(BitSail代码仓库):
https://github.com/bytedance/bitsail
💡提交问题和建议 :
https://github.com/bytedance/bitsail/issues
💡贡献代码:
https://github.com/bytedance/bitsail/pulls
💡BitSail官网:
https://bytedance.github.io/bitsail/zh/
💡订阅邮件列表: