Spring 整合 Dubbo 源码: https://blog.csdn.net/menxu_work/article/details/126418487?spm=1001.2014.3001.5502
服务导出的入口为ServiceBean中的export()方法
当Spring启动完之后,通过接收Spring的ContextRefreshedEvent事件来触发export()方法的执行。
一个ServiceBean对象就表示一个Dubbo服务,ServiceBean对象中的参数就表示服务的参数,比如timeout,该对象的参数值来至@Service注解中所定义的。
服务导出主要得做两件事情:
但是在做这两件事情之前得先把服务的参数确定好,因为一个Dubbo服务的参数,除开可以在@Service注解中去配置,还会继承Dubbo服务所属应用(Application)上的配置,还可以在配置中心或JVM环境变量中去配置某个服务的参数,所以首先要做的是确定好当前服务最终的(优先级最高)的参数值。
确定好服务参数之后,就根据所配置的协议启动对应的网络服务器。在启动网络服务器时,并且在网络服务器接收请求的过程中,都可以从服务参数中获取信息,比如最大连接数,线程数,socket超时时间等等。
启动完网络服务器之后,就将服务信息注册到注册中心。同时还有向注册中心注册监听器,监听Dubbo的中的动态配置信息变更。

ServiceBean extends ApplicationListener
ServiceBean.onApplicationEvent(ContextRefreshedEvent event)
ServiceBean.export(); // 服务导出(服务注册)
super.export()
ServcieConfig.export()
ServcieConfig.checkAndUpdateSubConfigs(); // 准备服务参数
ServcieConfig.doExport(); // 导出服务
doExportUrls();
registry://zookeeper.local.com:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-provider1-application&dubbo=2.0.2&logger=log4j&pid=42222®istry=zookeeper&release=2.7.0×tamp=1661236388599ServcieConfig.doExportUrlsFor1Protocol(protocolConfig, registryURLs); //导出一个单独的服务,注册到各个注册中心
dubbo://192.168.1.5:20881/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-provider1-application&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService&bind.ip=192.168.1.5&bind.port=20881&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&logger=log4j&methods=sayHello&pid=42222&release=2.7.0&side=provider×tamp=1661236769839for registryURLs //遍历注册中心–将服务注册到注册中心
registry://zookeeper.local.com:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-provider1-application&dubbo=2.0.2&logger=log4j&pid=42222®istry=zookeeper&release=2.7.0×tamp=1661236388599
Invoker> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
// 1. 生成一个当前服务接口的代理对象
// 2. 使用代理生成一个Invoker,Invoker表示服务提供者的代理,可以使用Invoker的invoke方法执行服务
// 3. 对应的url为registry://zookeeper.local.com:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-provider1-application&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.1.5%3A20881%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-provider1-application%26bean.name%3DServiceBean%3Aorg.apache.dubbo.demo.DemoService%26bind.ip%3D192.168.1.5%26bind.port%3D20881%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26logger%3Dlog4j%26methods%3DsayHello%26pid%3D42222%26release%3D2.7.0%26side%3Dprovider%26timestamp%3D1661236769839&logger=log4j&pid=42222®istry=zookeeper&release=2.7.0×tamp=1661236388599
// 4. 这个Invoker中包括了服务的实现者、服务接口类、服务的注册地址(针对当前服务的,参数export指定了当前服务)
// 5. 此invoker表示一个可执行的服务,调用invoker的invoke()方法即可执行服务,同时此invoker也可用来导出
Exporter> exporter = protocol.export(wrapperInvoker);
// 使用特定的协议来对服务进行导出,这里的协议为RegistryProtocol,导出成功后得到一个Exporter (借助SPI ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();)
// 1. 先使用RegistryProtocol进行服务注册
// 2. 注册完了之后,使用DubboProtocol进行导出
RegistryProtocol.export(wrapperInvoker) // 特定的协议来对服务进行导出
zookeeper://zookeeper.local.com:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-provider1-application&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.1.5%3A20881%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-provider1-application%26bean.name%3DServiceBean%3Aorg.apache.dubbo.demo.DemoService%26bind.ip%3D192.168.1.5%26bind.port%3D20881%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26logger%3Dlog4j%26methods%3DsayHello%26pid%3D42222%26release%3D2.7.0%26side%3Dprovider%26timestamp%3D1661236769839&logger=log4j&pid=42222&release=2.7.0×tamp=1661236388599dubbo://192.168.1.5:20881/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-provider1-application&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService&bind.ip=192.168.1.5&bind.port=20881&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&logger=log4j&methods=sayHello&pid=42222&release=2.7.0&side=provider×tamp=1661236769839overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
exporter = doLocalExport(originInvoker, providerUrl); // 根据动态配置重写了providerUrl之后,就会调用DubboProtocol或HttpProtocol去进行导出服务

channel.close();registry.register(registeredProviderUrl); //注册服务,把简化后的服务提供者url注册到registryUrl中去
服务目录
代理对象.method() ----- invoker.invoke(Invocation)
@Override
public Object getObject() {
return get();
}
public synchronized T get() {
.....
if (ref == null) {
// 入口
init();
}
return ref; // Invoke代理
}
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
// 这里的recreate方法很重要,他会调用AppResponse的recreate方法,
// 如果AppResponse对象中存在exception信息,则此方法中会throw这个异常
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
AbstractClusterInvoker.invoke(invocation).doInvoke
@Override
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
List<Invoker<T>> invokers = list(invocation); //路由
LoadBalance loadbalance = initLoadBalance(invokers, invocation); //负载均衡
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance); //服务容错
}
FailoverClusterInvoker



@Override
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
//构建DefaultFuture、同时添加一个超时任务task
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
@Override
public void send(Object message, boolean sent) throws RemotingException {
// whether the channel is closed
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
ChannelFuture future = channel.writeAndFlush(message);
//sent=true 等待消息发出,消息发送失败抛异常
//sent=false 不等待消息发出,将消息放入IO队列,直接返回
if (sent) {
// wait timeout ms
timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
…
public static void received(Channel channel, Response response, boolean timeout) {
try {
// response的id,
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
// decrease Time
t.cancel();
}
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}