更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群
本文将主要介绍创建、管理Split的角色SplitCoordinator。
SourceSplitCoordinator
大数据处理框架的核心目的就是将大规模的数据拆分成为多个合理的Split,SplitCoordinator承担这个创建、管理Split的角色。
SourceSplitCoordinator接口
public interface SourceSplitCoordinator<SplitT extends SourceSplit, StateT> extends Serializable, AutoCloseable {
void start();
void addReader(int subtaskId);
void addSplitsBack(List<SplitT> splits, int subtaskId);
void handleSplitRequest(int subtaskId, @Nullable String requesterHostname);
default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
}
StateT snapshotState() throws Exception;
default void notifyCheckpointComplete(long checkpointId) throws Exception {
}
void close();
interface Context<SplitT extends SourceSplit, StateT> {
boolean isRestored();
/**
* Return the state to the split coordinator, for the exactly-once.
*/
StateT getRestoreState();
/**
* Return total parallelism of the source reader.
*/
int totalParallelism();
/**
* When Source reader started, it will be registered itself to coordinator.
*/
Set<Integer> registeredReaders();
/**
* Assign splits to reader.
*/
void assignSplit(int subtaskId, List<SplitT> splits);
/**
* Mainly use in boundedness situation, represents there will no more split will send to source reader.
*/
void signalNoMoreSplits(int subtask);
/**
* If split coordinator have any event want to send source reader, use this method.
* Like send Pause event to Source Reader in CDC2.0.
*/
void sendEventToSourceReader(int subtaskId, SourceEvent event);
/**
* Schedule to run the callable and handler, often used in un-boundedness mode.
*/
<T> void runAsync(Callable<T> callable,
BiConsumer<T, Throwable> handler,
int initialDelay,
long interval);
/**
* Just run callable and handler once, often used in boundedness mode.
*/
<T> void runAsyncOnce(Callable<T> callable,
BiConsumer<T, Throwable> handler);
}
}
构造方法
开发者在构造方法中一般主要进行一些配置的设置和分片信息存储的容器的创建。
以ClickhouseSourceSplitCoordinator的构造为例:
public ClickhouseSourceSplitCoordinator(SourceSplitCoordinator.Context<ClickhouseSourceSplit, EmptyState> context,
BitSailConfiguration jobConf) {
this.context = context;
this.jobConf = jobConf;
this.splitAssignmentPlan = Maps.newConcurrentMap();
}
在自定义了State的场景中,需要对checkpoint时存储在SourceSplitCoordinator.Context
的状态进行保存和恢复。
以RocketMQSourceSplitCoordinator为例:
public RocketMQSourceSplitCoordinator(
SourceSplitCoordinator.Context<RocketMQSplit, RocketMQState> context,
BitSailConfiguration jobConfiguration,
Boundedness boundedness) {
this.context = context;
this.jobConfiguration = jobConfiguration;
this.boundedness = boundedness;
this.discoveryInternal = jobConfiguration.get(RocketMQSourceOptions.DISCOVERY_INTERNAL);
this.pendingRocketMQSplitAssignment = Maps.newConcurrentMap();
this.discoveredPartitions = new HashSet<>();
if (context.isRestored()) {
RocketMQState restoreState = context.getRestoreState();
assignedPartitions = restoreState.getAssignedWithSplits();
discoveredPartitions.addAll(assignedPartitions.keySet());
} else {
assignedPartitions = Maps.newHashMap();
}
prepareConsumerProperties();
}
start方法
进行一些数据源所需分片元数据的提取工作,如果有抽象出来的Split Assigner类,一般在这里进行初始化。如果使用的是封装的Split Assign函数,这里会进行待分配切片的初始化工作。
流批一体场景
以RocketMQSourceSplitCoordinator为例:
private void prepareRocketMQConsumer() {
try {
consumer = RocketMQUtils.prepareRocketMQConsumer(jobConfiguration,
String.format(COORDINATOR_INSTANCE_NAME_TEMPLATE,
cluster, topic, consumerGroup, UUID.randomUUID()));
consumer.start();
} catch (Exception e) {
throw BitSailException.asBitSailException(RocketMQErrorCode.CONSUMER_CREATE_FAILED, e);
}
}
@Override
public void start() {
prepareRocketMQConsumer();
splitAssigner = new FairRocketMQSplitAssigner(jobConfiguration, assignedPartitions);
if (discoveryInternal > 0) {
context.runAsync(
this::fetchMessageQueues,
this::handleMessageQueueChanged,
0,
discoveryInternal
);
} else {
context.runAsyncOnce(
this::fetchMessageQueues,
this::handleMessageQueueChanged
);
}
}
批式场景
以ClickhouseSourceSplitCoordinator为例:
public void start() {
List<ClickhouseSourceSplit> splitList;
try {
SimpleDivideSplitConstructor constructor = new SimpleDivideSplitConstructor(jobConf);
splitList = constructor.construct();
} catch (IOException e) {
ClickhouseSourceSplit split = new ClickhouseSourceSplit(0);
split.setReadTable(true);
splitList = Collections.singletonList(split);
LOG.error("Failed to construct splits, will directly read the table.", e);
}
int readerNum = context.totalParallelism();
LOG.info("Found {} readers and {} splits.", readerNum, splitList.size());
if (readerNum > splitList.size()) {
LOG.error("Reader number {} is larger than split number {}.", readerNum, splitList.size());
}
for (ClickhouseSourceSplit split : splitList) {
int readerIndex = ReaderSelector.getReaderIndex(readerNum);
splitAssignmentPlan.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(split);
LOG.info("Will assign split {} to the {}-th reader", split.uniqSplitId(), readerIndex);
}
}
Assigner
将划分好的切片分配给Reader,开发过程中,我们通常让SourceSplitCoordinator专注于处理和Reader 的通讯工作,实际split的分发逻辑一般封装在Assigner进行,这个Assigner可以是一个封装的Split Assign函数,也可以是一个抽象出来的Split Assigner类。
Assign函数示例
以ClickhouseSourceSplitCoordinator为例:
tryAssignSplitsToReader函数将存储在splitAssignmentPlan中的划分好的切片分配给相应的Reader。
private void tryAssignSplitsToReader() {
Map<Integer, List<ClickhouseSourceSplit>> splitsToAssign = new HashMap<>();
for (Integer readerIndex : splitAssignmentPlan.keySet()) {
if (CollectionUtils.isNotEmpty(splitAssignmentPlan.get(readerIndex)) && context.registeredReaders().contains(readerIndex)) {
splitsToAssign.put(readerIndex, Lists.newArrayList(splitAssignmentPlan.get(readerIndex)));
}
}
for (Integer readerIndex : splitsToAssign.keySet()) {
LOG.info("Try assigning splits reader {}, splits are: [{}]", readerIndex,
splitsToAssign.get(readerIndex).stream().map(ClickhouseSourceSplit::uniqSplitId).collect(Collectors.toList()));
splitAssignmentPlan.remove(readerIndex);
context.assignSplit(readerIndex, splitsToAssign.get(readerIndex));
context.signalNoMoreSplits(readerIndex);
LOG.info("Finish assigning splits reader {}", readerIndex);
}
}
Assigner方法示例
以RocketMQSourceSplitCoordinator为例:
public class FairRocketMQSplitAssigner implements SplitAssigner<MessageQueue> {
private BitSailConfiguration readerConfiguration;
private AtomicInteger atomicInteger;
public Map<MessageQueue, String> rocketMQSplitIncrementMapping;
public FairRocketMQSplitAssigner(BitSailConfiguration readerConfiguration,
Map<MessageQueue, String> rocketMQSplitIncrementMapping) {
this.readerConfiguration = readerConfiguration;
this.rocketMQSplitIncrementMapping = rocketMQSplitIncrementMapping;
this.atomicInteger = new AtomicInteger(CollectionUtils
.size(rocketMQSplitIncrementMapping.keySet()));
}
@Override
public String assignSplitId(MessageQueue messageQueue) {
if (!rocketMQSplitIncrementMapping.containsKey(messageQueue)) {
rocketMQSplitIncrementMapping.put(messageQueue, String.valueOf(atomicInteger.getAndIncrement()));
}
return rocketMQSplitIncrementMapping.get(messageQueue);
}
@Override
public int assignToReader(String splitId, int totalParallelism) {
return splitId.hashCode() % totalParallelism;
}
}
addReader方法
调用Assigner,为Reader添加切片。
批式场景示例
以ClickhouseSourceSplitCoordinator为例:
public void addReader(int subtaskId) {
LOG.info("Found reader {}", subtaskId);
tryAssignSplitsToReader();
}
流批一体场景示例
以RocketMQSourceSplitCoordinator为例:
private void notifyReaderAssignmentResult() {
Map<Integer, List<RocketMQSplit>> tmpRocketMQSplitAssignments = new HashMap<>();
for (Integer pendingAssignmentReader : pendingRocketMQSplitAssignment.keySet()) {
if (CollectionUtils.isNotEmpty(pendingRocketMQSplitAssignment.get(pendingAssignmentReader))
&& context.registeredReaders().contains(pendingAssignmentReader)) {
tmpRocketMQSplitAssignments.put(pendingAssignmentReader, Lists.newArrayList(pendingRocketMQSplitAssignment.get(pendingAssignmentReader)));
}
}
for (Integer pendingAssignmentReader : tmpRocketMQSplitAssignments.keySet()) {
LOG.info("Assigning splits to reader {}, splits = {}.", pendingAssignmentReader,
tmpRocketMQSplitAssignments.get(pendingAssignmentReader));
context.assignSplit(pendingAssignmentReader,
tmpRocketMQSplitAssignments.get(pendingAssignmentReader));
Set<RocketMQSplit> removes = pendingRocketMQSplitAssignment.remove(pendingAssignmentReader);
removes.forEach(removeSplit -> {
assignedPartitions.put(removeSplit.getMessageQueue(), removeSplit.getSplitId());
});
LOG.info("Assigned splits to reader {}", pendingAssignmentReader);
if (Boundedness.BOUNDEDNESS == boundedness) {
LOG.info("Signal reader {} no more splits assigned in future.", pendingAssignmentReader);
context.signalNoMoreSplits(pendingAssignmentReader);
}
}
}
@Override
public void addReader(int subtaskId) {
LOG.info(
"Adding reader {} to RocketMQ Split Coordinator for consumer group {}.",
subtaskId,
consumerGroup);
notifyReaderAssignmentResult();
}
addSplitsBack方法
对于一些Reader没有处理完的切片,进行重新分配,重新分配的策略可以自己定义,常用的策略是哈希取模,对于返回的Split列表中的所有Split进行重新分配后再Assign给不同的Reader。
批式场景示例
以ClickhouseSourceSplitCoordinator为例:
ReaderSelector使用哈希取模的策略对Split列表进行重分配。
tryAssignSplitsToReader方法将重分配后的Split集合通过Assigner分配给Reader。
public void addSplitsBack(List<ClickhouseSourceSplit> splits, int subtaskId) {
LOG.info("Source reader {} return splits {}.", subtaskId, splits);
int readerNum = context.totalParallelism();
for (ClickhouseSourceSplit split : splits) {
int readerIndex = ReaderSelector.getReaderIndex(readerNum);
splitAssignmentPlan.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(split);
LOG.info("Re-assign split {} to the {}-th reader.", split.uniqSplitId(), readerIndex);
}
tryAssignSplitsToReader();
}
流批一体场景示例
以RocketMQSourceSplitCoordinator为例:
addSplitChangeToPendingAssignment使用哈希取模的策略对Split列表进行重分配。
notifyReaderAssignmentResult将重分配后的Split集合通过Assigner分配给Reader。
private synchronized void addSplitChangeToPendingAssignment(Set<RocketMQSplit> newRocketMQSplits) {
int numReader = context.totalParallelism();
for (RocketMQSplit split : newRocketMQSplits) {
int readerIndex = splitAssigner.assignToReader(split.getSplitId(), numReader);
pendingRocketMQSplitAssignment.computeIfAbsent(readerIndex, r -> new HashSet<>())
.add(split);
}
LOG.debug("RocketMQ splits {} finished assignment.", newRocketMQSplits);
}
@Override
public void addSplitsBack(List<RocketMQSplit> splits, int subtaskId) {
LOG.info("Source reader {} return splits {}.", subtaskId, splits);
addSplitChangeToPendingAssignment(new HashSet<>(splits));
notifyReaderAssignmentResult();
}
snapshotState方法
存储处理切片的快照信息,用于恢复时在构造方法中使用。
public RocketMQState snapshotState() throws Exception {
return new RocketMQState(assignedPartitions);
}
close方法
关闭在分片过程中与数据源交互读取元数据信息的所有未关闭连接器。
public void close() {
if (consumer != null) {
consumer.shutdown();
}
}
About BitSail:
⭐️ Star 不迷路 https://github.com/bytedance/bitsail
提交问题和建议:https://github.com/bytedance/bitsail/issues
贡献代码:https://github.com/bytedance/bitsail/pulls
BitSail官网:https://bytedance.github.io/bitsail/zh/
订阅邮件列表:bitsail+subscribe@googlegroups.com
加入BitSail技术社群