• 【netty从入门到放弃】netty转发tcp数据到多客户端


    接到一个需求,需要实现转发通讯模块tcp数据其他的服务器,也就是转发tcp数据到多客户端

    任务拆解:

    • 首先需要建立多客户端,每个客户端有一个独立的clientId和对应的tcp通道对应
    • 能动态的根据clientId关闭对应的转发任务
    • 停止服务的时候,需要断开所有的客户端连接,减少开销
    • 客户端需要实现断线重连(考虑到断网的清空)

    注意:本篇文章是只是实现转发操作,不支持转发的服务器,反向控制设备,需要做特殊处理,如果大家感兴趣,给我留言

    下面我们根据我们头脑风暴的结果,来想办法实现上面的过程

    创建数据库表

    CREATE TABLE `station_message_transmit` (
      `id` bigint(32) NOT NULL COMMENT '主键',
      `station_id` int(11) NOT NULL COMMENT '站点id',
      `host` varchar(50) DEFAULT NULL COMMENT '主机ip',
      `port` int(11) DEFAULT NULL COMMENT '端口',
      `create_by` varchar(64) DEFAULT NULL COMMENT '创建人',
      `create_time` datetime DEFAULT NULL COMMENT '创建时间',
      `update_by` varchar(64) DEFAULT NULL COMMENT '修改人',
      `update_time` datetime DEFAULT NULL COMMENT '创建时间',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 所有的转发数据,都是基于单个站点(单个设备)
    • id是唯一的,后续会通过该id绑定tcp通道,来实现发数据,关闭连接等操作

    xml

    
    <project
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
            xmlns="http://maven.apache.org/POM/4.0.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
        <modelVersion>4.0.0modelVersion>
    
        <parent>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-parentartifactId>
            <version>2.0.5.RELEASEversion>
            <relativePath />
        parent>
    
        <groupId>boot.base.tcp.clientgroupId>
        <artifactId>boot-example-base-tcp-client-2.0.5artifactId>
        <version>0.0.1-SNAPSHOTversion>
        <name>boot-example-base-tcp-client-2.0.5name>
        <url>http://maven.apache.orgurl>
        <properties>
            <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8project.reporting.outputEncoding>
            <java.version>1.8java.version>
        properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-webartifactId>
            dependency>
            <dependency>
                <groupId>io.nettygroupId>
                <artifactId>netty-allartifactId>
            dependency>
            <dependency>
                <groupId>io.springfoxgroupId>
                <artifactId>springfox-swagger2artifactId>
                <version>2.9.2version>
            dependency>
            <dependency>
                <groupId>org.projectlombokgroupId>
                <artifactId>lombokartifactId>
                <version>1.18.20version>
                <scope>providedscope>
            dependency>
        dependencies>
        <build>
            <plugins>
                
                <plugin>
                    <groupId>org.springframework.bootgroupId>
                    <artifactId>spring-boot-maven-pluginartifactId>
                    <executions>
                        <execution>
                            <goals>
                                <goal>repackagegoal>
                            goals>
                        execution>
                    executions>
                plugin>
            plugins>
        build>
    project>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 所需要的依赖,这里只是实现一个简单的demo,来实践一下,我的设想是否能实现。

    实体类

    • yml配置文件就不需要配置,一切从简,默认的端口是8080
    package com.test;
    
    import lombok.Data;
    
    /**
     * @author wu
     * @version 1.0
     * @date 2023/10/18 16:39
     */
    @Data
    public class StationMessageTransmit {
        /** 唯一编号 */
        private Long id;
    
        /** 站点id */
        private Integer stationId;
    
        /** 主机ip */
        private String host;
    
        /** 端口 */
        private Integer port;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    启动类

    package com.test;
    import io.netty.channel.ChannelHandlerContext;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ApplicationEvent;
    import org.springframework.context.ApplicationListener;
    import org.springframework.context.event.ContextClosedEvent;
    import org.springframework.context.event.ContextRefreshedEvent;
    import org.springframework.context.event.ContextStartedEvent;
    import org.springframework.context.event.ContextStoppedEvent;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.annotation.EnableScheduling;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    /**
     * wu
     */
    @SpringBootApplication
    @EnableAsync
    @EnableScheduling
    public class BootNettyClientApplication implements CommandLineRunner, ApplicationListener {
        public static void main( String[] args ) {
    		SpringApplication app = new SpringApplication(BootNettyClientApplication.class);
    		app.run(args);
    
            System.out.println( "Hello World!" );
        }
    
        @Async
    	@Override
    	public void run(String... args) throws Exception {
    		StationMessageTransmit tran = new StationMessageTransmit();
    		tran.setId(1L);
    		tran.setHost("192.168.10.128");
    		tran.setPort(5000);
    		tran.setStationId(13);
    
    		StationMessageTransmit tran1 = new StationMessageTransmit();
    		tran1.setId(2L);
    		tran1.setHost("192.168.10.128");
    		tran1.setPort(5001);
    		tran1.setStationId(13);
    
    		List<StationMessageTransmit> traces = new ArrayList<StationMessageTransmit>();
    		traces.add(tran);
    		traces.add(tran1);
    
    		for (StationMessageTransmit trace : traces) {
    			BootNettyClientThread thread = new BootNettyClientThread(trace);
    			thread.start();
    		}
    
    	}
    
    	@Override
    	public void onApplicationEvent(ApplicationEvent applicationEvent) {
    		if(applicationEvent instanceof ContextClosedEvent){
    			System.out.println("应用关闭事件");
    			for (Map.Entry<String, ChannelHandlerContext> entry : BootNettyClientGroupCache.groupMapCache.entrySet()) {
    				ChannelHandlerContext channelHandlerContext = entry.getValue();
    				if(channelHandlerContext != null){
    					System.out.println("关闭链接:"+entry.getKey());
    					channelHandlerContext.close();
    				}
    			}
    		}else if(applicationEvent instanceof ContextRefreshedEvent){
    			System.out.println("应用刷新事件");
    		}else if(applicationEvent instanceof ContextStartedEvent){
    			System.out.println("应用开启事件");
    		}else if(applicationEvent instanceof ContextStoppedEvent){
    			System.out.println("应用停止事件");
    		}
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • run方法里面主要干的活,是一个伪代码,模拟从数据拿数据,再初始化创建多个客户端。

    • onApplicationEvent方法主要是监控服务停止的事件,这是考虑到,tcp是长链接,跟其他服务器链接是一直没有中断,会存在多次重建连接的问题,所以需要再关闭事件中,关闭所有的tcp客户端连接

    线程类

    package com.test;
    
    /**
     *
     * netty 客户端
     * wu
     */
    public class BootNettyClientThread extends Thread {
    
    	private StationMessageTransmit trace;
    	public BootNettyClientThread(StationMessageTransmit trace){
    		this.trace = trace;
    	}
    
    	@Override
    	public void run() {
    		try {
    			new BootNettyClient().connect(trace);
    		} catch (Exception e) {
    			throw new RuntimeException(e);
    		}
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 传实体类,主要是为了保证clientId和通道保证对应

    客户端代码

    package com.test;
    
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.DatagramChannel;
    import io.netty.channel.socket.DatagramPacket;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioDatagramChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.LineBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.util.CharsetUtil;
    
    import java.net.InetSocketAddress;
    
    /**
     *
     * netty 客户端
     * wu
     */
    public class BootNettyClient {
    
    	/**
    	 * 连接数据
    	 *
    	 * @param trace
    	 * @throws Exception
    	 */
    	public void connect(StationMessageTransmit trace) throws Exception {
    		if (trace.getNetProtocol() == 0) {
    			tcp(trace);
    		} else {
    			bind(trace);
    		}
    	}
    
    	private void tcp(StationMessageTransmit trace) throws InterruptedException {
    		/**
    		 * 客户端的NIO线程组
    		 *
    		 */
    		EventLoopGroup group = new NioEventLoopGroup();
    
    		try {
    			/**
    			 * Bootstrap 是一个启动NIO服务的辅助启动类 客户端的
    			 */
    			Bootstrap bootstrap = new Bootstrap();
    			bootstrap = bootstrap.group(group);
    			bootstrap = bootstrap.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true);
    			/**
    			 * 设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息
    			 */
    			bootstrap = bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    				@Override
    				protected void initChannel(SocketChannel socketChannel) throws Exception {
    					socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024 * 1024));
    					socketChannel.pipeline().addLast(new StringDecoder());
    					socketChannel.pipeline().addLast(new TcpHandler(trace));
    				}
    			});
    			/**
    			 * 连接服务端
    			 */
    			ChannelFuture future = bootstrap.connect(trace.getHost(), trace.getPort()).sync();
    			if (future.isSuccess()) {
    				System.out.println("netty client start success=" + trace.toString());
    				/**
    				 * 等待连接端口关闭
    				 */
    				future.channel().closeFuture().sync();
    			}
    		} finally {
    			/**
    			 * 退出,释放资源
    			 */
    			group.shutdownGracefully().sync();
    		}
    	}
    
    
    	/**
    	 * udp协议
    	 * @param trace
    	 * @throws InterruptedException
    	 */
    	public static void bind(StationMessageTransmit trace) throws InterruptedException {
    		EventLoopGroup group = new NioEventLoopGroup();
    		try {
    			Bootstrap bootstrap = new Bootstrap();
    			bootstrap.group(group)
    					.channel(NioDatagramChannel.class)
    					.option(ChannelOption.SO_BROADCAST, true)
    		            .handler(new ChannelInitializer<DatagramChannel>() {
    						@Override
    						protected void initChannel(DatagramChannel socketChannel) throws Exception {
    							socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024 * 1024));
    							socketChannel.pipeline().addLast(new StringDecoder());
    							socketChannel.pipeline().addLast(new UdpDataHandler(trace));
    						}
    					});
    
    			Channel channel = bootstrap.bind(0).sync().channel();
    			channel.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("Hello, Server", CharsetUtil.UTF_8),
    					new InetSocketAddress(trace.getHost(), trace.getPort()))).sync();
    			channel.closeFuture().await();
    		} finally {
    			group.shutdownGracefully();
    		}
    	}
    
    
    	public static void main(String[] args) throws InterruptedException {
    		StationMessageTransmit tran1 = new StationMessageTransmit();
    		tran1.setId(2L);
    		tran1.setHost("192.168.10.128");
    		tran1.setPort(8001);
    		tran1.setStationId(13);
    		tran1.setNetProtocol(1);
    		bind(tran1);
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126

    tcp handle

    package com.test;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.EventLoop;
    
    /**
     *
     * I/O数据读写处理类
     * wu
     */
    @ChannelHandler.Sharable
    public class TcpHandler extends ChannelInboundHandlerAdapter{
    
        private static ScheduledExecutorService SCHEDULED_EXECUTOR = Executors.newScheduledThreadPool(5);
    
        private StationMessageTransmit trace;
    
        public TcpHandler(StationMessageTransmit trace){
            this.trace = trace;
        }
    
        /**
         * 从服务端收到新的数据时,这个方法会在收到消息时被调用
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {
            if(msg == null){
                return;
            }
        	System.out.println("channelRead:read msg:"+msg.toString());
    
            //回应服务端
            //ctx.write("I got server message thanks server!");
        }
    
        /**
         * 从服务端收到新的数据、读取完成时调用
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
        	System.out.println("channelReadComplete");
        	ctx.flush();
        }
    
        /**
         * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException {
        	System.out.println("exceptionCaught");
            cause.printStackTrace();
            ctx.close();//抛出异常,断开与客户端的连接
        }
    
        /**
         * 客户端与服务端第一次建立连接时 执行
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception, IOException {
            super.channelActive(ctx);
            InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
            String clientIp = inSocket.getAddress().getHostAddress();
            System.out.println("服务器ip:"+clientIp+",clientId:"+trace.getId());
    
            BootNettyClientGroupCache.save(trace.getId().toString(), ctx);
        }
    
        /**
         * 客户端与服务端 断连时 执行
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
            super.channelInactive(ctx);
            InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
            String clientIp = inSocket.getAddress().getHostAddress();
            ctx.close(); //断开连接时,必须关闭,否则造成资源浪费
            System.out.println("channelInactive:"+clientIp);
    
            //客户端重连
            //reset();
        }
    
        /**
         * 客户端重连
         */
        public  void reset(){
            //增加一个伪代码,从服务器查询id对应的转发数据是否存在,不存在,则不继续运行转发任务
            SCHEDULED_EXECUTOR.schedule(() -> {
                try {
                    System.err.println("服务端链接不上,开始重连操作...");
                    new BootNettyClient().connect(trace);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, 3, TimeUnit.SECONDS);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • reset方法是为了实现客户端重连,3秒钟调用一次
    • channelInactive方法,客户端和服务器断开连接时会触发
    • channelActive方法,客户端和服务器建立连接时,需要实现client和通道的绑定关系,方便后续回写数据

    udp handle

    package com.test;
    
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.socket.DatagramPacket;
    import io.netty.util.CharsetUtil;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    
    /**
     *  wu
     * @author 10249
     */
    public class UdpDataHandler extends SimpleChannelInboundHandler<DatagramPacket> {
    	private StationMessageTransmit trace;
    
    	public UdpDataHandler(StationMessageTransmit trace){
    		this.trace = trace;
    	}
    
    	/**
    	 * 客户端返回数据
    	 * @param ctx
    	 * @param msg
    	 * @throws Exception
    	 */
    	@Override
    	protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
    		try {
    			String strData = msg.content().toString(CharsetUtil.UTF_8);
    			//打印收到的消息
                System.out.println("msg---"+strData);
                // 与BootNettyUdpClient中的channel.id().toString()是一样的值
                System.out.println(ctx.channel().id().toString());
                //	收到服务端原路返回的消息后,不需要再次向服务端发送消息, 可以在这里暴力关闭,也可以在 channelReadComplete(ChannelHandlerContext ctx)内
                //  ctx.close();
    			ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("Hello, Server123123", CharsetUtil.UTF_8),msg.sender()));
    		} catch (Exception e) {
    			System.out.println(e.toString());
    		}
    	}
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    	/**
    	 * 客户端与服务端第一次建立连接时 执行
    	 */
    	@Override
    	public void channelActive(ChannelHandlerContext ctx) throws Exception, IOException {
    		super.channelActive(ctx);
    		System.out.println("udp客户端:"+trace.getId());
    		BootNettyClientGroupCache.save(trace.getId(), ctx);
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    controller类

    package com.test;
    
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    import java.util.Map;
    
    /**
     * 	wu
     */
    @RestController
    public class BootNettyClientController {
    
    	/**
    	 * 给所有客户端发送消息
    	 * @param content
    	 * @return
    	 */
    	@PostMapping("/reportAllClientDataToServer")
    	public String reportAllClientDataToServer(@RequestParam(name="content", required = true) String content) {
    		for (Map.Entry<String, ChannelHandlerContext> entry : BootNettyClientGroupCache.groupMapCache.entrySet()) {
    			ChannelHandlerContext ctx = entry.getValue();
    			ctx.writeAndFlush(Unpooled.buffer().writeBytes(content.getBytes()));
    		}
    		return "ok";
    	}
    
    	/**
    	 * 停止指定的客户端
    	 * @param code
    	 * @return
    	 * @throws InterruptedException
    	 */
    	@PostMapping("/stopStationByCode")
    	public String downDataToClient(@RequestParam(name="code", required = true) String code) throws InterruptedException {
    		ChannelHandlerContext ctx =  BootNettyClientGroupCache.get(code);
    		ctx.close();
    		BootNettyClientGroupCache.remove(code);
    		return "success";
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 主要是提供两个测试方法,可以通过apifox调试工具进行模拟请求

    缓存tcp链接

    package com.test;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.EventLoopGroup;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     *  wu
     */
    public class BootNettyClientGroupCache {
    
        /**
         * 存放所有的连接,key是转发id,value是对应的数据
         */
        public static volatile Map<String, ChannelHandlerContext> groupMapCache = new ConcurrentHashMap<String, ChannelHandlerContext>();
    
        public static void add(String code, ChannelHandlerContext group){
            groupMapCache.put(code,group);
        }
    
        public static ChannelHandlerContext get(String code){
            return groupMapCache.get(code);
        }
    
        public static void remove(String code){
            groupMapCache.remove(code);
        }
    
        public static void save(String code, ChannelHandlerContext channel) {
            if(groupMapCache.get(code) == null) {
                add(code,channel);
            }
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 存放所有的通道
  • 相关阅读:
    【Unity细节】如何让组件失活而不是物体失活
    < Linux > 进程概念(1)
    测试报告 数据分析平台
    springboot项目Html页面引入css文件不生效
    【C++】构造函数与析构函数用途 ( 代码示例 - 构造函数与析构函数用途 )
    Python程序化生成三维场景【PyPRT】
    mybatis各阶段的详解
    尝试 vue 实现 SEO
    将设计思维应用于人工智能
    OrangePi Kunpeng Pro 开发板测评 | AI 边缘计算 & 大模型部署
  • 原文地址:https://blog.csdn.net/qq_16855077/article/details/133920687