在Dubbo中发起异步调用机制,从而提高对应的服务的调用的吞吐能力和调用机制
从2.7.0开始,Dubbo 的所有异步编程接口开始以CompletableFuture为基础,基于NIO的非阻塞实现并行调用,客户端不需要启动多线程即可完成并行调用多个远程服务,相对多线程开销较小。
首先需要服务提供者事先定义CompletableFuture签名的服务,接口定义指南如下:
Provider端异步执行将阻塞的业务从Dubbo内部线程池切换到业务自定义线程,避免Dubbo线程池的过度占用,有助于避免不同服务间的互相影响,异步执行无异于节省资源或提升RPC响应性能,因为如果业务执行需要阻塞,则始终还是要有线程来负责执行。
Provider端异步执行和Consumer端异步调用是相互独立的,任意正交组合两端配置
<properties>
<source.level>1.8source.level>
<target.level>1.8target.level>
<dubbo.version>3.0.2.1dubbo.version>
<spring.version>4.3.16.RELEASEspring.version>
<junit.version>4.12junit.version>
<maven-compiler-plugin.version>3.7.0maven-compiler-plugin.version>
properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframeworkgroupId>
<artifactId>spring-framework-bomartifactId>
<version>${spring.version}version>
<type>pomtype>
<scope>importscope>
dependency>
<dependency>
<groupId>org.apache.dubbogroupId>
<artifactId>dubbo-bomartifactId>
<version>${dubbo.version}version>
<type>pomtype>
<scope>importscope>
dependency>
<dependency>
<groupId>org.apache.dubbogroupId>
<artifactId>dubbo-dependencies-zookeeperartifactId>
<version>${dubbo.version}version>
<type>pomtype>
dependency>
<dependency>
<groupId>junitgroupId>
<artifactId>junitartifactId>
<version>${junit.version}version>
dependency>
dependencies>
dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.dubbogroupId>
<artifactId>dubboartifactId>
dependency>
<dependency>
<groupId>org.apache.dubbogroupId>
<artifactId>dubbo-dependencies-zookeeperartifactId>
<type>pomtype>
dependency>
<dependency>
<groupId>junitgroupId>
<artifactId>junitartifactId>
<scope>testscope>
dependency>
<dependency>
<groupId>org.springframeworkgroupId>
<artifactId>spring-testartifactId>
<scope>testscope>
dependency>
dependencies>
<profiles>
<profile>
<id>javax.annotationid>
<activation>
<jdk>[1.11,)jdk>
activation>
<dependencies>
<dependency>
<groupId>javax.annotationgroupId>
<artifactId>javax.annotation-apiartifactId>
<version>1.3.2version>
dependency>
dependencies>
profile>
profiles>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<version>${maven-compiler-plugin.version}version>
<configuration>
<source>${source.level}source>
<target>${target.level}target>
configuration>
plugin>
plugins>
build>
public interface AsyncService {
CompletableFuture<String> execute(String name);
}
public class AsyncServiceImpl implements AsyncService {
@Override
public CompletableFuture<String> execute(String name) {
return CompletableFuture.supplyAsync(() -> {
//TODO 功能实现机制
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "async response from provider.";
});
}
}
通过return CompletableFuture.supplyAsync() ,业务执行已从Dubbo线程切换到业务线程,避免了对 Dubbo 线程池的阻塞。
Dubbo提供了一个类似 Servlet 3.0 的异步接口AsyncContext,在没有 CompletableFuture修饰接口的情况下,也可以实现 Provider端的异步执行。
public interface AsyncService {
String execute(String name);
}
服务暴露,和普通服务完全一致
<bean id="asyncService" class="org.apache.dubbo.samples.governance.impl.AsyncServiceImpl"/>
<dubbo:service interface="org.apache.dubbo.samples.governance.api.AsyncService" ref="asyncService"/>
public class AsyncServiceImpl implements AsyncService {
public String execute(String name) {
final AsyncContext asyncContext = RpcContext.startAsync();
new Thread(() -> {
// 如果要使用上下文,则必须要放在第一句执行
asyncContext.signalContextSwitch();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 写回响应
asyncContext.write("Hello " + name + ", response from provider.");
}).start();
return null;
}
}
注意接口的返回类型是 CompletableFuture。
<dubbo:reference id="asyncService" timeout="10000" interface="com.alibaba.dubbo.samples.async.api.AsyncService"/>
// 调用直接返回CompletableFuture
// 获取到对应的asyncService服务实现类对象
CompletableFuture<String> future = asyncService.execute("async call request");
// 增加回调
future.whenComplete((v, t) -> {
if (t != null) {
t.printStackTrace();
} else {
System.out.println("Response: " + v);
}
});
// 早于结果输出
System.out.println("Executed before response return.");
在 consumer.xml 中配置
<dubbo:reference id="asyncService" interface="org.apache.dubbo.samples.governance.api.AsyncService">
<dubbo:method name="execute" async="true" />
dubbo:reference>
// 此调用会立即返回null
asyncService.execute("world");
// 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future
CompletableFuture<String> helloFuture = RpcContext.getServiceContext().getCompletableFuture();
// 为Future添加回调
helloFuture.whenComplete((retValue, exception) -> {
if (exception == null) {
System.out.println(retValue);
} else {
exception.printStackTrace();
}
});
或者,也可以这样做异步调用
CompletableFuture<String> future = RpcContext.getServiceContext().asyncCall(
() -> {
asyncService.sayHello("oneway call request1");
}
);
future.get();
异步总是不等待返回,你也可以设置是否等待消息发出
<dubbo:method name="findFoo" async="true" sent="true" />
如果你只是想异步,完全忽略返回值,可以配置 return=“false”,以减少 Future 对象的创建和管理成本
<dubbo:method name="findFoo" async="true" return="false" />