• Netty系列(三):Netty服务端发送消息到客户端


    通常客户端只会主动发送心跳消息,目的是为了保持与服务端连接,而其他消息往往需要服务端发送消息至客户端调取。

    实现步骤

    1. 客户端在第一次与服务端建立连接时,将此连接的通道在 Map 中保存下来,为了保证线程安全,可以使用线程安全的 ConcurrentHashMap

    2. 在发送消息给客户端时,通过设备标识遍历 ConcurrentHashMap 找到目标客户端连接通道。找到后先判断通道是否存活,如果连接是存活状态,就通过此通道发送消息给客户端,如果不是存活状态,就从 Map 中删除此通道信息。

    3. 将消息发送至客户端后,服务端正常接收客户端传回的信息。

    实现代码

    前两篇文章中已经提供了 netty 的整体框架代码,这里只提供一些核心的关键代码,其余代码不再赘述。

    指路:

    1. Netty系列(一):Springboot整合Netty,自定义协议实现
    2. Netty系列(二):Netty拆包/沾包问题的解决方案

    新建一个 ChannelMap 类,在客户端第一次连接时保存 channel 连接。后续服务端向客户端发送消息时,先从 Map 中找到对应的客户端消息通道连接,再向通道中写入消息进行发送。

    
    /**
     * @Author 鳄鱼儿
     * @Description 连接通道保存MAP
     * @date 2022/11/27 16:30
     * @Version 1.0
     */
    
    public class ChannelMap {
        /**
         * 存放客户端标识ID(消息ID)与channel的对应关系
         */
        private static volatile ConcurrentHashMap<String, Channel> channelMap = null;
    
        private ChannelMap() {
        }
        
        public static ConcurrentHashMap<String, Channel> getChannelMap() {
            if (null == channelMap) {
                synchronized (ChannelMap.class) {
                    if (null == channelMap) {
                        channelMap = new ConcurrentHashMap<>();
                    }
                }
            }
            return channelMap;
        }
    
        public static Channel getChannel(String id) {
            return getChannelMap().get(id);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    在客户端建立连接(服务端收到心跳消息)时,将channel加入map中。

    
    public class ServerListenerHandler extends SimpleChannelInboundHandler<Message> {
        private static final Logger log = LoggerFactory.getLogger(ServerListenerHandler.class);
    
        /**
         * 设备接入连接时处理
         *
         * @param ctx
         */
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            log.info("有新的连接:[{}]", ctx.channel().id().asLongText());
        }
    
        /**
         * 数据处理
         *
         * @param ctx
         * @param msg
         */
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Message msg) {
            // 获取消息实例中的消息体
            String content = msg.getContent();
            // 对不同消息类型进行处理
            MessageEnum type = MessageEnum.getStructureEnum(msg);
            switch (type) {
                case CONNECT:
                    // 将通道加入ChannelMap
                    ChannelMap.getChannelMap().put(msg.getId(), ctx.channel());
                    
                    // 将客户端ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
                    AttributeKey<String> key = AttributeKey.valueOf("id");
                    ctx.channel().attr(key).setIfAbsent(msg.getId());
    
                    // TODO 心跳消息处理
                case STATE:
                    // TODO 设备状态
                default:
                    System.out.println(type.content + "消息内容" + content);
            }
        }
    
        /**
         * 设备下线处理
         *
         * @param ctx
         */
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) {
            log.info("设备下线了:{}", ctx.channel().id().asLongText());
            // map中移除channel
            removeId(ctx);
        }
    
        /**
         * 设备连接异常处理
         *
         * @param ctx
         * @param cause
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            // 打印异常
            log.info("异常:{}", cause.getMessage());
            // map中移除channel
            removeId(ctx);
            // 关闭连接
            ctx.close();
        }
    
        private void removeId(ChannelHandlerContext ctx) {
            AttributeKey<String> key = AttributeKey.valueOf("id");
            // 获取channel中id
            String id = ctx.channel().attr(key).get();
            // map移除channel
            ChannelMap.getChannelMap().remove(id);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79

    写一个服务端发送消息的业务层类,并通过客户端id在map中获取到channel通道,将消息转化成json字符串后,通过writeAndFlush发送至客户端。

    
    /**
     * @Author 鳄鱼儿
     * @Description 向客户端发送消息
     * @date 2022/11/27 17:29
     * @Version 1.0
     */
    
    @Service
    public class PushMsgServiceImpl implements PushMsgService {
    
        /**
         * 向一个客户端发送消息
         *
         * @param msg
         */
        @Override
        public void push(Message msg) {
            // 客户端ID
            String id = msg.getId();
            Channel channel = ChannelMap.getChannel(id);
            if (null == channel) {
                throw new RuntimeException("客户端已离线");
            }
            channel.writeAndFlush(msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    注意:writeAndFlush参数是自定义编码的泛型对象实例。如本文自定义的Message消息解析类。

    public class MessageEncodeHandler extends MessageToByteEncoder<Message> {
    
        private static String delimiter;
    
        public MessageEncodeHandler(String delimiter) {
            this.delimiter = delimiter;
        }
    
        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception {
            out.writeBytes(
                    (message.toJsonString() + delimiter)
                            .getBytes(CharsetUtil.UTF_8)
            );
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    之后再编写一个Controller类(这里省略),在Controller类中调用PushMsgService中pushff,就可以完成对客户端的消息发送。

  • 相关阅读:
    【Linux】Qt Remote之Remote开发环境搭建填坑小记
    Java笔记二
    Vue3 实现文件预览 Word Excel pdf 图片 视频等格式 大全!!!!
    嵌入式进阶——EEPROM读写
    Python 操作pdf文件(pdfplumber读取PDF写入Excel)
    最新中文版本FLStudio21水果音乐软件更新下载
    [补题记录] Atcoder Beginner Contest 308(C~E)
    6、数据结构
    cpu设计和实现(pc跳转和延迟槽)
    孩子自律性不够?猿辅导:计划表要注意“留白”给孩子更多掌控感
  • 原文地址:https://blog.csdn.net/Ber_Bai/article/details/128155253