- 常用的在disruptor中添加消费者的方法:
1. `Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>(`
2. `() -> ObjectEvent<String>(), bufferSize, DaemonThreadFactory.INSTANCE);`
3. `disruptor.setDefaultExceptionHandler(new MyExceptionHandler());`
4. `disruptor`
5. `.handleEventsWith(new ProcessingEventHandler())`
6. `.then(new ClearingObjectHandler());`
一般调用disruptor的hadleEventsWith方法添加event handler处理消费到的event。
- disruptor中添加event handler的方法
1. `/**`
2. `* 添加多个消费者,每一个EventHandler都是独立的消费者,相互之间没有影响`
3. `* <p>Set up event handlers to handle events from the ring buffer. These handlers will process events`
4. `* as soon as they become available, in parallel.</p>`
5. `*`
6. `* <p>This method can be used as the start of a chain. For example if the handler <code>A</code> must`
7. `* process events before handler <code>B</code>:</p>`
8. `* <pre><code>dw.handleEventsWith(A).then(B);</code></pre>`
9. `*`
10. `* <p>This call is additive, but generally should only be called once when setting up the Disruptor instance</p>`
11. `*`
12. `* @param handlers the event handlers that will process events.`
13. `* @return a {@link EventHandlerGroup} that can be used to chain dependencies.`
14. `*/`
15. `@SuppressWarnings("varargs")`
16. `@SafeVarargs`
17. `public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)`
18. `{`
19. `return createEventProcessors(new Sequence[0], handlers);`
20. `}`
21.
22. `/**`
23. `* 添加并行消费者,每一个EventProcessorFactory创建一个EventProcessor映射为一个消费者。`
24. `* 这些消费者之间是并行关系,相互之间互不影响`
25. `* <p>Set up custom event processors to handle events from the ring buffer. The Disruptor will`
26. `* automatically start these processors when {@link #start()} is called.</p>`
27. `*`
28. `* <p>This method can be used as the start of a chain. For example if the handler <code>A</code> must`
29. `* process events before handler <code>B</code>:</p>`
30. `* <pre><code>dw.handleEventsWith(A).then(B);</code></pre>`
31. `*`
32. `* <p>Since this is the start of the chain, the processor factories will always be passed an empty <code>Sequence</code>`
33. `* array, so the factory isn't necessary in this case. This method is provided for consistency with`
34. `* {@link EventHandlerGroup#handleEventsWith(EventProcessorFactory...)} and {@link EventHandlerGroup#then(EventProcessorFactory...)}`
35. `* which do have barrier sequences to provide.</p>`
36. `*`
37. `* <p>This call is additive, but generally should only be called once when setting up the Disruptor instance</p>`
38. `*`
39. `* @param eventProcessorFactories the event processor factories to use to create the event processors that will process events.`
40. `* @return a {@link EventHandlerGroup} that can be used to chain dependencies.`
41. `*/`
42. `@SafeVarargs`
43. `public final EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories)`
44. `{`
45. `final Sequence[] barrierSequences = new Sequence[0];`
46. `return createEventProcessors(barrierSequences, eventProcessorFactories);`
47. `}`
48.
49. `/**`
50. `* 添加并行消费者,每一个EventProcessor映射为一个消费者。`
51. `* 这些消费者之间是并行关系,相互之间不影响`
52. `*`
53. `* <p>Set up custom event processors to handle events from the ring buffer. The Disruptor will`
54. `* automatically start this processors when {@link #start()} is called.</p>`
55. `*`
56. `* <p>This method can be used as the start of a chain. For example if the processor <code>A</code> must`
57. `* process events before handler <code>B</code>:</p>`
58. `* <pre><code>dw.handleEventsWith(A).then(B);</code></pre>`
59. `*`
60. `* @param processors the event processors that will process events.`
61. `* @return a {@link EventHandlerGroup} that can be used to chain dependencies.`
62. `*/`
63. `public EventHandlerGroup<T> handleEventsWith(final EventProcessor... processors)`
64. `{`
65. `for (final EventProcessor processor : processors)`
66. `{`
67. `consumerRepository.add(processor);`
68. `}`
69.
70. `final Sequence[] sequences = new Sequence[processors.length];`
71. `for (int i = 0; i < processors.length; i++)`
72. `{`
73. `sequences[i] = processors[i].getSequence();`
74. `}`
75.
76. `ringBuffer.addGatingSequences(sequences);`
77.
78. `return new EventHandlerGroup<>(this, consumerRepository, Util.getSequencesFor(processors));`
79. `}`
80.
81.
82. `/**`
83. `* 添加一个多线程的消费者。该消费者会将event分发到各个WorkHandler。每一个event只会被其中一个WorkHandler处理。具体的要看WorkerPool的处理逻辑`
84. `*`
85. `* Set up a {@link WorkerPool} to distribute an event to one of a pool of work handler threads.`
86. `* Each event will only be processed by one of the work handlers.`
87. `* The Disruptor will automatically start this processors when {@link #start()} is called.`
88. `*`
89. `* @param workHandlers the work handlers that will process events.`
90. `* @return a {@link EventHandlerGroup} that can be used to chain dependencies.`
91. `*/`
92. `@SafeVarargs`
93. `@SuppressWarnings("varargs")`
94. `public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)`
95. `{`
96. `return createWorkerPool(new Sequence[0], workHandlers);`
97. `}`
98.
99. `/**`
100. `* 添加event异常处理器`
101. `* <p>Specify an exception handler to be used for any future event handlers.</p>`
102. `*`
103. `* <p>Note that only event handlers set up after calling this method will use the exception handler.</p>`
104. `*`
105. `* @param exceptionHandler the exception handler to use for any future {@link EventProcessor}.`
106. `* @deprecated This method only applies to future event handlers. Use setDefaultExceptionHandler instead which applies to existing and new event handlers.`
107. `*/`
108. `public void handleExceptionsWith(final ExceptionHandler<? super T> exceptionHandler)`
109. `{`
110. `this.exceptionHandler = exceptionHandler;`
111. `}`
其中EventHandler与EventProcessor的关系是,EventProcessor中代理了EventHandler,这一点可以看下BatchEventProcessor的实现:
2.1 EventHandler(EventProcessor也与此类同,com.lmax.disruptor.dsl.Disruptor#createWorkerPool部分会单独用一篇文章讲解)
- 从createEventProcessors(new Sequence0, handlers)方法为入口去看:
1. `/**`
2. `* 创建事件处理器`
3. `*`
4. `* @param barrierSequences 屏障sequences, {@link com.lmax.disruptor.ProcessingSequenceBarrier#dependentSequence}`
5. `* 消费者的消费进度需要慢于它的前驱消费者`
6. `* @param eventHandlers 事件处理方法 每一个EventHandler都会被包装为{@link BatchEventProcessor},是通过BatchEventProcessor来代理EventHandler的工作`
7. `* @return`
8. `*/`
9. `EventHandlerGroup<T> createEventProcessors(`
10. `final Sequence[] barrierSequences,`
11. `final EventHandler<? super T>[] eventHandlers) {`
12. `// 组织消费者之间的关系只能在启动之前`
13. `checkNotStarted();`
14.
15. `// 用一个数组保存添加进来的的消费者的序号`
16. `final Sequence[] processorSequences = new Sequence[eventHandlers.length];`
17. `// 本次添加进来的消费者使用的屏障`
18. `final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);`
19.
20. `// 创建单线程消费者(BatchEventProcessor)`
21. `for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {`
22. `final EventHandler<? super T> eventHandler = eventHandlers[i];`
23. `//用BatchEventProcessor包装eventHandler`
24. `final BatchEventProcessor<T> batchEventProcessor =`
25. `new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);`
26.
27. `if (exceptionHandler != null) {`
28. `batchEventProcessor.setExceptionHandler(exceptionHandler);`
29. `}`
30.
31. `// 添加到消费者仓库中`
32. `consumerRepository.add(batchEventProcessor, eventHandler, barrier);`
33. `processorSequences[i] = batchEventProcessor.getSequence();`
34. `}`
35.
36. `// 更新gatingSequences(生产者只需要关注所有的末端消费者节点的序列)`
37. `updateGatingSequencesForNextInChain(barrierSequences, processorSequences);`
38.
39. `return new EventHandlerGroup<>(this, consumerRepository, processorSequences);`
40. `}`
每个消费者的sequence都会添加进processorSequences列表中去。
- 更新updateGatingSequencesForNextInChain(barrierSequences, processorSequences)方法:
1. `/**`
2. `* 在往消费者链后面添加新的节点时,需要更新gatingSequence`
3. `* @param barrierSequences 往消费者节点链上添加新节点时,之前节点的屏障序列(消费者节点链的开头部分的序列)就可以移除了`
4. `* @param processorSequences 新增加的节点的序列,新增的在消费者链的末端,因此它们的序列就是新增的gatingSequences`
5. `*/`
6. `private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) {`
7. `if (processorSequences.length > 0) {`
8. `// 将新增加的消费者节点序列添加到gatingSequences中`
9. `ringBuffer.addGatingSequences(processorSequences);`
10.
11. `//从gatingSequences中移除之前的barrierSequences`
12. `for (final Sequence barrierSequence : barrierSequences) {`
13. `ringBuffer.removeGatingSequence(barrierSequence);`
14. `}`
15.
16. `// 将之前的屏障序列标记为不再是消费者链的最后节点`
17. `consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);`
18. `}`
19. `}`
- 接下来我们看一看ringBuffer.addGatingSequences方法:
1. `/**`
2. `* 添加消费者序列到消费者链末端的消费者序列后面`
3. `* Add the specified gating sequences to this instance of the Disruptor. They will`
4. `* safely and atomically added to the list of gating sequences.`
5. `*`
6. `* @param gatingSequences The sequences to add.`
7. `*/`
8. `public void addGatingSequences(Sequence... gatingSequences)`
9. `{`
10. `sequencer.addGatingSequences(gatingSequences);`
11. `}`
- 然后调用的是sequencer.addGatingSequences方法,这个sequencer我们继续看看ringbuffer初始化时默认使用的MultiProducerSequencer,实际上调用的是com.lmax.disruptor.AbstractSequencer#addGatingSequences:
1. `/**`
2. `* 添加Sequence到gatingSequences 链末端`
3. `* @see Sequencer#addGatingSequences(Sequence...)`
4. `*/`
5. `@Override`
6. `public final void addGatingSequences(Sequence... gatingSequences)`
7. `{`
8. `SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);`
9. `}`
继续看com.lmax.disruptor.SequenceGroups#addSequences: 入参主要有:
- holder:在上面方法中传入的是MultiProducerSequencer,看过上一篇推文的童鞋会知道这个MultiProducerSequencer实例也是生产者申请ringbuffer空间时使用的Sequencer;
- updater: AtomicReferenceFieldUpdater是jdk的一个用于原子更新域的类,在MultiProducerSequencer中定义的原子更新域的属性定义为:,>
1. `/**`
2. `* 原子方式更新 追踪的Sequences`
3. `*/`
4. `private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =`
5. `AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");`
它是通过sequencer的对象引用来原子更新sequencer对象中的gatingSequences属性。
- cursor: 在这里传入的是MultiProducerSequencer,它是实现了Cursored接口的,代表的是生产者移动的光标,也就是生产者的当前进度。
- Sequence... sequencesToAdd 需要添加进来的消费者Sequence列表
- 源码部分如下:
1. `/**`
2. `* 原子方式添加Sequence,并将要添加的Sequence进度设置为生产者最新的值`
3. `* @param holder 域所属的对象`
4. `* @param updater 域原子更新器`
5. `* @param cursor 生产者光标(进度)`
6. `* @param sequencesToAdd 要添加的Sequences`
7. `* @param <T>`
8. `*/`
9. `static <T> void addSequences(`
10. `final T holder,`
11. `final AtomicReferenceFieldUpdater<T, Sequence[]> updater,`
12. `final Cursored cursor,`
13. `final Sequence... sequencesToAdd)`
14. `{`
15. `long cursorSequence;`
16. `Sequence[] updatedSequences;`
17. `Sequence[] currentSequences;`
18.
19. `do`
20. `{`
21. `currentSequences = updater.get(holder);//获取当前sequences`
22. `updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);`
23. `cursorSequence = cursor.getCursor();`
24.
25. `int index = currentSequences.length;`
26. `for (Sequence sequence : sequencesToAdd)//添加sequencesToAdd到updatedSequences列表中去`
27. `{`
28. `sequence.set(cursorSequence);`
29. `updatedSequences[index++] = sequence;`
30. `}`
31. `}`
32. `while (!updater.compareAndSet(holder, currentSequences, updatedSequences));//原子更新currentSequences`
33.
34. `cursorSequence = cursor.getCursor();`
35. `for (Sequence sequence : sequencesToAdd)`
36. `{`
37. `sequence.set(cursorSequence);`
38. `}`
39. `}`
注意:这里更新的其实是sequencer也就是本例中的MultiProducerSequencer的gatingSequences列表。
com.lmax.disruptor.SequenceGroups#removeSequence也和addSequence的操作类似
2.2 disruptor的启动过程
- com.lmax.disruptor.dsl.Disruptor#start方法:
1. `/**`
2. `* 启动Disruptor,其实就是启动消费者为。`
3. `* 为每一个EventProcessor创建一个独立的线程。`
4. `* <p>`
5. `* 这个方法必须在消费者(event processors/event handler)被添加到disruptor中之后调用`
6. `*`
7. `* <p>Starts the event processors and returns the fully configured ring buffer.</p>`
8. `*`
9. `* <p>The ring buffer is set up to prevent overwriting any entry that is yet to`
10. `* be processed by the slowest event processor.</p>`
11. `*`
12. `* <p>This method must only be called once after all event processors have been added.</p>`
13. `*`
14. `* @return the configured ring buffer.`
15. `*/`
16. `public RingBuffer<T> start() {`
17. `checkOnlyStartedOnce();//检查一下保存只启动一次`
18. `for (final ConsumerInfo consumerInfo : consumerRepository) {//遍历消费者仓库,用executor为每个消费者提供一个启动线程`
19. `consumerInfo.start(executor);`
20. `}`
21.
22. `return ringBuffer;`
23. `}`
- consumerRepository是上面添加消费者时存放消费者信息的仓库;
- executor为disruptor初始化时用户提供的执行器,默认使用BasicExecutor,关于为什么默认使用BasicExecutor的部分,在之前的推文中也有介绍过,请不明白的童鞋自行翻阅。
- consumerInfo.start:
1. `@Override`
2. `public void start(final Executor executor)`
3. `{`
4. `executor.execute(eventprocessor);`
5. `}`
可以看到executor.execute传入的是实现了Runnable接口的eventprocessor,也就是每个消费者线程,这里代表的是com.lmax.disruptor.BatchEventProcessor的run方法:
1. `/**`
2. `* 暂停以后交给下一个线程继续执行是线程安全的`
3. `* It is ok to have another thread rerun this method after a halt().`
4. `* @throws IllegalStateException if this object instance is already running in a thread`
5. `*/`
6. `@Override`
7. `public void run()`
8. `{`
9. `// 原子变量,当能从IDLE切换到RUNNING状态时,前一个线程一定退出了run()`
10. `// 具备happens-before原则,是线程安全的`
11. `if (running.compareAndSet(IDLE, RUNNING))`
12. `{`
13. `sequenceBarrier.clearAlert();`
14.
15. `notifyStart();`
16. `try`
17. `{`
18. `if (running.get() == RUNNING)`
19. `{`
20. `processEvents();`
21. `}`
22. `}`
23. `finally`
24. `{`
25. `notifyShutdown();`
26. `// 在退出的时候会恢复到IDLE状态,且是原子变量,具备happens-before原则`
27. `// 由volatile支持`
28. `running.set(IDLE);`
29. `}`
30. `}`
31. `else`
32. `{`
33. `// This is a little bit of guess work. The running state could of changed to HALTED by`
34. `// this point. However, Java does not have compareAndExchange which is the only way`
35. `// to get it exactly correct.`
36. `if (running.get() == RUNNING)`
37. `{`
38. `throw new IllegalStateException("Thread is already running");`
39. `}`
40. `else`
41. `{`
42. `earlyExit();`
43. `}`
44. `}`
45. `}`
上面的sequenceBarrier的来处是:
ringbuffer.newBarrier调用的是这个:
它的构造方法如下:
1. `ProcessingSequenceBarrier(`
2. `final Sequencer sequencer,`
3. `final WaitStrategy waitStrategy,`
4. `final Sequence cursorSequence,`
5. `final Sequence[] dependentSequences)`
6. `{`
7. `this.sequencer = sequencer;`
8. `this.waitStrategy = waitStrategy;`
9. `this.cursorSequence = cursorSequence;`
10.
11. `// 如果没有和我绑定的事件处理器,那么只需要与生产者的进度进行协调`
12. `if (0 == dependentSequences.length)`
13. `{`
14. `dependentSequence = cursorSequence;`
15. `}`
16. `else`
17. `{`
18. `dependentSequence = new FixedSequenceGroup(dependentSequences);`
19. `}`
20. `}`
我们的例子中传入的dependentSequences即为上层方法中的barrierSequences,是一个空的sequence数组,所以只需要与生产者的进度进行协调。
2.3 消费者也即eventProcessor的核心处理逻辑:
1. `private void processEvents()`
2. `{`
3. `T event = null;`
4. `//获取到下一个序列的编号,然后去barrier中申请`
5. `//-1是不需要消费的,第一个要消费的是0`
6. `long nextSequence = sequence.get() + 1L;`
7.
8. `while (true)`
9. `{`
10. `try`
11. `{`
12. `final long availableSequence = sequenceBarrier.waitFor(nextSequence);`
13. `if (batchStartAware != null)`
14. `{`
15. `batchStartAware.onBatchStart(availableSequence - nextSequence + 1);`
16. `}`
17.
18. `while (nextSequence <= availableSequence)`
19. `{`
20. `event = dataProvider.get(nextSequence);`
21. `eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);`
22. `nextSequence++;`
23. `}`
24.
25. `sequence.set(availableSequence);`
26. `}`
27. `catch (final TimeoutException e)`
28. `{`
29. `notifyTimeout(sequence.get());`
30. `}`
31. `catch (final AlertException ex)`
32. `{`
33. `if (running.get() != RUNNING)`
34. `{`
35. `break;`
36. `}`
37. `}`
38. `catch (final Throwable ex)`
39. `{`
40. `exceptionHandler.handleEventException(ex, nextSequence, event);`
41. `sequence.set(nextSequence);`
42. `nextSequence++;`
43. `}`
44. `}`
45. `}`
- sequenceBarrier.waitFor():
1. `@Override`
2. `public long waitFor(final long sequence)`
3. `throws AlertException, InterruptedException, TimeoutException`
4. `{`
5. `checkAlert();`
6.
7. `long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);`
8. `// 目标sequence还未发布,超时了`
9. `if (availableSequence < sequence)`
10. `{`
11. `return availableSequence;`
12. `}`
13. `// 目标sequence已经发布了,这里获取真正的最大序号(和生产者模型有关)`
14. `return sequencer.getHighestPublishedSequence(sequence, availableSequence);`
15. `}`
com.lmax.disruptor.BlockingWaitStrategy#waitFor:
1. `@Override`
2. `public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)`
3. `throws AlertException, InterruptedException`
4. `{`
5. `long availableSequence;`
6.
7. `// 步骤1.确保等待的序号的数据已经发布(协调与生产者之间的关系)`
8. `// double check`
9. `if (cursorSequence.get() < sequence)`
10. `{`
11. `synchronized (mutex)`
12. `{`
13. `// 循环中检测,避免虚假唤醒`
14. `while (cursorSequence.get() < sequence)`
15. `{`
16. `barrier.checkAlert();`
17. `mutex.wait();`
18. `}`
19. `}`
20. `}`
21.
22. `// 步骤2.确保该序号已经被我前面的消费者消费(协调与其他消费者的关系)`
23. `//如果前面的消费者还没有消费,则等待`
24. `//如果前面的消费者已经消费,则返回已经消费到的序列`
25. `while ((availableSequence = dependentSequence.get()) < sequence)`
26. `{`
27. `// 可理解为返回之前检查中断`
28. `barrier.checkAlert();`
29. `ThreadHints.onSpinWait();`
30. `}`
31.
32. `return availableSequence;`
33. `}`
com.lmax.disruptor.MultiProducerSequencer#getHighestPublishedSequence:
1. `/**`
2. `* 查询 nextSequence-availableSequence 区间段之间连续发布的最大序号。多生产者模式下sequence可能是不连续的。`
3. `* 多生产者模式下{@link Sequencer#next(int)} next是预分配的,因此可能部分数据还未被填充。`
4. `*`
5. `* @param lowerBound 期望消费的最小序号,前面的一定都已经填充并被当前消费者消费`
6. `* @param availableSequence The sequence to scan to.从已发布的最大序号`
7. `* 多生产者模式下,已发布的数据可能是不连续的,因此不能直接该序号进行消费。`
8. `* 必须顺序的消费,不能跳跃`
9. `* @return`
10. `*/`
11. `@Override`
12. `public long getHighestPublishedSequence(long lowerBound, long availableSequence) {`
13. `for (long sequence = lowerBound; sequence <= availableSequence; sequence++) {`
14. `// 这里中断了,不是连续发布的,需要剪断`
15. `if (!isAvailable(sequence)) {`
16. `return sequence - 1;`
17. `}`
18. `}`
19.
20. `return availableSequence;`
21. `}`
当前消费者执行部分:
1. `while (nextSequence <= availableSequence)`
2. `{`
3. `event = dataProvider.get(nextSequence);`
4. `eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);`
5. `nextSequence++;`
6. `}`
7.
8. `sequence.set(availableSequence);`
当nextSequence小于前面消费者消费到的最新序列的时候则执行消费操作,回调用户传入的eventHandler的onEvent方法,并更新当前eventProcessor对应的sequence的序列号。
2.4 示例
1. `public static void main(String[] args) throws Exception {`
2. `// The factory for the event`
3. `LongEventFactory factory = new LongEventFactory();`
4.
5. `// Specify the size of the ring buffer, must be power of 2.`
6. `int bufferSize = 1024;`
7.
8. `// Construct the Disruptor`
9. `Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);`
10.
11. `// Connect the handler`
12. `disruptor.handleEventsWith(new LongEventHandler(),new LongEventHandler());`
13.
14. `// Start the Disruptor, starts all threads running`
15. `disruptor.start();`
16.
17. `// Get the ring buffer from the Disruptor to be used for publishing.`
18. `RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();`
19.
20. `LongEventProducer producer = new LongEventProducer(ringBuffer);`
21.
22. `ByteBuffer bb = ByteBuffer.allocate(8);`
23. `for (long l = 0; l <= 5; l++) {`
24. `bb.putLong(0, l);`
25. `producer.onData(bb);`
26. `Thread.sleep(1000);`
27. `}`
28. `}`
29.
30. `public class LongEventHandler implements EventHandler<LongEvent> {`
31. `public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {`
32. `System.out.println(Thread.currentThread().getName() + " Event: " + event);`
33. `}`
34. `}`
输出结果为:
2.5 总结
-
每个eventProcessor是一个消费者,示例执行的结果也符合disruptor推文第一篇中介绍的disruptor事件多播的特性。eventHandler最终会被包装成eventProcessor来处理。
-
到这里整个消费者的处理流程都已经处理完成,接下来将会用一篇文章介绍下WorkerPool模式。