试想一下,dubbo的consumer与dubbo的provider端之间是通过一个长连接来进行通信的,但是dubbo的consumer还要处理很多线程的业务操作,会有很多线程的请求需要通过这个长连接来进行处理,那么它是怎么做到的呢?
- Client
client端的类结构图为:
com.alibaba.dubbo.remoting.exchange.ExchangeClient的结构:
可见ExchangeClient有很多实现类,现在我们着重分析dubbo默认使用的类com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeClient:
1. `/**`
2. `* DefaultMessageClient`
3. `*`
4. `* @author william.liangf`
5. `* @author chao.liuc`
6. `*/`
7. `public class HeaderExchangeClient implements ExchangeClient {`
8.
9. `private static final Logger logger = LoggerFactory.getLogger( HeaderExchangeClient.class );`
10. `//处理心跳定时线程`
11. `private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));`
12.
13. `// 心跳定时器`
14. `private ScheduledFuture<?> heatbeatTimer;`
15.
16. `// 心跳超时,毫秒。缺省0,不会执行心跳。`
17. `private int heartbeat;`
18.
19. `private int heartbeatTimeout;`
20.
21. `//装饰器模式,真正的client,可能是nettyClient、minaClient等`
22. `//@see com.alibaba.dubbo.remoting.Transporters#connect(com.alibaba.dubbo.common.URL, com.alibaba.dubbo.remoting.ChannelHandler...)`
23. `private final Client client;`
24. `//通道`
25. `private final ExchangeChannel channel;`
26.
27. `public ResponseFuture request(Object request) throws RemotingException {`
28. `return channel.request(request);`
29. `}`
- Channel
Channel的类实现关系如下:
这里主要来看下ExchangeChannel,这里对应的实现是com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel:
1. `/**`
2. `* ExchangeReceiver`
3. `*`
4. `* @author william.liangf`
5. `*/`
6. `final class HeaderExchangeChannel implements ExchangeChannel {`
7.
8. `private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeChannel.class);`
9.
10. `private static final String CHANNEL_KEY = HeaderExchangeChannel.class.getName() + ".CHANNEL";`
11. `//装饰器模式,这个Channel是真正发送数据用到的Channel,可能是nettyChannel也可能是minaChannel`
12. `private final Channel channel;`
13.
14. `private volatile boolean closed = false;`
15.
16. `public void send(Object message) throws RemotingException {`
17. `send(message, getUrl().getParameter(Constants.SENT_KEY, false));`
18. `}`
19.
20. `public void send(Object message, boolean sent) throws RemotingException {`
21. `if (closed) {`
22. `throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");`
23. `}`
24. `if (message instanceof Request`
25. `|| message instanceof Response`
26. `|| message instanceof String) {`
27. `channel.send(message, sent);`
28. `} else {`
29. `Request request = new Request();`
30. `request.setVersion("2.0.0");`
31. `request.setTwoWay(false);`
32. `request.setData(message);`
33. `channel.send(request, sent);`
34. `}`
35. `}`
36.
37. `public ResponseFuture request(Object request) throws RemotingException {`
38. `return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));`
39. `}`
40.
41. `public ResponseFuture request(Object request, int timeout) throws RemotingException {`
42. `if (closed) {`
43. `throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");`
44. `}`
45. `// create request.`
46. `Request req = new Request();`
47. `req.setVersion("2.0.0");`
48. `req.setTwoWay(true);`
49. `req.setData(request);`
50. `DefaultFuture future = new DefaultFuture(channel, req, timeout);`
51. `try{`
52. `channel.send(req);`
53. `}catch (RemotingException e) {`
54. `future.cancel();`
55. `throw e;`
56. `}`
57. `return future;`
58. `}`
- 可以看到这里用的是装饰器模式,另外方法调用通过外层的HeaderExchangeClient的request方法调到channel的request方法,然后调用的是channel的send方法。
- ResponseFuture的类继承图为:
- 这个Future是一个包装,为了进行非阻塞调用,它的真正结果是通过提前设置的回调方法异步获取或者手动调用get方法来同步获取的。它的get方法如下:
1. `//维护requestid与Channel的map`
2. `private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();`
3. `//维护requestId与ResponseFuture之间关系的map`
4. `private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();`
5.
6. `public DefaultFuture(Channel channel, Request request, int timeout){`
7. `this.channel = channel;`
8. `this.request = request;`
9. `this.id = request.getId();`
10. `this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);`
11. `// put into waiting map.`
12. `FUTURES.put(id, this);`
13. `CHANNELS.put(id, channel);`
14. `}`
15. `private final Lock lock = new ReentrantLock();`
16.
17. `private final Condition done = lock.newCondition();`
18.
19. `private volatile Response response;`
20.
21. `private volatile ResponseCallback callback;`
22.
23. `public Object get() throws RemotingException {`
24. `return get(timeout);`
25. `}`
26.
27. `public Object get(int timeout) throws RemotingException {`
28. `if (timeout <= 0) {`
29. `timeout = Constants.DEFAULT_TIMEOUT;`
30. `}`
31. `if (! isDone()) {`
32. `long start = System.currentTimeMillis();`
33. `lock.lock();`
34. `try {`
35. `while (! isDone()) {//这里是为了防止虚假唤醒,关于虚假唤醒的部分,有不明白的可以翻一下之前的推文`
36. `done.await(timeout, TimeUnit.MILLISECONDS);//当前线程进入等待状态,释放锁的占有权,等待被唤醒`
37. `if (isDone() || System.currentTimeMillis() - start > timeout) {`
38. `break;`
39. `}`
40. `}`
41. `} catch (InterruptedException e) {`
42. `throw new RuntimeException(e);`
43. `} finally {`
44. `lock.unlock();`
45. `}`
46. `if (! isDone()) {`
47. `throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));`
48. `}`
49. `}`
50. `return returnFromResponse();`
51. `}`
52.
53. `public boolean isDone() {`
54. `return response != null;`
55. `}`
- done.await(timeout, TimeUnit.MILLISECONDS);在response为null时会持续等待。
- 在创建DefaultFuture时会向两个静态的map:FUTURES和CHANNELS中以requestId为key,以DefaultFuture和Channel为value存入map中。
- 最终该线程会阻塞在这个对象的下面这个方法调用上
这个Condition被唤醒的地方,就是我们接下来要看的HeaderExchangeHandler。
- ChannelHandler代码分析
1. `/**`
2. `* ChannelHandler. (API, Prototype, ThreadSafe)`
3. `*`
4. `* @see com.alibaba.dubbo.remoting.Transporter#bind(com.alibaba.dubbo.common.URL, ChannelHandler)`
5. `* @see com.alibaba.dubbo.remoting.Transporter#connect(com.alibaba.dubbo.common.URL, ChannelHandler)`
6. `* @author qian.lei`
7. `* @author william.liangf`
8. `*/`
9. `@SPI`
10. `public interface ChannelHandler {`
11.
12. `/**`
13. `* on channel connected.`
14. `*`
15. `* @param channel channel.`
16. `*/`
17. `void connected(Channel channel) throws RemotingException;`
18.
19. `/**`
20. `* on channel disconnected.`
21. `*`
22. `* @param channel channel.`
23. `*/`
24. `void disconnected(Channel channel) throws RemotingException;`
25.
26. `/**`
27. `* on message sent.`
28. `*`
29. `* @param channel channel.`
30. `* @param message message.`
31. `*/`
32. `void sent(Channel channel, Object message) throws RemotingException;`
33.
34. `/**`
35. `* on message received.`
36. `*`
37. `* @param channel channel.`
38. `* @param message message.`
39. `*/`
40. `void received(Channel channel, Object message) throws RemotingException;`
41.
42. `/**`
43. `* on exception caught.`
44. `*`
45. `* @param channel channel.`
46. `* @param exception exception.`
47. `*/`
48. `void caught(Channel channel, Throwable exception) throws RemotingException;`
49.
50. `}`
是一个spi拓展接口,(关于spi的部分,之后会有专门的文章介绍),是一个信息处理器,是将consumer和provider端的逻辑都包括在内的,在consumer端主要处理response的逻辑,它的部分类继承图为:
下面我们着重分析下com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler:
1. `/**`
2. `* ExchangeReceiver`
3. `*`
4. `* @author william.liangf`
5. `* @author chao.liuc`
6. `*/`
7. `public class HeaderExchangeHandler implements ChannelHandlerDelegate {`
8.
9. `protected static final Logger logger = LoggerFactory.getLogger(HeaderExchangeHandler.class);`
10.
11. `public static String KEY_READ_TIMESTAMP = HeartbeatHandler.KEY_READ_TIMESTAMP;`
12.
13. `public static String KEY_WRITE_TIMESTAMP = HeartbeatHandler.KEY_WRITE_TIMESTAMP;`
14.
15. `private final ExchangeHandler handler;`
16.
17. `public void received(Channel channel, Object message) throws RemotingException {`
18. `channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());`
19. `ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);`
20. `try {`
21. `if (message instanceof Request) {`
22. `// handle request.`
23. `Request request = (Request) message;`
24. `if (request.isEvent()) {`
25. `handlerEvent(channel, request);`
26. `} else {`
27. `if (request.isTwoWay()) {`
28. `Response response = handleRequest(exchangeChannel, request);`
29. `channel.send(response);`
30. `} else {`
31. `handler.received(exchangeChannel, request.getData());`
32. `}`
33. `}`
34. `} else if (message instanceof Response) {`
35. `//处理响应信息`
36. `handleResponse(channel, (Response) message);`
37. `} else if (message instanceof String) {`
38. `--------`
39. `} else {`
40. `handler.received(exchangeChannel, message);`
41. `}`
42. `} finally {`
43. `HeaderExchangeChannel.removeChannelIfDisconnected(channel);`
44. `}`
45. `}`
46.
47. `static void handleResponse(Channel channel, Response response) throws RemotingException {`
48. `if (response != null && !response.isHeartbeat()) {`
49. `DefaultFuture.received(channel, response);`
50. `}`
51. `}`
- 一看很明显的一点是使用了装饰器模式,对传入的ExchangeHandler进行了一层装饰,添加了些东西,然后进行方法调用.
- handleResponse方法中进行的操作是DefaultFuture.received(channel, response);这个DefaultFuture就是上面Channel中返回的DefaultFuture对象。
- com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#received:
1. `private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();`
2.
3. `private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();`
4.
5. `public static void received(Channel channel, Response response) {`
6. `try {`
7. `DefaultFuture future = FUTURES.remove(response.getId());`
8. `if (future != null) {`
9. `future.doReceived(response);`
10. `} else {`
11. `logger.warn("The timeout response finally returned at "`
12. `+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))`
13. `+ ", response " + response`
14. `+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()`
15. `+ " -> " + channel.getRemoteAddress()));`
16. `}`
17. `} finally {`
18. `CHANNELS.remove(response.getId());`
19. `}`
20. `}`
这里根据response.getId()与对应的request的id相同,删除FUTURES这个map中对应的DefaultFuture,并保证删除CHANNELS对应的kv信息。
- future.doReceived方法:
1. `private void doReceived(Response res) {`
2. `lock.lock();//获取锁`
3. `try {`
4. `response = res;`
5. `if (done != null) {`
6. `done.signal();//唤醒,如果传入了回调方法,这里是没有实际唤醒的,真正调用的是下面的回调方法`
7. `}`
8. `} finally {`
9. `lock.unlock();`
10. `}`
11. `if (callback != null) {`
12. `invokeCallback(callback);//如果事先设定的回调方法不为null,则执行回调,就是上面说的非阻塞拿到DefaultFuture结果的方式。`
13. `}`
14. `}`
done是在lock上处于等待状态的condition,这时候lock处于被释放的状态,所以doReceived可以在这时候获取到锁的占有权并尝试唤醒done,这时也需要在上面防止虚假唤醒的操作。
- 总结
-
dubbo的Client、Channel、Handler都使用了装饰器模式,真正工作的是传入的对象,外层对象是对传入的对象的工作进行了一定的装饰或增强。
-
Client包裹着Channel(NettyChannel,这个NettyChannel中也包括传入的Client的引用)和传入的真正的Client,如NettyClient和MinaClient。这里以NettyClient为例,NettyClient中包裹着netty原生的channel,这个channel是长连接的那个channel,也是最终真正工作的那个。
-
consumer端多线程的请求进入Client后会先调用request方法,非阻塞地返回DefaultFuture对象,然后从future对象中获取响应结果,获取结果的方式有两种,一种是通过get方法阻塞获取,还有一种是通过传入回调方法,在响应的时候进行回调。
-
DefaultFuture中维护着Map CHANNELS和Map FUTURES,这两个static map都以requestId作为key,其中每个response中也维护着和它对应的request相同的id,在响应时是通过这个id来寻找client端返回的那个DefaultFuture然后进行响应信息的获取。,>,>
-
这个相当于用DefaultFuture中的两个静态map维护着等待响应的请求信息,然后一个长连接作为worker来处理(在handler中进行),每有一个响应过来,静态map中对应的kv被移除,get方法阻塞的部分被唤醒。这样就完成了一个长连接,多个并发请求都能正常工作的效果。
-
这一节请参考下之前的关于http2的推文协助理解这么设计的思路——Request-Response通讯模式的优化(share connection、pipline、asynchrous)
