自定义基于TCP的应用层通信协议。实现客户端对服务器的远程调用
编写服务器及客户端代码
基于TCP的自定义应用层协议
一、请求
1.请求格式

type:哪个方法
length:payload的长度
payload:调用的方法的参数
2.创建Request类

二、响应
1.响应格式

type:哪个方法
length:payload的长度
payload:调用的方法的结果
2.创建Response类

三、客户端-服务器交互

四、type
- 0X1 创建channel
- 0X2 销毁channel
- 0X3 创建交换机 exchangeDeclare
- 0X4 删除交换机 exchangeDelete
- 0X5 创建队列 queueDeclare
- 0X6 删除队列 queueDelete
- 0X7 创建绑定 queueBind
- 0X8 删除绑定 queueUnbind
- 0X9 发布消息 basicPublish
- 0Xa 订阅消息 basicConsume
- 0xb 确认消息 basicAck
- 0xc 服务器给客户端推送消息(响应独有)
五、请求payload
1.BasicAruguments(方法公共参数)
- rid(一次请求/响应)
- channelId(一次逻辑上的连接)
2.每个方法的参数
需要继承BasicArguments
ExchangeDelareArguments

ExchangeDeleteArguments

QueueDeclareArguments

QueueDeleteArguments

QueueBindArguments

QueueUnbindArguments

BasicPublishArguments

BasicConsumeArguments

BasicAckArguments

以ExchangeDeclare方法为例,具体的请求格式如下:

六、响应payload
1.BasicReturns(返回结果公共参数)
- rid (一次请求/响应)
- channelId (一次逻辑上的连接)
- ok (方法运行结果)
以exchangeDeclare为例,具体的响应格式:

其他的方法返回的响应payload都是BasicReturns序列化后的结果,除了0xc,是响应独有的。
2.SubscribeReturns
服务器通过Consumer接口实现推送消息给客户端(队列收到消息的时候会调用回调方法)

响应具体格式:

编写服务器代码
一、创建BrokerServer

二、初始化ServerSocket
给Serversocket初始化,监听一个端口

三、开启服务器

四、停止服务器(便于测试)
- runnable设置成false
- 抛弃线程池的所有任务
- 关闭ServerSocket服务器连接

五、处理连接
- 读取请求并解析
- 根据请求计算响应
- 将响应返回给客户端
- 关闭客户端连接
- 清除断开连接的socket的会话信息(channelId-socket)

1.读取请求并解析

2.根据请求计算响应

3.将响应返回给客户端

4.清除有关断开连接的socket对应的会话信息

编写客户端代码
一、ConnectionFactory 连接工厂
创建Connection对象

二、Connection 一次TCP连接
- socket对象 socket=new socket(host,port)
- 多个channel 对象 (创建channel对象)
- 写入请求
- 读取响应
- 处理响应
1.Connection 属性

2.初始化

3.写入请求

4.读取响应

5.创建channel

6.处理响应
此处在构造方法中,补充创建一个扫描线程,当连接未断开时,不停的扫描(读取)服务器返回的响应。处理响应。
如果是SubScirbleReturns,使用线程池执行消费者的回调。
如果是BasicReturns,将响应的basicReturns放入对应channel的basicReturnsMap中。


7.关闭连接

三、Channel 逻辑上的连接
1.属性

2.API(远程调用服务器的)
生成rid:

创建channel

销毁channel

创建交换机

删除交换机

创建队列

删除队列

创建绑定

删除绑定

发布消息

订阅消息

确认消息

3.实现阻塞等待服务器的响应
waitResult
根据rid,在basicReturnsMap集合中找;如果找到了,就返回,找不到阻塞等待。

putResult
将rid-basicReturns 放入basicReturnsMap集合中

编写Demo
实现消息队列生产者-消费者模型。
1.生产者客户端

2.消费者客户端

测试
1. 启动服务器


2.启动生产者

3.启动消费者
