RPC框架-Gitee代码(麻烦点个Starred, 支持一下吧)
RPC框架-GitHub代码(麻烦点个Starred, 支持一下吧)
在core模块下的com.dcyrpc
的enumeration
包
在包内创建ResponseCode
枚举:定义响应码枚举
/**
* 响应码枚举
*/
public enum ResponseCode {
SUCCESS((byte) 1, "成功"), FAIL((byte) 2, "失败");
private byte code;
private String desc;
ResponseCode(byte code, String desc) {
this.code = code;
this.desc = desc;
}
}
在core模块下com.dcyrpc
的transprt.message
包下
创建DcyRpcResponse
类:服务提供方回复的响应
/**
* 服务提供方回复的响应
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class DcyRpcResponse {
// 请求的id
private long requestId;
// 压缩的类型
private byte compressType;
// 序列化的方式
private byte serializeType;
// 响应码类型:1 成功,2 异常
private byte code;
// 具体的消息体
private Object body;
}
在core模块channelhandler.handler
包下创建DcyRpcResponseEncoder
类:对调用结果进行编码
/**
* 编码器
*
* 4B magic(魔数值) -- Drpc.getBytes()
* 1B version(版本) -- 1
* 2B header length(首部的长度)
* 4B full length(报文的总长度)
* 1B serialize (序列化类型的长度)
* 1B compress(压缩类型的长度)
* 1B code(响应码)
* 8B requestId
*
* Object body
*/
@Slf4j
public class DcyRpcResponseEncoder extends MessageToByteEncoder<DcyRpcResponse> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, DcyRpcResponse dcyRpcResponse, ByteBuf byteBuf) throws Exception {
// 4个字节魔数值
byteBuf.writeBytes(MessageFormatConstant.MAGIC);
// 1个字节版本号
byteBuf.writeByte(MessageFormatConstant.VERSION);
// 2个字节的头部的长度
byteBuf.writeShort(MessageFormatConstant.HEADER_LENGTH);
// 总长度未知,不知道body的长度
byteBuf.writerIndex(byteBuf.writerIndex() + MessageFormatConstant.FULL_FIELD_LENGTH);
// 响应码
byteBuf.writeByte(dcyRpcResponse.getCode());
// 序列化类型
byteBuf.writeByte(dcyRpcResponse.getSerializeType());
// 压缩类型
byteBuf.writeByte(dcyRpcResponse.getCompressType());
// 8个字节的请求id
byteBuf.writeLong(dcyRpcResponse.getRequestId());
// 写入请求体body(requestPayload)
byte[] body = getBodyBytes(dcyRpcResponse.getBody());
if (body != null) {
byteBuf.writeBytes(body);
byteBuf.writeInt(MessageFormatConstant.HEADER_LENGTH + body.length);
}
int bodyLength = body ==null ? 0 : body.length;
// 重新处理报文的总长度
// 先获取当前的写指针的位置
int writerIndex = byteBuf.writerIndex();
// 将写指针的位置移动到总长度的位置上
byteBuf.writerIndex(MessageFormatConstant.MAGIC.length + MessageFormatConstant.VERSION_LENGTH + MessageFormatConstant.HEADER_FIELD_LENGTH);
byteBuf.writeInt(MessageFormatConstant.HEADER_LENGTH + bodyLength);
// 将写指针归位
byteBuf.writerIndex(writerIndex);
}
private byte[] getBodyBytes(Object body) {
// 心跳请求没有payload
if (body == null) {
return null;
}
// 对象序列化成字节数组
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(baos);
outputStream.writeObject(body);
return baos.toByteArray();
} catch (IOException e) {
log.error("序列化时出现异常");
throw new RuntimeException(e);
}
}
}
修改DcyRpcBootstrap
的start()
方法
DcyRpcResponseEncoder
响应编码器// 略....
// 3.配置服务器
serverBootstrap = serverBootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// TODO 核心内容,需要添加很多入栈和出栈的handler
socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.INFO))
// 对报文进行解码
.addLast(new DcyRpcRequestDecoder())
// 根据请求进行方法调用
.addLast(new MethodCallHandler())
// 对响应结果进行编码
.addLast(new DcyRpcResponseEncoder());
}
});
// 略....
在core模块channelhandler.handler
包下创建DcyRpcResponseEncoder
类:服务请求方对响应结果进行解码
/**
* 解码器
*/
@Slf4j
public class DcyRpcResponseDecoder extends LengthFieldBasedFrameDecoder {
public DcyRpcResponseDecoder() {
// 找到当前报文的总长度,截取报文,截取出来的报文可以进行解析
super(
// 最大帧的长度,超过这个maxFrameLength值,会直接丢弃
MessageFormatConstant.MAX_FRAME_LENGTH,
// 长度字段偏移量
MessageFormatConstant.MAGIC.length + MessageFormatConstant.VERSION_LENGTH + MessageFormatConstant.HEADER_FIELD_LENGTH,
// 长度的字段的长度
MessageFormatConstant.FULL_FIELD_LENGTH,
// todo 负载的适配长度
-(MessageFormatConstant.MAGIC.length + MessageFormatConstant.VERSION_LENGTH + MessageFormatConstant.HEADER_FIELD_LENGTH + MessageFormatConstant.FULL_FIELD_LENGTH),
// 跳过的字段
0);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
Object decode = super.decode(ctx, in);
if (decode instanceof ByteBuf) {
ByteBuf byteBuf = (ByteBuf) decode;
return decodeFrame(byteBuf);
}
return null;
}
private Object decodeFrame(ByteBuf byteBuf) {
// 1.解析魔数
byte[] magic = new byte[MessageFormatConstant.MAGIC.length];
byteBuf.readBytes(magic);
// 检测魔数值是否匹配
for (int i = 0; i < magic.length; i++) {
if (magic[i] != MessageFormatConstant.MAGIC[i]) {
throw new RuntimeException("获得的请求类型不匹配");
}
}
// 2.解析版本号
byte version = byteBuf.readByte();
if (version > MessageFormatConstant.VERSION) {
throw new RuntimeException("获得的请求版本不被支持");
}
// 3.解析头部的长度
short headLength = byteBuf.readShort();
// 4.解析总长度
int fullLength = byteBuf.readInt();
// 5.解析响应码
byte responseCode = byteBuf.readByte();
// 6.解析序列化类型
byte serializeType = byteBuf.readByte();
// 7.解析压缩型
byte compressType = byteBuf.readByte();
// 8.解析请求Id
long requestId = byteBuf.readLong();
DcyRpcResponse dcyRpcResponse = new DcyRpcResponse();
dcyRpcResponse.setCode(responseCode);
dcyRpcResponse.setCompressType(compressType);
dcyRpcResponse.setSerializeType(serializeType);
dcyRpcResponse.setRequestId(requestId);
// 9.解析消息体payload
int bodyLength = fullLength - headLength;
byte[] body = new byte[bodyLength];
byteBuf.readBytes(body);
// 解压缩和反序列化
// todo 解压缩
// 反序列化
try (ByteArrayInputStream bis = new ByteArrayInputStream(body);
ObjectInputStream ois = new ObjectInputStream(bis)
) {
Object object = ois.readObject();
dcyRpcResponse.setBody(object);
} catch (IOException | ClassNotFoundException e) {
log.error("请求【{}】反序列化时出现异常", requestId, e);
throw new RuntimeException(e);
}
return dcyRpcResponse;
}
}
修改channelHandler
包下的ConsumerChannelInitializer
类:添加入站的解码器 DcyRpcResponseDecoder()
public class ConsumerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
// netty自带的日志处理器
.addLast(new LoggingHandler(LogLevel.INFO))
// 消息编码器
.addLast(new DcyRpcRequestEncoder())
// 入站的解码器
.addLast(new DcyRpcResponseDecoder())
// 处理结果
.addLast(new MySimpleChannelInboundHandler());
}
}
修改channelhandler.handler
包下的MySimpleChannelInboundHandler
类:将响应结果的ByteBuf改成DcyRpcResponse
/**
* 处理响应结果
*/
public class MySimpleChannelInboundHandler extends SimpleChannelInboundHandler<DcyRpcResponse> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, DcyRpcResponse dcyRpcResponse) throws Exception {
// 异步
// 服务提供方,给予的结果
Object returnValue = dcyRpcResponse.getBody();
// 从全局的挂起的请求中,寻找与之匹配的待处理 completeFuture
CompletableFuture<Object> completableFuture = DcyRpcBootstrap.PENDING_REQUEST.get(1L);
completableFuture.complete(returnValue);
}
}
在core模块channelhandler.handler
包的MethodCallHandler
类:封装响应结果
// 略...
// 3.封装响应
DcyRpcResponse dcyRpcResponse = DcyRpcResponse.builder()
.code(ResponseCode.SUCCESS.getCode())
.requestId(dcyRpcRequest.getRequestId())
.compressType(dcyRpcRequest.getCompressType())
.serializeType(dcyRpcRequest.getSerializeType())
.body(result)
.build();
// 4.写出响应
channelHandlerContext.channel().writeAndFlush(dcyRpcResponse);
// 略...
在当前项目中,我们需要给请求一个唯一标识,用来标识一个请求和响应的关联关系,我们要求请求的id必须唯一,且不能占用过大的空间,可用的方案如下:
1.自增id,单机的自增id不能解决不重复的问题,微服务情况下我们需要一个稳定的发号服务才能保证,但是这样做性能偏低。
2.uuid,将uuid作为唯一标识占用空间太大
3.雪花算法,最优解。
雪花算法(snowflake)最早是twitter内部使用分布式环境下的唯一ID生成算法,他使用64位long类型的数据存储
id,具体如下:
0 - 0000000000 0000000000 0000000000 0000000000 0 - 0000000000 - 000000000000
符号位 时间戳 机器码 序列号
最高位表示符号位,其中0代表整数,1代表负数,而id一般都是正数,所以最高位为0。
通过雪花算法实现 – (世界上没有一片雪花是一样的) 5+5+42+12=64位
时间戳 (42) 机房号 (5) 机器号 (5) 序列号 (12)
101010101010101010101010101010101010101011 10101 10101 101011010101
在common块下的com.dcy
包下创建IdGenerator
类
/**
* 请求id的生成器:雪花算法
*/
public class IdGenerator {
// 起始时间戳
private static final long START_STAMP = DateUtil.get("2022-1-1").getTime();
// 机房号
public static final long DATA_CENTER_BIT = 5L;
// 机器号
public static final long MACHINE_BIT = 5L;
// 序列化号
public static final long SEQUENCE_BIT = 5L;
// 机房号的最大值
public static final long DATA_CENTER_MAX = ~(-1L << DATA_CENTER_BIT);
// 机器号的最大值
public static final long MACHINE_MAX = ~(-1L << MACHINE_BIT);
// 序列号的最大值
public static final long SEQUENCE_MAX = ~(-1L << SEQUENCE_BIT);
// 时间戳需要左移的位数
public static final long TIMESTAMP_LEFT = DATA_CENTER_BIT + MACHINE_BIT + SEQUENCE_BIT;
// 机房号需要左移的位数
public static final long DATA_CENTER_LEFT = MACHINE_BIT + SEQUENCE_BIT;
// 机器号需要左移的位数
public static final long MACHINE_LEFT = SEQUENCE_BIT;
private long dataCenterId;
private long machineId;
private LongAdder sequenceId = new LongAdder();
private long lastTimeStamp = -1;
public IdGenerator(long dataCenterId, long machineId) {
//参数是否合法
if (dataCenterId > DATA_CENTER_MAX || machineId > MACHINE_MAX) {
throw new IllegalArgumentException("传入的数据中心编号和机器编号不合法");
}
this.dataCenterId = dataCenterId;
this.machineId = machineId;
}
public long getId() {
// 1.处理时间戳的问题
long currentTime = System.currentTimeMillis();
long timeStamp = currentTime - START_STAMP;
// 2.判断时钟回拨
if (timeStamp < lastTimeStamp) {
throw new RuntimeException("服务器进行了时钟回调");
}
// 3.对sequenceId做一些处理:如果是同一个时间节点,必须自增
if (timeStamp == lastTimeStamp) {
sequenceId.increment();
if (sequenceId.sum() >= SEQUENCE_MAX) {
timeStamp = getNextTimeStamp();
sequenceId.reset();
}
} else {
sequenceId.reset();
}
// 执行结束将时间戳赋值给lastTimeStamp
lastTimeStamp = timeStamp;
long sequence = sequenceId.sum();
return timeStamp << TIMESTAMP_LEFT | dataCenterId << DATA_CENTER_LEFT | machineId << MACHINE_LEFT | sequence;
}
private long getNextTimeStamp() {
// 获取当前的时间戳
long current = System.currentTimeMillis() - START_STAMP;
// 如果一样就一直循环,直到下一个时间戳
while (current == lastTimeStamp) {
current = System.currentTimeMillis() - START_STAMP;
}
return current;
}
}
在common块下的com.dcy
包下创建DateUtil
类:用于时间日期相关的工具类
/**
* 时间日期相关的工具类
*/
public class DateUtil {
public static Date get(String pattern) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
try {
return sdf.parse(pattern);
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
}
在DcyRpcBootstrap
类中添加IdGenerator
// 略...
private int port = 8088;
public static final IdGenerator ID_GENERATOR = new IdGenerator(1, 2);
// 略...
在RpcConsumerInvocationHandler
类的封装报文位置,将requestId的值设置为通过id生成器获取
// 略...
DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder()
.requestId(DcyRpcBootstrap.ID_GENERATOR.getId())
.compressType((byte) 1)
.serializeType((byte) 1)
.requestType(RequestType.REQUEST.getId())
.requestPayload(requestPayload)
.build();
在core模块下创建serialize
包
在该包下创建Serializer
接口:序列化器
public interface Serializer {
/**
* 序列化
* @param object 待序列化的对象实例
* @return 字节数组
*/
byte[] serializer(Object object);
/**
* 反序列化
* @param bytes 待反序列化的字节数组
* @param clazz 目标类的class对象
* @return 目标实例
* @param 目标类泛型
*/
<T> T deserialize(byte[] bytes, Class<T> clazz);
}
在serialize
包下创建impl
包,创建JdkSerializer
类,实现Serializer
接口:jdk序列化器
@Slf4j
public class JdkSerializer implements Serializer{
@Override
public byte[] serializer(Object object) {
// 心跳请求没有payload
if (object == null) {
return null;
}
// 对象序列化成字节数组
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(baos)
) {
outputStream.writeObject(object);
return baos.toByteArray();
} catch (IOException e) {
log.error("序列化对象【{}】时出现异常", object);
throw new SerializeException(e);
}
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
if (bytes == null || clazz == null) {
return null;
}
// 字节数组转成对象序列化
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
ObjectInputStream objectInputStream = new ObjectInputStream(bais)
) {
return (T) objectInputStream.readObject();
} catch (IOException | ClassNotFoundException e) {
log.error("反序列化对象【{}】时出现异常", clazz);
throw new SerializeException(e);
}
}
}
在common的exceptions
中创建SerializeException
类:序列化异常处理
public class SerializeException extends RuntimeException{
public SerializeException() {
super();
}
public SerializeException(String message) {
super(message);
}
public SerializeException(Throwable cause) {
super(cause);
}
}
在consumer
包下的Application
启动类中添加序列化方法
// 略...
DcyRpcBootstrap.getInstance()
.application("first-dcyrpc-consumer")
.registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
.serialize("jdk")
.reference(reference);
// 略...
在DcyRpcBootstrap
类中,添加序列化配置项和方法
// 略...
public static String SERIALIZE_TYPE = "jdk";
// 略...
/**
* 配置序列化的方式
* @param serializeType
* @return
*/
public DcyRpcBootstrap serialize(String serializeType) {
SERIALIZE_TYPE = serializeType;
return this;
}
在core模块下的serialize
包下创建SerializerWrapper
类:序列化包装类
@NoArgsConstructor
@AllArgsConstructor
@Data
public class SerializerWrapper {
private byte code;
private String type;
private Serializer serializer;
}
在core模块下的serialize
包下创建SerializerFactory
类:序列化工厂类
/**
* 序列化工厂类
*/
public class SerializerFactory {
private final static Map<String, SerializerWrapper> SERIALIZER_CACHE = new ConcurrentHashMap<>(8);
private final static Map<Byte, SerializerWrapper> SERIALIZER_CACHE_CODE = new ConcurrentHashMap<>(8);
static {
SerializerWrapper jdk = new SerializerWrapper((byte) 1, "jdk", new JdkSerializer());
SerializerWrapper json = new SerializerWrapper((byte) 2, "json", new JsonSerializer());
SERIALIZER_CACHE.put("jdk", jdk);
SERIALIZER_CACHE.put("json", json);
SERIALIZER_CACHE_CODE.put((byte) 1, jdk);
SERIALIZER_CACHE_CODE.put((byte) 2, json);
}
/**
* 使用工厂方法获取一个SerializerWrapper
* @param serializeType 序列化的类型
* @return
*/
public static SerializerWrapper getSerializer(String serializeType) {
return SERIALIZER_CACHE.get(serializeType);
}
public static SerializerWrapper getSerializer(byte serializeCode) {
return SERIALIZER_CACHE_CODE.get(serializeCode);
}
}
修改DcyRpcRequestEncoder
类,在请求类,对请求添加有关序列化的代码
//略...
// 8个字节的请求id
byteBuf.writeLong(dcyRpcRequest.getRequestId());
// 写入请求体body(requestPayload)
// 1.根据配置的序列化方式进行序列化
Serializer serializer = SerializerFactory.getSerializer(dcyRpcRequest.getSerializeType()).getSerializer();
byte[] body = serializer.serializer(dcyRpcRequest.getRequestPayload());
// 2.根据配置的压缩方式进行压缩
if (body != null) {
byteBuf.writeBytes(body);
byteBuf.writeInt(MessageFormatConstant.HEADER_LENGTH + body.length);
}
//略...
修改RpcConsumerInvocationHandler
类:修改填写序列化器的代码
// 略...
DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder()
.requestId(DcyRpcBootstrap.ID_GENERATOR.getId())
.compressType((byte) 1)
.serializeType(SerializerFactory.getSerializer(DcyRpcBootstrap.SERIALIZE_TYPE).getCode())
.requestType(RequestType.REQUEST.getId())
.requestPayload(requestPayload)
.build();
// 略...
修改DcyRpcRequestDecoder
类,在响应类,对请求添加反序列化的代码
// 略...
// 9.解析消息体payload
int payloadLength = fullLength - headLength;
byte[] payload = new byte[payloadLength];
byteBuf.readBytes(payload);
// 解压缩和反序列化
// todo 解压缩
// 反序列化
Serializer serializer = SerializerFactory.getSerializer(serializeType).getSerializer();
RequestPayload requestPayload = serializer.deserialize(payload, RequestPayload.class);
dcyRpcRequest.setRequestPayload(requestPayload);
return dcyRpcRequest;
修改DcyRpcResponseEncoder
类:在响应类,对响应添加序列化的代码
// 略...
// 8个字节的请求id
byteBuf.writeLong(dcyRpcResponse.getRequestId());
// 写入请求体body(requestPayload)
// 对响应做序列化器
Serializer serializer = SerializerFactory.getSerializer(dcyRpcResponse.getSerializeType()).getSerializer();
byte[] body = serializer.serializer(dcyRpcResponse.getBody());
if (body != null) {
byteBuf.writeBytes(body);
byteBuf.writeInt(MessageFormatConstant.HEADER_LENGTH + body.length);
}
// 略...
修改DcyRpcResponseDecoder
类,在请求类,对响应添加反序列化的代码
// 略...
// 9.解析消息体payload
int payloadLength = fullLength - headLength;
byte[] payload = new byte[payloadLength];
byteBuf.readBytes(payload);
// 解压缩和反序列化
// todo 解压缩
Serializer serializer = SerializerFactory.getSerializer(dcyRpcResponse.getSerializeType()).getSerializer();
Object body = serializer.deserialize(payload, Object.class);
dcyRpcResponse.setBody(body);
return dcyRpcResponse;
// 略...
Hessian序列化是一种支持动态类型、跨语言、基于对象传输的网络协议,Java对象序列化的二进制流可以被其他语言(如,c++,python)。特性如下:
序列化操作:
反序列化操作:
/**
* hessian序列化器
*/
@Slf4j
public class HessianSerializer implements Serializer {
@Override
public byte[] serializer(Object object) {
// 心跳请求没有payload
if (object == null) {
return null;
}
// 对象序列化成字节数组
try (
ByteArrayOutputStream baos = new ByteArrayOutputStream();
) {
Hessian2Output hessian2Output = new Hessian2Output(baos);
hessian2Output.writeObject(object);
hessian2Output.flush();
log.info("对象使用hessian【{}】完成了序列化", object);
return baos.toByteArray();
} catch (IOException e) {
log.error("使用hessian序列化对象【{}】时出现异常", object);
throw new SerializeException(e);
}
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
if (bytes == null || clazz == null) {
return null;
}
// 字节数组转成对象序列化
try (
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
) {
HessianInput hessianInput = new HessianInput(bais);
T t = (T) hessianInput.readObject();
log.info("对象使用hessian【{}】完成了反序列化", clazz);
return t;
} catch (IOException e) {
log.error("使用hessian反序列化对象【{}】时出现异常", clazz);
throw new SerializeException(e);
}
}
}