netty源码分析之EventLoop中的线程FastThreadLocalThread和队列

技术

每个NioEventLoop有着自己的任务队列(taskQueue=mpscQueue和延迟队列PriorityQueue)和自己的处理线程(FastThreadLocalThread),同时也维护着自己的Selector。如果是bossGroup,在ServerBootstrap初始化时会去Selector上注册ServerSocketChannel,同时在pipeline中添加ServerBootstrapAcceptor。io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)方法中会在(readyOps & (SelectionKey.OPREAD | SelectionKey.OPACCEPT)) != 0 || readyOps == 0成立时进入unsafe.read(),此时也就是表明有连接进入,在io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read方法中会调用doReadMessages(readBuf)方法,在io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages方法中会通过NioSocketChannel nioSocketChannel = new NioSocketChannel(this, ch);buf.add(nioSocketChannel);的方式将NioSocketChannel放入List类型的readBuf中。此时回到io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read方法中,会遍历readBuf列表,调用pipeline.fireChannelRead(readBuf.get(i))方法,这样以来就会触发ServerBootstrapAcceptor中的channelRead方法,在ServerBootstrapAcceptor的channelRead方法中调用childGroup.register(child)方法,这个childGroup就是创建ServerBootstrap时传入的workerGroup,这个child就是NioSocketChannel类型的对象。在register之后,每个NioEventLoop线程都会在维护自身的task队列(普通任务队列与定时任务)的同时,在它的run方法中还会不停地执行select,在doRegister方法中会调用pipeline.fireChannelActive();方法,在方法里会在pipeline中进行传播,调用io.netty.channel.DefaultChannelPipeline.HeadContext#read方法,继而调用unsafe.beginRead()方法,在io.netty.channel.nio.AbstractNioChannel#doBeginRead方法中会进行selectionKey.interestOps(interestOps | readInterestOp)方法的调用。关于接入流程可以参考:https://www.cnblogs.com/ZhuChangwu/p/11210437.html,这些处理的主要是网络IO事件,它的任务事件是放入队列中来进行处理的。

io.netty.channel.nio.NioEventLoop

类继承关系:

picture.image

它继承自SingleThreadEventLoop,它的超类是SingleThreadEventExecutor。而在下面你会发现NioEventLoopGroup中维护着多个SingleThreadEventExecutor。先来看下NioEventLoop和SingleThreadEventLoop、SingleThreadEventExecutor的代码。

SingleThreadEventExecutor

因为这个Executor的主要作用是维护其中的FastThreadLocalThread的生命周期,我们来依照这条线进行分析:

  • 线程创建:

      1. `protected SingleThreadEventExecutor(`
2. `EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {`
3. `----------------省略部分代码--------------------`
4. `this.parent = parent;`
5. `this.addTaskWakesUp = addTaskWakesUp;`
6. `// 创建线程`
7. `thread = threadFactory.newThread(new Runnable() {`
8. `@Override`
9. `public void run() {`
10. `boolean success = false;`
11. `updateLastExecutionTime();`
12. `try {`
13. `// 回调executor的run方法`
14. `SingleThreadEventExecutor.this.run();`
15. `success = true;`
16. `} catch (Throwable t) {`
17. `logger.warn("Unexpected exception from an event executor: ", t);`
18. `} finally {`
19. `for (;;) {`
20. `int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);`


    

使用threadFactory来创建线程,创建的是FastThreadLocalThread,这个在下文中会详细分析。在创建的线程的run方法中会回调SingleThreadEventExecutor的run方法。

  • 线程状态:

      1. `private static final int ST_NOT_STARTED = 1;`
2. `private static final int ST_STARTED = 2;`
3. `private static final int ST_SHUTTING_DOWN = 3;`
4. `private static final int ST_SHUTDOWN = 4;`
5. `private static final int ST_TERMINATED = 5;`
6. 
7. `private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER;`
8. `static {`
9. `AtomicIntegerFieldUpdater<SingleThreadEventExecutor> updater = PlatformDependent.newAtomicIntegerFieldUpdater(SingleThreadEventExecutor.class, "state");`
10. `if (updater == null) {`
11. `updater = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");`
12. `}`
13. `STATE_UPDATER = updater;`
14. `}`


    

通过一个AtomicIntegerFieldUpdater变量来维护着线程的状态变化。

  • 线程唤醒,为什么要有线程唤醒呢,我们来看下这个SingleThreadEventExecutor的实现类NioEventLoop中对run方法的实现:

      1. `@Override`
2. `protected void run() {`
3. `for (;;) {`
4. `boolean oldWakenUp = wakenUp.getAndSet(false);`
5. `try {`
6. `if (hasTasks()) {`
7. `selectNow();`
8. `} else {`
9. `// wakeup是要在调用selector.wakeup()之前来校检来减少wake-up发生,这是因为selector.wakeup()是一个昂贵的操作`
10. `select(oldWakenUp);`
11. `------------省略部分------`
12. `if (wakenUp.get()) {`
13. `selector.wakeup();`
14. `}`
15. `}`
16. `cancelledKeys = 0;`
17. `needsToSelectAgain = false;`
18. `final int ioRatio = this.ioRatio;`
19. `if (ioRatio == 100) {`
20. `processSelectedKeys();`
21. `runAllTasks();`
22. `} else {`
23. `final long ioStartTime = System.nanoTime();`
24. `processSelectedKeys();`
25. `final long ioTime = System.nanoTime() - ioStartTime;`
26. `runAllTasks(ioTime * (100 - ioRatio) / ioRatio);`
27. `}`
28. `if (isShuttingDown()) {`
29. `closeAll();`
30. `if (confirmShutdown()) {`
31. `break;`
32. `}`
33. `}`


    

NIO中的Selector封装了底层的系统调用,其中wakeup用于唤醒阻塞在select方法上的线程,它的实现很简单,在linux上就是创建一 个管道并加入poll的fd集合,wakeup就是往管道里写一个字节,那么阻塞的poll方法有数据可读就立即返回。

这里有必要再提一下ioRatio,这个参数提供了一个粗略的机制,用来大致控制处理IO相关(socket 读,链接,关闭,挂起等)和非IO相关任务的时间分配比.非IO任务是,由于使用Executor接口,例如Executor#execute(..),而在EventLoopGroup队列中的Runnable对象.参数值越小,越多的时间将消耗在非IO任务上.当前,100将禁止所有超时时间(详见源码runAllTasks(long timeoutNanos))并运行所有等待着的非IO任务.详情参考官方issue:https://github.com/netty/netty/issues/6058。

控制wakeup的属性为:


      1. `private
  
 final
  
 boolean
  addTaskWakesUp
 ;`


    

这里关注下io.netty.util.concurrent.SingleThreadEventExecutor#execute方法:


      1. `@Override`
2. `public void execute(Runnable task) {`
3. `----------省略部分-------`
4. `boolean inEventLoop = inEventLoop();`
5. `if (inEventLoop) {//当前处理线程为EventLoop绑定线程时,放入队列`
6. `addTask(task);`
7. `} else {`
8. `startThread();// 启动新的eventLoop线程`
9. `addTask(task);//添加任务入队`
10. `if (isShutdown() && removeTask(task)) {`
11. `reject();`
12. `}`
13. `}`
14. `// addTaskWakesUp为true就代表只有在触发addTask(Runnable)方法时才会唤醒executor线程,默认为false`
15. `if (!addTaskWakesUp && wakesUpForTask(task)) {`
16. `wakeup(inEventLoop);`
17. `}`
18. `}`
19. `// io.netty.channel.SingleThreadEventLoop#wakesUpForTask:`
20. `@Override`
21. `protected boolean wakesUpForTask(Runnable task) {`
22. `return !(task instanceof NonWakeupRunnable);`
23. `}`
24. `// 添加wakeup任务`
25. `protected void wakeup(boolean inEventLoop) {`
26. `if (!inEventLoop || STATE_UPDATER.get(this) == ST_SHUTTING_DOWN) {`
27. `taskQueue.add(WAKEUP_TASK);`
28. `}`
29. `}`


    

该方法在之前的netty源码分析中详细地分析过,主要用于查看netty的IO线程的状态,当前处理线程为EventLoop绑定线程时,放入队列,否则启动新的EventLoop线程并将任务入队,并在线程处于shutdown状态时将任务出列并执行拒绝策略。如果上面添加的不是NonWakeupRunnable类型的task,并且当前执行线程不是EventLoop线程或者当前线程的状态为shutdown状态时,添加一个WAKEUPTASK,会在io.netty.util.concurrent.SingleThreadEventExecutor#takeTask方法从队列中取task时唤醒阻塞线程。

  • 关闭线程,在SingleThreadEventExecutor中有一个threadLock属性:

      1. `private
  
 final
  
 Semaphore
  threadLock 
 =
  
 new
  
 Semaphore
 (
 0
 );`


    

它的主要调用位于:


      1. `@Override`
2. `public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {`
3. `------------省略部分代码--------------`
4. `if (threadLock.tryAcquire(timeout, unit)) {`
5. `threadLock.release();`
6. `}`
7. `return isTerminated();`
8. `}`


    

threadLock是一个初始值为0的信号量。一个初值为0的信号量,当线程请求锁时只会阻塞,这有什么用呢?awaitTermination()方法揭晓答案,用来使其他线程阻塞等待原生线程关闭 。

那么EventLoop线程的作用又是什么呢?

处理IO事件

对于boss NioEventLoop也就是reactor线程来说,轮询到的是基本上是连接事件(OP_ACCEPT):

源码调用链:


      1. `1. io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)`
2. `public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {`
3. `super.group(parentGroup);`
4. `----------`
5. `this.childGroup = childGroup;}`
6. `2. 上面将parentGroup传入了super的group方法io.netty.bootstrap.AbstractBootstrap#group(io.netty.channel.EventLoopGroup):`
7. `public B group(EventLoopGroup group) {`
8. `-----------------------`
9. `this.group = group;`
10. `return (B) this;`
11. `}`
12. `传给了AbstractBootstrap的group属性。在AbstractBootstrap内部的io.netty.bootstrap.AbstractBootstrap#bind():`
13. `public ChannelFuture bind() {`
14. `-----------------------`
15. `return doBind(localAddress);`
16. `}`
17. `doBind方法:`
18. `private ChannelFuture doBind(final SocketAddress localAddress) {`
19. `final ChannelFuture regFuture = initAndRegister();`
20. `final Channel channel = regFuture.channel();`
21. `---------------`
22. `io.netty.bootstrap.AbstractBootstrap#initAndRegister方法:`
23. `final ChannelFuture initAndRegister() {`
24. `final Channel channel = channelFactory().newChannel();`
25. `try {`
26. `init(channel);`
27. `------------------------------`
28. `}`
29. `ChannelFuture regFuture = group().register(channel);//这里是parentGroup`


    

这里的group调用的是初始时传入的parentGroup,紧接着进入io.netty.channel.EventLoopGroup#register(io.netty.channel.Channel)方法,该方法会根据传入的Channel为ServerSocketChannel和SocketChannel来决定注册不同的事件到Selector上,这里主要是accept事件。执行register方法时会从boosGroup的线程组中使用EventExecutorChooser选择出一个NioEventLoop来进行register操作,所以一般boosGroup中的线程数量都是一个。详细分析参考之前的关于netty源码分析的公众号文章。这里还有一点需要注意的是io.netty.bootstrap.ServerBootstrap#init方法:


      1. `@Override`
2. `void init(Channel channel) throws Exception {`
3. `-------------------------------`
4. `p.addLast(new ChannelInitializer<Channel>() {`
5. `@Override`
6. `public void initChannel(Channel ch) throws Exception {`
7. `ch.pipeline().addLast(new ServerBootstrapAcceptor(`
8. `currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));`
9. `}`
10. `});`
11. `}`


    

这里的ServerBootstrapAcceptor就是worker NioEventLoop工作的关键点了。

对于worker NioEventLoop来说,轮询到的基本上是IO读写事件(以OP_READ为例):

这里简要地过一遍它的源码流程:


      1. `//io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)`
2. `public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {`
3. `super.group(parentGroup);`
4. `----------`
5. `this.childGroup = childGroup;}`
6. `// 这里赋值给了childGroup属性。`
7. `//接着看io.netty.bootstrap.ServerBootstrap#init方法,上面已经列出,主要是将childGroup传给了ServerBootstrapAcceptor的childGroup属性。我们来看下具体作用,在io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead方法:`
8. `public void channelRead(ChannelHandlerContext ctx, Object msg) {`
9. `final Channel child = (Channel) msg;`
10. `child.pipeline().addLast(childHandler);`
11. `--------------------------------------------------------`
12. `try {`
13. `childGroup.register(child).addListener(new ChannelFutureListener() {`
14. `-----------------------------------------`
15. `});`
16. `在childGroup.register(child)中child对应的就是每个SocketChannel`


    

ServerBootstrapAcceptor就是大名鼎鼎的reactor模型中的acceptor,这里childGroup.register(child)中child对应的就是每个SocketChannel,执行register方法时会从workerGroup的线程组中使用EventExecutorChooser选择出一个NioEventLoop来进行register操作,主要执行Selector的事件注册,这里主要是读写事件。关于EventExecutorChooser和register的介绍之前的文章中有过详细分析,这里不再赘述。

任务处理

处理用户产生的普通任务:

NioEventLoop中的Queue taskQueue被用来承载用户产生的普通Task。任务入列方法io.netty.util.concurrent.SingleThreadEventExecutor#addTask:


      1. `protected void addTask(Runnable task) {`
2. `if (task == null) {`
3. `throw new NullPointerException("task");`
4. `}`
5. `if (isShutdown()) {`
6. `reject();`
7. `}`
8. `taskQueue.add(task);`
9. `}`


    

taskQueue的创建是io.netty.channel.nio.NioEventLoop#newTaskQueue方法:


      1. `@Override`
2. `protected Queue<Runnable> newTaskQueue() {`
3. `// This event loop never calls takeTask()`
4. `return PlatformDependent.newMpscQueue();`
5. `}`


    

使用的是mpsc队列,也就是多生产者单消费者队列。

taskQueue被实现为netty的mpscQueue,即多生产者单消费者队列。netty使用该队列将外部用户线程产生的Task聚集,并在reactor线程内部用单线程的方式串行执行队列中的Task。

当用户在非IO线程调用Channel的各种方法执行Channel相关的操作时,比如channel.write()、channel.flush()等,netty会将相关操作封装成一个Task并放入taskQueue中,保证相关操作在IO线程中串行执行。

处理用户产生的定时任务:

关于定时任务就需要看下io.netty.util.concurrent.SingleThreadEventExecutor#schedule(java.lang.Runnable, long, java.util.concurrent.TimeUnit)方法代码:


      1. `@Override`
2. `public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {`
3. `----------省略部分代码---------------------`
4. `return schedule(new ScheduledFutureTask<Void>(`
5. `this, delayedTaskQueue, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));`
6. `}`


    

ScheduledFutureTask中传入的队列为delayedTaskQueue:


      1. `final
  
 Queue
 <
 ScheduledFutureTask
 <?>>
  delayedTaskQueue 
 =
  
 new
  
 PriorityQueue
 <
 ScheduledFutureTask
 <?>>();`


    

NioEventLoop中的Queue> delayedTaskQueue = new PriorityQueue被用来承载用户产生的定时Task。它是一个优先队列。

当用户在非IO线程需要产生定时操作时,netty将用户的定时操作封装成ScheduledFutureTask,即一个netty内部的定时Task,并将定时Task放入delayedTaskQueue中等待对应Channel的IO线程串行执行。

为了解决多线程并发写入delayedTaskQueue的问题,netty将添加ScheduledFutureTask到delayedTaskQueue中的操作封装成普通Task,放入taskQueue中,通过NioEventLoop的IO线程对delayedTaskQueue进行单线程写操作。

处理任务队列的逻辑:

  1. 将已到期的定时Task从delayedTaskQueue中转移到taskQueue中
  2. 计算本次循环执行的截止时间
  3. 循环执行taskQueue中的任务,每隔64个任务检查一下是否已过截止时间,直到taskQueue中任务全部执行完或者超过执行截止时间。

io.netty.util.concurrent.SingleThreadEventExecutor#takeTask方法:

picture.image

在io.netty.util.concurrent.SingleThreadEventExecutor#fetchFromDelayedQueue方法内部进行任务迁移的操作:


      1. `private void fetchFromDelayedQueue() {`
2. `long nanoTime = 0L;`
3. `for (;;) {`
4. `ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();`
5. `if (delayedTask == null) {`
6. `break;`
7. `}`
8. 
9. `if (nanoTime == 0L) {`
10. `nanoTime = ScheduledFutureTask.nanoTime();`
11. `}`
12. 
13. `if (delayedTask.deadlineNanos() <= nanoTime) {`
14. `delayedTaskQueue.remove();// 从delayedTaskQueue中移除`
15. `taskQueue.add(delayedTask);// 添加到任务队列中`
16. `} else {`
17. `break;`
18. `}`
19. `}`
20. `}`


    

io.netty.channel.nio.NioEventLoopGroup

NioEventLoopGroup中主要维护一组EventLoop,EventLoop实现了Executor接口,里面维护着executor线程和方法。

NioEventLoopGroup的类继承关系:

picture.image

MultithreadEventLoopGroup

在这个类的静态代码块中对EventLoopGroup的默认线程数进行了初始化:


      1. `static {`
2. `DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(`
3. `"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));`
4. `}`


    

并且对ThreadFactory进行的初始化:


      1. `@Override`
2. `protected ThreadFactory newDefaultThreadFactory() {`
3. `return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);`
4. `}`


    

io.netty.util.concurrent.DefaultThreadFactory#DefaultThreadFactory(java.lang.Class, int):


      1. `public DefaultThreadFactory(String poolName, boolean daemon, int priority) {`
2. `if (poolName == null) {`
3. `throw new NullPointerException("poolName");`
4. `}`
5. `if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {`
6. `throw new IllegalArgumentException(`
7. `"priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");`
8. `}`
9. 
10. `prefix = poolName + '-' + poolId.incrementAndGet() + '-';`
11. `this.daemon = daemon;`
12. `this.priority = priority;`
13. `}`


    

这里我们主要关注一下它的newThread方法:


      1. `@Override`
2. `public Thread newThread(Runnable r) {`
3. `Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());`
4. `--------------------------`
5. `}`
6. 
7. `protected Thread newThread(Runnable r, String name) {`
8. `return new FastThreadLocalThread(r, name);`
9. `}`


    

这里创建的线程为FastThreadLocalThread,接着顺便来分析下FastThreadLocalThread,先来看下它与普通线程不一样的属性和方法:


      1. `private InternalThreadLocalMap threadLocalMap;`
2. `/**`
3. `* Returns the internal data structure that keeps the thread-local variables bound to this thread.`
4. `* Note that this method is for internal use only, and thus is subject to change at any time.`
5. `*/`
6. `public final InternalThreadLocalMap threadLocalMap() {`
7. `return threadLocalMap;`
8. `}`
9. 
10. `/**`
11. `* Sets the internal data structure that keeps the thread-local variables bound to this thread.`
12. `* Note that this method is for internal use only, and thus is subject to change at any time.`
13. `*/`
14. `public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {`
15. `this.threadLocalMap = threadLocalMap;`
16. `}`


    

JDK 中自带的 ThreadLocal 在线程池使用环境中,有内存泄漏的风险,很明显,Netty 为了避免这个 bug,重新进行了封装。它主要用于与io.netty.util.concurrent.FastThreadLocal合用,就如同Thread与ThreadLocal合用一样(关于ThreadLocal、InheritThreadLocal和TransmitableThreadLocal的源码之前有一系列的文章分别分析过,需要详细了解的请翻阅历史文章)。我们知道解决hash冲突的办法主要有以下几种:

  1. 开放定址法(线性探测再散列,二次探测再散列,伪随机探测再散列)
  2. 再次哈希法(rehash)
  3. 链地址法
  4. 建立一个公共溢出区

Java中hashmap的解决办法就是采用的链地址法。这里我们补充一下hashmap中的知识点:

  • JDK1.8 HashMap的优化:一方面引入红黑树解决过长链表效率低的问题;另一方面重写resize方法,移除了alternative hashing相关方法,避免重新计算键的hash。
  • HashMap的线程不安全体现在:多线程同时put添加元素会丢失元素,多线程同时扩容会造成死循环。

关于FastThreadLocal,我们简要来分析几点:

缓冲行填充

io.netty.util.internal.InternalThreadLocalMap中有几个long类型的属性:


      1. `// Cache line padding (must be public)`
2. `// With CompressedOops enabled, an instance of this class should occupy at least 128 bytes.`
3. `public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;`


    

对cpu缓存行进行填充,防止因为伪共享导致的缓存失效问题。

fastGet和slowGet方法

io.netty.util.internal.InternalThreadLocalMap#get:


      1. `public static InternalThreadLocalMap get() {`
2. `Thread thread = Thread.currentThread();`
3. `if (thread instanceof FastThreadLocalThread) {`
4. `return fastGet((FastThreadLocalThread) thread);`
5. `} else {`
6. `return slowGet();`
7. `}`
8. `}`


    

会根据当前线程类型来决定走fastGet方法还是slowGet方法.

io.netty.util.internal.InternalThreadLocalMap#fastGet:


      1. `private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {`
2. `InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();`
3. `if (threadLocalMap == null) {`
4. `thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());`
5. `}`
6. `return threadLocalMap;`
7. `}`
8. 
9. `// 构造方法:`
10. `private InternalThreadLocalMap() {`
11. `super(newIndexedVariableTable());`
12. `}`
13. `private static Object[] newIndexedVariableTable() {`
14. `Object[] array = new Object[32];`
15. `Arrays.fill(array, UNSET);`
16. `return array;`
17. `}`
18. 
19. `// super构造方法:`
20. `UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {`
21. `this.indexedVariables = indexedVariables;`
22. `}`


    

在fastGet方法中针对的是FastThreadLocalThread线程,也就是netty的内部线程(与EventLoop关联使用的线程,用的是io.netty.util.internal.InternalThreadLocalMap。,这个 Map 内部维护的是一个数组,和 JDK 不同,JDK 维护的是一个使用线性探测法的 Map,可见,从底层数据结构上,JDK ThreadLocalMap就已经输了,他们的读取速度相差很大,特别是当数据量很大的时候,Netty 的数据结构速度依然不变,而 JDK ThreadLocalMap由于使用线性探测法,速度会相应的下降。

io.netty.util.internal.InternalThreadLocalMap#slowGet:


      1. `private static InternalThreadLocalMap slowGet() {`
2. `ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;`
3. `if (slowThreadLocalMap == null) {`
4. `UnpaddedInternalThreadLocalMap.slowThreadLocalMap =`
5. `slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();`
6. `}`
7. 
8. `InternalThreadLocalMap ret = slowThreadLocalMap.get();`
9. `if (ret == null) {`
10. `ret = new InternalThreadLocalMap();`
11. `slowThreadLocalMap.set(ret);`
12. `}`
13. `return ret;`
14. `}`


    
  • 这个方法针对的是普通线程,非FastThreadLocalThread线程。它使用的是ThreadLocal变量,在ThreadLocal变量内部存放的是InternalThreadLocalMap。在之前的文章中有详细分析过ThreadLocal,它的内部是基于ThreadLocalMap实现的,ThreadLocalMap内部Entry是一个WeakReference类型(弱引用级别比软引用更低。当对象根节点可及、无强引用和软引用、有弱引用指向对象时,若发生GC,该对象将直接被回收)的hashMap的结构。上上面提到过,hashmap是线性探测法的 Map。
  • 这个方法首先使用 JDK 的 ThreadLocal 获取一个 Netty 的 InternalThreadLocalMap,如果没有就创建一个,并将这个 InternalThreadLocalMap 设置到 JDK 的 ThreadLocal 中,然后返回这个 InternalThreadLocalMap。从这里可以看出,为了提高性能,Netty 还是避免使用了JDK 的 threadLocalMap,他的方式是曲线救国:在JDK 的 threadLocal 中设置 Netty 的 InternalThreadLocalMap ,然后,这个 InternalThreadLocalMap 中设置 Netty 的 FastThreadLcoal。

io.netty.util.concurrent.FastThreadLocal#set与get方法


      1. `/**`
2. `* Set the value for the current thread.`
3. `*/`
4. `public final void set(V value) {`
5. `if (value != InternalThreadLocalMap.UNSET) {`
6. `set(InternalThreadLocalMap.get(), value);`
7. `} else {`
8. `remove();`
9. `}`
10. `}`
11. 
12. `public final void set(InternalThreadLocalMap threadLocalMap, V value) {`
13. `if (value != InternalThreadLocalMap.UNSET) {`
14. `// 设置变量`
15. `if (threadLocalMap.setIndexedVariable(index, value)) {`
16. `addToVariablesToRemove(threadLocalMap, this);`
17. `}`
18. `} else {`
19. `remove(threadLocalMap);`
20. `}`
21. `}`
22. 
23. `//index是在FastThreadLocal的构造方法中初始化的:`
24. `public FastThreadLocal() {`
25. `index = InternalThreadLocalMap.nextVariableIndex();`
26. `}`
27. 
28. `//io.netty.util.internal.InternalThreadLocalMap#nextVariableIndex:`
29. `public static int nextVariableIndex() {`
30. `// 通过AtomicInteger维护`
31. `int index = nextIndex.getAndIncrement();`
32. `if (index < 0) {`
33. `nextIndex.decrementAndGet();`
34. `throw new IllegalStateException("too many thread-local indexed variables");`
35. `}`
36. `return index;`
37. `}`


    

它的每个变量值在set进去后可以根据index快速定位到指定index在数组中的值,看下get方法就很清晰了:


      1. `/**`
2. `* Returns the current value for the current thread`
3. `*/`
4. `public final V get() {`
5. `return get(InternalThreadLocalMap.get());`
6. `}`
7. 
8. `/**`
9. `* Returns the current value for the specified thread local map.`
10. `* The specified thread local map must be for the current thread.`
11. `*/`
12. `@SuppressWarnings("unchecked")`
13. `public final V get(InternalThreadLocalMap threadLocalMap) {`
14. `Object v = threadLocalMap.indexedVariable(index);`
15. `if (v != InternalThreadLocalMap.UNSET) {`
16. `return (V) v;`
17. `}`
18. 
19. `return initialize(threadLocalMap);`
20. `}`
21. 
22. `// io.netty.util.internal.InternalThreadLocalMap#indexedVariable`
23. `public Object indexedVariable(int index) {`
24. `Object[] lookup = indexedVariables;`
25. `return index < lookup.length? lookup[index] : UNSET;`
26. `}`


    

它能够根据index的值快速定位到数组中的元素,而它的索引是通过AtomicInteger来维护的.

拓容


      1. `private void expandIndexedVariableTableAndSet(int index, Object value) {`
2. `Object[] oldArray = indexedVariables;`
3. `final int oldCapacity = oldArray.length;`
4. `int newCapacity = index;`
5. `newCapacity |= newCapacity >>> 1;`
6. `newCapacity |= newCapacity >>> 2;`
7. `newCapacity |= newCapacity >>> 4;`
8. `newCapacity |= newCapacity >>> 8;`
9. `newCapacity |= newCapacity >>> 16;`
10. `newCapacity ++;`
11. 
12. `Object[] newArray = Arrays.copyOf(oldArray, newCapacity);`
13. `Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);`
14. `newArray[index] = value;`
15. `indexedVariables = newArray;`
16. `}`


    

这里和hashMap的扩容对比着看就很好理解了,这段代码的作用就是按原来的容量扩容2倍。并且保证结果是2的幂次方。这里 Netty 的做法和 HashMap 一样,按照原来的容量扩容到最近的 2 的幂次方大小,比如原来32,就扩容到64,然后,将原来数组的内容填充到新数组中,剩余的填充 空对象,然后将新数组赋值给成员变量 indexedVariables。完成了一次扩容。

从上面几点可以看出FastThreadLocalThread与FastThreadLocal合并时的主要特点是快,更多细节请参考:https://www.jianshu.com/p/3fc2fbac4bb7和https://www.jianshu.com/p/6adfa89ed06e。而DefaultThreadFactory创建的线程都是FastThreadLocalThread类型的.

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
在火山引擎云搜索服务上构建混合搜索的设计与实现
本次演讲将重点介绍字节跳动在混合搜索领域的探索,并探讨如何在多模态数据场景下进行海量数据搜索。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论