• 13.RPC的socket实现(阻塞式)与netty实现(非阻塞式)


    【README】

    • 1.本文总结了 RPC 的概念,包括定义,RPC实现,及其优缺点;
    • 2.本文po出了RPC的简单代码实现,总结自 B站《netty-尚硅谷》;
    • 3.本文部分内容总结自 :What Is Remote Procedure Call (RPC)? Definition from SearchAppArchitecture
    • 4.本文还po出了 RPC 与 RMI 的区别;
    • 5.基于socket的rpc是阻塞式的,而基于netty的rpc是非阻塞式

    【1】RPC 

    【1.1】RPC定义

    1)RPC定义: Remote Procedure Call, 远程过程调用协议

    远程过程调用是一种软件通讯协议,基于该协议,一个程序可以通过网络调用另一台计算机上的服务而无需关心细节,就像调用本地系统的服务一样。

    【补充】RMI:Remote Method Invocation,远程方法调用

    • RMI 能够让jvm上的客户端像调用本地方法一样,调用远程jvm服务器上对象的方法;
    • 显然, RMI是 RPC 的 java版本的实现

    2)常见的 RPC 框架有:

    • 比较知名的如阿里的Dubbo、google的gRPC、Go语言的rpcx、Apache的thrift, Spring 旗下的 Spring Cloud。

    【1.2】RPC的实现

    1)当触发远程过程调用,调用环境会被挂起,过程参数会通过网络传送到过程执行环境(过程参数可以理解为是类名,方法名,方法参数,方法参数类型等)。

    2)当服务器过程执行完成,结果会被回传到调用环境;只要过程调用返回则客户端立即恢复执行;

    3)RPC调用过程中,会经过如下步骤:

    • 步骤1:客户端调用客户端存根(代理);该调用是本地过程调用,并把参数压入栈;
    • 步骤2:客户端代理封装过程参数到报文,并发送报文;把过程参数打包称为 参数编组
    • 步骤3:客户端本地操作系统发送报文到远程服务器;
    • 步骤4:服务器操作系统把请求数据包送入 服务器存根(代理)
    • 步骤5:服务器代理把参数解包,这个过程称为 参数解组
    • 步骤6:当服务器过程调用完成,把结果返回给服务器代理;服务器代理把返回值封装到报文;服务器代理接着把报文传递给传输层
    • 步骤7:服务器传输层发送响应报文到客户端传输层,接着回传给客户端代理
    • 步骤8:客户端代理解组返回的参数,返回到调用者的执行点;

    4)RPC 调用流程图

    【图解】 在RPC中:

    • 1)客户端称为服务消费者;服务端称为服务提供者;
    • 2)编码是把对象转为字节数组(字节码),以便网络传输,即序列化
    • 3)解码是把字节数组(字节)转为对象,以便业务逻辑处理,即反序列化
    • 4)综上,RPC需要用到动态代理, 序列化与反序列化技术等;

     【1.3】RPC的优缺点

    1)优点:

    • 在高级语言中,有助于客户端通过传统过程调用与服务器通讯;
    • 能够被用于分布式环境,远程调用就像本地调用一样;
    • 支持面向进程与面向线程模型;
    • 对用户隐藏了内部报文传输机制;
    • 仅需极少的工作即可重写和重新开发代码;
    • 删除了许多协议层以提高性能;

    2)缺点:

    • 客户端与服务器各自使用不同执行环境,使用资源非常复杂,如文件。因此 RPC系统不适合传输大量数据;
    • RPC极易失败,因为它涉及到通讯系统,另一外机器和另一个处理过程;
    • RPC没有统一的标准,可以通过多种方式实现;
    • RPC 只是基于交互的,因此,它对硬件架构没有提供任何灵活性;

    【2】RPC的Socket实现(阻塞式)

    部分代码转自: 简单RPC之Socket实现_归田的博客-CSDN博客_rpc socket

    0)目录结构:

    【图解】RPC的实现代码包含3部分:

    1)公共协议:

    • 约定的报文格式(封装了过程调用参数,包括全限定类名,方法名,参数类型数组,参数值数组),以便于服务端(服务提供方)根据反射创建对应类与调用方法
    • 约定的过程名(如方法名):把方法名封装到接口;

    2)客户端(服务消费方):

    • 通过代理模式创建服务的代理对象;调用方法,实际上调用的是代理对象方法,代理对象方法底层封装报文通过socket请求服务器获取调用结果;(客户端在整个调用过程是阻塞的,直到结果返回

    3)服务端(服务提供方):

    • 根据报文中的 类名通过反射创建服务实例,根据方法名,参数类型,参数列表通过反射调用方法,并将结果回写到socket

    注意(报文的序列化与反序列化):

    • 客户端把报文对象写出到 socket 前需要序列化为字节数组;服务器从socket读入字节数组后,需要反序列化为报文对象;

    【2.1】协议报文与过程调用方法定义:

    报文:

    1. /**
    2. * @Description 报文(注意:要可序列化)
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月12日
    6. */
    7. public class MyRpcMessage implements Externalizable {
    8. /** 全限定类名 */
    9. private String className;
    10. /** 方法名 */
    11. private String methodName;
    12. /** 参数类型clazz数组 */
    13. private Class[] parameterTypes;
    14. /** 参数值 */
    15. private Object[] parameterValues;
    16. /**
    17. * @description 私有构造器
    18. * @author xiao tang
    19. * @date 2022/9/12
    20. */
    21. public MyRpcMessage() {
    22. }
    23. /**
    24. * @description 创建生成器
    25. * @return 生成器
    26. * @author xiao tang
    27. * @date 2022/9/12
    28. */
    29. public static Builder builder() {
    30. Builder builder = new Builder();
    31. return builder;
    32. }
    33. public String getClassName() {
    34. return className;
    35. }
    36. public void setClassName(String className) {
    37. this.className = className;
    38. }
    39. public String getMethodName() {
    40. return methodName;
    41. }
    42. public void setMethodName(String methodName) {
    43. this.methodName = methodName;
    44. }
    45. public Class[] getParameterTypes() {
    46. return parameterTypes;
    47. }
    48. public void setParameterTypes(Class[] parameterTypes) {
    49. this.parameterTypes = parameterTypes;
    50. }
    51. public Object[] getParameterValues() {
    52. return parameterValues;
    53. }
    54. public void setParameterValues(Object[] parameterValues) {
    55. this.parameterValues = parameterValues;
    56. }
    57. @Override
    58. public void writeExternal(ObjectOutput out) throws IOException {
    59. out.writeUTF(className);
    60. out.writeUTF(methodName);
    61. out.writeObject(parameterTypes);
    62. out.writeObject(parameterValues);
    63. }
    64. @Override
    65. public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
    66. this.className = in.readUTF();
    67. this.methodName = in.readUTF();
    68. this.parameterTypes = (Class[]) in.readObject();
    69. this.parameterValues = (Object[]) in.readObject();
    70. }
    71. /**
    72. * @description 生成器
    73. * @author xiao tang
    74. * @date 2022/9/12
    75. */
    76. public static class Builder {
    77. /** rpc报文对象 */
    78. private MyRpcMessage myRpcMessage;
    79. /**
    80. * @description 私有构造器
    81. * @author xiao tang
    82. * @date 2022/9/12
    83. */
    84. private Builder() {
    85. this.myRpcMessage = new MyRpcMessage();
    86. }
    87. /**
    88. * @description 设置全限定类名
    89. * @return 生成器
    90. * @author xiao tang
    91. * @date 2022/9/12
    92. */
    93. public Builder className(String className) {
    94. this.myRpcMessage.setClassName(className);
    95. return this;
    96. }
    97. /**
    98. * @description 设置方法名
    99. * @return 生成器
    100. * @author xiao tang
    101. * @date 2022/9/12
    102. */
    103. public Builder methodName(String methodName) {
    104. this.myRpcMessage.setMethodName(methodName);
    105. return this;
    106. }
    107. /**
    108. * @description 设置参数类型数组
    109. * @return 生成器
    110. * @author xiao tang
    111. * @date 2022/9/12
    112. */
    113. public Builder parameterTypes(Class[] parameterTypes) {
    114. this.myRpcMessage.setParameterTypes(parameterTypes);
    115. return this;
    116. }
    117. /**
    118. * @description 设置参数值数组
    119. * @return 生成器
    120. * @author xiao tang
    121. * @date 2022/9/12
    122. */
    123. public Builder parameterValues(Object[] parameterValues) {
    124. this.myRpcMessage.setParameterValues(parameterValues);
    125. return this;
    126. }
    127. /**
    128. * @description 获得rpc报文对象
    129. * @return rpc报文对象
    130. * @author xiao tang
    131. * @date 2022/9/12
    132. */
    133. public MyRpcMessage build() {
    134. return myRpcMessage;
    135. }
    136. }
    137. }

    方法接口定义:

    1. /**
    2. * @Description 过程调用接口
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月11日
    6. */
    7. public interface IHelloRpc {
    8. String hello(String name);
    9. }

    【2.2】服务器:

    1. /**
    2. * @Description rpc服务器
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月11日
    6. */
    7. public class MyRpcSocketServer {
    8. /** 线程池 */
    9. private static ExecutorService THREAD_POOL = Executors.newCachedThreadPool();
    10. public static void main(String[] args) {
    11. try {
    12. startServer0(8089);
    13. } catch (IOException e) {
    14. e.printStackTrace();
    15. } finally {
    16. THREAD_POOL.shutdown();
    17. }
    18. }
    19. /**
    20. * @description 开启服务器
    21. * @param port 端口
    22. * @author xiao tang
    23. * @date 2022/9/11
    24. */
    25. private static void startServer0(int port) throws IOException {
    26. ServerSocket serverSocket = new ServerSocket(port);
    27. System.out.println("服务器启动成功.");
    28. while(true) {
    29. try {
    30. // 阻塞直到有客户端请求
    31. Socket socket = serverSocket.accept();
    32. // 处理器请求
    33. THREAD_POOL.execute(new BusiTask(socket));
    34. } catch (Exception e) {
    35. e.printStackTrace();
    36. System.out.println("抛出异常");
    37. throw e;
    38. }
    39. }
    40. }
    41. /**
    42. * @description 业务作业,Runnable子类
    43. * @author xiao tang
    44. * @date 2022/9/11
    45. */
    46. static class BusiTask implements Runnable {
    47. private Socket socket;
    48. public BusiTask(Socket socket) {
    49. this.socket = socket;
    50. }
    51. @Override
    52. public void run() {
    53. // 获得字节输入输出流
    54. try (ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
    55. ObjectOutputStream outputMsg = new ObjectOutputStream(socket.getOutputStream());) {
    56. // 获取过程参数(报文),包括类名,方法名,方法参数类型,方法参数列表
    57. MyRpcMessage inputMsg = (MyRpcMessage)inputStream.readObject();
    58. // 通过反射创建对象
    59. Class clazz = Class.forName(inputMsg.getClassName());
    60. Object procedureObj = clazz.newInstance();
    61. // // 通过反射创建方法
    62. Method method = clazz.getDeclaredMethod(inputMsg.getMethodName(), inputMsg.getParameterTypes());
    63. // // 通过反射调用方法
    64. Object result = method.invoke(procedureObj, inputMsg.getParameterValues());
    65. // 把结果回写
    66. outputMsg.writeObject(result);
    67. } catch (Exception e) {
    68. e.printStackTrace();
    69. } finally {
    70. try {
    71. socket.close();
    72. } catch (IOException e) {
    73. e.printStackTrace();
    74. }
    75. }
    76. }
    77. }
    78. }

    过程调用接口实现类:

    1. /**
    2. * @Description 过程调用接口实现
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月11日
    6. */
    7. public class IHelloRpcServerImpl implements IHelloRpc {
    8. @Override
    9. public String hello(String name) {
    10. return "hello " + name + ", nice to meet you, i am a server named RPC.";
    11. }
    12. }

    【2.3】客户端:

    1. /**
    2. * @Description RPC存根(代理)工厂
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月11日
    6. */
    7. public class MyRpcProxyFactory {
    8. /**
    9. * @description 私有构造器
    10. * @author xiao tang
    11. * @date 2022/9/11
    12. */
    13. private MyRpcProxyFactory() {
    14. }
    15. /**
    16. * @description 创建代理对象
    17. * @param targetClazz 目标对象clazz
    18. * @param className 类名
    19. * @return 代理对象
    20. * @author xiao tang
    21. * @date 2022/9/11
    22. */
    23. public static T newProxy(Class targetClazz, String className) {
    24. return (T) Proxy.newProxyInstance(
    25. targetClazz.getClassLoader()
    26. , new Class[]{targetClazz}
    27. , (Object proxy, Method method, Object[] args) ->{
    28. Socket socket = new Socket("localhost",8089);
    29. // 获取输出流
    30. try(ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
    31. ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream())) {
    32. // 写出过程参数到服务器,包括类名,方法名,方法参数类型,方法参数列表
    33. MyRpcMessage message = MyRpcMessage.builder()
    34. .className(className)
    35. .methodName(method.getName())
    36. .parameterTypes(method.getParameterTypes())
    37. .parameterValues(args)
    38. .build();
    39. outputStream.writeObject(message);
    40. outputStream.flush();
    41. // 获取输入流
    42. Object result = inputStream.readObject();
    43. if (result instanceof Throwable) {
    44. throw (Throwable)result;
    45. }
    46. return result;
    47. } finally {
    48. socket.close();
    49. }
    50. });
    51. }
    52. }

    测试用例入口:

    1. public class MyRpcSocketClient {
    2. public static void main(String[] args) {
    3. IHelloRpc helloRpc = MyRpcProxyFactory.newProxy(IHelloRpc.class, "com.my.netty.rpc.socket.server.IHelloRpcServerImpl");
    4. String result = helloRpc.hello("成都");
    5. System.out.println(result);
    6. }
    7. }

    【2.4】运行效果:

    hello 成都, nice to meet you, i am a server named RPC.

    【3】RPC的netty 实现

    1)背景: 基于socket的rpc实现时阻塞式的,吞吐量不高; 本文尝试使用netty 实现rpc ;

    2)netyt优势:

    • netty是非阻塞的,能够很好利用多核cpu的算力,增大系统吞吐量;
    • netty底层采用零拷贝,报文通讯性能更优;

    3)代码目录:

    【图解】 代码还是分为3部分(常量不算)

    1)公共协议:

    • 把过程方法抽象为接口,客户端与服务器以该接口进行通讯;

    2)客户端:

    • 通过代理模式创建服务的代理对象;调用方法,实际上调用的是代理对象方法,代理对象方法底层封装报文(一个简单的字符串)通过 netty非阻塞式连接服务器,并把报文写出到channel,送入服务器(客户端在整个调用过程是非阻塞的);
    • 当前线程睡眠直到服务端的响应报文回写到客户端;

    3)服务端:

    • 服务器在接收到客户端请求报文后,若协议报文头以 HelloServer#hello# 开头,则调用预设  HelloService.hello() 方法;
    • 调用hello() 方法完成后,把结果回写到netty的channel通道
    • 补充:为了简化编程,基于netty的rpc实现没有使用反射来创建类和调用方法,而是写死 方法;当然了,像上文的socket版本的rpc一样,可以把类名,方法名,参数类型,参数值列表等过程参数封装到报文头,服务器可以根据上述报文头通过反射来创建类和调用方法

    【3.1】公共协议(约定)

    1)公共接口;

    1. /**
    2. * @Description 公共接口,服务提供方,服务消费方都需要
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月11日
    6. */
    7. public interface HelloService {
    8. String hello(String msg);
    9. }

    常量:

    1. public class MyDubboConst {
    2. /** 协议头 */
    3. public static final String PROTOCOL_HEAD = "HelloServer#hello#";
    4. }

    【3.2】 客户端

    1)netty客户端代理:

    1. /**
    2. * @Description 客户端-服务消费者
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月11日
    6. */
    7. public class MyDubboNettyClient {
    8. // 创建线程池
    9. private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    10. // 客户端处理器
    11. private static MyDubboNettyClientHandler clientHandler;
    12. private int counter = 0;
    13. // 编写方法,使用代理模式,获取代理对象
    14. public Object getBean(final Class serviceClass, final String protocolHead) {
    15. return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
    16. new Class[]{serviceClass}, (Object proxy, Method method, Object[] args)->{
    17. System.out.println("Object proxy, Method method, Object[] args 进入了,第 " + (++counter) + " 次进入");
    18. // 下面的代码, 客户端每调用一次hello,就会进入到该代码
    19. if (clientHandler == null) {
    20. initClientHandler(); // 初始化客户端处理器
    21. }
    22. // 设置要发送给服务器的信息
    23. // protocolHead 协议头部
    24. // args[0] 就是客户端调用 hello的参数
    25. System.out.println("clientHandler.setParameter");
    26. clientHandler.setParameter(protocolHead + args[0]);
    27. System.out.println("executorService.submit");
    28. return executorService.submit(clientHandler).get();
    29. });
    30. }
    31. // 初始化客户端处理器
    32. private static void initClientHandler() {
    33. clientHandler = new MyDubboNettyClientHandler();
    34. // 创建 EventLoopGroup
    35. NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    36. Bootstrap bootstrap = new Bootstrap();
    37. bootstrap.group(eventLoopGroup)
    38. .channel(NioSocketChannel.class)
    39. .option(ChannelOption.TCP_NODELAY, true)
    40. .handler(new ChannelInitializer() {
    41. @Override
    42. protected void initChannel(SocketChannel socketChannel) throws Exception {
    43. ChannelPipeline pipeline = socketChannel.pipeline();
    44. pipeline.addLast(new StringDecoder());
    45. pipeline.addLast(new StringEncoder());
    46. pipeline.addLast(clientHandler);
    47. }
    48. });
    49. // 启动客户端
    50. ChannelFuture channelFuture = null;
    51. try {
    52. channelFuture = bootstrap.connect("127.0.0.1", 8089).sync();
    53. } catch (InterruptedException e) {
    54. e.printStackTrace();
    55. }
    56. System.out.println("客户端启动成功");
    57. }
    58. }

    2)netty客户端业务逻辑处理器

    1. /**
    2. * @Description dubbo客户端处理器
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月11日
    6. */
    7. public class MyDubboNettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
    8. private ChannelHandlerContext context; // 上下文
    9. private String result; // 返回结果
    10. private String parameter; // 客户端调用方法时,传入的参数
    11. // 与服务器连接创建成功后,就会被调用 (第1个被调用 )
    12. @Override
    13. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    14. System.out.println("MyDubboNettyClientHandler.channelActive");
    15. context = ctx; // 因为在其他方法也会用到这个 context
    16. }
    17. // 收到服务器的数据后,就会调用方法
    18. @Override
    19. public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    20. System.out.println("MyDubboNettyClientHandler.channelRead");
    21. result = msg.toString();
    22. notify(); // 唤醒等待的线程
    23. }
    24. @Override
    25. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    26. System.out.println("MyDubboNettyClientHandler.exceptionCaught");
    27. ctx.close();
    28. }
    29. // 被代理对象调用, 发送数据给服务器 -> 然后 wait -> 等待被唤醒 -> 返回结果 (第3个被调用)
    30. @Override
    31. public synchronized Object call() throws Exception {
    32. System.out.println("发送数据到服务器, parameter = " + parameter);
    33. context.writeAndFlush(parameter); // 发送数据给服务器
    34. System.out.println("发送数据到服务成功");
    35. wait(); // 等待直到服务器返回结果,服务器返回后调用 channelRead 方法来唤醒;
    36. System.out.println("after wait().");
    37. return result; // 服务提供方返回的结果
    38. }
    39. // 第2个被调用,设置参数
    40. public void setParameter(String parameter) {
    41. System.out.println("setParameter");
    42. this.parameter = parameter;
    43. }
    44. }

    3)客户端启动引导程序:

    1. public class MyDubboClientBootstrap {
    2. public static void main(String[] args) {
    3. // 创建访问消费者-netty客户端
    4. MyDubboNettyClient customer = new MyDubboNettyClient();
    5. // 创建代理对象
    6. HelloService helloService = (HelloService)customer.getBean(HelloService.class, MyDubboConst.PROTOCOL_HEAD);
    7. // 通过代理对象调用服务提供者的服务
    8. String result = helloService.hello("hello服务器");
    9. System.out.println("服务器响应结果 = " + result);
    10. }
    11. }

    【3.3】服务器

    1)过程调用(方法调用)的具体实现

    1. /**
    2. * @Description 服务提供方实现接口
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月11日
    6. */
    7. public class HelloServiceImpl implements HelloService {
    8. private int counter;
    9. // 当有消费方调用该方法时,就返回一个字符串
    10. @Override
    11. public String hello(String msg) {
    12. System.out.println("收到客户端消息 = " + msg);
    13. // 根据msg 返回不同结果
    14. if (msg != null) {
    15. return "你好客户端,我收到你的消息 ["+msg+"]这是第" + (++counter) + "次收到消息";
    16. } else {
    17. return "你好客户端,我收到的消息为空" ;
    18. }
    19. }
    20. }

    2)netty服务器

    1. /**
    2. * @Description 基于netty的 Dubbo 服务器
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月11日
    6. */
    7. public class MyDubboNettyServer {
    8. public static void startServer(String hostName, int port) throws InterruptedException {
    9. startServer0(hostName, port);
    10. }
    11. private static void startServer0(String hostName, int port) throws InterruptedException {
    12. EventLoopGroup bossGroup = new NioEventLoopGroup();
    13. EventLoopGroup workerGroup = new NioEventLoopGroup();
    14. try {
    15. ServerBootstrap serverBootstrap = new ServerBootstrap();
    16. serverBootstrap.group(bossGroup, workerGroup)
    17. .channel(NioServerSocketChannel.class)
    18. .childHandler(new ChannelInitializer() {
    19. @Override
    20. protected void initChannel(SocketChannel socketChannel) throws Exception {
    21. ChannelPipeline pipeline = socketChannel.pipeline();
    22. pipeline.addLast(new StringDecoder());
    23. pipeline.addLast(new StringEncoder());
    24. pipeline.addLast(new MyDubboNettyServerHandler()) ; // 自定义业务处理器
    25. }
    26. });
    27. // 启动服务器
    28. ChannelFuture channelFuture = serverBootstrap.bind(hostName, port).sync();
    29. System.out.println("服务提供方启动成功,开始提供服务");
    30. // 监听关闭
    31. channelFuture.channel().closeFuture().sync();
    32. } finally {
    33. bossGroup.shutdownGracefully();
    34. workerGroup.shutdownGracefully();
    35. }
    36. }
    37. }

    3)netty服务器业务处理器

    1. /**
    2. * @Description 基于netty的 Dubbo 服务器处理器
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月11日
    6. */
    7. public class MyDubboNettyServerHandler extends ChannelInboundHandlerAdapter {
    8. @Override
    9. public void channelRead(ChannelHandlerContext ctx, Object msg) {
    10. // 获取客户端发送的消息 并 调用公共接口服务
    11. System.out.println("接收的msg = " + msg);
    12. // 客户端在调用服务器的api时,我们需要定义一个协议
    13. // 如 msg 以 "HelloServer#hello#" 开头,才调用api
    14. String msgString = msg.toString();
    15. if (msgString.startsWith(MyDubboConst.PROTOCOL_HEAD)) {
    16. String result = new HelloServiceImpl().hello(msgString.substring(msgString.lastIndexOf("#")+1));
    17. ctx.writeAndFlush(result);
    18. } else {
    19. ctx.writeAndFlush("协议头非法");
    20. }
    21. }
    22. @Override
    23. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    24. cause.printStackTrace();
    25. ctx.close();
    26. }
    27. }

    4)服务器启动引导程序:

    1. public class MyDubboServerBootstrap {
    2. public static void main(String[] args) throws InterruptedException {
    3. // 代码代填
    4. MyDubboNettyServer.startServer("127.0.0.1", 8089);
    5. }
    6. }

    【3.4】运行效果:

    1)服务器日志:

    1. 服务提供方启动成功,开始提供服务
    2. [DEBUG] Recycler - -Dio.netty.recycler.maxCapacityPerThread: 32768
    3. [DEBUG] Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
    4. [DEBUG] Recycler - -Dio.netty.recycler.linkCapacity: 16
    5. [DEBUG] Recycler - -Dio.netty.recycler.ratio: 8
    6. [DEBUG] AbstractByteBuf - -Dio.netty.buffer.bytebuf.checkAccessible: true
    7. [DEBUG] ResourceLeakDetectorFactory$DefaultResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@e62edfd
    8. 接收的msg = HelloServer#hello#hello服务器
    9. 收到客户端消息 = hello服务器

    2)客户端日志:

    1. 客户端启动成功
    2. clientHandler.setParameter
    3. MyDubboNettyClientHandler.channelActive
    4. setParameter
    5. executorService.submit
    6. 发送数据到服务器, parameter = HelloServer#hello#hello服务器
    7. [DEBUG] Recycler - -Dio.netty.recycler.maxCapacityPerThread: 32768
    8. [DEBUG] Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
    9. [DEBUG] Recycler - -Dio.netty.recycler.linkCapacity: 16
    10. [DEBUG] Recycler - -Dio.netty.recycler.ratio: 8
    11. 发送数据到服务成功
    12. [DEBUG] AbstractByteBuf - -Dio.netty.buffer.bytebuf.checkAccessible: true
    13. [DEBUG] ResourceLeakDetectorFactory$DefaultResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@89e890f
    14. MyDubboNettyClientHandler.channelRead
    15. after wait().
    16. 服务器响应结果 = 你好客户端,我收到你的消息 [hello服务器]这是第1次收到消息

     

  • 相关阅读:
    HCIE Routing&Switching之MPLS LDP理论
    CLUE模型构建方法、模型验证及土地利用变化情景预测实践技术
    mysql 闪回表工具
    1201. Ugly Number III && 264. Ugly Number ll
    [附源码]计算机毕业设计springboot大学生考勤管理系统论文
    require、loadfile、dofile、load、loadstring
    vscode连接服务器
    LabVIEW中图像显示错误
    kubelet gc 源码分析
    Hopcroft–Karp algorithm
  • 原文地址:https://blog.csdn.net/PacosonSWJTU/article/details/126808608