在消费端服务是基于接口调用Provider端提供的服务,所以在消费端并没有服务公共接口的实现类。
- @RestController
- @RequestMapping("/consumer")
- public class CountryController {
-
- @DubboReference(version = "2.0",lazy = true)
- CountryService countryService;
-
- @GetMapping("/getCountry")
- public JSONObject getCountry() {
- JSONObject rtn = new JSONObject();
- rtn.put("msg",countryService.getCountry());
- return rtn;
- }
- }
ReferenceAnnotationBeanPostProcessor重置服务目标接口CountryService在IOC注册表class的属性为ReferenceBean。
- public class ReferenceBean
implements FactoryBean{ - @Override
- public T getObject() {
- if (lazyProxy == null) {
- // 对目标类代理处理
- createLazyProxy();
- }
- return (T) lazyProxy;
- }
-
- private void createLazyProxy() {
-
- ProxyFactory proxyFactory = new ProxyFactory();
- proxyFactory.setTargetSource(new DubboReferenceLazyInitTargetSource());
- proxyFactory.addInterface(interfaceClass);
- Class>[] internalInterfaces = AbstractProxyFactory.getInternalInterfaces();
- for (Class> anInterface : internalInterfaces) {
- proxyFactory.addInterface(anInterface);
- }
- if (!StringUtils.isEquals(interfaceClass.getName(), interfaceName)) {
- Class> serviceInterface = ClassUtils.forName(interfaceName, beanClassLoader);
- proxyFactory.addInterface(serviceInterface);
- }
- //jdk基于接口代理
- this.lazyProxy = proxyFactory.getProxy(this.beanClassLoader);
- }
-
- private Object getCallProxy() throws Exception {
- //get reference proxy
- return referenceConfig.get();
- }
-
- private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource {
-
- @Override
- protected Object createObject() throws Exception {
- return getCallProxy();
- }
-
- @Override
- public synchronized Class> getTargetClass() {
- return getInterfaceClass();
- }
- }
- }
jdk动态代理技术生成目标接口代理类过程中需要注意 DubboReferenceLazyInitTargetSource之lazyTarget属性【属性赋值时机、属性使用时机】。
TargetSource之DubboReferenceLazyInitTargetSource是初始化lazyTarget属性值的时机其实是通过抽象类AbstractLazyCreationTargetSource完成的。
- public abstract class AbstractLazyCreationTargetSource implements TargetSource {
- /**
- * Returns the lazy-initialized target object,
- * creating it on-the-fly if it doesn't exist already.
- * @see #createObject()
- */
- @Override
- public synchronized Object getTarget() throws Exception {
- if (this.lazyTarget == null) {
- // 触发CountryService的代理
- this.lazyTarget = createObject();
- }
- return this.lazyTarget;
- }
- }
根据注解得知lazyTarget属性赋值是懒加载方式得到的,即首次获取lazyTarget对象时才真正触发其完成赋值。但是实际情况是在创建目标接口的代理类时就实现赋值操作【不知道为啥?】。
- class JdkDynamicAopProxy implements AopProxy, InvocationHandler, Serializable {
-
- // 代理目标类中的目标方法
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- Object oldProxy = null;
- boolean setProxyContext = false;
- // 通过 ProxyFactory 获取TargetSource之DubboReferenceLazyInitTargetSource
- TargetSource targetSource = this.advised.targetSource;
- Object target = null;
-
- ...
-
- Object retVal;
- // 此处就是获取目标接口CountryService的代理,其实就是lazyTarget属性值
- target = targetSource.getTarget();
- Class> targetClass = (target != null ? target.getClass() : null);
-
- // Get the interception chain for this method.
- List
-
- if (chain.isEmpty()) {// 默认情况下成立
- Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
- retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
- }
- else {
- ...
- }
- ...
- return retVal;
- }
- }
综上所述:目标接口代理类生成过程中还涉及lazyTarget属性赋值。而且发现目标接口代理类的类名、lazyTarget属性名均为CountryServiceDubboProxy0。但是两者区别是前者实例中持有的InvocationHandler类型为JdkDynamicAopProxy,后者持有的InvocationHandler类型为InvokerInvocationHandler。
接口Protocol的扩展类如下图所示:
- public interface Protocol {
- // 服务提供者注册目标接口
- @Adaptive
-
Exporter export(Invoker invoker) throws RpcException; - // 服务消费端通过注解@DubboReference引进目标接口
- @Adaptive
-
Invoker refer(Class type, URL url) throws RpcException; - default List
getServers() { - return Collections.emptyList();
- }
- }
ProtocolSerializationWrapper
- public class ServiceDiscoveryRegistry extends FailbackRegistry {
-
- private final AbstractServiceNameMapping serviceNameMapping;
-
- @Override
- public void doSubscribe(URL url, NotifyListener listener) {
- url = addRegistryClusterKey(url);
- serviceDiscovery.subscribe(url, listener);
- boolean check = url.getParameter(CHECK_KEY, false);
- String key = ServiceNameMapping.buildMappingKey(url);
- Lock mappingLock = serviceNameMapping.getMappingLock(key);
- mappingLock.lock();
- Set
subscribedServices = serviceNameMapping.getCachedMapping(url); - MappingListener mappingListener = new DefaultMappingListener(url, subscribedServices, listener);
- subscribedServices = serviceNameMapping.getAndListen(this.getUrl(), url, mappingListener);
- mappingListeners.put(url.getProtocolServiceKey(), mappingListener);
- subscribeURLs(url, listener, subscribedServices);
- }
-
- protected void subscribeURLs(URL url, NotifyListener listener, Set
serviceNames) { - //
- serviceNames = toTreeSet(serviceNames);
- String serviceNamesKey = toStringKeys(serviceNames);
- String protocolServiceKey = url.getProtocolServiceKey();
- logger.info(String.format("Trying to subscribe from apps %s for service key %s, ", serviceNamesKey, protocolServiceKey));
-
- // register ServiceInstancesChangedListener
- Lock appSubscriptionLock = getAppSubscription(serviceNamesKey);
- try {
- appSubscriptionLock.lock();
- ServiceInstancesChangedListener serviceInstancesChangedListener = serviceListeners.get(serviceNamesKey);
- if (serviceInstancesChangedListener == null) {
- serviceInstancesChangedListener = serviceDiscovery.createListener(serviceNames);
- serviceInstancesChangedListener.setUrl(url);
- for (String serviceName : serviceNames) {//
- // 获取提供端服务的IP、端口号等信息
- List
serviceInstances = serviceDiscovery.getInstances(serviceName); - if (CollectionUtils.isNotEmpty(serviceInstances)) {
- serviceInstancesChangedListener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
- }
- }
- serviceListeners.put(serviceNamesKey, serviceInstancesChangedListener);
- }
-
- if (!serviceInstancesChangedListener.isDestroyed()) {
- serviceInstancesChangedListener.setUrl(url);
- listener.addServiceListener(serviceInstancesChangedListener);
- serviceInstancesChangedListener.addListenerAndNotify(protocolServiceKey, listener);
- serviceDiscovery.addServiceInstancesChangedListener(serviceInstancesChangedListener);
- } else {
- logger.info(String.format("Listener of %s has been destroyed by another thread.", serviceNamesKey));
- serviceListeners.remove(serviceNamesKey);
- }
- } finally {
- appSubscriptionLock.unlock();
- }
- }
- }
-
-
- public abstract class AbstractServiceNameMapping implements ServiceNameMapping, ScopeModelAware {
-
- @Override
- public Set
getAndListen(URL registryURL, URL subscribedURL, MappingListener listener) { - String key = ServiceNameMapping.buildMappingKey(subscribedURL);
- // use previously cached services.
- Set
mappingServices = this.getCachedMapping(key); - // Asynchronously register listener in case previous cache does not exist or cache expired.
- if (CollectionUtils.isEmpty(mappingServices)) {
- ...
- } else {
- ExecutorService executorService = applicationModel.getFrameworkModel().getBeanFactory()
- .getBean(FrameworkExecutorRepository.class).getMappingRefreshingExecutor();
- executorService.submit(new AsyncMappingTask(listener, subscribedURL, true));
- }
-
- return mappingServices;
- }
-
- private class AsyncMappingTask implements Callable
> { - private final MappingListener listener;
- private final URL subscribedURL;
- private final boolean notifyAtFirstTime;
-
- public AsyncMappingTask(MappingListener listener, URL subscribedURL, boolean notifyAtFirstTime) {
- this.listener = listener;
- this.subscribedURL = subscribedURL;
- this.notifyAtFirstTime = notifyAtFirstTime;
- }
-
- @Override
- public Set
call() throws Exception { - synchronized (mappingListeners) {
- Set
mappedServices = emptySet(); - try {
- String mappingKey = ServiceNameMapping.buildMappingKey(subscribedURL);
- if (listener != null) {
- // 通过zk 客户端 获取服务提供端的服务名集合
- mappedServices = toTreeSet(getAndListen(subscribedURL, listener));
- Set
listeners = mappingListeners.computeIfAbsent(mappingKey, _k -> new HashSet<>()); - listeners.add(listener);
- if (CollectionUtils.isNotEmpty(mappedServices)) {
- if (notifyAtFirstTime) {
- // 将 提供端服务名 添加到本地集合缓存 serviceNameMapping 中
- // DefaultMappingListener:本地缓存 & zk 服务端 之间保证 提供端服务名 一致性
- listener.onEvent(new MappingChangedEvent(mappingKey, mappedServices));
- }
- }
- } else {
- mappedServices = get(subscribedURL);
- if (CollectionUtils.isNotEmpty(mappedServices)) {
- AbstractServiceNameMapping.this.putCachedMapping(mappingKey, mappedServices);
- }
- }
- } catch (Exception e) {
- logger.error("Failed getting mapping info from remote center. ", e);
- }
- return mappedServices;
- }
- }
- }
- }
NettyChannel真正netty发送数据。
CodecSupportNetty相关协议。