• 【MQ】java 从零开始实现消息队列 mq-02-如何实现生产者调用消费者?


    前景回顾

    上一节我们学习了如何实现基于 netty 客服端和服务端的启动。

    【mq】从零开始实现 mq-01-生产者、消费者启动

    【mq】java 从零开始实现消息队列 mq-02-如何实现生产者调用消费者?

    那么客户端如何调用服务端呢?

    我们本节就来一起实现一下。

    02.png

    消费者实现

    启动类的调整

    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(workerGroup, bossGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<Channel>() {
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    ch.pipeline()
                            .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf))
                            .addLast(new MqConsumerHandler(invokeService));
                }
            })
            // 这个参数影响的是还没有被accept 取出的连接
            .option(ChannelOption.SO_BACKLOG, 128)
            // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。
            .childOption(ChannelOption.SO_KEEPALIVE, true);
    

    这里我们通过指定分隔符解决 netty 粘包问题。

    解决 netty 粘包问题

    MqConsumerHandler 处理类

    MqConsumerHandler 的实现如下,添加对应的业务处理逻辑。

    package com.github.houbb.mq.consumer.handler;
    
    /**
     * @author binbin.hou
     * @since 1.0.0
     */
    public class MqConsumerHandler extends SimpleChannelInboundHandler {
    
        private static final Log log = LogFactory.getLog(MqConsumerHandler.class);
    
        /**
         * 调用管理类
         * @since 1.0.0
         */
        private final IInvokeService invokeService;
    
        public MqConsumerHandler(IInvokeService invokeService) {
            this.invokeService = invokeService;
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuf = (ByteBuf) msg;
            byte[] bytes = new byte[byteBuf.readableBytes()];
            byteBuf.readBytes(bytes);
    
            RpcMessageDto rpcMessageDto = null;
            try {
                rpcMessageDto = JSON.parseObject(bytes, RpcMessageDto.class);
            } catch (Exception exception) {
                log.error("RpcMessageDto json 格式转换异常 {}", new String(bytes));
                return;
            }
    
            if (rpcMessageDto.isRequest()) {
                MqCommonResp commonResp = this.dispatch(rpcMessageDto, ctx);
    
                if(commonResp == null) {
                    log.debug("当前消息为 null,忽略处理。");
                    return;
                }
    
                writeResponse(rpcMessageDto, commonResp, ctx);
            } else {
                final String traceId = rpcMessageDto.getTraceId();
    
                // 丢弃掉 traceId 为空的信息
                if(StringUtil.isBlank(traceId)) {
                    log.debug("[Server Response] response traceId 为空,直接丢弃", JSON.toJSON(rpcMessageDto));
                    return;
                }
    
                // 添加消息
                invokeService.addResponse(traceId, rpcMessageDto);
            }
        }
    }
    

    rpc 消息体定义

    为了统一标准,我们的 rpc 消息体 RpcMessageDto 定义如下:

    package com.github.houbb.mq.common.rpc;
    
    /**
     * @author binbin.hou
     * @since 1.0.0
     */
    public class RpcMessageDto implements Serializable {
    
        /**
         * 请求时间
         */
        private long requestTime;
    
        /**
         * 请求标识
         */
        private String traceId;
    
        /**
         * 方法类型
         */
        private String methodType;
    
        /**
         * 是否为请求消息
         */
        private boolean isRequest;
    
        private String respCode;
    
        private String respMsg;
    
        private String json;
    
        //getter&setter
    
    }
    

    消息分发

    对于接收到的消息体 RpcMessageDto,分发逻辑如下:

    /**
     * 消息的分发
     *
     * @param rpcMessageDto 入参
     * @param ctx 上下文
     * @return 结果
     */
    private MqCommonResp dispatch(RpcMessageDto rpcMessageDto, ChannelHandlerContext ctx) {
        final String methodType = rpcMessageDto.getMethodType();
        final String json = rpcMessageDto.getJson();
        String channelId = ChannelUtil.getChannelId(ctx);
        log.debug("channelId: {} 接收到 method: {} 内容:{}", channelId,
                methodType, json);
    
        // 消息发送
        if(MethodType.P_SEND_MESSAGE.equals(methodType)) {
            // 日志输出
            log.info("收到服务端消息: {}", json);
            // 如果是 broker,应该进行处理化等操作。
            MqCommonResp resp = new MqCommonResp();
            resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
            resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
            return resp;
        }
        throw new UnsupportedOperationException("暂不支持的方法类型");
    }
    

    这里对于接收到的消息,只做一个简单的日志输出,后续将添加对应的业务逻辑处理。

    结果回写

    收到请求以后,我们需要返回对应的响应。

    基于 channel 的回写实现如下:

    /**
     * 结果写回
     *
     * @param req  请求
     * @param resp 响应
     * @param ctx  上下文
     */
    private void writeResponse(RpcMessageDto req,
                               Object resp,
                               ChannelHandlerContext ctx) {
        final String id = ctx.channel().id().asLongText();
        RpcMessageDto rpcMessageDto = new RpcMessageDto();
        // 响应类消息
        rpcMessageDto.setRequest(false);
        rpcMessageDto.setTraceId(req.getTraceId());
        rpcMessageDto.setMethodType(req.getMethodType());
        rpcMessageDto.setRequestTime(System.currentTimeMillis());
        String json = JSON.toJSONString(resp);
        rpcMessageDto.setJson(json);
        // 回写到 client 端
        ByteBuf byteBuf = DelimiterUtil.getMessageDelimiterBuffer(rpcMessageDto);
        ctx.writeAndFlush(byteBuf);
        log.debug("[Server] channel {} response {}", id, JSON.toJSON(rpcMessageDto));
    }
    

    调用管理类

    为了方便管理异步返回的请求结果,我们统一定义了 IInvokeService 类,用于管理请求与响应。

    接口

    package com.github.houbb.mq.common.support.invoke;
    
    import com.github.houbb.mq.common.rpc.RpcMessageDto;
    
    /**
     * 调用服务接口
     * @author binbin.hou
     * @since 1.0.0
     */
    public interface IInvokeService {
    
        /**
         * 添加请求信息
         * @param seqId 序列号
         * @param timeoutMills 超时时间
         * @return this
         * @since 1.0.0
         */
        IInvokeService addRequest(final String seqId,
                                  final long timeoutMills);
    
        /**
         * 放入结果
         * @param seqId 唯一标识
         * @param rpcResponse 响应结果
         * @return this
         * @since 1.0.0
         */
        IInvokeService addResponse(final String seqId, final RpcMessageDto rpcResponse);
    
        /**
         * 获取标志信息对应的结果
         * @param seqId 序列号
         * @return 结果
         * @since 1.0.0
         */
        RpcMessageDto getResponse(final String seqId);
    
    }
    

    实现

    实现本身也不难。

    package com.github.houbb.mq.common.support.invoke.impl;
    
    /**
     * 调用服务接口
     * @author binbin.hou
     * @since 1.0.0
     */
    public class InvokeService implements IInvokeService {
    
        private static final Log logger = LogFactory.getLog(InvokeService.class);
    
        /**
         * 请求序列号 map
         * (1)这里后期如果要添加超时检测,可以添加对应的超时时间。
         * 可以把这里调整为 map
         *
         * key: seqId 唯一标识一个请求
         * value: 存入该请求最长的有效时间。用于定时删除和超时判断。
         * @since 0.0.2
         */
        private final ConcurrentHashMap<String, Long> requestMap;
    
        /**
         * 响应结果
         * @since 1.0.0
         */
        private final ConcurrentHashMap<String, RpcMessageDto> responseMap;
    
        public InvokeService() {
            requestMap = new ConcurrentHashMap<>();
            responseMap = new ConcurrentHashMap<>();
    
            final Runnable timeoutThread = new TimeoutCheckThread(requestMap, responseMap);
            Executors.newScheduledThreadPool(1)
                    .scheduleAtFixedRate(timeoutThread,60, 60, TimeUnit.SECONDS);
        }
    
        @Override
        public IInvokeService addRequest(String seqId, long timeoutMills) {
            logger.debug("[Invoke] start add request for seqId: {}, timeoutMills: {}", seqId,
                    timeoutMills);
    
            final long expireTime = System.currentTimeMillis()+timeoutMills;
            requestMap.putIfAbsent(seqId, expireTime);
    
            return this;
        }
    
        @Override
        public IInvokeService addResponse(String seqId, RpcMessageDto rpcResponse) {
            // 1. 判断是否有效
            Long expireTime = this.requestMap.get(seqId);
            // 如果为空,可能是这个结果已经超时了,被定时 job 移除之后,响应结果才过来。直接忽略
            if(ObjectUtil.isNull(expireTime)) {
                return this;
            }
    
            //2. 判断是否超时
            if(System.currentTimeMillis() > expireTime) {
                logger.debug("[Invoke] seqId:{} 信息已超时,直接返回超时结果。", seqId);
                rpcResponse = RpcMessageDto.timeout();
            }
    
            // 这里放入之前,可以添加判断。
            // 如果 seqId 必须处理请求集合中,才允许放入。或者直接忽略丢弃。
            // 通知所有等待方
            responseMap.putIfAbsent(seqId, rpcResponse);
            logger.debug("[Invoke] 获取结果信息,seqId: {}, rpcResponse: {}", seqId, JSON.toJSON(rpcResponse));
            logger.debug("[Invoke] seqId:{} 信息已经放入,通知所有等待方", seqId);
    
            // 移除对应的 requestMap
            requestMap.remove(seqId);
            logger.debug("[Invoke] seqId:{} remove from request map", seqId);
    
            // 同步锁
            synchronized (this) {
                this.notifyAll();
                logger.debug("[Invoke] {} notifyAll()", seqId);
            }
    
    
            return this;
        }
    
        @Override
        public RpcMessageDto getResponse(String seqId) {
            try {
                RpcMessageDto rpcResponse = this.responseMap.get(seqId);
                if(ObjectUtil.isNotNull(rpcResponse)) {
                    logger.debug("[Invoke] seq {} 对应结果已经获取: {}", seqId, rpcResponse);
                    return rpcResponse;
                }
    
                // 进入等待
                while (rpcResponse == null) {
                    logger.debug("[Invoke] seq {} 对应结果为空,进入等待", seqId);
    
                    // 同步等待锁
                    synchronized (this) {
                        this.wait();
                    }
    
                    logger.debug("[Invoke] {} wait has notified!", seqId);
    
                    rpcResponse = this.responseMap.get(seqId);
                    logger.debug("[Invoke] seq {} 对应结果已经获取: {}", seqId, rpcResponse);
                }
    
                return rpcResponse;
            } catch (InterruptedException e) {
                logger.error("获取响应异常", e);
                throw new MqException(MqCommonRespCode.RPC_GET_RESP_FAILED);
            }
        }
    
    }
    

    这里 getResponse 获取不到会进入等待,直到 addResponse 唤醒。

    但是这也有一个问题,如果一个请求的响应丢失了怎么办?

    总不能一直等待吧。

    TimeoutCheckThread 超时检测线程

    超时检测线程就可以帮我们处理一些超时未返回的结果。

    package com.github.houbb.mq.common.support.invoke.impl;
    
    import com.github.houbb.heaven.util.common.ArgUtil;
    import com.github.houbb.mq.common.rpc.RpcMessageDto;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * 超时检测线程
     * @author binbin.hou
     * @since 0.0.2
     */
    public class TimeoutCheckThread implements Runnable {
    
        /**
         * 请求信息
         * @since 0.0.2
         */
        private final ConcurrentHashMap<String, Long> requestMap;
    
        /**
         * 请求信息
         * @since 0.0.2
         */
        private final ConcurrentHashMap<String, RpcMessageDto> responseMap;
    
        /**
         * 新建
         * @param requestMap  请求 Map
         * @param responseMap 结果 map
         * @since 0.0.2
         */
        public TimeoutCheckThread(ConcurrentHashMap<String, Long> requestMap,
                                  ConcurrentHashMap<String, RpcMessageDto> responseMap) {
            ArgUtil.notNull(requestMap, "requestMap");
            this.requestMap = requestMap;
            this.responseMap = responseMap;
        }
    
        @Override
        public void run() {
            for(Map.Entry<String, Long> entry : requestMap.entrySet()) {
                long expireTime = entry.getValue();
                long currentTime = System.currentTimeMillis();
    
                if(currentTime > expireTime) {
                    final String key = entry.getKey();
                    // 结果设置为超时,从请求 map 中移除
                    responseMap.putIfAbsent(key, RpcMessageDto.timeout());
                    requestMap.remove(key);
                }
            }
        }
    
    }
    

    处理逻辑就是定时检测,如果超时了,就默认设置结果为超时,并且从请求集合中移除。

    消息生产者实现

    启动核心类

    public class MqProducer extends Thread implements IMqProducer {
    
        private static final Log log = LogFactory.getLog(MqProducer.class);
    
        /**
         * 分组名称
         */
        private final String groupName;
    
        /**
         * 端口号
         */
        private final int port;
    
        /**
         * 中间人地址
         */
        private String brokerAddress  = "";
    
        /**
         * channel 信息
         * @since 0.0.2
         */
        private ChannelFuture channelFuture;
    
        /**
         * 客户端处理 handler
         * @since 0.0.2
         */
        private ChannelHandler channelHandler;
    
        /**
         * 调用管理服务
         * @since 0.0.2
         */
        private final IInvokeService invokeService = new InvokeService();
    
        /**
         * 获取响应超时时间
         * @since 0.0.2
         */
        private long respTimeoutMills = 5000;
    
        /**
         * 可用标识
         * @since 0.0.2
         */
        private volatile boolean enableFlag = false;
    
        /**
         * 粘包处理分隔符
         * @since 1.0.0
         */
        private String delimiter = DelimiterUtil.DELIMITER;
    
        //set 方法
    
        
    
        @Override
        public synchronized void run() {
            // 启动服务端
            log.info("MQ 生产者开始启动客户端 GROUP: {}, PORT: {}, brokerAddress: {}",
                    groupName, port, brokerAddress);
    
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                // channel handler
                this.initChannelHandler();
    
                // 省略,同以前
    
                // 标识为可用
                enableFlag = true;
            } catch (Exception e) {
                log.error("MQ 生产者启动遇到异常", e);
                throw new MqException(ProducerRespCode.RPC_INIT_FAILED);
            }
        }
    
    }
    

    其中初始化 handler 的实现如下:

    private void initChannelHandler() {
        final ByteBuf delimiterBuf = DelimiterUtil.getByteBuf(delimiter);
    
        final MqProducerHandler mqProducerHandler = new MqProducerHandler();
        mqProducerHandler.setInvokeService(invokeService);
    
        // handler 实际上会被多次调用,如果不是 @Shareable,应该每次都重新创建。
        ChannelHandler handler = new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
                ch.pipeline()
                        .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf))
                        .addLast(mqProducerHandler);
            }
        };
        this.channelHandler = handler;
    }
    

    MqProducerHandler 生产者处理逻辑

    和消费者处理逻辑类似。

    这里最核心的就是添加响应结果:invokeService.addResponse(rpcMessageDto.getTraceId(), rpcMessageDto);

    package com.github.houbb.mq.producer.handler;
    
    /**
     * @author binbin.hou
     * @since 1.0.0
     */
    public class MqProducerHandler extends SimpleChannelInboundHandler {
    
        private static final Log log = LogFactory.getLog(MqProducerHandler.class);
    
        /**
         * 调用管理类
         */
        private IInvokeService invokeService;
    
        public void setInvokeService(IInvokeService invokeService) {
            this.invokeService = invokeService;
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuf = (ByteBuf)msg;
            byte[] bytes = new byte[byteBuf.readableBytes()];
            byteBuf.readBytes(bytes);
    
            String text = new String(bytes);
            log.debug("[Client] channelId {} 接收到消息 {}", ChannelUtil.getChannelId(ctx), text);
    
            RpcMessageDto rpcMessageDto = null;
            try {
                rpcMessageDto = JSON.parseObject(bytes, RpcMessageDto.class);
            } catch (Exception exception) {
                log.error("RpcMessageDto json 格式转换异常 {}", JSON.parse(bytes));
                return;
            }
    
            if(rpcMessageDto.isRequest()) {
                // 请求类
                final String methodType = rpcMessageDto.getMethodType();
                final String json = rpcMessageDto.getJson();
            } else {
                // 丢弃掉 traceId 为空的信息
                if(StringUtil.isBlank(rpcMessageDto.getTraceId())) {
                    log.debug("[Client] response traceId 为空,直接丢弃", JSON.toJSON(rpcMessageDto));
                    return;
                }
    
                invokeService.addResponse(rpcMessageDto.getTraceId(), rpcMessageDto);
                log.debug("[Client] response is :{}", JSON.toJSON(rpcMessageDto));
            }
        }
    }
    

    消息的发送

    关心请求结果的:

    public SendResult send(MqMessage mqMessage) {
        String messageId = IdHelper.uuid32();
        mqMessage.setTraceId(messageId);
        mqMessage.setMethodType(MethodType.P_SEND_MESSAGE);
        MqCommonResp resp = callServer(mqMessage, MqCommonResp.class);
        if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
            return SendResult.of(messageId, SendStatus.SUCCESS);
        }
        return SendResult.of(messageId, SendStatus.FAILED);
    }
    

    不关心请求结果的发送:

    public SendResult sendOneWay(MqMessage mqMessage) {
        String messageId = IdHelper.uuid32();
        mqMessage.setTraceId(messageId);
        mqMessage.setMethodType(MethodType.P_SEND_MESSAGE);
        this.callServer(mqMessage, null);
        return SendResult.of(messageId, SendStatus.SUCCESS);
    }
    

    其中 callServer 实现如下:

    /**
     * 调用服务端
     * @param commonReq 通用请求
     * @param respClass 类
     * @param <T> 泛型
     * @param <R> 结果
     * @return 结果
     * @since 1.0.0
     */
    public <T extends MqCommonReq, R extends MqCommonResp> R callServer(T commonReq, Class<R> respClass) {
        final String traceId = commonReq.getTraceId();
        final long requestTime = System.currentTimeMillis();
        RpcMessageDto rpcMessageDto = new RpcMessageDto();
        rpcMessageDto.setTraceId(traceId);
        rpcMessageDto.setRequestTime(requestTime);
        rpcMessageDto.setJson(JSON.toJSONString(commonReq));
        rpcMessageDto.setMethodType(commonReq.getMethodType());
        rpcMessageDto.setRequest(true);
        // 添加调用服务
        invokeService.addRequest(traceId, respTimeoutMills);
    
        // 遍历 channel
        // 关闭当前线程,以获取对应的信息
        // 使用序列化的方式
        ByteBuf byteBuf = DelimiterUtil.getMessageDelimiterBuffer(rpcMessageDto);
        //负载均衡获取 channel
        Channel channel = channelFuture.channel();
        channel.writeAndFlush(byteBuf);
        String channelId = ChannelUtil.getChannelId(channel);
    
        log.debug("[Client] channelId {} 发送消息 {}", channelId, JSON.toJSON(rpcMessageDto));
        if (respClass == null) {
            log.debug("[Client] 当前消息为 one-way 消息,忽略响应");
            return null;
        } else {
            //channelHandler 中获取对应的响应
            RpcMessageDto messageDto = invokeService.getResponse(traceId);
            if (MqCommonRespCode.TIMEOUT.getCode().equals(messageDto.getRespCode())) {
                throw new MqException(MqCommonRespCode.TIMEOUT);
            }
            String respJson = messageDto.getJson();
            return JSON.parseObject(respJson, respClass);
        }
    }
    

    测试代码

    启动消费者

    MqConsumerPush mqConsumerPush = new MqConsumerPush();
    mqConsumerPush.start();
    

    启动日志如下:

    [DEBUG] [2022-04-21 19:55:26.346] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
    [INFO] [2022-04-21 19:55:26.369] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者开始启动服务端 groupName: C_DEFAULT_GROUP_NAME, port: 9527, brokerAddress: 
    [INFO] [2022-04-21 19:55:27.845] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者启动完成,监听【9527】端口
    

    启动生产者

    MqProducer mqProducer = new MqProducer();
    mqProducer.start();
    
    //等待启动完成
    while (!mqProducer.isEnableFlag()) {
        System.out.println("等待初始化完成...");
        DateUtil.sleep(100);
    }
    
    String message = "HELLO MQ!";
    MqMessage mqMessage = new MqMessage();
    mqMessage.setTopic("TOPIC");
    mqMessage.setTags(Arrays.asList("TAGA", "TAGB"));
    mqMessage.setPayload(message.getBytes(StandardCharsets.UTF_8));
    
    SendResult sendResult = mqProducer.send(mqMessage);
    System.out.println(JSON.toJSON(sendResult));
    

    生产者日志:

    [INFO] [2022-04-21 19:56:39.609] [Thread-0] [c.g.h.m.p.c.MqProducer.run] - MQ 生产者启动客户端完成,监听端口:9527
    ...
    [DEBUG] [2022-04-21 19:56:39.895] [main] [c.g.h.m.c.s.i.i.InvokeService.addRequest] - [Invoke] start add request for seqId: a70ea2c4325641d6a5b198323228dc24, timeoutMills: 5000
    ...
    [DEBUG] [2022-04-21 19:56:40.282] [main] [c.g.h.m.c.s.i.i.InvokeService.getResponse] - [Invoke] seq a70ea2c4325641d6a5b198323228dc24 对应结果已经获取: com.github.houbb.mq.common.rpc.RpcMessageDto@a8f0b4
    ...
    {"messageId":"a70ea2c4325641d6a5b198323228dc24","status":"SUCCESS"}
    

    消费者日志:

    [DEBUG] [2022-04-21 19:56:40.179] [nioEventLoopGroup-2-1] [c.g.h.m.c.h.MqConsumerHandler.dispatch] - channelId: 502b73fffec4485c-00003954-00000001-384d194f6233433e-c8246542 接收到 method: P_SEND_MESSAGE 内容:{"methodType":"P_SEND_MESSAGE","payload":"SEVMTE8gTVEh","tags":["TAGA","TAGB"],"topic":"TOPIC","traceId":"a70ea2c4325641d6a5b198323228dc24"}
    
    [INFO] [2022-04-21 19:56:40.180] [nioEventLoopGroup-2-1] [c.g.h.m.c.h.MqConsumerHandler.dispatch] - 收到服务端消息: {"methodType":"P_SEND_MESSAGE","payload":"SEVMTE8gTVEh","tags":["TAGA","TAGB"],"topic":"TOPIC","traceId":"a70ea2c4325641d6a5b198323228dc24"}
    
    [DEBUG] [2022-04-21 19:56:40.234] [nioEventLoopGroup-2-1] [c.g.h.m.c.h.MqConsumerHandler.writeResponse] - [Server] channel 502b73fffec4485c-00003954-00000001-384d194f6233433e-c8246542 response {"requestTime":1650542200182,"traceId":"a70ea2c4325641d6a5b198323228dc24","request":false,"methodType":"P_SEND_MESSAGE","json":"{\"respCode\":\"0000\",\"respMessage\":\"成功\"}"}
    

    可以看到消费者成功的获取到了生产者的消息。

    小结

    到这里,我们就实现了一个消息生产者调用消费者的实现。

    但是你可能会问,这不就是 rpc 吗?

    没有解耦。

    是的,为了解决耦合问题,我们将在下一节引入 broker 消息的中间人。

    希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

    我是老马,期待与你的下次重逢。

    开源地址

    The message queue in java.(java 简易版本 mq 实现) https://github.com/houbb/mq

    拓展阅读

    rpc-从零开始实现 rpc https://github.com/houbb/rpc

  • 相关阅读:
    【基于 python 云计算的娱乐资讯软件 V1.0操作指导及说明书】
    Docker镜像仓库:存储与分发Docker镜像的中央仓库
    C#学习笔记--变量类型的转换
    小红书运营怎么做,快速提升品牌印象
    配置Tomcat修改默认ROOT路径为自己的路径
    数据库系统及应用复习——第九章关系查询处理和查询优化
    resetFields()报错
    【JVM】JVM异常不打印堆栈信息 [ -XX:-OmitStackTraceInFastThrow ]
    用VS Code搞Qt6:编译附加模块
    云服务器CentOS8.2安装部署Docker一文详解
  • 原文地址:https://www.cnblogs.com/houbbBlogs/p/16182062.html