RPC(RemoteProcedureCall)—远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样
HTTP方式直接调用不叫RPC, RPC调用就好像类似调用本地方法一样
常见的 RPC 框架有: 比较知名的如阿里的Dubbo、google的gRPC、Go语言的rpcx、Apache的thrift,Spring 旗下的 Spring Cloud。
在RPC中, 调用者称为消费者, 被调用者称为服务提供者。 RPC框架就是将上面时序图的步骤封装起来, 让远程调用, 像在本地调用方法一样。
技术选型: 注册中心: netty实现, 客户端远程调用: netty实现, 服务提供: netty实现
都是最最简单实现版本, 用于学习!
@Data
public class MessageProtocol {
/**
* 调用的类名称
*/
private String className;
/**
* 内容
*/
private String methodName;
/**
* 方法参数类型
*/
private Object[] methodParameterTypes;
/**
* 方法参数
*/
private Object[] methodParameter;
/**
* 每次请求唯一ID
*/
private String requestId;
/**
* 调用结果
*/
private InvokeResult invokeResult;
}
@Data
public class InvokeResult {
/**
* 0=成功, 1=失败
*/
private Integer resultCode;
/**
* 失败原因
*/
private String failMessage;
/**
* 调用后的结果
*/
private Object invokeResult;
public InvokeResult(Integer resultCode, String failMessage) {
this.resultCode = resultCode;
this.failMessage = failMessage;
}
public InvokeResult(Integer resultCode, Object invokeResult) {
this.resultCode = resultCode;
this.invokeResult = invokeResult;
}
public InvokeResult() {
}
}
@Data
@EqualsAndHashCode
public class RegistrationInfo {
/**
* 0=注册, 1=拉取
*/
private Integer type;
/**
* 服务名称
*/
private String serviceName;
/**
* 地址
*/
private String ip;
/**
* 端口号
*/
private Integer port;
}
/**
* @Author: ZhiHao
* @Date: 2022/6/29 16:17
* @Description: 注册中心 (简单提供的功能是, 接受服务提供者注册服务, 接受消费者拉取对应需要的服务提供者元数据)
* @Versions 1.0
**/
@Slf4j
public class MyRegistrationCenter {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(2);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap = serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.childHandler(new MyChannelInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress("127.0.0.1", 8888));
channelFuture.addListener(el -> {
if (el.isSuccess()) {
log.info("MyRegistrationCenter===注册中心启动成功, 等待服务提供者或者消费者访问!");
}
});
try {
channelFuture.sync().channel().closeFuture().sync();
} catch (Exception e) {
log.info("MyRegistrationCenter===注册中心发生异常, 消息:{}", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
private static final DefaultEventLoopGroup DEFAULT_EVENT_LOOP_GROUP = new DefaultEventLoopGroup(16);
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringEncoder());
// 将ByteBuf转成为 json内容的ByteBuf
pipeline.addLast(new JsonObjectDecoder());
// 在转字符串
pipeline.addLast(new StringDecoder());
// 自定义业务处理器
pipeline.addLast(DEFAULT_EVENT_LOOP_GROUP,new MyChannelHandler());
}
}
@Slf4j
public class MyRegisteredChannelHandler extends SimpleChannelInboundHandler<String> {
private static final ConcurrentMap<String, RegistrationInfo> CONCURRENT_MAP = new ConcurrentHashMap<>();
private static final int REGISTERED = 0;
private static final int PULL_REGISTERED_INFO = 1;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
RegistrationInfo registrationInfo = JSONUtil.toBean(msg, RegistrationInfo.class);
if (Objects.nonNull(registrationInfo)) {
Integer type = Optional.ofNullable(registrationInfo.getType()).orElse(-1);
switch (type) {
case REGISTERED:
CONCURRENT_MAP.put(registrationInfo.getServiceName(), registrationInfo);
log.info("MyChannelHandler===注册成功一个服务提供者:{}",registrationInfo);
JSONObject jsonObject = new JSONObject();
jsonObject.putOnce("result", "信息注册成功!");
String jsonStr1 = jsonObject.toString();
ctx.writeAndFlush(Unpooled.wrappedBuffer(jsonStr1.getBytes(StandardCharsets.UTF_8)));
break;
case PULL_REGISTERED_INFO:
registrationInfo = CONCURRENT_MAP.get(registrationInfo.getServiceName());
if (Objects.isNull(registrationInfo)){
JSONObject jsonObject2 = new JSONObject();
jsonObject2.putOnce("result", "服务对应的注册信息不存在!");
ctx.writeAndFlush(Unpooled.wrappedBuffer(jsonObject2.toString().getBytes(StandardCharsets.UTF_8)));
return;
}
String jsonStr2 = JSONUtil.toJsonStr(registrationInfo);
log.info("MyChannelHandler===成功拉取服务提供者地址返回给客户端:{}",jsonStr2);
ctx.writeAndFlush(Unpooled.wrappedBuffer(jsonStr2.getBytes(StandardCharsets.UTF_8)));
break;
default:
JSONObject jsonObject3 = new JSONObject();
jsonObject3.putOnce("result", "type类型正确!");
ctx.writeAndFlush(Unpooled.wrappedBuffer(jsonObject3.toString().getBytes(StandardCharsets.UTF_8)));
}
}
ctx.fireChannelRead(msg);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.close();
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("MyChannelHandler===发生异常, 信息:{}", cause);
ctx.close();
super.exceptionCaught(ctx, cause);
}
}
18:02:44.914 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.rpc.registrationcenter.MyRegistrationCenter - MyRegistrationCenter===注册中心启动成功, 等待服务提供者或者消费者访问!
1