• 【Dubbo3高级特性】「框架与服务」框架与服务的异步调用实践以及开发模式


    异步调用

    在Dubbo中发起异步调用机制,从而提高对应的服务的调用的吞吐能力和调用机制

    特性说明

    技术背景

    从2.7.0开始,Dubbo 的所有异步编程接口开始以CompletableFuture为基础,基于NIO的非阻塞实现并行调用,客户端不需要启动多线程即可完成并行调用多个远程服务,相对多线程开销较小。

    使用场景

    使用CompletableFuture修饰的接口
    1. 首先需要服务提供者事先定义CompletableFuture签名的服务,接口定义指南如下:

    2. Provider端异步执行将阻塞的业务从Dubbo内部线程池切换到业务自定义线程,避免Dubbo线程池的过度占用,有助于避免不同服务间的互相影响,异步执行无异于节省资源或提升RPC响应性能,因为如果业务执行需要阻塞,则始终还是要有线程来负责执行。

    3. Provider端异步执行和Consumer端异步调用是相互独立的,任意正交组合两端配置

      • Consumer同步 - Provider同步
      • Consumer异步 - Provider同步
      • Consumer同步 - Provider异步
      • Consumer异步 - Provider异步
    Maven依赖配置
     <properties>
            <source.level>1.8source.level>
            <target.level>1.8target.level>
            <dubbo.version>3.0.2.1dubbo.version>
            <spring.version>4.3.16.RELEASEspring.version>
            <junit.version>4.12junit.version>
            <maven-compiler-plugin.version>3.7.0maven-compiler-plugin.version>
        properties>
    <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframeworkgroupId>
                    <artifactId>spring-framework-bomartifactId>
                    <version>${spring.version}version>
                    <type>pomtype>
                    <scope>importscope>
                dependency>
    
                <dependency>
                    <groupId>org.apache.dubbogroupId>
                    <artifactId>dubbo-bomartifactId>
                    <version>${dubbo.version}version>
                    <type>pomtype>
                    <scope>importscope>
                dependency>
    
                <dependency>
                    <groupId>org.apache.dubbogroupId>
                    <artifactId>dubbo-dependencies-zookeeperartifactId>
                    <version>${dubbo.version}version>
                    <type>pomtype>
                dependency>
    
                <dependency>
                    <groupId>junitgroupId>
                    <artifactId>junitartifactId>
                    <version>${junit.version}version>
                dependency>
            dependencies>
        dependencyManagement>
    		
    		<dependencies>
            <dependency>
                <groupId>org.apache.dubbogroupId>
                <artifactId>dubboartifactId>
            dependency>
    
            <dependency>
                <groupId>org.apache.dubbogroupId>
                <artifactId>dubbo-dependencies-zookeeperartifactId>
                <type>pomtype>
            dependency>
    
            <dependency>
                <groupId>junitgroupId>
                <artifactId>junitartifactId>
                <scope>testscope>
            dependency>
    
            <dependency>
                <groupId>org.springframeworkgroupId>
                <artifactId>spring-testartifactId>
                <scope>testscope>
            dependency>
        dependencies>
    
        <profiles>
            
            <profile>
                <id>javax.annotationid>
                <activation>
                    <jdk>[1.11,)jdk>
                activation>
                <dependencies>
                    <dependency>
                        <groupId>javax.annotationgroupId>
                        <artifactId>javax.annotation-apiartifactId>
                        <version>1.3.2version>
                    dependency>
                dependencies>
            profile>
        profiles>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.pluginsgroupId>
                    <artifactId>maven-compiler-pluginartifactId>
                    <version>${maven-compiler-plugin.version}version>
                    <configuration>
                        <source>${source.level}source>
                        <target>${target.level}target>
                    configuration>
                plugin>
            plugins>
        build>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    方案1:定义CompletableFuture修饰的接口
    服务接口定义
    public interface AsyncService {
        CompletableFuture<String> execute(String name);
    }
    
    • 1
    • 2
    • 3
    服务接口实现(服务提供者)
    public class AsyncServiceImpl implements AsyncService {
        @Override
        public CompletableFuture<String> execute(String name) {
            return CompletableFuture.supplyAsync(() -> {
    						//TODO 功能实现机制
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "async response from provider.";
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    服务接口返回

    通过return CompletableFuture.supplyAsync() ,业务执行已从Dubbo线程切换到业务线程,避免了对 Dubbo 线程池的阻塞。

    方案2:使用AsyncContext

    Dubbo提供了一个类似 Servlet 3.0 的异步接口AsyncContext,在没有 CompletableFuture修饰接口的情况下,也可以实现 Provider端的异步执行。

    服务接口定义
    public interface AsyncService {
        String execute(String name);
    }
    
    • 1
    • 2
    • 3

    服务暴露,和普通服务完全一致

    <bean id="asyncService" class="org.apache.dubbo.samples.governance.impl.AsyncServiceImpl"/>
    <dubbo:service interface="org.apache.dubbo.samples.governance.api.AsyncService" ref="asyncService"/>
    
    • 1
    • 2
    服务实现(服务提供者)
    public class AsyncServiceImpl implements AsyncService {
        public String execute(String name) {
            final AsyncContext asyncContext = RpcContext.startAsync();
            new Thread(() -> {
                // 如果要使用上下文,则必须要放在第一句执行
                asyncContext.signalContextSwitch();
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 写回响应
                asyncContext.write("Hello " + name + ", response from provider.");
            }).start();
            return null;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    注意接口的返回类型是 CompletableFuture

    XML引用服务(服务消费者者)
    <dubbo:reference id="asyncService" timeout="10000" interface="com.alibaba.dubbo.samples.async.api.AsyncService"/>
    
    • 1
    调用远程服务(服务消费者)
    // 调用直接返回CompletableFuture
    // 获取到对应的asyncService服务实现类对象
    CompletableFuture<String> future = asyncService.execute("async call request");
    // 增加回调
    future.whenComplete((v, t) -> {
        if (t != null) {
            t.printStackTrace();
        } else {
            System.out.println("Response: " + v);
        }
    });
    // 早于结果输出
    System.out.println("Executed before response return.");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    方案3:使用RpcContext

    在 consumer.xml 中配置

    <dubbo:reference id="asyncService" interface="org.apache.dubbo.samples.governance.api.AsyncService">
          <dubbo:method name="execute" async="true" />
    dubbo:reference>
    
    • 1
    • 2
    • 3
    调用代码
    // 此调用会立即返回null
    asyncService.execute("world");
    // 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future
    CompletableFuture<String> helloFuture = RpcContext.getServiceContext().getCompletableFuture();
    // 为Future添加回调
    helloFuture.whenComplete((retValue, exception) -> {
        if (exception == null) {
            System.out.println(retValue);
        } else {
            exception.printStackTrace();
        }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    或者,也可以这样做异步调用

    CompletableFuture<String> future = RpcContext.getServiceContext().asyncCall(
        () -> {
            asyncService.sayHello("oneway call request1");
        }
    );
    
    future.get();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    异步总是不等待返回,你也可以设置是否等待消息发出

    <dubbo:method name="findFoo" async="true" sent="true" />
    
    • 1
    • sent=“true” 等待消息发出,消息发送失败将抛出异常。

    如果你只是想异步,完全忽略返回值,可以配置 return=“false”,以减少 Future 对象的创建和管理成本

    <dubbo:method name="findFoo" async="true" return="false" />
    
    • 1
    • sent=“false” 不等待消息发出,将消息放入 IO 队列,即刻返回。
  • 相关阅读:
    渗透面试经验分享
    【Linux练习生】Linux多线程
    Docker基础管理
    echarts超好用的可视化图表库 在react中使用
    认识NAT技术
    IDEA(2023)修改默认缓存目录
    三步学会如何构建平衡二叉树(简单好理解)
    手把手教你VMware14虚拟机安装教程「图文附软件」
    大模型微调发展-学习调研总结
    Worthington用于细胞收获的胰蛋白酶&细胞释放程序
  • 原文地址:https://blog.csdn.net/l569590478/article/details/127693911