Java的Pipe是一种新的线程通信机制,传统的线程通信可以是通过共享内存的方式,socket等方式,而Pipe是通过Java NIO 通信的方式实现共享内存,优点类似于go语言的管道
先上代码
- public static void main(String[] args) throws IOException {
- Pipe open = Pipe.open();
- Pipe.SinkChannel sinkChannel = open.sink();
- ByteBuffer byteBuffer = ByteBuffer.allocateDirect(32);
- byteBuffer.clear();
- byteBuffer.put("你好 pipe".getBytes());
- byteBuffer.flip();
- sinkChannel.write(byteBuffer);
- Pipe.SourceChannel source = open.source();
- ByteBuffer byteBuffer1 = ByteBuffer.allocateDirect(32);
- source.read(byteBuffer1);
-
- }
这段代码我没有开启多线程读写
只讲底层实现
首先 SinkChannel 和 SourceChannel 是两个内部类 这个两个类都有一个SocketChannel
但是SinkChannel只提供了写方法 SourceChannel提供了读方法,相当于是对原始SocketChannel的一个封装,使其达到了一个SocketChannel 只能读一个SocketChannel 只能写的效果。所以就可以做到线程间通信
- ServerSocketChannel var1 = null;
- SocketChannel var2 = null;
- SocketChannel var3 = null;
-
- try {
- ByteBuffer var4 = ByteBuffer.allocate(16);
- ByteBuffer var5 = ByteBuffer.allocate(16);
- InetAddress var6 = InetAddress.getByName("127.0.0.1");
-
- assert var6.isLoopbackAddress();
-
- InetSocketAddress var7 = null;
-
- while(true) {
- if (var1 == null || !var1.isOpen()) {
- var1 = ServerSocketChannel.open();
- var1.socket().bind(new InetSocketAddress(var6, 0));
- var7 = new InetSocketAddress(var6, var1.socket().getLocalPort());
- }
-
- var2 = SocketChannel.open(var7);
- PipeImpl.RANDOM_NUMBER_GENERATOR.nextBytes(var4.array());
-
- do {
- var2.write(var4);
- } while(var4.hasRemaining());
-
- var4.rewind();
- var3 = var1.accept();
-
- do {
- var3.read(var5);
- } while(var5.hasRemaining());
-
- var5.rewind();
- if (var5.equals(var4)) {
- PipeImpl.this.source = new SourceChannelImpl(Initializer.this.sp, var2);
- PipeImpl.this.sink = new SinkChannelImpl(Initializer.this.sp, var3);
- break;
- }
-
- var3.close();
- var2.close();
- }
首先初始化一个ServerSocketChannel 然后绑定本地回环地址分配端口号
然后var2通过Open和ServerSocketChannel进行连接 ,紧接着var3通过acceept的方式来接受连接。如此这两个SocketChannel通过ServerSocketChannel建立了连接从而实现了跨线程通信。

而且通信底层使用了直接内存效率会高一些
而且SocketChannel还可以是非阻塞的