A线程异步调用B线程,整个过程链路是怎样的?比如dubbo调用netty的过程代码是怎样的?比如发送异步MQ消息的代码如何实现?
一、一图总结线程交互逻辑:
二、调用方A线程的视角
A线程调用B线程之前,A线程把自己存入一个全局的ConcurrentHashMap中,并把Map的key作为参数传给线程B,然后A线程调用Thread的wait()方法进入WAITTING状态,,等到B线程处理完后利用A的Key从Map中取出对应的线程,进行唤醒notify()
三、接收放B线程的视角
B线程本身也是一个异步处理,当接收到A的请求后,把A的参数存入处理队列,然后通知A接收成功(A接收到成功的消息后调用Thread的wait()方法进入WAITTING状);B的处理多线程按规则处理队列里面的任务,完成相应的任务后从全局的ConcurrentHashMap中取出A的线程,调用A的notify(),然后A开始运行后告诉B自己成功了,则B完成一次任务
四、线程状态机(供参考用)
五、具体解析
5.1 发送同步转异步
在网络通信框架,都会面临一个很经典的问题,上游业务调用外部方法是同步的,但是网络请求会设计成异步,当服务端处理完成以后告诉客户端,这个过程如何实现? 举例:业务调用本地的netty客户端发起请求以后阻塞,netty客户端异步发送请求给服务端,当服务端处理完成以后告诉客户端,当客户端收到响应以后,唤醒之前阻塞的业务方调用。这个过程如何实现?
而rocketMq 在这个地方的实现上代码是比较简洁的。(dubbo调用netty的过程也超级简直,有兴趣可以了解)代码如下:
代码片段一,同步发送消息的入口:
作者:随风
链接:https://www.zhihu.com/question/623539711/answer/3224195091
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture); //------------①
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); //------②
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
下面看下②处里面具体的代码:
代码片段二:
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
可以理解为responseFuture中有一个responseCommand对象,它放置的就是真正的响应结果,但是它只有在countDownLatch.countdown()以后,才不会阻塞在上面的this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS) 这里,所以可以想到,还有另外一个线程在收到服务端响应后会调用countDownLatch.countdown,继续看下文另外一个线程怎么在接收到服务端响应后来处理。
下面看下当客户端收到服务端响应是否会这样呢?代码如下:
代码片段三,收到服务端响应入口:
作者:随风
链接:https://www.zhihu.com/question/623539711/answer/3224195091
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque); //---①
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {
responseFuture.putResponse(cmd); //----②
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}
代码片段四:
public void putResponse(final RemotingCommand responseCommand) {
this.responseCommand = responseCommand;
this.countDownLatch.countDown(); //----------③
}
5.2. 回调采用支持独立线程池
作者:随风
链接:https://www.zhihu.com/question/623539711/answer/3224195091
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
ExecutorService executor = this.getCallbackExecutor();//------------1
if (executor != null) {
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
responseFuture.executeInvokeCallback();//------2
} catch (Throwable e) {
log.warn("execute callback in executor exception, and callback throw", e);
} finally {
responseFuture.release();
}
}
});
} catch (Exception e) {
runInThisThread = true;
log.warn("execute callback in executor exception, maybe executor busy", e);
}
} else {
runInThisThread = true;
}
if (runInThisThread) {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("executeInvokeCallback Exception", e);
} finally {
responseFuture.release();
}
}
}
上面的这段代码是消息发送成功后触发业务方回调的方法,
设计上采用:ExecutorService executor = this.getCallbackExecutor(); 这里采用线程池异步处理,可以防止回
调方法的不确定性(超时等)阻碍消息接收的主线程。