优秀的编程知识分享平台

网站首页 > 技术文章 正文

JAVA线程之间如何通信?(java线程同步的方法)

nanyue 2024-09-01 20:39:57 技术文章 17 ℃

#挑战30天在头条写日记#

A线程异步调用B线程,整个过程链路是怎样的?比如dubbo调用netty的过程代码是怎样的?比如发送异步MQ消息的代码如何实现?

一、一图总结线程交互逻辑:

二、调用方A线程的视角

A线程调用B线程之前,A线程把自己存入一个全局的ConcurrentHashMap中,并把Map的key作为参数传给线程B,然后A线程调用Thread的wait()方法进入WAITTING状态,,等到B线程处理完后利用A的Key从Map中取出对应的线程,进行唤醒notify()

三、接收放B线程的视角

B线程本身也是一个异步处理,当接收到A的请求后,把A的参数存入处理队列,然后通知A接收成功(A接收到成功的消息后调用Thread的wait()方法进入WAITTING状);B的处理多线程按规则处理队列里面的任务,完成相应的任务后从全局的ConcurrentHashMap中取出A的线程,调用A的notify(),然后A开始运行后告诉B自己成功了,则B完成一次任务

四、线程状态机(供参考用)

五、具体解析

5.1 发送同步转异步

在网络通信框架,都会面临一个很经典的问题,上游业务调用外部方法是同步的,但是网络请求会设计成异步,当服务端处理完成以后告诉客户端,这个过程如何实现? 举例:业务调用本地的netty客户端发起请求以后阻塞,netty客户端异步发送请求给服务端,当服务端处理完成以后告诉客户端,当客户端收到响应以后,唤醒之前阻塞的业务方调用。这个过程如何实现?

而rocketMq 在这个地方的实现上代码是比较简洁的。(dubbo调用netty的过程也超级简直,有兴趣可以了解)代码如下:

代码片段一,同步发送消息的入口:

作者:随风
链接:https://www.zhihu.com/question/623539711/answer/3224195091
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
        final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        final int opaque = request.getOpaque();

        try {
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
            this.responseTable.put(opaque, responseFuture); //------------①
            final SocketAddress addr = channel.remoteAddress();
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }

                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    responseFuture.putResponse(null);
                    log.warn("send a request command to channel <" + addr + "> failed.");
                }
            });

            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); //------②
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }
  • 上面代码①处,相当于每次请求的时候,先生成了一个响应对象ResponseFuture(里面的响应内容是空的),然后将其放置responseTable(ConcurrentHashMap)中,并且有个唯一标识opaque,这个东西会传到服务端,后面服务端再带过来。
  • 上面代码②出,相当于要从响应里面拿结果,拿不到就会阻塞到超时。
  • 下面看下②处里面具体的代码:

    代码片段二:

    public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
            this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
            return this.responseCommand;
       }

    可以理解为responseFuture中有一个responseCommand对象,它放置的就是真正的响应结果,但是它只有在countDownLatch.countdown()以后,才不会阻塞在上面的this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS) 这里,所以可以想到,还有另外一个线程在收到服务端响应后会调用countDownLatch.countdown,继续看下文另外一个线程怎么在接收到服务端响应后来处理。

    下面看下当客户端收到服务端响应是否会这样呢?代码如下:

    代码片段三,收到服务端响应入口:

    作者:随风
    链接:https://www.zhihu.com/question/623539711/answer/3224195091
    来源:知乎
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
    
    public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
            final int opaque = cmd.getOpaque();
            final ResponseFuture responseFuture = responseTable.get(opaque); //---①
            if (responseFuture != null) {
                responseFuture.setResponseCommand(cmd);
    
                responseTable.remove(opaque);
    
                if (responseFuture.getInvokeCallback() != null) {
                    executeInvokeCallback(responseFuture);
                } else {
                    responseFuture.putResponse(cmd);  //----②
                    responseFuture.release();
                }
            } else {
                log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
                log.warn(cmd.toString());
            }
        }

    代码片段四:

    public void putResponse(final RemotingCommand responseCommand) {
            this.responseCommand = responseCommand;
            this.countDownLatch.countDown(); //----------③
        }
  • 上面的① 就是客户端拿到响应后,根据全文唯一的标识opaque拿到之前代码片段一放concurrenthashMap中的响应对象
  • 上面的② 就是要把结果给放在我们响应对象里面
  • 上面的③ 就是在放结果的时候调用countDownLatch.countDown(),那么上文代码片段二,阻塞的线程得以正常执行
  • 5.2. 回调采用支持独立线程池

    作者:随风
    链接:https://www.zhihu.com/question/623539711/answer/3224195091
    来源:知乎
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
    
    private void executeInvokeCallback(final ResponseFuture responseFuture) {
            boolean runInThisThread = false;
            ExecutorService executor = this.getCallbackExecutor();//------------1
            if (executor != null) {
                try {
                    executor.submit(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                responseFuture.executeInvokeCallback();//------2
                            } catch (Throwable e) {
                                log.warn("execute callback in executor exception, and callback throw", e);
                            } finally {
                                responseFuture.release();
                            }
                        }
                    });
                } catch (Exception e) {
                    runInThisThread = true;
                    log.warn("execute callback in executor exception, maybe executor busy", e);
                }
            } else {
                runInThisThread = true;
            }
    
            if (runInThisThread) {
                try {
                    responseFuture.executeInvokeCallback();
                } catch (Throwable e) {
                    log.warn("executeInvokeCallback Exception", e);
                } finally {
                    responseFuture.release();
                }
            }
        }

    上面的这段代码是消息发送成功后触发业务方回调的方法,

    设计上采用:ExecutorService executor = this.getCallbackExecutor(); 这里采用线程池异步处理,可以防止回

    调方法的不确定性(超时等)阻碍消息接收的主线程。

    Tags:

    最近发表
    标签列表