为了加深对 RPC 框架的理解,自己动手做了个简单的 RPC 框架,名字随便起个,就叫 lsf 吧。
lsf GitHub 地址:https://github.com/buyulian/lsf
目录
1、注册中心、消费者、生产证 spring bean标签定义


- <dependency>
- <groupId>com.megroupId>
- <artifactId>lsf-clientartifactId>
- <version>1.0-SNAPSHOTversion>
- dependency>


- <lsf:registry id="registry" host="127.0.0.1" port="25000"/>
-
- <lsf:provider id="helloWorldServiceLsf" alias="test"
- interface="com.me.lsf.provider.api.HelloWorldService"
- registry="registry"
- ref="helloWorldService"/>

- <dependency>
- <groupId>com.megroupId>
- <artifactId>lsf-provider-apiartifactId>
- <version>1.0-SNAPSHOTversion>
- dependency>
- <dependency>
- <groupId>com.megroupId>
- <artifactId>lsf-clientartifactId>
- <version>1.0-SNAPSHOTversion>
- dependency>
- <lsf:registry id="registry" host="127.0.0.1" port="25000"/>
-
- <lsf:consumer id="helloWorldService"
- interface="com.me.lsf.provider.api.HelloWorldService"
- registry="registry"
- alias="test"/>

- "1.0" encoding="UTF-8" ?>
- <schema xmlns="http://www.w3.org/2001/XMLSchema"
- targetNamespace="http://www.me.com/schema/lsf"
- elementFormDefault="qualified">
-
- <element name="registry">
- <complexType>
- <attribute name="id" type="string"/>
- <attribute name="host" type="string"/>
- <attribute name="port" type="string"/>
- complexType>
- element>
- <element name="provider">
- <complexType>
- <attribute name="id" type="string"/>
- <attribute name="alias" type="string"/>
- <attribute name="interface" type="string"/>
- <attribute name="ref" type="string"/>
- <attribute name="registry" type="string"/>
- complexType>
- element>
- <element name="consumer">
- <complexType>
- <attribute name="id" type="string"/>
- <attribute name="alias" type="string"/>
- <attribute name="interface" type="string"/>
- <attribute name="registry" type="string"/>
- complexType>
- element>
- schema>

- public class RpcParam {
-
- /**
- * 调用类
- */
- private String rClass;
-
- /**
- * 调用方法
- */
- private String method;
-
- /**
- * 参数列表
- */
- private String[] args;
-
- /**
- * 序列化方式
- */
- private String serializeType = SerializeTypeEnum.JSON_AUTO_TYPE.getCode();
- }
-
- public class Consumerbean {
-
- /**
- * 调用接口
- */
- private Class interfaceClass;
-
- /**
- * 调用接口名
- */
- private String interfaceName;
-
- /**
- * 别名
- */
- private String alias;
-
- /**
- * 预留
- */
- private Boolean register;
-
- /**
- * 存活生产者连接
- */
- private List
aliveConnectionList; -
- /**
- * 手工指定生产者连接
- */
- private List
fixedConnectionList; -
- /**
- * 父对象
- */
- private ParentObject parentObject;
-
- /**
- * 序列化方式
- */
- private String serializeType = SerializeTypeEnum.JSON_AUTO_TYPE.getCode();
-
- /**
- * 注册中心 bean
- */
- private RegistryBean registryBean;
- }
- public class ConsumerBeanInvocationHandler implements InvocationHandler {
-
- private static Logger logger = LoggerFactory.getLogger(ConsumerBeanInvocationHandler.class);
-
- private Consumerbean consumerbean;
-
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
-
- //获取调用的类
- Class tClass = consumerbean.getInterfaceClass();
-
- String canonicalName = tClass.getCanonicalName();
- String methodName = method.getName();
-
- ParentObject parentObject = consumerbean.getParentObject();
-
- boolean isNoRpc = isNoRpc(methodName, parentObject);
-
- if (isNoRpc) {
- return method.invoke(parentObject, args);
- }
- logger.debug("执行了 rpc 调用, class {}, method {}, args {}",canonicalName, methodName, Arrays.toString(args));
-
- //获取序列化方式
- String serializeType = consumerbean.getSerializeType();
-
- //组装 rpc 参数
- RpcParam rpcParam = getRpcParam(args, canonicalName, methodName, serializeType, method);
-
- //获取可用的生产者连接
- LsfConnection connection = consumerbean.getConnection();
-
- //调用并得到字符串结果
- String rpcResponseParamStr = getBody(rpcParam, connection);
-
- RpcResponseParam rpcResponseParam = JSON.parseObject(rpcResponseParamStr, RpcResponseParam.class);
-
- if (ErrorCodeEnum.SUCCESS.getCode().equals(rpcResponseParam.getCode())) {
- //获取序列化处理类
- LsfSerialize lsfSerialize = LsfSerializeFactory.get(serializeType);
- //反序列化结果
- Object result = lsfSerialize.deSerializeResult(method, rpcResponseParam.getResult());
- return result;
- } else {
- //生产者抛出的异常处理
- throw new RuntimeException(rpcResponseParam.getException());
- }
-
- }
-
- private String getBody(RpcParam rpcParam, LsfConnection connection) {
- LsfClient client = LsfHttpClientFactory.getClient();
- String host = connection.getHost();
- int port = connection.getPort();
- ClientParam clientParam = new ClientParam();
- clientParam.setHost(host);
- clientParam.setPort(port);
- clientParam.setUrl("/");
-
- String rpcBody = JSON.toJSONString(rpcParam);
- clientParam.setBody(rpcBody);
- // netty 执行网络调用
- return client.post(clientParam);
- }
-
-
- private RpcParam getRpcParam(Object[] args, String canonicalName, String methodName, String serializeType, Method method) {
- RpcParam rpcParam = new RpcParam();
-
- rpcParam.setrClass(canonicalName);
- rpcParam.setMethod(methodName);
- rpcParam.setSerializeType(serializeType);
-
- //获取序列化方式
- LsfSerialize lsfSerialize = LsfSerializeFactory.get(serializeType);
-
- //序列化参数
- String[] argsStrs = lsfSerialize.serializeParam(method, args);
-
- rpcParam.setArgs(argsStrs);
- return rpcParam;
- }
-
- private boolean isNoRpc(String methodName, ParentObject parentObject) {
- boolean isNoRpc = false;
- Method[] declaredMethods = parentObject.getClass().getDeclaredMethods();
- for (Method declaredMethod : declaredMethods) {
- if (declaredMethod.getName().equals(methodName)) {
- isNoRpc = true;
- break;
- }
- }
- return isNoRpc;
- }
- }
- public static String dealRequest(String body) {
-
- logger.info("center asyncDeal request {}",body);
-
- //解析 rpc 调用参数
- RpcParam rpcParam = JSON.parseObject(body, RpcParam.class);
-
- String rClassStr = rpcParam.getrClass();
- Object provider = getProvider(rClassStr);
-
- if (provider == null) {
- throw new RuntimeException("没有 对应的 provider");
- }
-
- String method = rpcParam.getMethod();
- String[] argsStr = rpcParam.getArgs();
-
- Class> aClass = provider.getClass();
-
- String resultStr = "error";
- RpcResponseParam rpcResponseParam = new RpcResponseParam();
- try {
- Method declaredMethod = null;
-
- //通过反射获取rpc调用的方法
- Method[] declaredMethods = aClass.getDeclaredMethods();
- for (Method declaredMethod1 : declaredMethods) {
- if (declaredMethod1.getName().equals(method)) {
- declaredMethod = declaredMethod1;
- break;
- }
- }
-
- if (declaredMethod == null) {
- throw new RuntimeException("没有这个方法 " + method);
- }
-
- //获取序列化实现类
- LsfSerialize lsfSerialize = LsfSerializeFactory.get(rpcParam.getSerializeType());
-
- //反序列参数
- Object[] inArgs = lsfSerialize.deSerializeParam(declaredMethod, argsStr);
-
- //调用实现类
- Object result = declaredMethod.invoke(provider, inArgs);
-
- //序列化执行结果
- String result1 = lsfSerialize.serializeResult(declaredMethod, result);
- rpcResponseParam.setCode(ErrorCodeEnum.SUCCESS.getCode());
- rpcResponseParam.setResult(result1);
-
- } catch (Exception e) {
- //若原始方法发生异常,则封装异常信息并返回给消费者
- rpcResponseParam.setCode(ErrorCodeEnum.EXCEPTION.getCode());
- rpcResponseParam.setException(e.toString());
- logger.error("lsf rpc exception rpc param {}", JSON.toJSONString(rpcParam), e);
- }
- resultStr = JSON.toJSONString(rpcResponseParam);
-
- logger.info("center asyncDeal result {}", resultStr);
- return resultStr;
- }
对扩展开放。新的的序列化方式可通过实现这个接口,并注册到序列化工厂里去实现。
- public interface LsfSerialize {
-
- String[] serializeParam(Method method, Object[] args);
-
- Object[] deSerializeParam(Method method, String[] contents);
-
- String serializeResult(Method method, Object result);
-
- Object deSerializeResult(Method method, String content);
-
- }
-
- public class JsonAutoTypeSerialize implements LsfSerialize {
-
- private static Logger logger = LoggerFactory.getLogger(JsonAutoTypeSerialize.class);
-
- {
- ParserConfig.getGlobalInstance().setSafeMode(false);
- ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
- }
-
- @Override
- public String[] serializeParam(Method method, Object[] args) {
-
- String[] argsStrs = null;
- if (args != null) {
- argsStrs = new String[args.length];
- for (int i = 0; i < args.length; i++) {
- argsStrs[i] = JSON.toJSONString(args[i], SerializerFeature.WriteClassName);
- }
- }
- return argsStrs;
- }
-
- @Override
- public Object[] deSerializeParam(Method method, String[] contents) {
- Class>[] parameterTypes = method.getParameterTypes();
- return getInArgs(contents, parameterTypes, method);
- }
-
- private Object[] getInArgs(String[] strs, Class>[] parameterTypes, Method method) {
-
- if (strs == null) {
- return null;
- }
- Type[] genericParameterTypes = method.getGenericParameterTypes();
- Object[] inArgs = new Object[strs.length];
- for (int i = 0; i < parameterTypes.length; i++) {
- Class> parameterType = parameterTypes[i];
- inArgs[i] = getObjectSuper(strs[i], parameterType, genericParameterTypes[i]);
- }
- return inArgs;
- }
-
- @Override
- public String serializeResult(Method method, Object result) {
- return JSON.toJSONString(result, SerializerFeature.WriteClassName);
- }
-
- @Override
- public Object deSerializeResult(Method method, String content) {
- Type genericReturnType = method.getGenericReturnType();
- Class> returnType = method.getReturnType();
- return getObjectSuper(content, returnType, genericReturnType);
- }
-
- private Object getObjectSuper(String content, Class> returnType, Type genericReturnType) {
- Object result = JSON.parseObject(content, returnType);
- return result;
- }
- }
先启动注册中心,然后启动生产者,最后启动消费者。