lettuce和hbase中对netty的使用你都了解吗?

技术

之前的两篇文章中咱们有详细地聊过关于lettuce的pipeline以及spring-data-redis对其封装的细节。本篇紧接着上面篇以connectionPoolingProvider为入口,对lettuce基于netty处理IO事件的线程池进行进一步地分析。

lettuce EventLoopGroup初始化

在使用lettuce作为redis连接池时,在上一节中我们知道,lettuce中维护连接有两种使用连接池的方式,目前一种已经废弃,另一种大家正在使用的版本是apache commons pool。咱们来回顾下。

从连接池获取连接的入口

org.springframework.data.redis.connection.lettuce.LettucePoolingConnectionProvider#getConnection:


          
@Override
          
    public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
          
        GenericObjectPool<StatefulConnection<?, ?>> pool = pools.computeIfAbsent(connectionType, poolType -> {
          
            // 不存在就创建连接池
          
            return ConnectionPoolSupport.createGenericObjectPool(() -> connectionProvider.getConnection(connectionType),
          
                    poolConfig, false);
          
        });
          
        try {
          
            // 从连接池中获取
          
            StatefulConnection<?, ?> connection = pool.borrowObject();
          
            // 放入map中
          
            poolRef.put(connection, pool);
          
            return connectionType.cast(connection);
          
        } catch (Exception e) {
          
            throw new PoolException("Could not get a resource from the pool", e);
          
        }
          
    }
      

上面的代码中获取连接的核心是connectionProvider.getConnection(connectionType),我们先来看一下connectionProvider的分类列表如下:

picture.image

LettucePoolingConnectionProvider只是一个代理,在它的内部维护着具体的创建连接的provider。在我的场景下的这个provider是StandaloneConnectionProvider。

我们来看一下org.springframework.data.redis.connection.lettuce.StandaloneConnectionProvider#getConnection(java.lang.Class)方法:


          
@Override
          
    public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
          
        ------------省略部分--------------
          
        if (StatefulConnection.class.isAssignableFrom(connectionType)) {
          
            return connectionType.cast(readFrom.map(it -> this.masterReplicaConnection(redisURISupplier.get(), it))
          
                    .orElseGet(() -> client.connect(codec)));
          
        }
          
        -------------------------
          
    }
      

在这里是使用RedisClient获取连接的方法,接着一起来看下io.lettuce.core.RedisClient#connect(io.lettuce.core.codec.RedisCodec<K,V>)方法:


          
 public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec) {
          
        checkForRedisURI();
          
        return getConnection(connectStandaloneAsync(codec, this.redisURI, timeout));
          
    }
      

在这里connectStandaloneAsync方法返回的是一个ConnectionFuture<StatefulRedisConnection<K, V>>对象,是一个维户着我们的redis连接的Future。来看下io.lettuce.core.RedisClient#connectStandaloneAsync方法:


          
private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(RedisCodec<K, V> codec,
          
            RedisURI redisURI, Duration timeout) {
          
        -------------------
          
        // 创建DefaultEndpoint对象
          
        DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions, clientResources);
          
        RedisChannelWriter writer = endpoint;
          
        if (CommandExpiryWriter.isSupported(clientOptions)) {
          
            writer = new CommandExpiryWriter(writer, clientOptions, clientResources);
          
        }
          
        // 创建连接对象
          
        StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, codec, timeout);
          
       // 进行连接
          
        ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, codec, endpoint, redisURI,
          
                () -> new CommandHandler(clientOptions, clientResources, endpoint));
          
         ------------------------------------------
          
        return future;
          
    }
      

这里我们来分析下真正去建立连接的过程,主要逻辑在io.lettuce.core.RedisClient#connectStatefulAsync方法中,代码如下:


          
private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection,
          
            RedisCodec<K, V> codec, Endpoint endpoint,
          
            RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {
          
        ConnectionBuilder connectionBuilder;
          
        if (redisURI.isSsl()) {
          
       -------------省略部分代码----------------
          
        } else {
          
            // 创建connection builder对象
          
            connectionBuilder = ConnectionBuilder.connectionBuilder();
          
        }
          
        connectionBuilder.connection(connection);
          
        connectionBuilder.clientOptions(clientOptions);
          
        connectionBuilder.clientResources(clientResources);
          
        connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint);
          
        // 在这里会创建netty client Bootstrap对象
          
        connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);
          
        // 在这里根据channel type来初始化eventLoopGroup对象
          
        channelType(connectionBuilder, redisURI);
          
       ----------------------省略部分代码--------------------
          
        return sync.thenApply(channelHandler -> (S) connection);
          
    }
      

这里主要需要分析个方法:connectionBuilder方法和channelType方法。

1.io.lettuce.core.AbstractRedisClient#connectionBuilder方法:


          
     protected void connectionBuilder(Mono<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder,
          
               RedisURI redisURI) {
          
           // 创建bootstrap对象
          
           Bootstrap redisBootstrap = new Bootstrap();
          
           // 设置高水位线
          
           redisBootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
          
          // 设置低水平线
          
           redisBootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
          
          // buf的allocator
          
           redisBootstrap.option(ChannelOption.ALLOCATOR, BUF_ALLOCATOR);
          
           SocketOptions socketOptions = getOptions().getSocketOptions();
          
          // 设置bootstrap连接超时时间
          
           redisBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
          
                   Math.toIntExact(socketOptions.getConnectTimeout().toMillis()));
          
           if (LettuceStrings.isEmpty(redisURI.getSocket())) {
          
               redisBootstrap.option(ChannelOption.SO_KEEPALIVE, socketOptions.isKeepAlive());
          
               redisBootstrap.option(ChannelOption.TCP_NODELAY, socketOptions.isTcpNoDelay());
          
           }
          
            // 设置超时
          
           connectionBuilder.timeout(redisURI.getTimeout());
          
           // 设置密码
          
           connectionBuilder.password(redisURI.getPassword());
          
           connectionBuilder.bootstrap(redisBootstrap);
          
           connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents).timer(timer);
          
           connectionBuilder.socketAddressSupplier(socketAddressSupplier);
          
       }
          

      

这里主要是初始化 netty client的相关对象,并对其中一些参数进行设置。可以看到整个connectionBuilder对象中维护着连接所需要的所有信息。

1.io.lettuce.core.AbstractRedisClient#channelType方法


          
    protected void channelType(ConnectionBuilder connectionBuilder, ConnectionPoint connectionPoint) {
          
           LettuceAssert.notNull(connectionPoint, "ConnectionPoint must not be null");
          
           connectionBuilder.bootstrap().group(getEventLoopGroup(connectionPoint));
          
           if (connectionPoint.getSocket() != null) {
          
               NativeTransports.assertAvailable();
          
               connectionBuilder.bootstrap().channel(NativeTransports.domainSocketChannelClass());
          
           } else {
          
               connectionBuilder.bootstrap().channel(Transports.socketChannelClass());
          
           }
          
       }
      

这个方法的目的也很简单,主要是设置执行netty 网络IO操作的线程池 eventLoopGroup。在真正开始分析eventLoopGroup之前,先来看一下io.lettuce.core.Transports#eventLoopGroupClass方法:


          
     static Class<? extends EventLoopGroup> eventLoopGroupClass() {
          
           if (NativeTransports.isSocketSupported()) {
          
               return NativeTransports.eventLoopGroupClass();
          
           }
          
           return NioEventLoopGroup.class;
          
       }
      

这个方法的主要目的是确定使用哪种类型的NioEventLoopGroup。在io.lettuce.core.Transports.NativeTransports#eventLoopGroupClass方法内部:


          
   static Class<? extends EventLoopGroup> eventLoopGroupClass() {
          
       if (KqueueProvider.isAvailable()) {
          
           return KqueueProvider.eventLoopGroupClass();
          
       }
          
       return EpollProvider.eventLoopGroupClass();
          
   }
      

这个方法会先判断kqueue与epoll provider的可用性,然后来决定eventLoopGroup的class类型是io.netty.channel.kqueue.KQueueEventLoopGroup还是io.netty.channel.epoll.EpollEventLoopGroup。具体判断的逻辑在EpollProvider和KqueueProvider的静态代码块,有兴趣的可以自己去分析一下,这里简单地看下kqueue的:

picture.image

如果既没有epoll也没有kqueue,那么会使用NioEventLoopGroup。

获取EventLoopGroup

此时再回过头来看下io.lettuce.core.AbstractRedisClient#getEventLoopGroup方法代码如下:


          
      private synchronized EventLoopGroup getEventLoopGroup(ConnectionPoint connectionPoint) {
          
           if (connectionPoint.getSocket() == null && !eventLoopGroups.containsKey(Transports.eventLoopGroupClass())) {
          
              // 先进行对应eventLoopGroupClass的初始化
          
               eventLoopGroups.put(Transports.eventLoopGroupClass(),clientResources.eventLoopGroupProvider().allocate(Transports.eventLoopGroupClass()));
          
           }
          
           if (connectionPoint.getSocket() != null) {
          
               NativeTransports.assertAvailable();
          
               Class<? extends EventLoopGroup> eventLoopGroupClass = NativeTransports.eventLoopGroupClass();
          
               if (!eventLoopGroups.containsKey(NativeTransports.eventLoopGroupClass())) {
          
                   eventLoopGroups
          
                           .put(eventLoopGroupClass, clientResources.eventLoopGroupProvider().allocate(eventLoopGroupClass));
          
               }
          
           }
          
           if (connectionPoint.getSocket() == null) {
          
               // 返回
          
               return eventLoopGroups.get(Transports.eventLoopGroupClass());
          
           }
          
           if (connectionPoint.getSocket() != null) {
          
               NativeTransports.assertAvailable();
          
               return eventLoopGroups.get(NativeTransports.eventLoopGroupClass());
          
           }
          
           throw new IllegalStateException("This should not have happened in a binary decision. Please file a bug.");
          
       }
      

这里我们主要关注clientResources.eventLoopGroupProvider().allocate(Transports.eventLoopGroupClass())方法,其中clientResources.eventLoopGroupProvider()方法会从clientResources中获取到eventLoopGroupProvider,然后使用这个provider去申请对应class类型的eventLoopGroup。

在DefaultClientResources中的provider类型是DefaultEventLoopGroupProvider,至于为什么这样,下文会有分析。我们来看下io.lettuce.core.resource.DefaultEventLoopGroupProvider#allocate方法:


          
  @Override
          
    public <T extends EventLoopGroup> T allocate(Class<T> type) {
          
        synchronized (this) {
          
            logger.debug("Allocating executor {}", type.getName());
          
            // 将引用放入到外部容器中去,方便计数统计
          
            return addReference(getOrCreate(type));
          
        }
          
    }
          

          
io.lettuce.core.resource.DefaultEventLoopGroupProvider#getOrCreate:
          
private <T extends EventLoopGroup> T getOrCreate(Class<T> type) {
          
 ---------------省略部分代码------------------
          
        if (!eventLoopGroups.containsKey(type)) {
          
            // 如果不存在,则创建eventLoopGroup
          
            eventLoopGroups.put(type, createEventLoopGroup(type, numberOfThreads));
          
        }
          
        return (T) eventLoopGroups.get(type);
          
    }
          

          
io.lettuce.core.resource.DefaultEventLoopGroupProvider#createEventLoopGroup:
          
 public static <T extends EventExecutorGroup> EventExecutorGroup createEventLoopGroup(Class<T> type, int numberOfThreads) {
          
        logger.debug("Creating executor {}", type.getName());
          
        if (DefaultEventExecutorGroup.class.equals(type)) {
          
            return new DefaultEventExecutorGroup(numberOfThreads, new DefaultThreadFactory("lettuce-eventExecutorLoop", true));
          
        }
          
        if (NioEventLoopGroup.class.equals(type)) {
          
            return new NioEventLoopGroup(numberOfThreads, new DefaultThreadFactory("lettuce-nioEventLoop", true));
          
        }
          
        if (EpollProvider.isAvailable() && EpollProvider.isEventLoopGroup(type)) {
          
            return EpollProvider.newEventLoopGroup(numberOfThreads, new DefaultThreadFactory("lettuce-epollEventLoop", true));
          
        }
          
        if (KqueueProvider.isAvailable() && KqueueProvider.isEventLoopGroup(type)) {
          
            return KqueueProvider.newEventLoopGroup(numberOfThreads, new DefaultThreadFactory("lettuce-kqueueEventLoop", true));
          
        }
          
        throw new IllegalArgumentException(String.format("Type %s not supported", type.getName()));
          
    }
      

这里的getEventLoopGroup方法会生成真正供netty使用的EventLoopGroup,可能是nio、epoll、kqueue中的一种。先判断下当前环境是否支持epoll和kequeue,如果支持则会生成对应的eventLoopGroup,默认使用的是NioEventLoopGroup。没错,这个eventLoopGroup就是netty client进行io操作的eventLoopGroup。

DefaultClientResources 的 DefaultEventExecutorGroup

这里先来看一下clientResources的初始化,org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration#lettuceClientResources代码:


          
@Bean(destroyMethod = "shutdown")
          
@ConditionalOnMissingBean(ClientResources.class)
          
public DefaultClientResources lettuceClientResources() {
          
   return DefaultClientResources.create();
          
}
          

          
io.lettuce.core.resource.DefaultClientResources#create:
          
  public static DefaultClientResources create() {
          
        return builder().build();
          
    }
          

          
io.lettuce.core.resource.DefaultClientResources.Builder#build:
          
@Override
          
public DefaultClientResources build() {
          
    return new DefaultClientResources(this);
          
}
          

          
protected DefaultClientResources(Builder builder) {
          
        if (builder.eventLoopGroupProvider == null) {
          
            int ioThreadPoolSize = builder.ioThreadPoolSize;
          
            ----------省略部分代码---------
          
                // 创建DefaultEventLoopGroupProvider
          
            this.eventLoopGroupProvider = new DefaultEventLoopGroupProvider(ioThreadPoolSize);
          

          
        } else {
          
            this.sharedEventLoopGroupProvider = builder.sharedEventLoopGroupProvider;
          
            this.eventLoopGroupProvider = builder.eventLoopGroupProvider;
          
        }
          

          
        if (builder.eventExecutorGroup == null) {
          
          ------------------------------
          
              // 创建eventExecutorGroup
          
            eventExecutorGroup = DefaultEventLoopGroupProvider.createEventLoopGroup(DefaultEventExecutorGroup.class,
          
                    computationThreadPoolSize);
          
            sharedEventExecutor = false;
          
        } else {
          
            sharedEventExecutor = builder.sharedEventExecutor;
          
            eventExecutorGroup = builder.eventExecutorGroup;
          
        }
          

          
        if (builder.timer == null) {
          
            // 创建时间轮timer
          
            timer = new HashedWheelTimer(new DefaultThreadFactory("lettuce-timer"));
          
            sharedTimer = false;
          
        } else {
          
            timer = builder.timer;
          
            sharedTimer = builder.sharedTimer;
          
        }
          
        if (builder.eventBus == null) {
          
            // 创建事件驱动服务总线
          
            eventBus = new DefaultEventBus(Schedulers.fromExecutor(eventExecutorGroup));
          
        } else {
          
            eventBus = builder.eventBus;
          
        }
          
-------------------------------------
      

这里本文主要关注的重点是创建eventExecutorGroup的方法,eventExecutorGroup是EventExecutorGroup类型的,它的主要作用是作为netty channelGroup的EventExecutor。关于EventExecutor参考

https://www.cnblogs.com/lighten/p/8967630.htmlhttps://juejin.im/post/5de7a0dcf265da33bf1ff42c

io.lettuce.core.resource.DefaultEventLoopGroupProvider#createEventLoopGroup方法代码如下:


          
public static <T extends EventExecutorGroup> EventExecutorGroup createEventLoopGroup(Class<T> type, int numberOfThreads) {
          
        logger.debug("Creating executor {}", type.getName());
          
        if (DefaultEventExecutorGroup.class.equals(type)) {
          
            // 创建默认的eventExecutorGroup
          
            return new DefaultEventExecutorGroup(numberOfThreads, new DefaultThreadFactory("lettuce-eventExecutorLoop", true));
          
        }
          
        if (NioEventLoopGroup.class.equals(type)) {
          
            // 如果是NioEventLoopGroup类型的会创建NioEventLoopGroup
          
            return new NioEventLoopGroup(numberOfThreads, new DefaultThreadFactory("lettuce-nioEventLoop", true));
          
        }
          
        if (EpollProvider.isAvailable() && EpollProvider.isEventLoopGroup(type)) {
          
            // 创建epoll类型的eventLoop
          
            return EpollProvider.newEventLoopGroup(numberOfThreads, new DefaultThreadFactory("lettuce-epollEventLoop", true));
          
        }
          
        if (KqueueProvider.isAvailable() && KqueueProvider.isEventLoopGroup(type)) {
          
            // 创建kqueue类型的eventLoop
          
            return KqueueProvider.newEventLoopGroup(numberOfThreads, new DefaultThreadFactory("lettuce-kqueueEventLoop", true));
          
        }
          
        throw new IllegalArgumentException(String.format("Type %s not supported", type.getName()));
          
    }
      

这个方法的主要作用是根据类型创建相应类型的EventLoopGroup,对于DefaultClientResources来说,它创建的是DefaultEventExecutorGroup,属于EventExecutorGroup类型的,它的主要作用是处理一些回调事件,异步处理,超时处理等,但是它不是用于处理netty io事件的那个eventLoopGroup。

说在后面的话

hbase client中底层rpc也是使用的也是Netty,它的org.apache.hadoop.hbase.ipc.NettyRpcConnection#connect方法部分截图如下:

picture.image

这里也会涉及到一个rpcClient.group的初始化过程,它和lettuce中对netty的使用有什么区别呢?感兴趣的可以去分析一下。

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
云原生机器学习系统落地和实践
机器学习在字节跳动有着丰富业务场景:推广搜、CV/NLP/Speech 等。业务规模的不断增大对机器学习系统从用户体验、训练效率、编排调度、资源利用等方面也提出了新的挑战,而 Kubernetes 云原生理念的提出正是为了应对这些挑战。本次分享将主要介绍字节跳动机器学习系统云原生化的落地和实践。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论