大彬大约 3 分钟

一、背景 基于RokcetMQ可以实现异步处理、流量削锋、业务解耦,通常是依赖RocketMQ的发布订阅模式。今天分享RocketMQ的新特性,基于Request/Reply模式来实现RPC的功能。该模式是在v4.6作为RocketMQ新特性引入,但在在v4.7.1上才比较完善。

二、设计思路 从整个数据流的角度上来说,发布/订阅模式中生产者和消费者之间是异步处理的,生产者负责把消息投递到RocketMQ上,而消费者负责处理数据,如果把生产者当做上游系统,消费者是下游系统,那么上下游系统之间是没有任何的状态交流的。而我们知道,RPC上下游系统之间是需要状态交互的,简单来说,要想实现RPC功能,在整个数据链路上,原先上下游系统之间是异步交互的模式,首先需要把异步模式换成同步模式。

异步模式:

把异步模式换成同步模式,需要在生产者发送消息到MQ之后,保持原有的状态,比如可以用一个Map集合去统一维护,等到消费者处理完数据返回响应后,再从Map集合中拿到对应的请求进行处理。其中涉及到怎么去标识一个请求,这里可以用UUID或者雪花id去标记。

同步模式:

RocketMQ整体的处理思路跟上面是类似的,DefaultMQProducerImpl#request负责RPC消息的下发,而DefaultMQPushConsumer中负责消息的消费。具体用法可以看RocketMQ源码example中的RPC部分。

三、结构定义 RocketMQ中是依赖于Message的Properties来区分不同的请求,在调用DefaultMQProducerImpl#request进行消息下发之间会先给消息设置不同的属性,通过属性来保证上下游之间的处理是同一个请求。

设置的属性有:

CORRELATION_ID:消息的标识Id,这里对应是一个UUID REPLY_TO_CLIENT:消息下发的客户端Id TTL:消息下发的超时时间,单位ms

其实就类似于HTTP请求中的头部内容一样。

之后还会校验一下消息中对应Topic的一个合法性。

四、消息下发 RocketMQ将下发的客户端封装成RequestResponseFuture,包含客户端Id,请求超时时间,同时根据客户端Id维护在ConcurrentHashMap,调用DefaultMQProducerImpl#sendDefaultImpl下发消息,根据下发消息的回调函数确认消息下发的状态。

消息下发后会调用waitResponse,waitResponse调用CountDownLatch进入阻塞状态,等待消息消费之后的响应。

CountDownLatch中的计数器是1,说明每个请求都会独立隔离阻塞。

五、消息响应 当服务端(消费者)收到消息处理完返回响应时,会调用ReplyMessageProcessor#pushReplyMessage封装响应的内容,处理响应的头部信息和返回body的参数,最终封装成一个PUSH_REPLY_MESSAGE_TO_CLIENT的请求命令发给客户端。

客户端(生产者)收到请求后,会调用ClientRemotingProcessor#processRequest,判断是PUSH_REPLY_MESSAGE_TO_CLIENT命令会调用receiveReplyMessage,将接收到的数据封装成新的消息,接着调用响应处理的处理器。

ClientRemotingProcessor#processReplyMessage中主要做的从消息中获取消息的Id,从ConcurrentHashMap中定位到具体的请求,将返回消息封装到RequestResponseFuture中,同时CountDownLatch的计数值减1,此时线程阻塞状态被释放,之后便将消息响应给到客户端。

六、总结 所以整体上看来,RocketMQ的Request/Reply模式,其实是利用客户端线程阻塞来换取请求异步变同步以及RocketMQ的回调机制从而间接的实现了RPC效果,但是相比直接RPC调用,数据的链路更长,性能肯定是会有损耗,但是请求会持久化,所以给了重复下发提供了可能。

版权声明:本文为CSDN博主「林风自在」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/lveex/article/details/122514893open in new window

Loading...