• 基于SpringBoot+Netty实现即时通讯(IM)功能


    简单记录一下实现的整体框架,具体细节在实际生产中再细化就可以了。

    第一步 引入netty依赖

    SpringBoot的其他必要的依赖像Mybatis、Lombok这些都是老生常谈了 就不在这里放了

           <dependency>
                <groupId>io.nettygroupId>
                <artifactId>netty-allartifactId>
                <version>4.1.85.Finalversion>
            dependency>

     

    第二步 接下来就是准备工作。

    消息服务类(核心代码) 聊天服务的功能就是靠这个类的start()函数来启动的 绑定端口8087 之后可以通socket协议访问这个端口来执行通讯

    复制代码
    import com.bxt.demo.im.handler.WebSocketHandler;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
    import io.netty.handler.stream.ChunkedWriteHandler;
    import io.netty.util.concurrent.GlobalEventExecutor;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * @Description: 即时通讯服务类
     * @author: bhw
     * @date: 2023年09月27日 13:44
     */
    @Slf4j
    public class IMServer {
      // 用来存放连入服务器的用户集合
    public static final Map USERS = new ConcurrentHashMap<>(1024);   // 用来存放创建的群聊连接 public static final ChannelGroup GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); public static void start() throws InterruptedException { log.info("IM服务开始启动"); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); // 绑定端口 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 添加http编码解码器 pipeline.addLast(new HttpServerCodec()) //支持大数据流 .addLast(new ChunkedWriteHandler()) // 对http消息做聚合操作 FullHttpRequest FullHttpResponse .addLast(new HttpObjectAggregator(1024*64)) //支持websocket .addLast(new WebSocketServerProtocolHandler("/")) .addLast(new WebSocketHandler()); } }); ChannelFuture future = bootstrap.bind(8087).sync(); log.info("服务器启动开始监听端口: {}", 8087); future.channel().closeFuture().sync(); //关闭主线程组 bossGroup.shutdownGracefully(); //关闭工作线程组 workGroup.shutdownGracefully(); } }
    复制代码

     

     创建聊天消息实体类

    复制代码
    /**
     * @Description: 聊天消息对象 可以自行根据实际业务扩展
     * @author: seizedays
     */
    @Data
    public class ChatMessage extends IMCommand {
        //消息类型
        private Integer type;
        //消息目标对象
        private String target;
        //消息内容
        private String content;
    
    }
    复制代码

    连接类型枚举类,暂时定义为建立连接、发送消息和加入群组三种状态码

    复制代码
    @AllArgsConstructor
    @Getter
    public enum CommandType {
    
        //建立连接
        CONNECT(10001),
        //发送消息
        CHAT(10002),
        //加入群聊
        JOIN_GROUP(10003),
        ERROR(-1)
        ;
    
    
        private Integer code;
    
        public static CommandType match(Integer code){
            for (CommandType value : CommandType.values()) {
                if (value.code.equals(code)){
                    return value;
                }
            }
            return ERROR;
        }
    
    }
    复制代码

    命令动作为聊天的时候 消息类型又划分为私聊和群聊两种 枚举类如下:

    复制代码
    @AllArgsConstructor
    @Getter
    public enum MessageType {
    
        //私聊
        PRIVATE(1),
        //群聊
        GROUP(2),
        ERROR(-1)
        ;
        private Integer type;
    
        public static MessageType match(Integer code){
            for (MessageType value : MessageType.values()) {
                if (value.type.equals(code)){
                    return value;
                }
            }
            return ERROR;
        }
    
    }
    复制代码

     

    创建连接请求的拦截器

    复制代码
    import com.alibaba.fastjson2.JSON;
    import com.bxt.common.vo.Result;
    import com.bxt.demo.im.cmd.IMCommand;
    import com.bxt.demo.im.server.IMServer;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    
    /**
     * @Description: 用户连接到服务端的拦截器
     * @author: bhw
     * @date: 2023年09月27日 14:28
     */
    public class ConnectionHandler {
        public static void execute(ChannelHandlerContext ctx, IMCommand command) {
            if (IMServer.USERS.containsKey(command.getNickName())) {
                ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error(command.getNickName() + "已经在线,不能重复连接"))));
                ctx.channel().disconnect();
                return;
            }
    
            IMServer.USERS.put(command.getNickName(), ctx.channel());
    
            ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("系统消息:" + command.getNickName() + "与服务端连接成功"))));
    
            ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success(JSON.toJSONString(IMServer.USERS.keySet())))));
        }
    }
    复制代码

    加入群组功能的拦截器

    复制代码
    /**
     * @Description: 加入群聊拦截器
     * @author: bhw
     * @date: 2023年09月27日 15:07
     */
    public class JoinGroupHandler {
        public static void execute(ChannelHandlerContext ctx) {
            try {
                IMServer.GROUP.add(ctx.channel());
                ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("加入系统默认群组成功!"))));
            } catch (Exception e) {
                ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("消息内容异常"))));
            }
    
        }
    }
    复制代码

    发送聊天到指定对象的功能拦截器

    复制代码
    import com.alibaba.excel.util.StringUtils;
    import com.alibaba.fastjson2.JSON;
    import com.bxt.common.vo.Result;
    import com.bxt.demo.im.cmd.ChatMessage;
    import com.bxt.demo.im.cmd.MessageType;
    import com.bxt.demo.im.server.IMServer;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    
    import java.util.Objects;
    
    /**
     * @Description: 聊天拦截器
     * @author: bhw
     * @date: 2023年09月27日 15:07
     */
    public class ChatHandler {
        public static void execute(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
            try {
                ChatMessage message = JSON.parseObject(frame.text(), ChatMessage.class);
                MessageType msgType = MessageType.match(message.getType());
    
                if (msgType.equals(MessageType.PRIVATE)) {
                    if (StringUtils.isBlank(message.getTarget())){
                        ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系统消息:消息发送失败,请选择消息发送对象"))));
                        return;
                    }
                    Channel channel = IMServer.USERS.get(message.getTarget());
                    if (Objects.isNull(channel) || !channel.isActive()){
                        ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系统消息:消息发送失败,对方不在线"))));
                        IMServer.USERS.remove(message.getTarget());
                        return;
                    }
                    channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("私聊消息(" + message.getTarget() + "):" + message.getContent()))));
    
                } else if (msgType.equals(MessageType.GROUP)) {
                    IMServer.GROUP.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("群消息:发送者(" + message.getNickName() + "):" + message.getContent()))));
                }else {
                    ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系统消息:不支持的消息类型"))));
                }
    
    
            } catch (Exception e) {
                ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("消息内容异常"))));
            }
    
        }
    }
    复制代码

    最后是websocket拦截器 接收到客户端的指令后选择对应的拦截器实现相应的功能:

    复制代码
    import com.alibaba.fastjson2.JSON;
    import com.bxt.common.vo.Result;
    import com.bxt.demo.im.cmd.CommandType;
    import com.bxt.demo.im.cmd.IMCommand;
    import com.bxt.demo.im.server.IMServer;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * @Description: websocket拦截器
     * @author: bhw
     * @date: 2023年09月27日 13:59
     */
    @Slf4j
    public class WebSocketHandler extends SimpleChannelInboundHandler {
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
            System.out.println(frame.text());
            try {
                IMCommand command = JSON.parseObject(frame.text(), IMCommand.class);
                CommandType cmdType = CommandType.match(command.getCode());
                if (cmdType.equals(CommandType.CONNECT)){
                    ConnectionHandler.execute(ctx, command);
                } else if (cmdType.equals(CommandType.CHAT)) {
                    ChatHandler.execute(ctx,frame);
                } else if (cmdType.equals(CommandType.JOIN_GROUP)) {
                    JoinGroupHandler.execute(ctx);
                } else {
                    ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("不支持的code"))));
                }
            }catch (Exception e){
                ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error(e.getMessage()))));
            }
    
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            // 当连接断开时被调用
            Channel channel = ctx.channel();
            // 从 USERS Map 中移除对应的 Channel
            removeUser(channel);
            super.channelInactive(ctx);
        }
    
        private void removeUser(Channel channel) {
            // 遍历 USERS Map,找到并移除对应的 Channel
            IMServer.USERS.entrySet().removeIf(entry -> entry.getValue() == channel);
        }
    }
    复制代码

    第三步 启动服务

    复制代码
    @SpringBootApplication
    public class DemoApplication {
        public static void main(String[] args) {
            SpringApplication.run(DemoApplication.class, args);
           // 启动IM服务
            try {
                IMServer.start();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    
    }
    复制代码

    现在 客户端通过socket协议访问8087端口即可实现基本的聊天室功能了!

  • 相关阅读:
    jQuery总结
    Codesys 获取系统年、月、日、时、分、秒、星期几 +解决时区问题+ ST语言编程实现代码
    科普系列:AUTOSAR与OSEK网络管理比较(上)
    微店request方法构造下单页面
    FPBJXDN224、FPBJXDV224插头式电比例节流阀放大器
    欠拟合、过拟合及优化:岭回归
    使用Bat_To_Exe_Converter生成exe的方法(绝对可行)
    【XSS & CSRF 】访问时篡改密码——以DVWA-High为例
    上周热点回顾(1.1-1.7)
    Java api中文在线版
  • 原文地址:https://www.cnblogs.com/seizedays/p/17772433.html