Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序,是目前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.28.Final</version>
</dependency>
读取yml 配置中的多端口配置
//netty:
// port: {8300: A, 8500: B}
@Data
@Configuration
@ConfigurationProperties(prefix = "netty")
public class PortDefinition {
Map<Integer, String> port;
}
多端口判断使用的常量类
public class GatewayType {
//个人设备
public final static String GERNESHEBEI_SHOUHUA="A";
}
实现Netty服务端
@Slf4j
@RefreshScope
@Component
public class NettyServer {
@Autowired
private PortDefinition portDefinition;
public void start() throws InterruptedException {
/**
* 创建两个线程组 bossGroup 和 workerGroup
* bossGroup 只是处理连接请求,真正的和客户端业务处理,会交给 workerGroup 完成
* 两个都是无线循环
*/
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//设置两个线程组
bootstrap.group(bossGroup, workerGroup)
//使用NioServerSocketChannel 作为服务器的通道实现
.channel(NioServerSocketChannel.class)
//设置线程队列得到连接个数
.option(ChannelOption.SO_BACKLOG, 128)
//设置保持活动连接状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
//通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
.childOption(ChannelOption.TCP_NODELAY, true)
//可以给 bossGroup 加个日志处理器
.handler(new LoggingHandler(LogLevel.INFO))
//监听多个端口
.childHandler(new SocketChannelInitHandler(portDefinition.getPort()))
;
// 监听多个端口
Map<Integer, String> ports = portDefinition.getPort();
log.info("netty服务器在{}端口启动监听", JSONObject.toJSONString(ports));
for (Map.Entry<Integer, String> p : ports.entrySet()) {
final int port = p.getKey();
// 绑定端口
ChannelFuture cf = bootstrap.bind(new InetSocketAddress(port)).sync();
if (cf.isSuccess()) {
log.info("netty 启动成功,端口:{}", port);
} else {
log.info("netty 启动失败,端口:{}", port);
}
//对关闭通道进行监听
cf.channel().closeFuture().sync();
}
} finally {
//发送异常关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
根据多端口去判断去执行那个业务的 Handler 方法
@Slf4j
public class SocketChannelInitHandler extends ChannelInitializer<SocketChannel> {
/**
* 用来存储每个连接上来的设备
*/
public static final Map<ChannelId, ChannelPipeline> CHANNEL_MAP = new ConcurrentHashMap<>();
/**
* 端口信息,用来区分这个端口属于哪种类型的连接 如:8300 属于 A
*/
Map<Integer, String> ports;
public SocketChannelInitHandler(Map<Integer, String> ports) {
this.ports = ports;
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//每次连接上来 对通道进行保存
CHANNEL_MAP.put(socketChannel.id(), socketChannel.pipeline());
ChannelPipeline pipeline = socketChannel.pipeline();
int port = socketChannel.localAddress().getPort();
String type = ports.get(port);
pipeline.addLast(new StringEncoder(StandardCharsets.UTF_8));
pipeline.addLast(new StringDecoder(StandardCharsets.UTF_8));
pipeline.addLast(new LengthFieldBasedFrameDecoder(24*1024,0,2));
log.info("【initChannel】端口号: "+port+" 类型: "+type);
//不同类型连接,处理链中加入不同处理协议
switch (type) {
case GatewayType.GERNESHEBEI_SHOUHUA:
//手环
pipeline.addLast(new NettyServerHandler());
break;
default:
log.error("当前网关类型并不存在于配置文件中,无法初始化通道");
break;
}
}
}
业务员的Handler 方法
@Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler<Object> {
//private static final Logger log = LoggerFactory.getLogger(NettyServerHandler.class);
protected void channelRead0(ChannelHandlerContext context, Object obj) throws Exception {
log.info(">>>>>>>>>>>服务端接收到客户端的消息:{}",obj);
String message = (String)obj;
//之后写自己的业务逻辑即可
String b=message.replace("[", "")
.replace("]","");
String[] split1 = b.split("\\*");
String key = split1[1];
SocketChannel socketChannel = (SocketChannel) context.channel();
//调用业务代码类并执行
WristWatchSocket.runSocket(key,message,socketChannel,true);
ReferenceCountUtil.release(obj);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
@Slf4j
//@Data
public class WristWatchSocket{
//业务代码
public static void runSocket(String key, String message, SocketChannel socketChannel, Boolean isNotFirst){
try {
if(message==null||message.trim().equals("")){
return;
}
String restr="测试发送";
if(!"".equals(restr)){
socketChannel.writeAndFlush(restr);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
netty:
port: {8300: A, 8500: B}
在启动类中配置
@EnableAsync
@EnableSwagger2
@EnableFeignClients
@EnableTransactionManagement
@Slf4j
@MapperScan({"com.yuandian.platform.mapper"})
@SpringBootApplication(exclude={DataSourceAutoConfiguration.class})
public class YunYiplatformApplication {
public static void main(String[] args) {
ApplicationContext run = SpringApplication.run(YunYiplatformApplication.class, args);
log.info("\n\n【【【【平台成功启动!】】】】\n");
//netty 启动配置
try {
run.getBean(NettyServer.class).start();
}catch (Exception e){
e.printStackTrace();
}
}
}
截图