• 云原生中间件RocketMQ-生产者消息返回状态,延迟消息,自定义消息发送规则,netty框架部分代码分析


    生产者消息返回状态

    FLUSH_DISK_TIMEOUT

    如果设置了 FlushDiskType=SYNC_FLUSH (默认是 ASYNC_FLUSH),并且 Broker 没有在 syncFlushTimeout (默认是 5 秒)设置的时间内完成刷盘,就会收到此状态码。

    FLUSH_SLAVE_TIMEOUT

    如果设置为 SYNC_MASTER,并且 slave Broker 没有在 syncFlushTimeout 设定时间内完成同步,就会收到此状态码。

    SLAVE_NOT_AVAILABLE

    如果设置为 SYNC_MASTER,并没有配置 slave Broker,就会收到此状态码。

    SEND_OK

    这个状态可以简单理解为,没有发生上面列出的三个问题状态就是SEND_OK。需要注意的是,SEND_OK 并不意味着可靠,如果想严格确保没有消息丢失,需要开启 SYNC_MASTER or SYNC_FLUSH。
    如果收到了 FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT,意味着消息会丢失,有2个选择,一是无所谓,适用于消息不需要保证不丢失的场景,二是重发,但可能产生消息重复,这就需要consumer进行去重控制。如果收到了 SLAVE_NOT_AVAILABLE 就要及时进行处理了。

    延迟消息

    延迟消息:消息发到Broker后,要特定的时间才会被Consumer消费。
    目前只支持固定精度的定时消息,可以在rocketmq-store模块MessageStoreConfig配置类中看到对应的定时消息的配置。

    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    
    • 1

    在这里插入图片描述
    设置延迟发送消息等级:

    //	创建消息
    //	主题
    Message message = new Message("test_quick_topic",
                                  //	标签
                                  "TagA",
                                  // 	用户自定义的key ,唯一的标识
                                  "key" + i,
                                  //	消息内容实体(byte[])
                                  ("Hello RocketMQ" + i).getBytes());	
    // 设置延迟等级为3,也就是10s发送一条
    message.setDelayTimeLevel(3);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    自定义消息发送规则 MessageQueueSelector

    如何把消息发送到指定的队列(Message Queue)?

    // 同步发送消息,直接获取发送结果(指定第二个队列)
    SendResult sr = producer.send(message, new MessageQueueSelector() {
    
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            Integer queueNumber = (Integer)arg;
            return mqs.get(queueNumber);
        }
    }, 2);
    System.err.println(sr);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    Netty底层框架解析

    在这里插入图片描述
    在这里插入图片描述
    NettyRemotingServer实现Netty服务器端功能,接受数据包,在服务器端处理后发送给客户端。
    NettyRemotingClient实现Netty客户端功能。

    NettyRemotingServer

    start() 方法

    start方法主要启动Netty服务器,并在绑定端口后阻塞主线程。这里主要看看Netty服务器端装配了哪些ChannelHandler:

    ServerBootstrap childHandler =
                this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                    .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .option(ChannelOption.SO_KEEPALIVE, false)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                // SocketChannel添加ChannelHandler
                                .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                                .addLast(defaultEventExecutorGroup,
                                    encoder,
                                    new NettyDecoder(),
                                    new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                    connectionManageHandler,
                                    serverHandler
                                );
                        }
                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    HandshakeHandler:检测传输的包体是否使用TLS协议(传输层安全性协议,Transport Layer Security)传输 ,如果包体使用TSL协议,将会在Pipeline中加入处理TSL协议握手的Handler。

    ctx.pipeline()
        .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
        .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
    
    • 1
    • 2
    • 3

    在这里插入图片描述
    image.png
    NettyEncoder/NettyDecoder:RocketMQ自定义的编解码Handler,其中编码器将RemotingCommand(RocketMQ的服务器端和客户端交互的数据结构)序列化,其中序列化的方式有json或者二进制,具体编解码方式这里不讨论了。而解码器NettyDecoder继承LengthFieldBasedFrameDecoder,基于长度编解码方式,将二进制反序列化为RemotingCommand。
    IdleStateHandler:Netty包中定义的心跳检测包。读写超时时间由NettyServerConfig.serverChannelMaxIdleTimeSeconds变量控制,默认时间120s。
    NettyConnectManageHandler:Channel连接的管理handler,当发生channel连接的激活、失效、超时和异常时,NettyRemotingServer会生成一个Netty事件,管理连接的组件相应的会处理事件。

    NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
    NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
    NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
    NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
    
    • 1
    • 2
    • 3
    • 4

    NettyServerHandler:处理RemotingCommand消息,并且返回相应的处理结果。具体实现如下:

    class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    而processMessageReceived()方法在NettyRemotingServer的父类NettyRemotingAbstract中实现:

    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            final RemotingCommand cmd = msg;
            if (cmd != null) {
                switch (cmd.getType()) {
                    case REQUEST_COMMAND:
                        processRequestCommand(ctx, cmd);
                        break;
                    case RESPONSE_COMMAND:
                        processResponseCommand(ctx, cmd);
                        break;
                    default:
                        break;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在看processRequestCommand和processResponseCommand这两个方法之前,先了解一下RemotingCommand这个对象:

    public class RemotingCommand {
        private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON;
        static {
            // 获取配置的序列化方式
            final String protocol = System.getProperty(SERIALIZE_TYPE_PROPERTY, System.getenv(SERIALIZE_TYPE_ENV));
            if (!isBlank(protocol)) {
                try {
                    serializeTypeConfigInThisServer = SerializeType.valueOf(protocol);
                } catch (IllegalArgumentException e) {
                    throw new RuntimeException("parser specified protocol error. protocol=" + protocol, e);
                }
            }
        }
     
        private int code;  // 请求类型
        private LanguageCode language = LanguageCode.JAVA;
        private int version = 0; // RocketMQ版本编号
        private int opaque = requestId.getAndIncrement(); // 请求序号
        private int flag = 0;  // 标记请求是普通请求,还是无回应的请求
        private String remark; // 失败提示
        private HashMap<String, String> extFields;  // 参数字段的数值
        private transient CommandCustomHeader customHeader;  // 参数的类型
     
        private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
     
        private transient byte[] body;  // 解码时缓存的字节流
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    NettyRemotingClient

    NettyRemotingClient的start()方法与NettyRemotingServer类似,在添加ChannelHandler处理包的handler是NettyClientHandler,其功能与NettyServerHandler一样。

     public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
            throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
            long beginStartTime = System.currentTimeMillis();
            final Channel channel = this.getAndCreateChannel(addr);
            if (channel != null && channel.isActive()) {
                try {
                    doBeforeRpcHooks(addr, request);
                    long costTime = System.currentTimeMillis() - beginStartTime;
                    if (timeoutMillis < costTime) {
                        throw new RemotingTimeoutException("invokeSync call timeout");
                    }
                    RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
                    doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                    return response;
                } catch (Exception e) {
                    // .....
                    this.closeChannel(addr, channel);
                }
            } else {
                this.closeChannel(addr, channel);
                throw new RemotingConnectException(addr);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    invokeSync方法根据发送命令的地址取出或创建一个Channel,然后调用父类的invokeSyncImpl()方法,阻塞等待返回结果。
    当addr为null时,取出连接到namesrv的channel。也就是说,当调用NettyRemotingClient.invokeSync(null, request, 3000)时,请求会发送到namesrv。getAndCreateNameserverChannel()方法,从定时检测刷新的namesrvAddrList中按序取出自己绑定的nameserver的地址。然后从缓存中取出channel,如果缓存中的channel不存在或失活,那么重新连接。
    类似的,createChannel()方法,主要逻辑实现连接到目标服务器,并将生成的channel放入到channelTables缓存中,下一次发送命令时,如果channel依然存活,那么从缓存中取出channel使用。

  • 相关阅读:
    如何快速用一条命令配置好本地yum源(6/7/8版本)
    webpack(一)模块化
    CFCA证书——基于SM2/3算法的安全信任
    PromQL基础语法(下)-聚合运算符、内置函数【prometheus查询语句】
    Android 11 inputflinger分析(触摸优先级)
    【每日思考】---成事的正确思维方式
    基于JAVA人事管理系统计算机毕业设计源码+数据库+lw文档+系统+部署
    linux crontab定时任务
    使用 Google Breakpad 来助力解决程序崩溃
    LeetCode精选200道--字符串篇
  • 原文地址:https://blog.csdn.net/qq_35427589/article/details/126091919