• 网络编程入门详解(Socket、TCP通信、Channel通信)


    入门知识

    软件结构

    • C/S结构 :全称为Client/Server结构,是指客户端和服务器结构。常见程序有QQ、迅雷等软件。
    • B/S结构 :全称为Browser/Server结构,是指浏览器和服务器结构。常见浏览器有谷歌、火狐等。

    两种架构各有优势,但是无论哪种架构,都离不开网络的支持。

    网络编程:就是在一定的协议下,实现两台计算机的通信的程序。

    网络通信的大致流程为:一个数据包经由应用程序产生,进入到协议栈中进行各种报文头的包装,然后操作系统调用网卡驱动程序指挥硬件,把数据发送到对端主机。

    整个过程的大体的图示如下:

    在这里插入图片描述

    协议栈其实是位于操作系统中的一些协议的堆叠,这些协议包括 TCP、UDP、ARP、ICMP、IP等。

    通常某个协议的设计都是为了解决特定问题的,比如:

    • TCP 的设计就负责安全可靠的传输数据
    • UDP 设计就是报文小,传输效率高
    • ARP 的设计是能够通过 IP 地址查询物理(Mac)地址
    • ICMP 的设计目的是返回错误报文给主机
    • IP 设计的目的是为了实现大规模主机的互联互通

    网络通信常见协议:UDP|TCP

    UDP:面向无连接的协议,通信的双方不用建立连接,可以直接发送数据

    • 好处:效率高,耗资小

    • 弊端:不安全,容易丢失数据


    TCP:面向连接协议,客户端和服务器端必须经过3次握手建立逻辑连接,才能通信

    • 好处:安全

    • 弊端:效率低

    三次握手:TCP协议中,在发送数据的准备阶段,客户端与服务器之间的三次交互,以保证连接的可靠。

    1. 第一次握手,客户端向服务器端发出连接请求,等待服务器确认。// 服务器你死了吗?

    2. 第二次握手,服务器端向客户端回送一个响应,通知客户端收到了连接请求。// 我活着 啊!!

    3. 第三次握手,客户端再次向服务器端发送确认信息,确认连接。// 我知道了!!


    TCP/IP 协议

    TCP/IP协议参考模型

    TCP/IP协议参考模型把所有的TCP/IP系列协议归类到四个抽象层中

    • 应用层:TFTP,HTTP,SNMP,FTP,SMTP,DNS,Telnet 等等
    • 传输层:TCP,UDP
    • 网络层:IP,ICMP,OSPF,EIGRP,IGMP
    • 数据链路层:SLIP,CSLIP,PPP,MTU

    每一抽象层建立在低一层提供的服务上,并且为高一层提供服务。

    在这里插入图片描述


    IP地址

    IP地址:就相当于计算机的身份号(唯一)

    ip地址的作用:具有唯一性,在网络中可以通过ip地址找到另外一台计算机

    ip地址分类:

    • ipv4:ip地址由4个字节组成,一个字节8位(比特位 1,0)

      二进制:11001101.11001100.11000001.11001111

      十进制:192.168.0.106

      每个字节的范围:0-255(2^8),ip地址第一位不能为0

      ip地址的数量:42亿(2^32=4294967296个)

      问题:随着计算机的增多,ip地址面临枯竭(全球IPv4地址在2011年2月分配完毕)不够用,就出了ipv6地址

    • ipv6:ip地址由16个字节组成,一个字节8位(比特位 1,0)

      ip地址的数量:2^128=3.4028236692093846346337460743177e+38

      号称可以为地球上每一粒沙子编写一个ip地址

      为了表示方便使用十六进制:fe80::a8a6:b83c:8b8b:2685%17

    一些常用dos命令:dos窗口 win+r ==> cmd ==> dos窗口

    1.查看电脑的IP信息
    	命令:ipconfig
        --------------------------------------------------
       	Windows IP 配置
       	连接特定的 DNS 后缀 . . . . . . . :
       	本地链接 IPv6 地址. . . . . . . . : fe80::a8a6:b83c:8b8b:2685%17
       	IPv4 地址 . . . . . . . . . . . . : 192.168.0.106
       	子网掩码  . . . . . . . . . . . . : 255.255.255.0
       	默认网关. . . . . . . . . . . . . : 192.168.0.1
    	--------------------------------------------------
    
    2.测试你的电脑和指定ip地址的电脑是否可以连通
    	命令:ping ip地址
    	--------------------------------------------------
    	C:\Users\Administrator>ping 192.168.0.222  没有ping通
        正在 Ping 192.168.0.222 具有 32 字节的数据:
        来自 192.168.0.106 的回复: 无法访问目标主机。
        来自 192.168.0.106 的回复: 无法访问目标主机。
        来自 192.168.0.106 的回复: 无法访问目标主机。
        来自 192.168.0.106 的回复: 无法访问目标主机。
    	--------------------------------------------------
        C:\Users\Administrator>ping www.baidu.com
        正在 Ping www.a.shifen.com [61.135.169.121] 具有 32 字节的数据:
        来自 61.135.169.121 的回复: 字节=32 时间=6ms TTL=56
        来自 61.135.169.121 的回复: 字节=32 时间=4ms TTL=56
        来自 61.135.169.121 的回复: 字节=32 时间=4ms TTL=56
        来自 61.135.169.121 的回复: 字节=32 时间=4ms TTL=56
    	--------------------------------------------------
    
    3.ping本机的ip地址(你自己电脑的ip地址) 
    	命令:ping 127.0.0.1	或	ping localhost 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    端口号

    端口号是一个逻辑端口,无法直接看到,使用一些软件可以看到(电脑管家,360.…)

    当打开网络软件(联网使用)时,操作系统就会为这个网络软件分配一个随机的端口号或者网络软件在打开的时候和操作系统要指定的端口号

    端口号是由两个字节组成,表示的范围:2^16=0-65535 之间


    1024之前的端口号,不能使用,已经被操作系统分配给一些已知的网络软件。

    注意:各个网络软件的端口号是不能重复

    常用的端口号:

    • 80端口:网络端口

    • 数据库:mysql:3306, oracle:1521

    • Tomcat服务:8080


    保证数据能准确无误发送到对方计算机的某一个软件上,使用 ip地址:端口号

    测试端口号是否连通:telnet ip地址:端口号


    InetAddress类:获取IP地址

    java.net.InetAddress:描述计算机的ip地址

    此类表示互联网协议 (IP) 地址。

    可以使用InetAddress类中的方法获取到计算机的ip地址

    创建对象的方式:静态方法

    static InetAddress getLocalHost() 			// 返回本地主机(你自己电脑的ip地址对象)。
    static InetAddress getByName(String host) 	// 在给定主机名的情况下确定主机的 IP 地址。
    /* 参数:
     	String host:可以传递主机名称、ip地址、域名
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5

    ​ 非静态的方法:

    String getHostAddress() 	// 返回 IP 地址字符串(以文本表现形式)。
    String getHostName() 		// 获取此 IP 地址的主机名。
    
    • 1
    • 2

    Socket:套接字

    应用程序比如浏览器、电子邮件、文件传输服务器等产生的数据,会通过传输层协议进行传输。而应用程序是不会和传输层直接建立联系的,而是有一个能够连接应用层和传输层之间的套件,这个套件就是 Socket

    在这里插入图片描述


    阻塞/非阻塞、同步/异步

    • 阻塞:等待结果,什么事都不能做

    • 非阻塞:可以做别的事情

    • 同步:主动获取结果

    • 异步:等待通知结果


    • BIO:Block(阻塞的) IO 【同步、阻塞】
    • NIO:Non-Block(非阻塞的)(同步)IO 【同步、非阻塞】——JDK1.4开始
    • AIO:Asynchronous(异步-非阻塞)IO 【异步、非阻塞】 ——JDK1.7开始

    TCP/IP 通信

    TCP 通信的客户端:Socket

    作用:主动和服务器经过3次握手建立连接通路,给服务器发送数据,读取服务器回写的数据

    表示客户端的类:java.net.Socket:此类实现客户端套接字(也可以就叫“套接字”)。

    套接字:封装了IP地址和端口号的网络单位

    构造方法:

    public Socket(InetAddress address, int port)	// 创建一个流套接字并将其连接到指定 IP 地址的指定端口号。
    public Socket(String host, int port) 			// 创建一个流套接字并将其连接到指定主机上的指定端口号。
    /* 参数:
       	InetAddress address | String host:传递服务器的ip地址
       	int port:服务器的端口号
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    ​ 成员方法:

    OutputStream getOutputStream()	// 返回此套接字的输出流。
    InputStream getInputStream() 	// 返回此套接字的输入流。
    
    void shutdownOutput() 			// 禁用此套接字的输出流。
        							// 对于 TCP 套接字,任何以前写入的数据都将被发送,并且后跟 TCP 的正常连接终止序列
    
    • 1
    • 2
    • 3
    • 4
    • 5

    注意:

    1. 创建客户端Socket对象的时候,客户端会根据服务器的ip地址和端口号和服务器经过三次握手连接连接通路

      • 服务器已经启动了,服务器的ip地址和端口号填写正确:握手成功,创建好Socket对象

      • 服务器没有启动,服务器的ip地址和端口号填写错误:握手失败,会抛出连接异常

        ConnectException: Connection refused: connect

    2. 客户端和服务器之间进行数据传输,不能使用自己创建的流对象(只能和本地硬盘之间进行读写)。

      使用Socket对象中提供的网络流对象


    TCP 通信的服务端:ServerSocket

    作用:接收客户端的请求和客户端经过3次握手建立连接通路;读取客户端发送的数据,给客户端回写(发送)数据

    表示服务器的类:java.net.ServerSocket;此类实现服务器套接字。

    构造方法:

    public ServerSocket(int port) // 创建绑定到特定端口的服务器套接字。
    
    • 1

    成员方法:

    Socket accept() 	// 侦听并接受到此套接字的连接。
    /* 使用accpet方法,会一直监听客户端的请求
    		有客户端请求服务器,accept方法就会获取到请求的客户端Socket对象
    		没有客户端请求服务器,accept方法会进入到阻塞状态,一直等待
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5

    注意:

    ​ 服务器启动的时候,抛出了以下的异常:说明服务器使用的端口号已经被占用了,需要更换端口号

    ​ java.net.BindException: Address already in use: JVM_Bind


    文件上传案例

    文件上传的客户端

    读取本地文件,上传到服务器中,读取服务器回写的"上传成功!"

    文件上传就是文件的复制:

    ​ 数据源:c:\1.jpg

    ​ 目的地:服务器中

    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.Socket;
    
    public class Demo01TCPClient {
        public static void main(String[] args) throws IOException {
            //1.创建本地字节输入流FileInputStream对象,构造方法中绑定要读取的数据源
            FileInputStream fis = new FileInputStream("c:\\1.jpg");
            
            //2.创建客户端Socket对象,构造方法绑定服务器的ip地址和端口号
            Socket socket = new Socket("127.0.0.1", 9999);
            
            //3.使用客户端Socket对象中的方法getOutputStream,获取网络字节输出流OutputStream对象
            OutputStream os = socket.getOutputStream();
            //4.使用本地字节输入流FileInputStream对象中的方法read,读取要上传的而文件
            byte[] bytes = new byte[1024];
            int len = 0;
            while ((len = fis.read(bytes)) != -1){
                //5.使用网络字节输出流OutputStream对象中的方法write,把读取到的文件上传到服务器中
                os.write(bytes, 0, len);
            }
            // 上传结束
            socket.shutdownOutput();
            
            //6.使用客户端Socket对象中的方法getInputStream,获取网络字节输入流InputStream对象
            InputStream is = socket.getInputStream();
            //7.使用网络字节输入流InputStream对象中的方法read,读取服务器回写的"上传成功!"
            while ((len = is.read(bytes)) != -1){
                System.out.println(new String(bytes, 0, len));
            }
            
            //8.释放资源(FileInputStream对象, Socket)
            fis.close();
            socket.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    文件上传的服务器端(多线程)

    读取客户端上传的文件,把文件保存到服务器的硬盘上,给客户端回写"上传成功!"

    文件上传就是文件的复制:

    ​ 数据源: 客户端上传的文件 1.jpg

    ​ 目的地: 服务器的硬盘中 d:\upload\1.jpg

    import java.io.File;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    public class Demo02TCPServer {
        public static void main(String[] args) throws IOException {
            //1.判断d盘有没有upload文件夹,没有则创建
            File file = new File("d:\\upload");
            if(!file.exists()){
                file.mkdir();
            }
            //2.创建服务器ServerSocket对象,和系统要指定的端口号9999
            ServerSocket server = new ServerSocket(9999);
            
            // 一直循环监听客户端的请求(轮询)
            while(true){
                 //3.使用服务器ServerSocket对象中的方法accpet,监听并获取请求的客户端Socket对象
            	Socket socket = server.accept();
                
                // 开启一个新的线程完成这个客户端的文件上传
                new Thread(()->{
                    try {
                        //4.使用客户端Socket对象中的方法getInputStream,获取网络字节输入流InputStream对象
            			InputStream is = socket.getInputStream();
            			
            			/*
            			    自定义一个文件的名称,防止名称的重复,覆盖之前的文件
            			    规则:不重复 ==> 自己写 ==> 域名 + 毫秒值 + 随机数
            			 */
            			String fileName = "cormorant" + System.currentTimeMillis() 
                            + new Random().nextInt(9999999) + ".jpg";
            			
            			//5.创建本地字节输出流FileOutputStream对象,绑定要输出的目的地
            			//FileOutputStream fos = new FileOutputStream(file + "\\1.jpg");  //d:\\upload\\1.jpg
            			FileOutputStream fos = new FileOutputStream(file + File.separator + fileName);
            			
                        //6.使用网络字节输入流InputStream对象中的方法read,读取客户端上传的文件
            			byte[] bytes = new byte[1024];
            			int len = 0;
            			while ((len = is.read(bytes)) != -1){
            			    //7.使用本地字节输出流FileOutputStream对象中的方法write,把读取到的文件,写到服务器的硬盘中保存
            			    fos.write(bytes, 0, len);
            			}
                        
            			//8.使用客户端Socket对象中的方法getOutputStream,获取网络字节输出流OutputStream对象
            			//9.使用网络字节输出流OutputStream对象中的方法write,给客户端回写"上传成功!"
            			socket.getOutputStream().write("上传成功".getBytes());
    			
            			//10.释放资源(fos, Socket, ServerScoket)
            			fos.close();
            			socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
            
            //让服务器一直启动,不在关闭了
            //server.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    文件上传的阻塞问题

    在这里插入图片描述

    /*
        解决:上传完图片之后,给服务器写一个结束标记,告之服务器文件已经上传完毕,无需在等待了
        Socket对象中的方法
            void shutdownOutput() 禁用此套接字的输出流。
            对于 TCP 套接字,任何以前写入的数据都将被发送,并且后跟 TCP 的正常连接终止序列
     */
    socket.shutdownOutput();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    Channel(通道)

    Buffer类(缓冲区)

    java.nio.Buffer(抽象类):用于特定原始类型(基本类型)的数据的容器。Channel进行通信时,底层全部使用Buffer。

    它的几个子类:

    • ByteBuffer:里面可以封装一个byte[]数组。【重点掌握】
    • ShortBuffer:里面可以封装一个short[]数组。
    • CharBuffer:里面可以封装一个char[]数组
    • IntBuffer:里面可以封装一个int[]数组。
    • LongBuffer:里面可以封装一个long[]数组。
    • FloatBuffer:里面可以封装一个float[]数组。
    • DoubleBuffer:里面可以封装一个double[]数组。
    • 没有boolean类型对应的Buffer

    属性

    • capacity(容量):Buffer所能够包含的元素的最大数量。定义了Buffer后,容量是不可变的。

      int capacity()		//返回此缓冲区的容量
      
      • 1
    • limit(限制):表示如果设置“限制为某一个位置,那么此位置及其后面的位置将不可用”。

      int limit()					// 获取此缓冲区的限制
      Buffer limit(int newLimit)	// 设置此缓冲区的限制
      
      • 1
      • 2
    • position(位置):当前可写入的索引。位置不能小于0,并且不能大于"限制"。

      int position()				// 获取当前可写入位置索引
      Buffer position(int p)		// 更改当前可写入位置索引
      
      • 1
      • 2
    • mark(标记):当调用缓冲区的reset()方法时,会将缓冲区的position位置重置为该mark设置的索引。

      ​ 不能小于0,不能大于position。

      Buffer mark() 		// 在此缓冲区的当前位置(索引)设置标记
      Buffer reset() 		// 将此缓冲区的位置重置为以前标记的位置
      					/* 当我们调用缓冲区的reset方法时,会将缓冲区的position索引位置重置为mark标记的位置 */
      
      • 1
      • 2
      • 3

    创建ByteBuffer

    java.nio.ByteBuffer:字节缓冲区,里边封装了一个byte类型的数组

    创建对象的方式:使用静态方法

    static ByteBuffer allocate(int capacity)		// 使用一个“容量”来创建一个“间接字节缓存区”——程序的“堆”空间中创建。
    static ByteBuffer allocateDirect(int capacity)	// 使用一个“容量”来创建一个“直接字节缓存区”——系统内存。
        											// 可以直接和系统交互,效率高
    static ByteBuffer wrap(byte[] byteArray)		// 使用一个“byte[]数组”创建一个“间接字节缓存区”。
    
    • 1
    • 2
    • 3
    • 4

    ByteBuffer成员方法

    /* 向ByteBuffer添加数据 */
    ByteBuffer put(byte b)				// 向当前可用位置添加数据,一次添加一个字节
    ByteBuffer put(byte[] byteArray)	// 向当前可用位置添加一个byte[]数组
    ByteBuffer put(byte[] byteArray, int offset, int len)		// 添加一个byte[]数组的一部分
    	/* 参数:int offset:数组的开始索引,从哪个索引开始添加
            	int len:添加个数
    	*/
    ByteBuffer put(int index, byte b) 	// 往指定索引处添加一个byte字节(替换)
    
    byte[] array()		// 获取此缓冲区的 byte 数组
    
    boolean isReadOnly()	// 获取当前缓冲区是否只读
    boolean isDirect()		// 获取当前缓冲区是否为直接缓冲区
    int remaining()			// 获取position与limit之间的元素数量
    Buffer clear()			// 还原缓冲区的状态。
    	/* 	将position设置为0
    		将限制limit设置为容量capacity
    		丢弃标记mark
    	*/
    Buffer flip()		// 缩小limit的范围。获取读取的有效数据0到position之间的数据
    	/* 	将limit设置为当前position位置; [0, 1, 2, 0, 0, 0, 0, 0, 0, 0]  position=3 limit=10
    		将当前position位置设置为0;   position=0 limit=3  new String(bytes, 0, len)
    		丢弃标记
    	*/
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    示例:

    import java.nio.ByteBuffer;
    import java.util.Arrays;
    
    public class Demo02put {
        public static void main(String[] args) {
            //创建一个长度为10的ByteBuffer ==> 包含了一个长度为10的数组,默认值:{0,0,0,..0}
            ByteBuffer buffer = ByteBuffer.allocate(10);
            System.out.println(buffer);		//java.nio.HeapByteBuffer[pos=0 lim=10 cap=10]
    
            //- byte[] array()获取此缓冲区的 byte 数组
            byte[] arr = buffer.array();
            System.out.println(Arrays.toString(arr));//[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    
            //- public ByteBuffer put(byte b):向当前可用位置添加数据,一次添加一个字节
            buffer.put((byte)1);	//1默认是int类型,put方法的参数需要byte类型,需要强转
            byte b1 = 2;
            buffer.put(b1);
            System.out.println(Arrays.toString(arr));//[1, 2, 0, 0, 0, 0, 0, 0, 0, 0]
    
            //- public ByteBuffer put(byte[] byteArray):向当前可用位置添加一个byte[]数组
            byte[] bytes = {10,20,30,40,50};
            buffer.put(bytes);
            System.out.println(Arrays.toString(arr));//[1, 2, 10, 20, 30, 40, 50, 0, 0, 0]
    
            /*
                - public ByteBuffer put(byte[] byteArray,int offset,int len):添加一个byte[]数组的一部分
                int offset:数组的开始索引,从哪个索引开始添加
                int len:添加个数
             */
            buffer.put(bytes,3,2);//40,50
            System.out.println(Arrays.toString(arr));//[1, 2, 10, 20, 30, 40, 50, 40, 50, 0]
    
            //ByteBuffer put(int index, byte b) 往指定索引处添加一个byte字节(替换)
            buffer.put(1,(byte)88);
            System.out.println(Arrays.toString(arr));//[1, 88, 10, 20, 30, 40, 50, 40, 50, 0]
            
            
            //创建一个长度为10的ByteBuffer
            ByteBuffer buffer = ByteBuffer.allocate(10);
            buffer.put((byte)0);
            buffer.put((byte)1);
            buffer.put((byte)2);
            System.out.println(Arrays.toString(buffer.array()));//[0, 1, 2, 0(positon), 0, 0, 0, 0, 0, 0(limit 10)]
    
            //public int remaining():获取position与limit之间的元素数量。
            System.out.println(buffer.remaining());//7[3-9]
    
            //public boolean isReadOnly():获取当前缓冲区是否只读。
            System.out.println(buffer.isReadOnly());//false:既能读,有能写 true:只能读,不能写(只读)
    
            //public boolean isDirect():获取当前缓冲区是否为直接缓冲区。
            System.out.println(buffer.isDirect());//false:间接字节缓冲区(堆) true:直接字节缓冲区(系统)
            System.out.println(ByteBuffer.allocateDirect(10).isDirect());//true
    
            buffer.limit(5);//设置限制为5
            System.out.println("位置:"+buffer.position()+" 限制:"+buffer.limit());//位置:3 限制:5
    
            /*
                public Buffer clear():还原缓冲区的状态。
                - 将position设置为:0
                - 将限制limit设置为容量capacity;
                - 丢弃标记mark。
             */
            //buffer.clear();
            //System.out.println("位置:"+buffer.position()+" 限制:"+buffer.limit());//位置:0 限制:10
            //System.out.println(Arrays.toString(buffer.array()));//[0, 1, 2, 0, 0, 0, 0, 0, 0, 0]
    
            /*
                public Buffer flip():缩小limit的范围。 获取读取的有效数据:0到position之间的数据
                - 将limit设置为当前position位置; [0, 1, 2, 0, 0, 0, 0, 0, 0, 0]  position=3 limit=10
                - 将当前position位置设置为0;   position=0 limit=3  new String(bytes,0,len)
                - 丢弃标记。
             */
            buffer.flip();
            System.out.println("位置:"+buffer.position()+" 限制:"+buffer.limit());//位置:0 限制:3
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    Channel 概述

    1).java.nio.channels.Channel(接口):用于 I/O 操作的连接。

    • 表示:通道。
    • 可以是“文件通道-FileChannel”、“网络通道-SocketChannel和ServerSockecChannel”。
    • 它类似于IO流,但比IO流更强大。read(byte[]) write(byte[])
    • IO流是“单向”的,Channel是“双向的”。

    2).Channel全部使用Buffer实现读、写。read(ByteBuffer) write(ByteBuffer)


    BIO / FileChannel(文件通道)

    java.nio.channels.FileChannel:用于读取、写入、映射和操作文件的通道。

    ​ 类似IO流,用于读取文件,写入文件,复制文件

    FileChannel是一个抽象类,无法直接创建对象,获取对象的方式:

    /* 使用字节输入流(FileInputStream)中的方法获取读取文件的FileChannel */
    FileChannel getChannel() 	// 返回与此文件输入流有关的唯一 FileChannel 对象
    /* 使用字节输出流(FileOutputStream)中的方法获取写入文件的FileChannel */
    FileChannel getChannel() 	// 返回与此文件输出流有关的唯一 FileChannel 对象
    
    • 1
    • 2
    • 3
    • 4

    FileChannel的成员方法:

    int read(ByteBuffer dst)  	// 读取多个字节存储到ByteBuffer中,相当于FileInputStream中的read(byte[])
    int write(ByteBuffer src)  	// 将ByteBuffer中的数据写入到文件中,相当于FileOutputStream中的write(byte[])
    
    • 1
    • 2

    示例:

    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.FileChannel;
    
    public class Demo01FileChannel {
        public static void main(String[] args) throws IOException {
            //1.创建FileInputStream对象,构造方法中绑定要读取的数据源
            FileInputStream fis = new FileInputStream("c:\\1.jpg");
            //2.创建FileOutputStream对象,构造方法中绑定要写入的目的地
            FileOutputStream fos = new FileOutputStream("d:\\1.jpg");
            //3.使用FileInputStream对象中的方法getChannel,获取读取文件的FileChannel对象
            FileChannel fisChannel = fis.getChannel();
            //4.使用FileOutputStream对象中的方法getChannel,获取写入文件的FIleChannel对象
            FileChannel fosChannel = fos.getChannel();
            //一读一写复制文件
            //5.使用读取文件的FileChannel对象中的方法read,读取文件 int read(ByteBuffer dst)
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int len = 0;
            while ((len = fisChannel.read(buffer)) != -1){
                //6.使用写入文件的FIleChannel对象中的方法write,把读取到的数据写入到文件中 int write(ByteBuffer src)
                //使用flip方法缩小limit的范围:最后一次读取的不一定是1024个字节
                System.out.println("flip前:position的位置:" + buffer.position() + ",limit:" + buffer.limit());
                buffer.flip();
                System.out.println("flip后:position的位置:" + buffer.position() + ",limit:" + buffer.limit());
                //write方法是从position(0)开始往目的地写数据,写到limit(有效字节),写完之后positon会变的
                System.out.println("write方法写数据:从" + buffer.position() + "写到:" + buffer.limit());
                fosChannel.write(buffer);//position(0)-limit(有效字节)之间的数据
                //初始化ByteBuffer的状态
                System.out.println("clear前:position的位置:" + buffer.position() + ",limit:" + buffer.limit());
                buffer.clear();//将position设置为0,将limit设置为容量(1024)
                System.out.println("clear后:position的位置:" + buffer.position()+",limit:" + buffer.limit());
                System.out.println("-----------------------------------------------");
            }
            //7.释放资源
            fosChannel.close();
            fisChannel.close();
            fos.close();
            fis.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    MappedByteBuffer:高效读写

    java.io.RandomAccessFile类:可以设置读、写模式的IO流类

    构造方法:

    public RandomAccessFile(String name, String mode)
    /* 参数:
    		String name;要读取的数据源,或者写入的目的地
    		String mode:设置流的读写模式
    			"r":只读,必须是小写
    			"rw":读写,必须是小写
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    使用 RandomAccessFile 类中的方法 getChannel 获取 FileChannel

    FileChannel getChannel() 	// 返回与此文件关联的唯一 FileChannel 对象
    
    • 1

    使用 FileChannel 类中的方法 map 获取 MappedByteBuffer

    MappedByteBuffer map(FileChannel.MapMode mode, long position, long size)  // 将此通道的文件区域直接映射到内存中
    /* 参数:
    		FileChannel.MapMode mode:设置读写的模式
    			READ_ONLY:只读映射模式
    			READ_WRITE:读取/写入映射模式
    	   long position:文件中的位置,映射区域从此位置开始,一般都是从0开始
    	   long size:要映射的区域大小,就是要复制文件的大小,单位字节
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    java.nio.MappedByteBuffer:它可以创建“直接缓存区”,将文件的磁盘数据映射到内存。

    ​ 系统内存中,和系统直接交互数据,速度快,效率高

    注意:

    • 它最大可以映射:Integer.MAX_VALUE个字节(2G)左右,直接复制的文件不能超过2G,超过了就需要分块复制。

    • 磁盘和内存实时映射:内存中直接缓冲区中的文件改变,映射的硬盘中的文件也会实时改变。效率高(内存和内存进行读写)

    MappedByteBuffer中的成员方法:

    byte get(int index)  				// 获取缓冲区中指定索引处的字节
    ByteBuffer put(int index, byte b)	// 把字节写入到指定的索引处
    
    • 1
    • 2

    示例:使用FileChannel结合MappedByteBuffer实现高效读写复制2g以上的文件

    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.RandomAccessFile;
    import java.nio.MappedByteBuffer;
    import java.nio.channels.FileChannel;
    
    public class Demo03FileChannel {
        public static void main(String[] args) throws IOException {
            long s = System.currentTimeMillis();
            //1.创建读取文件的RandomAccessFile对象,构造方法中封装要读取的数据源和设置只读模式("r")
            RandomAccessFile inRAF = new RandomAccessFile("c:\\2g.rar", "r");
            //2.创建写入文件的RandomAccessFile对象,构造方法中封装要写入的目的地和设置读写模式("rw")
            RandomAccessFile outRAF = new RandomAccessFile("d:\\2g.rar", "rw");
            //3.使用读取文件的RandomAccessFile对象中的方法getChannel,获取读取文件的FileChannel对象
            FileChannel inRAFChannel = inRAF.getChannel();
            //4.使用写入文件的RandomAccessFile对象中的方法getChannel,获取读取写入的FileChannel对象
            FileChannel outRAFChannel = outRAF.getChannel();
            //5.使用读取文件的FileChannel对象中的方法size,获取读取文件的大小(单位字节)
            long size = inRAFChannel.size();
            System.out.println(size);//2355126731 字节
    
            //定义复制文件需要使用的变量
            long count = 1;  //复制文件的块数,默认值是1
            long startIndex = 0;  //每次复制每块文件的开始索引
            long everySize = 512*1024*1024;  //分块,每块的大小 512M
            long copySize = size;  //每次复制文件的大小,默认等于文件的总大小
    
            //判断要复制的文件大小是否大于每块文件的大小
            if(size > everySize){
                //复制的文件大于512M,进行分块
                //计算复制文件可以分成几块  2242.56M/512M
                count = size%everySize==0 ? size/everySize : size/everySize+1;
                System.out.println("文件的大小:" + size + "字节,可以分成:" + count + "块");
    
                //第一次复制文件的大小等于每块的大小
                copySize = everySize;
            }
    
            //定义一个for循环,分几块就循环复制几次
            for (int i = 0; i < count; i++) {
                //6.使用读取文件的FileChannel对象中的方法map,创建读取文件的直接缓冲区MappedByteBuffer对象
                MappedByteBuffer inMap = inRAFChannel.map(FileChannel.MapMode.READ_ONLY, startIndex, copySize);
                //7.使用写入文件的FileChannel对象中的方法map,创建写入文件的直接缓冲区MappedByteBuffer对象
                MappedByteBuffer outMap = outRAFChannel.map(
                    FileChannel.MapMode.READ_WRITE, startIndex, copySize);
                System.out.println("每块文件的开始复制的索引:" + startIndex);
                System.out.println("每块文件的大小:" + copySize + "字节");
                System.out.println("--------------------------------------------");
                //8.创建for循环,循环size次
                for (int j = 0; j < copySize; j++) {
                    //9.使用取文件的直接缓冲区MappedByteBuffer对象中的方法get,读取数据源指定索引处的文件
                    byte b = inMap.get(j);
                    //10.使用写入文件的直接缓冲区MappedByteBuffer对象中的方法put,把读取到的字节写入到目的地指定的索引处
                    outMap.put(j, b);
                }
    
                //复制完每块文件,重新计算startIndex和copySize的大小
                startIndex += copySize;
                copySize = size-startIndex>everySize ? everySize : size-startIndex;
            }
    
            //11.释放资源
            outRAFChannel.close();
            inRAFChannel.close();
            outRAF.close();
            inRAF.close();
            long e = System.currentTimeMillis();
            System.out.println("复制文件共耗时:"+(e-s)+"毫秒!");//复制文件共耗时:4912毫秒!
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    NIO / SocketChannel(网络通道)

    服务器端

    相关的类:java.nio.channels.ServerSocketChannel:用于面向流的侦听套接字的可选通道。

    获取对象的方式:使用静态方法open

    static ServerSocketChannel open() 		// 打开服务器插槽通道
    
    • 1

    成员方法:

    ServerSocketChannel bind(SocketAddress local) 		// 给服务器绑定指定的端口号
    SocketChannel accept() 		// 监听客户端的请求
    SelectableChannel configureBlocking(boolean block) 		// 设置服务器的阻塞模式 true:阻塞(不写默认) false:非阻塞
    
    • 1
    • 2
    • 3

    客户端

    相关的类:java.nio.channels.SocketChannel:用于面向流的连接插座的可选通道。

    获取对象的方法:使用静态方法open

    static SocketChannel open() 		// 打开套接字通道
    
    • 1

    成员方法:

    boolean connect(SocketAddress remote) 		// 根据服务器的ip地址和端口号连接服务器
    /* 参数:
    		SocketAddress remote:封装服务器的ip地址和端口号,用的时候直接new
                返回值:boolean
                    连接服务器成功:true
                    连接服务器失败:false
    */
    SelectableChannel configureBlocking(boolean block) 	// 设置客户端的阻塞模式。true:阻塞(不写默认) false:非阻塞
    /*
    	1.客户端设置为阻塞模式:connect方法会多次尝试连接服务器
    		connect连接成功服务器,返回true
    		connect连接服务器失败,会抛出连接异常ConnectException: Connection refused: connect
    	2.客户端设置为非阻塞模式:connect方法只会连接一次服务器
    		connect方法无论连接是否成功还是失败都会false
    		所以客户端设置为非阻塞模式没有意义,结束不了轮询
    */
        
    int write(ByteBuffer src) 		// 给服务器发送数据
    int read(ByteBuffer dst) 		// 读取服务器回写的数据
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    示例

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    
    /*
        实现同步非阻塞的服务器:轮询监听客户端的请求
     */
    public class NIOTCPServer {
        public static void main(String[] args) throws IOException, InterruptedException {
            //1.使用open方法获取ServerSocketChannel对象
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            //2.使用ServerSocketChannel对象中的方法bind给服务器绑定指定的端口号
            serverSocketChannel.bind(new InetSocketAddress(8888));
    
            //SelectableChannel configureBlocking(boolean block) 设置服务器的阻塞模式 true:阻塞(不写默认) false:非阻塞
            serverSocketChannel.configureBlocking(false);
    
            //轮询监听客户端的请求 ==> 死循环一直执行,监听客户端
            while (true){
                //3.使用ServerSocketChannel对象中的方法accept监听客户端的请求
                System.out.println("服务器等待客户端的连接...");
                SocketChannel socketChannel = serverSocketChannel.accept();//accpet:非阻塞 不会等待客户端请求
    
                //对客户端SocketChannel对象进行一个非空判断,没有客户端连接服务器,accpet方法返回null
                if(socketChannel != null){
                    System.out.println("有客户端连接服务器,服务器读取客户端发送的数据,给客户端回写数据...");
    
                    //int read(ByteBuffer dst) 读取客户端发送的数据
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int len = socketChannel.read(buffer);
                    String msg = new String(buffer.array(), 0, len);
                    System.out.println("服务器读取客户端发送的数据:" + msg);
    
                    //int write(ByteBuffer src) 服务器给客户端发送数据
                    socketChannel.write(ByteBuffer.wrap("收到,谢谢".getBytes()));
    
                    System.out.println("服务器读写数据完成,结束轮询...");
                    break;//结束轮询
                }else{
                    System.out.println("没有客户端连接服务器,休息2秒钟,干点其他事情,在继续下一次轮询监听客户端连接...");
                    Thread.sleep(2000);
                }
            }
    
            //释放资源
            serverSocketChannel.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SocketChannel;
    
    /*
        客户端:轮询连接服务器。客户端轮询连接服务器成功,给服务器发送数据,读取服务器回写的数据
     */
    public class NIOTCPClient {
        public static void main(String[] args) {
            //客户端轮询连接服务器,连接成功,结束轮询
            while (true){
                try {
                    //1.使用open方法获取客户端SocketChannel对象
                    SocketChannel socketChannel = SocketChannel.open();
                    System.out.println("客户端开始连接服务器...");
    
                    //2.使用SocketChannel对象中的方法connect根据服务器的ip地址和端口号连接服务器
                    boolean b = socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
                    System.out.println(b);
                    System.out.println("客户端连接服务器成功,给服务器发送数据,读取服务器回写的数据...");
                    //int write(ByteBuffer src) 给服务器发送数据
                    ByteBuffer buffer = ByteBuffer.wrap("你好服务器".getBytes());
                    System.out.println("容量:"+buffer.capacity());
                    System.out.println("索引:"+buffer.position());
                    System.out.println("限定:"+buffer.limit());
                    socketChannel.write(buffer);
    
                    //int read(ByteBuffer dst) 读取服务器回写的数据
                    ByteBuffer buffer2 = ByteBuffer.allocate(1024);
                    int len = socketChannel.read(buffer2);
                    //len:读取的有效字节个数
                    //System.out.println("客户端读取服务器发送的数据:" + new String(buffer2.array(), 0, len));
    
                    buffer2.flip();//缩小limit的范围: position=0 limit=position(读取的有效字节个数)
                    System.out.println("客户端读取服务器发送的数据:"+new String(buffer2.array(),0,buffer2.limit()));
    
                    System.out.println("客户端读写数据完毕,结束轮询...");
                    break;
                } catch (IOException e) {
                    System.out.println("客户端connect方法连接服务器失败,休息2秒钟,干点其他事情,在继续下一次连接服务器...");
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    多路复用 / 选择器:Selector

    多路复用的概念

    选择器Selector是NIO中的重要技术之一。它与SelectableChannel联合使用实现了非阻塞的多路复用。使用它可以节省CPU资源,提高程序的运行效率。

    "多路"是指:服务器端同时监听多个“端口”的情况。每个端口都要监听多个客户端的连接。

    • 服务器端的非多路复用效果

      在这里插入图片描述

      如果不使用“多路复用”,服务器端需要开很多线程处理每个端口的请求。如果在高并发环境下,造成系统性能下降。

    • 服务器端的多路复用效果

      在这里插入图片描述

      使用了多路复用,只需要一个线程就可以处理多个通道,降低内存占用率,减少CPU切换时间,在高并发、高频段业务环境下有非常重要的优势


    选择器Selector_服务器端实现多路注册

    相关的类: java.nio.channels.Selector:SelectableChannel对象的多路复用器。

    获取Selector对象的方式:

    static Selector open() 		// 打开选择器
    
    • 1

    注册Channel(服务器通道)到Selector上:使用ServerSocketChannel中的方法

    SelectionKey register(Selector sel, int ops) 		// 使用给定的选择器注册此频道,返回一个选择键。
    /* 参数:
       	Selector sel:传递要注册的选择器对象
       	int ops:传递对应的事件
           	使用SelectionKey中的常量:SelectionKey.OP_ACCEPT(固定写法,把服务器通道注册到选择器上)
               OP_ACCEPT:监听客户端件事
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    选择器Selector_常用方法

    // 返回一个Set集合,表示:已注册通道的集合。每个已注册通道封装为一个SelectionKey对象
    Set<SelectionKey> keys()
    
    // 返回一个Set集合,表示:当前已连接的通道的集合。每个已连接通道同一封装为一个SelectionKey对象
    Set<SelectionKey> selectedKeys()
        
    int select()	// 会阻塞,直到至少有1个客户端连接。返回一个int值,表示有几个客户端连接了服务器。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    示例:多路信息接收

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.util.Iterator;
    import java.util.Set;
    
    /*
        服务器
     */
    public class TCPServer {
        public static void main(String[] args) throws IOException, InterruptedException {
            //1.创建3个ServerSocketChannel服务器对象
            ServerSocketChannel channel01 = ServerSocketChannel.open();
            ServerSocketChannel channel02 = ServerSocketChannel.open();
            ServerSocketChannel channel03 = ServerSocketChannel.open();
            //2.分别给3个ServerSocketChannel服务器对象绑定不同的端口号
            channel01.bind(new InetSocketAddress(7777));
            channel02.bind(new InetSocketAddress(8888));
            channel03.bind(new InetSocketAddress(9999));
            //3.设置3个ServerSocketChannel服务器对象为非阻塞模式(只要使用Selector选择器,服务器必须是非阻塞的)
            channel01.configureBlocking(false);
            channel02.configureBlocking(false);
            channel03.configureBlocking(false);
            //4.获取Selector对象
            Selector selector = Selector.open();
            //5.使用服务器ServerSocketChannel对象中的方法register,把3个服务器通道注册到选择器Selector上
            channel01.register(selector, SelectionKey.OP_ACCEPT);
            channel02.register(selector, SelectionKey.OP_ACCEPT);
            channel03.register(selector, SelectionKey.OP_ACCEPT);
    
            //Selector的keys()方法 此方法返回一个Set集合,表示:已注册通道的集合。每个已注册通道封装为一个SelectionKey对象。
            Set<SelectionKey> keys = selector.keys();
            System.out.println("已注册服务器通道的数量:" + keys.size());	//3
    
            //服务器轮询监听客户端的请求
            while (true){
                //Selector的select()方法:获取客户端连接的数量,没有客户端连接服务器,此方法会一直阻塞
                int select = selector.select();
                System.out.println("连接服务器的客户端数量:" + select);
    
                //Selector的selectedKeys()方法:获取当前已经连接的通道集合
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                System.out.println("已经到服务器的通道数量:" + selectionKeys.size());
    
                //处理Selector监听到客户端的请求事件:遍历Set集合,获取每一个SelectionKey对象
                Iterator<SelectionKey> it = selectionKeys.iterator();
                while (it.hasNext()){
                    SelectionKey selectionKey = it.next();
                    //获取SelectionKey里边封装的服务器ServerSocketChannel对象
                    ServerSocketChannel channel = (ServerSocketChannel)selectionKey.channel();
                    System.out.println("获取当前通道ServerSocketChannel监听的端口号:" + channel.getLocalAddress());
                    //处理监听的accept事件==>获取请求服务器的客户单SocketChannel对象
                    SocketChannel socketChannel = channel.accept();
                    //读取客户端SocketChannel发送的数据
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int len = socketChannel.read(buffer);
                    System.out.println("服务器读取到客户端发送的数据:" + new String(buffer.array(), 0, len));
    
                    //处理完SelectionKey监听到的事件,要在Set集合中移除已经处理完的SelectionKey对象
                    it.remove();//使用迭代器对象移除集合中的元素,不会抛出并发修改异常(移除的就是it.next()方法取出的对象)
                }
    
                //获取完一个客户端的连接,睡眠2秒,在进行下一次轮询
                Thread.sleep(2000);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SocketChannel;
    
    /*
        开启三个线程,每个线程分别创建一个客户端对象,连接服务器的三个端口
     */
    public class TCPClient {
        public static void main(String[] args) {
            new Thread(()->{
                //创建客户端对象,轮询连接服务器
                while (true){
                    try(SocketChannel socketChannel = SocketChannel.open();) {
                        System.out.println("客户端开始连接7777端口...");
                        socketChannel.connect(new InetSocketAddress("127.0.0.1",7777));
    
                        System.out.println("客户端连接7777端口成功,给服务器发送数据");
                        socketChannel.write(ByteBuffer.wrap("你好服务器,我是连接7777端口号的客户端对象!".getBytes()));
    
                        System.out.println("客户端7777发送数据完毕,结束轮询...");
                        break;
                    } catch (IOException e) {
                        System.out.println("客户端连接7777端口异常");
                    }
                }
            }).start();
    
            new Thread(()->{
                //创建客户端对象,轮询连接服务器
                while (true){
                    try(SocketChannel socketChannel = SocketChannel.open();) {
                        System.out.println("客户端开始连接8888端口...");
                        socketChannel.connect(new InetSocketAddress("127.0.0.1",8888));
    
                        System.out.println("客户端连接8888端口成功,给服务器发送数据");
                        socketChannel.write(ByteBuffer.wrap("你好服务器,我是连接8888端口号的客户端对象!".getBytes()));
    
                        System.out.println("客户端8888发送数据完毕,结束轮询...");
                        break;
                    } catch (IOException e) {
                        System.out.println("客户端连接8888端口异常");
                    }
                }
            }).start();
    
            new Thread(()->{
                //创建客户端对象,轮询连接服务器
                while (true){
                    try(SocketChannel socketChannel = SocketChannel.open();) {
                        System.out.println("客户端开始连接9999端口...");
                        socketChannel.connect(new InetSocketAddress("127.0.0.1",9999));
    
                        System.out.println("客户端连接9999端口成功,给服务器发送数据");
                        socketChannel.write(ByteBuffer.wrap("你好服务器,我是连接9999端口号的客户端对象!".getBytes()));
    
                        System.out.println("客户端9999发送数据完毕,结束轮询...");
                        break;
                    } catch (IOException e) {
                        System.out.println("客户端连接9999端口异常");
                    }
                }
            }).start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    AIO / 异步通道:AsynchronousServerSocketChannel

    概述

    服务器端:java.nio.channels.AsynchronousServerSocketChannel:用于面向流的侦听套接字的异步通道。

    获取对象的方法:

    static AsynchronousServerSocketChannel open() 	// 打开异步服务器套接字通道
    
    • 1

    成员方法:

    AsynchronousServerSocketChannel bind(SocketAddress local) 	// 给服务器绑定指定的端口号
    void accept(A attachment, CompletionHandler<?> handler) 	// 监听客户端的请求,默认就是非阻塞的
    /* 参数:
            A attachment:附件,可以传递null
            CompletionHandler handler:事件处理的接口,用于处理accept方法监听到的事件
    */
        
    Future<Integer> write(ByteBuffer src) 			// 给客户端发送数据。阻塞方法,会一直等待客户端发送数据
    // 读取客户端发送的数据。非阻塞的读取方法
    Future<Integer> read(ByteBuffer dst)
    void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<?> handler)
    /* 参数:
    		ByteBuffer dst: 用来存储读取到的数据
    		long timeout: 完成I / O操作的最长时间
    		TimeUnit unit: timeout参数的时间单位(TimeUnit.SECONDS:秒)
    		A attachment: 要附加到I / O操作的对象; 可以是null
    		CompletionHandler handler: 消费结果的处理程序,是一个回调函数
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    java.nio.channels.CompletionHandler接口:用于消除异步I / O操作结果的处理程序。

    CompletionHandler 也叫回调函数,accept或read方法执行成功,服务器就会自动执行这个回调函数,来读取客户端发送的数据

    接口中的方法:

    void completed(V result, A attachment) 		// 主方法执行成功,执行的方法
    void failed(Throwable exc, A attachment) 	// 主方法执行失败,执行的方法
    
    • 1
    • 2

    客户端: java.nio.channels.AsynchronousSocketChannel:用于面向流的连接插座的异步通道。

    获取对象的方法:

    static AsynchronousSocketChannel open() 		// 打开异步套接字通道
    
    • 1

    成员方法:

    Future<Void> connect(SocketAddress remote) 		// 连接服务器的方法,参数传递服务器的ip地址和端口号
    /* 注意:
    		connect是一个非阻塞的方法,不会等待方法运行完毕,连接服务器成功在执行下边的代码
    		客户端连接服务器需要时间的,如果没有连接成功,就给服务器使用write方法发送数据,会抛出异常
    */
    
    Future<Integer> write(ByteBuffer src) 			// 给服务器发送数据
    Future<Integer> read(ByteBuffer dst) 			// 读取服务器发送的数据
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    java.util.concurrent.Future接口

    接口中的方法:

    boolean isDone() 			// 如果此任务完成,则返回 true
    /*	返回true:连接服务器成功
        返回false:还没有连接上服务器(客户端连接服务器是需要时间的)
    */
    
    • 1
    • 2
    • 3
    • 4

    示例:AIO异步连接,异步阻塞读写

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousServerSocketChannel;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    /*
    	服务器端
    */
    public class AIOTCPServer {
        public static void main(String[] args) throws IOException, InterruptedException {
            //1.获取异步非阻塞的服务器AsynchronousServerSocketChannel对象
            AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
            //2.使用bind方法给AsynchronousServerSocketChannel对象绑定指定的端口号
            serverSocketChannel.bind(new InetSocketAddress(8888));
            //3.使用AsynchronousServerSocketChannel对象中的方法accept监听客户端的请求
            System.out.println("accept方法开始执行了...");
            serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
                @Override
                public void completed(AsynchronousSocketChannel result, Object attachment) {
                    System.out.println("客户端连接服务器成功...");
                    //读取客户端发送的数据  Future read(ByteBuffer dst)
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    //read方法是一个阻塞的方法,会一直等待客户端发送数据
                    Future<Integer> future = result.read(buffer);
                    Integer len = 0;
                    try {
                        len = future.get();//获取客户端发送数据长度
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                    System.out.println("服务器读取客户端发送的数据:"+new String(buffer.array(),0,len));
                }
    
                @Override
                public void failed(Throwable exc, Object attachment) {
                    System.out.println("客户端连接服务器失败...");
                }
            });
            System.out.println("accept方法执行结束了...");
            //accept方法是一个非阻塞的方法,我们执行完accept方法,可以去做其他的事情
            //死循环的目的不让程序停止(工作中:写一些具体的需求);当有客户端请求服务器,会自动执行回调函数中的方法
            while (true){
                System.out.println("正在忙其他的事情!");
                Thread.sleep(2000);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.util.concurrent.Future;
    
    /*
    	客户端
     */
    public class AIOTCPClient {
        public static void main(String[] args) throws IOException, InterruptedException {
            //1.创建异步非阻塞的客户端AsynchronousSocketChannel对象
            AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
            //2.使用AsynchronousSocketChannel对象中的方法connect连接服务器
            Future<Void> future = socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
            System.out.println(future.isDone());	//false:还未连接上服务器
            System.out.println(111);
            //休眠5秒钟,等待客户端连接服务器成功,再给服务器发送数据
            Thread.sleep(5000);
            System.out.println(future.isDone());	//true:已经连接服务器成功
            if(future.isDone()){
                socketChannel.write(ByteBuffer.wrap("你好服务器".getBytes()));
            }
            //释放资源
            socketChannel.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    示例:AIO异步连接,异步非阻塞读写

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousServerSocketChannel;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    
    /*
    	服务器端
    */
    public class AIOTCPServer {
        public static void main(String[] args) throws IOException, InterruptedException {
            //1.获取异步非阻塞的服务器AsynchronousServerSocketChannel对象
            AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
            //2.使用bind方法给AsynchronousServerSocketChannel对象绑定指定的端口号
            serverSocketChannel.bind(new InetSocketAddress(8888));
            //3.使用AsynchronousServerSocketChannel对象中的方法accept监听客户端的请求
            System.out.println("accept方法开始执行了...");
            serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
                @Override
                public void completed(AsynchronousSocketChannel result, Object attachment) {
                    System.out.println("客户端连接服务器成功...");
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    //当客户端连接服务器成功,read方法只会等待客户端10秒钟,10秒钟发送了数据,执行completed方法;10秒钟之后还没有发送数据,执行failed
                    result.read(buffer, 10, TimeUnit.SECONDS, null, new CompletionHandler<Integer, Object>() {
                        @Override
                        public void completed(Integer result, Object attachment) {
                            System.out.println("服务器读取客户端发送数据成功,执行completed方法");
                            //服务器读取客户端发送的数据
                            String msg = new String(buffer.array(), 0, result);//把读取到的有效数据,转换为字符串
                            System.out.println("服务器读取客户端发送的数据为:"+msg);
                        }
    
                        @Override
                        public void failed(Throwable exc, Object attachment) {
                            System.out.println("服务器读取客户端发送数据失败,执行failed方法");
                        }
                    });
    
                }
    
                @Override
                public void failed(Throwable exc, Object attachment) {
                    System.out.println("客户端连接服务器失败...");
                }
            });
            System.out.println("accept方法执行结束了...");
            //accept方法是一个非阻塞的方法,我们执行完accept方法,可以去做其他的事情
            //死循环的目的不让程序停止(工作中:写一些具体的需求);当有客户端请求服务器,会自动执行回调函数中的方法
            while (true){
                System.out.println("正在忙其他的事情!");
                Thread.sleep(2000);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.util.concurrent.Future;
    
    /*
    	客户端
     */
    public class AIOTCPClient {
        public static void main(String[] args) throws IOException, InterruptedException {
            //1.创建异步非阻塞的客户端AsynchronousSocketChannel对象
            AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
            //2.使用AsynchronousSocketChannel对象中的方法connect连接服务器
            Future<Void> future = socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
            System.out.println(future.isDone());//false:还未连接上服务器
            System.out.println(111);
            //休眠5秒钟,等待客户端连接服务器成功,再给服务器发送数据
            Thread.sleep(15000);
            System.out.println(future.isDone());//true:已经连接服务器成功
            if(future.isDone()){
                socketChannel.write(ByteBuffer.wrap("你好服务器".getBytes()));
            }
            //释放资源
            socketChannel.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    Socket 通信

    概述

    Linux 内核协议簇中有几十种通讯协议,AF-INET就是常见 TCP/IP 的通讯方式,AF-UNIX 是用于本机线程间通讯一种IPC机制,从用户角度看,所采用通讯模式相差不大,但就原理上看,相差较大。


    AF_INET 域 socket 通信过程

    典型的TCP/IP四层模型的通信过程:

    在这里插入图片描述

    发送方、接收方依赖 IP:Port 来标识,即将本地的 socket 绑定到对应的 IP 端口上,发送数据时,指定对方的 IP 端口,经过Internet,可以根据此 IP 端口最终找到接收方;接收数据时,可以从数据包中获取到发送方的IP端口。

    发送方通过系统调用 send() 将原始数据发送到操作系统内核缓冲区中。内核缓冲区从上到下依次经过TCP层、IP层、链路层的编码,分别添加对应的头部信息,经过网卡将一个数据包发送到网络中。经过网络路由到接收方的网卡。网卡通过系统中断将数据包通知到接收方的操作系统,再沿着发送方编码的反方向进行解码,即依次经过链路层、IP层、TCP层去除头部、检查校验等,最终将原始数据上报到接收方进程。


    AF_UNIX 域 socket 通信过程

    典型的本地 IPC,类似于管道,依赖路径名标识发送方和接收方。即发送数据时,指定接收方绑定的路径名,操作系统根据该路径名可以直接找到对应的接收方,并将原始数据直接拷贝到接收方的内核缓冲区中,并上报给接收方进程进行处理。同样的接收方可以从收到的数据包中获取到发送方的路径名,并通过此路径名向其发送数据。

    在这里插入图片描述


    异同及应用场景

    相同点

    • 操作系统提供的接口 socket(),bind(),connect(),accept(),send(),recv(),以及用来对其进行多路复用事件检测的 select(),poll(),epoll() 都是完全相同的。收发数据的过程中,上层应用感知不到底层的差别。

    不同点

    • 建立 socket 传递的地址域,及bind()的地址结构稍有区别:

      • socket() 分别传递不同的域 AF_INET 和 AF_UNIX
      • bind() 的地址结构分别为sockaddr_in(制定IP端口)和 sockaddr_un(指定路径名)
    • AF_INET 需经过多个协议层的编解码,消耗系统cpu,并且数据传输需要经过网卡,受到网卡带宽的限制。AF_UNIX 数据到达内核缓冲区后,由内核根据指定路径名找到接收方socket对应的内核缓冲区,直接将数据拷贝过去,不经过协议层编解码,节省系统 cpu,并且不经过网卡,因此不受网卡带宽的限制。

    • AF_UNIX 的传输速率远远大于 AF_INET

    • AF_INET 不仅可以用作本机的跨进程通信,同样的可以用于不同机器之间的通信,其就是为了在不同机器之间进行网络互联传递数据而生。而 AF_UNIX 则只能用于本机内进程之间的通信。


    应用场景

    • AF_UNIX 由于其对系统 cpu 的较少消耗,不受限于网卡带宽,及高效的传递速率,本机通信则首选 AF_UNIX 域
    • AF_INET 多用于跨机器之间的通信

    AFUNIX Server Socket 通信

    Java AFUNIXServerSocket类

    参考:https://vimsky.com/examples/detail/java-class-org.newsclub.net.unix.AFUNIXServerSocket.html

    依赖:

            <dependency>
                <groupId>com.kohlschutter.junixsocketgroupId>
                <artifactId>junixsocket-coreartifactId>
                <version>2.3.2version>
            dependency>
            <dependency>
                <groupId>com.kohlschutter.junixsocketgroupId>
                <artifactId>junixsocket-commonartifactId>
                <version>2.3.2version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    使用示例1:run

    import org.newsclub.net.unix.AFUNIXServerSocket; 
    
    @AllArgsConstructor
    public class SocketJob implements Runnable{
        private String path;
        
        public void run() throws IOException {
            File socketFile = new File(path);
            socketFile.deleteOnExit();
    
            final ExecutorService executorService = Executors.newCachedThreadPool();
    
            try (AFUNIXServerSocket server = AFUNIXServerSocket.newInstance()) {
                // 绑定路径
                server.bind(new AFUNIXSocketAddress(socketFile));
                System.out.println("server: " + server);
    
                while (!Thread.interrupted()) {
                    System.out.println("Waiting for connection...");
                    executorService.execute(new ClientConnection(this, server.accept()));
                }
            } finally {
                executorService.shutdown();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    使用示例2:main

    import org.newsclub.net.unix.AFUNIXServerSocket; 
    
    public static void main(String[] args) throws IOException {
        final File socketFile =
                new File(new File(System.getProperty("java.io.tmpdir")), "junixsocket-test.sock");
    
        try (AFUNIXServerSocket server = AFUNIXServerSocket.newInstance()) {
            server.bind(new AFUNIXSocketAddress(socketFile));
            System.out.println("server: " + server);
    
            while (!Thread.interrupted()) {
                System.out.println("Waiting for connection...");
                try (Socket sock = server.accept()) {
                    System.out.println("Connected: " + sock);
    
                    try (InputStream is = sock.getInputStream(); 
                         OutputStream os = sock.getOutputStream()) {
                        byte[] buf = new byte[128];
                        int read = is.read(buf);
                        System.out.println("Client's response: " + new String(buf, 0, read));
    
                        System.out.println("Saying hello to client " + os);
                        os.write("Hello, dear Client".getBytes());
                        os.flush();
                    }
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    socket 通讯消息接收

    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import lombok.AllArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.Socket;
    import java.nio.charset.StandardCharsets;
    import java.util.ArrayList;
    
    /**
     * AF_UNIX 域 socket 通讯消息接收
     */
    @AllArgsConstructor
    @Slf4j
    public class SocketMsgReceiveBO implements Runnable {
    
        /**
         * AF_UNIX 域 docker socket 客户端
         */
        private Socket socketClient;
    
        @Override
        public void run() {
            log.info("start socket client receive msg");
    
            if (socketClient == null || socketClient.isClosed()){
                log.error("socket client is unavailable");
                return;
            }
    
            StringBuffer acceptMsgBuffer = new StringBuffer();
            try (InputStream is = socketClient.getInputStream()) {
                byte[] buf = new byte[2048];
                int readLenth;
                // 退出当前循环依赖于socketClient端主动关闭链接,否则当前线程一致等待通讯
                while ((readLenth = is.read(buf)) != -1){
                    acceptMsgBuffer.append(new String(buf, 0, readLenth, StandardCharsets.UTF_8));
                    log.info("server accept msg:{}", acceptMsgBuffer);
    
                    ArrayList<String> validJsonMsgList = new ArrayList<>();
                    // 获取socket回调消息中的符合json格式的字符串
                    String validJsonMsg = getValidJsonStrFromMsg(acceptMsgBuffer.toString());
                    while (StringUtils.isNotBlank(validJsonMsg)){
                        // 添加到待解析消息集合中
                        validJsonMsgList.add(validJsonMsg);
    
                        /**
                         * 比较当前合法json格式的消息和原始消息的长度
                         * 若一致,则原始消息即为一个完整的符合json格式的消息,清空消息缓存Buffer
                         * 若不一致,则从原始消息的符合json格式片段的最后一个字符之后开始截取,等待拼接到下一次的消息
                         */
                        String originMsg = acceptMsgBuffer.toString();
                        if (validJsonMsg.length() == originMsg.length()){
                            acceptMsgBuffer = new StringBuffer();
                            break;
                        } else {
                            acceptMsgBuffer = new StringBuffer(originMsg.substring(validJsonMsg.length()));
                        }
    
                        log.info("all wait parse valid json msgs:{} and nextBuffer:{}", JSON.toJSONString(validJsonMsgList), acceptMsgBuffer);
    
                        // 解析合法的消息
                        validJsonMsgList.forEach(msg -> parseSocketMsg(msg));
                    }
    
                }
            } catch (Exception e){
                log.error("failed to receive socket client msg", e);
            } finally {
                String msg = acceptMsgBuffer.toString();
                if (StringUtils.isNotBlank(msg) && isValidSocketMsg(msg)){
                    log.info("finally parse last msg:{}", msg);
                    parseSocketMsg(msg);
                }
                if (!socketClient.isClosed()){
                    // 若socket客户端未自动关闭,则主动关闭
                    try {
                        socketClient.close();
                        log.info("success to close socket client");
                    } catch (IOException e){
                        log.error("failed to close socket client");
                    }
                }
            }
        }
    
        /**
         * 获取socket回调消息中的合法json格式的字符串
         */
        private String getValidJsonStrFromMsg(String msg) {
            if (StringUtils.isBlank(msg)){
                return "";
            }
            /**
             * 为防止出现粘包现象,对该消息进行循环判断,一旦发现内部存在合法的json格式,即任务是有效的消息
             * 依赖每次对消息进行截取长度少1的方式,以保证当不能找到匹配的JSON也能退出循环
             */
            while (!isValidSocketMsg(msg) && StringUtils.isNoneBlank(msg)){
                // 从0开始截取到当前字符串的最后一个字符。substring(startIndex, endIndex)方法入参前闭后开区间
                msg = msg.substring(0, msg.length()-1);
                int lastLeftBraceIndex = msg.lastIndexOf("}");
                if (lastLeftBraceIndex < 0){
                    // 当字符串中没有"}"字符时,退出循环
                    break;
                }
                // 从0截取到字符串的最后一个"}"字符
                msg = msg.substring(0, lastLeftBraceIndex + 1);
            }
            if (isValidSocketMsg(msg)){
                return msg;
            }
            return "";
        }
    
        /**
         * 如果接收到的消息非空,且符合json格式,则认为是合法的消息,可以进行解析
         */
        private boolean isValidSocketMsg(String msg) {
            if(StringUtils.isBlank(msg)) return false;
            try {
                JSON.parse(msg);
                return true;
            } catch (Exception e){
                return false;
            }
        }
    
        /**
         * 解析符合json格式的消息
         */
        private void parseSocketMsg(String msgStr) {
            JSONObject msgJsonObj = JSON.parseObject(msgStr);
            // TODO 根据约定的通讯字段进行相关业务逻辑
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
  • 相关阅读:
    Python究竟属不属于嵌入式语言?
    UE4 C++设计模式:装饰模式
    使用 PPO 算法进行 RLHF 的 N 步实现细节
    基于php食堂外卖系统
    我国跨国企业外汇风险管理——以海尔公司为例
    leetcode分类刷题:队列(Queue)(二、优先队列解决TopK简单问题)
    什么是Jmeter?Jmeter使用的原理步骤是什么?
    vue+electron 修改默认安装目录
    【Linux】 grep命令使用
    简单说说ConcurrentHashMap的结构和实现
  • 原文地址:https://blog.csdn.net/footless_bird/article/details/126289727