之前的两篇文章中咱们有详细地聊过关于lettuce的pipeline以及spring-data-redis对其封装的细节。本篇紧接着上面篇以connectionPoolingProvider为入口,对lettuce基于netty处理IO事件的线程池进行进一步地分析。
在使用lettuce作为redis连接池时,在上一节中我们知道,lettuce中维护连接有两种使用连接池的方式,目前一种已经废弃,另一种大家正在使用的版本是apache commons pool。咱们来回顾下。
从连接池获取连接的入口
org.springframework.data.redis.connection.lettuce.LettucePoolingConnectionProvider#getConnection:
@Override
public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
GenericObjectPool<StatefulConnection<?, ?>> pool = pools.computeIfAbsent(connectionType, poolType -> {
// 不存在就创建连接池
return ConnectionPoolSupport.createGenericObjectPool(() -> connectionProvider.getConnection(connectionType),
poolConfig, false);
});
try {
// 从连接池中获取
StatefulConnection<?, ?> connection = pool.borrowObject();
// 放入map中
poolRef.put(connection, pool);
return connectionType.cast(connection);
} catch (Exception e) {
throw new PoolException("Could not get a resource from the pool", e);
}
}
上面的代码中获取连接的核心是connectionProvider.getConnection(connectionType),我们先来看一下connectionProvider的分类列表如下:
LettucePoolingConnectionProvider只是一个代理,在它的内部维护着具体的创建连接的provider。在我的场景下的这个provider是StandaloneConnectionProvider。
我们来看一下org.springframework.data.redis.connection.lettuce.StandaloneConnectionProvider#getConnection(java.lang.Class)方法:
@Override
public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
------------省略部分--------------
if (StatefulConnection.class.isAssignableFrom(connectionType)) {
return connectionType.cast(readFrom.map(it -> this.masterReplicaConnection(redisURISupplier.get(), it))
.orElseGet(() -> client.connect(codec)));
}
-------------------------
}
在这里是使用RedisClient获取连接的方法,接着一起来看下io.lettuce.core.RedisClient#connect(io.lettuce.core.codec.RedisCodec<K,V>)方法:
public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec) {
checkForRedisURI();
return getConnection(connectStandaloneAsync(codec, this.redisURI, timeout));
}
在这里connectStandaloneAsync方法返回的是一个ConnectionFuture<StatefulRedisConnection<K, V>>对象,是一个维户着我们的redis连接的Future。来看下io.lettuce.core.RedisClient#connectStandaloneAsync方法:
private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(RedisCodec<K, V> codec,
RedisURI redisURI, Duration timeout) {
-------------------
// 创建DefaultEndpoint对象
DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions, clientResources);
RedisChannelWriter writer = endpoint;
if (CommandExpiryWriter.isSupported(clientOptions)) {
writer = new CommandExpiryWriter(writer, clientOptions, clientResources);
}
// 创建连接对象
StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, codec, timeout);
// 进行连接
ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, codec, endpoint, redisURI,
() -> new CommandHandler(clientOptions, clientResources, endpoint));
------------------------------------------
return future;
}
这里我们来分析下真正去建立连接的过程,主要逻辑在io.lettuce.core.RedisClient#connectStatefulAsync方法中,代码如下:
private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection,
RedisCodec<K, V> codec, Endpoint endpoint,
RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {
ConnectionBuilder connectionBuilder;
if (redisURI.isSsl()) {
-------------省略部分代码----------------
} else {
// 创建connection builder对象
connectionBuilder = ConnectionBuilder.connectionBuilder();
}
connectionBuilder.connection(connection);
connectionBuilder.clientOptions(clientOptions);
connectionBuilder.clientResources(clientResources);
connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint);
// 在这里会创建netty client Bootstrap对象
connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);
// 在这里根据channel type来初始化eventLoopGroup对象
channelType(connectionBuilder, redisURI);
----------------------省略部分代码--------------------
return sync.thenApply(channelHandler -> (S) connection);
}
这里主要需要分析个方法:connectionBuilder方法和channelType方法。
1.io.lettuce.core.AbstractRedisClient#connectionBuilder方法:
protected void connectionBuilder(Mono<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder,
RedisURI redisURI) {
// 创建bootstrap对象
Bootstrap redisBootstrap = new Bootstrap();
// 设置高水位线
redisBootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
// 设置低水平线
redisBootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
// buf的allocator
redisBootstrap.option(ChannelOption.ALLOCATOR, BUF_ALLOCATOR);
SocketOptions socketOptions = getOptions().getSocketOptions();
// 设置bootstrap连接超时时间
redisBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
Math.toIntExact(socketOptions.getConnectTimeout().toMillis()));
if (LettuceStrings.isEmpty(redisURI.getSocket())) {
redisBootstrap.option(ChannelOption.SO_KEEPALIVE, socketOptions.isKeepAlive());
redisBootstrap.option(ChannelOption.TCP_NODELAY, socketOptions.isTcpNoDelay());
}
// 设置超时
connectionBuilder.timeout(redisURI.getTimeout());
// 设置密码
connectionBuilder.password(redisURI.getPassword());
connectionBuilder.bootstrap(redisBootstrap);
connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents).timer(timer);
connectionBuilder.socketAddressSupplier(socketAddressSupplier);
}
这里主要是初始化 netty client的相关对象,并对其中一些参数进行设置。可以看到整个connectionBuilder对象中维护着连接所需要的所有信息。
1.io.lettuce.core.AbstractRedisClient#channelType方法
protected void channelType(ConnectionBuilder connectionBuilder, ConnectionPoint connectionPoint) {
LettuceAssert.notNull(connectionPoint, "ConnectionPoint must not be null");
connectionBuilder.bootstrap().group(getEventLoopGroup(connectionPoint));
if (connectionPoint.getSocket() != null) {
NativeTransports.assertAvailable();
connectionBuilder.bootstrap().channel(NativeTransports.domainSocketChannelClass());
} else {
connectionBuilder.bootstrap().channel(Transports.socketChannelClass());
}
}
这个方法的目的也很简单,主要是设置执行netty 网络IO操作的线程池 eventLoopGroup。在真正开始分析eventLoopGroup之前,先来看一下io.lettuce.core.Transports#eventLoopGroupClass方法:
static Class<? extends EventLoopGroup> eventLoopGroupClass() {
if (NativeTransports.isSocketSupported()) {
return NativeTransports.eventLoopGroupClass();
}
return NioEventLoopGroup.class;
}
这个方法的主要目的是确定使用哪种类型的NioEventLoopGroup。在io.lettuce.core.Transports.NativeTransports#eventLoopGroupClass方法内部:
static Class<? extends EventLoopGroup> eventLoopGroupClass() {
if (KqueueProvider.isAvailable()) {
return KqueueProvider.eventLoopGroupClass();
}
return EpollProvider.eventLoopGroupClass();
}
这个方法会先判断kqueue与epoll provider的可用性,然后来决定eventLoopGroup的class类型是io.netty.channel.kqueue.KQueueEventLoopGroup还是io.netty.channel.epoll.EpollEventLoopGroup。具体判断的逻辑在EpollProvider和KqueueProvider的静态代码块,有兴趣的可以自己去分析一下,这里简单地看下kqueue的:
如果既没有epoll也没有kqueue,那么会使用NioEventLoopGroup。
此时再回过头来看下io.lettuce.core.AbstractRedisClient#getEventLoopGroup方法代码如下:
private synchronized EventLoopGroup getEventLoopGroup(ConnectionPoint connectionPoint) {
if (connectionPoint.getSocket() == null && !eventLoopGroups.containsKey(Transports.eventLoopGroupClass())) {
// 先进行对应eventLoopGroupClass的初始化
eventLoopGroups.put(Transports.eventLoopGroupClass(),clientResources.eventLoopGroupProvider().allocate(Transports.eventLoopGroupClass()));
}
if (connectionPoint.getSocket() != null) {
NativeTransports.assertAvailable();
Class<? extends EventLoopGroup> eventLoopGroupClass = NativeTransports.eventLoopGroupClass();
if (!eventLoopGroups.containsKey(NativeTransports.eventLoopGroupClass())) {
eventLoopGroups
.put(eventLoopGroupClass, clientResources.eventLoopGroupProvider().allocate(eventLoopGroupClass));
}
}
if (connectionPoint.getSocket() == null) {
// 返回
return eventLoopGroups.get(Transports.eventLoopGroupClass());
}
if (connectionPoint.getSocket() != null) {
NativeTransports.assertAvailable();
return eventLoopGroups.get(NativeTransports.eventLoopGroupClass());
}
throw new IllegalStateException("This should not have happened in a binary decision. Please file a bug.");
}
这里我们主要关注clientResources.eventLoopGroupProvider().allocate(Transports.eventLoopGroupClass())方法,其中clientResources.eventLoopGroupProvider()方法会从clientResources中获取到eventLoopGroupProvider,然后使用这个provider去申请对应class类型的eventLoopGroup。
在DefaultClientResources中的provider类型是DefaultEventLoopGroupProvider,至于为什么这样,下文会有分析。我们来看下io.lettuce.core.resource.DefaultEventLoopGroupProvider#allocate方法:
@Override
public <T extends EventLoopGroup> T allocate(Class<T> type) {
synchronized (this) {
logger.debug("Allocating executor {}", type.getName());
// 将引用放入到外部容器中去,方便计数统计
return addReference(getOrCreate(type));
}
}
io.lettuce.core.resource.DefaultEventLoopGroupProvider#getOrCreate:
private <T extends EventLoopGroup> T getOrCreate(Class<T> type) {
---------------省略部分代码------------------
if (!eventLoopGroups.containsKey(type)) {
// 如果不存在,则创建eventLoopGroup
eventLoopGroups.put(type, createEventLoopGroup(type, numberOfThreads));
}
return (T) eventLoopGroups.get(type);
}
io.lettuce.core.resource.DefaultEventLoopGroupProvider#createEventLoopGroup:
public static <T extends EventExecutorGroup> EventExecutorGroup createEventLoopGroup(Class<T> type, int numberOfThreads) {
logger.debug("Creating executor {}", type.getName());
if (DefaultEventExecutorGroup.class.equals(type)) {
return new DefaultEventExecutorGroup(numberOfThreads, new DefaultThreadFactory("lettuce-eventExecutorLoop", true));
}
if (NioEventLoopGroup.class.equals(type)) {
return new NioEventLoopGroup(numberOfThreads, new DefaultThreadFactory("lettuce-nioEventLoop", true));
}
if (EpollProvider.isAvailable() && EpollProvider.isEventLoopGroup(type)) {
return EpollProvider.newEventLoopGroup(numberOfThreads, new DefaultThreadFactory("lettuce-epollEventLoop", true));
}
if (KqueueProvider.isAvailable() && KqueueProvider.isEventLoopGroup(type)) {
return KqueueProvider.newEventLoopGroup(numberOfThreads, new DefaultThreadFactory("lettuce-kqueueEventLoop", true));
}
throw new IllegalArgumentException(String.format("Type %s not supported", type.getName()));
}
这里的getEventLoopGroup方法会生成真正供netty使用的EventLoopGroup,可能是nio、epoll、kqueue中的一种。先判断下当前环境是否支持epoll和kequeue,如果支持则会生成对应的eventLoopGroup,默认使用的是NioEventLoopGroup。没错,这个eventLoopGroup就是netty client进行io操作的eventLoopGroup。
这里先来看一下clientResources的初始化,org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration#lettuceClientResources代码:
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(ClientResources.class)
public DefaultClientResources lettuceClientResources() {
return DefaultClientResources.create();
}
io.lettuce.core.resource.DefaultClientResources#create:
public static DefaultClientResources create() {
return builder().build();
}
io.lettuce.core.resource.DefaultClientResources.Builder#build:
@Override
public DefaultClientResources build() {
return new DefaultClientResources(this);
}
protected DefaultClientResources(Builder builder) {
if (builder.eventLoopGroupProvider == null) {
int ioThreadPoolSize = builder.ioThreadPoolSize;
----------省略部分代码---------
// 创建DefaultEventLoopGroupProvider
this.eventLoopGroupProvider = new DefaultEventLoopGroupProvider(ioThreadPoolSize);
} else {
this.sharedEventLoopGroupProvider = builder.sharedEventLoopGroupProvider;
this.eventLoopGroupProvider = builder.eventLoopGroupProvider;
}
if (builder.eventExecutorGroup == null) {
------------------------------
// 创建eventExecutorGroup
eventExecutorGroup = DefaultEventLoopGroupProvider.createEventLoopGroup(DefaultEventExecutorGroup.class,
computationThreadPoolSize);
sharedEventExecutor = false;
} else {
sharedEventExecutor = builder.sharedEventExecutor;
eventExecutorGroup = builder.eventExecutorGroup;
}
if (builder.timer == null) {
// 创建时间轮timer
timer = new HashedWheelTimer(new DefaultThreadFactory("lettuce-timer"));
sharedTimer = false;
} else {
timer = builder.timer;
sharedTimer = builder.sharedTimer;
}
if (builder.eventBus == null) {
// 创建事件驱动服务总线
eventBus = new DefaultEventBus(Schedulers.fromExecutor(eventExecutorGroup));
} else {
eventBus = builder.eventBus;
}
-------------------------------------
这里本文主要关注的重点是创建eventExecutorGroup的方法,eventExecutorGroup是EventExecutorGroup类型的,它的主要作用是作为netty channelGroup的EventExecutor。关于EventExecutor参考
•https://www.cnblogs.com/lighten/p/8967630.html •https://juejin.im/post/5de7a0dcf265da33bf1ff42c
io.lettuce.core.resource.DefaultEventLoopGroupProvider#createEventLoopGroup方法代码如下:
public static <T extends EventExecutorGroup> EventExecutorGroup createEventLoopGroup(Class<T> type, int numberOfThreads) {
logger.debug("Creating executor {}", type.getName());
if (DefaultEventExecutorGroup.class.equals(type)) {
// 创建默认的eventExecutorGroup
return new DefaultEventExecutorGroup(numberOfThreads, new DefaultThreadFactory("lettuce-eventExecutorLoop", true));
}
if (NioEventLoopGroup.class.equals(type)) {
// 如果是NioEventLoopGroup类型的会创建NioEventLoopGroup
return new NioEventLoopGroup(numberOfThreads, new DefaultThreadFactory("lettuce-nioEventLoop", true));
}
if (EpollProvider.isAvailable() && EpollProvider.isEventLoopGroup(type)) {
// 创建epoll类型的eventLoop
return EpollProvider.newEventLoopGroup(numberOfThreads, new DefaultThreadFactory("lettuce-epollEventLoop", true));
}
if (KqueueProvider.isAvailable() && KqueueProvider.isEventLoopGroup(type)) {
// 创建kqueue类型的eventLoop
return KqueueProvider.newEventLoopGroup(numberOfThreads, new DefaultThreadFactory("lettuce-kqueueEventLoop", true));
}
throw new IllegalArgumentException(String.format("Type %s not supported", type.getName()));
}
这个方法的主要作用是根据类型创建相应类型的EventLoopGroup,对于DefaultClientResources来说,它创建的是DefaultEventExecutorGroup,属于EventExecutorGroup类型的,它的主要作用是处理一些回调事件,异步处理,超时处理等,但是它不是用于处理netty io事件的那个eventLoopGroup。
hbase client中底层rpc也是使用的也是Netty,它的org.apache.hadoop.hbase.ipc.NettyRpcConnection#connect方法部分截图如下:
这里也会涉及到一个rpcClient.group的初始化过程,它和lettuce中对netty的使用有什么区别呢?感兴趣的可以去分析一下。