• AKKA 互相调用


    SpringBoot 集成 AKKA 可以参考此文:SpringBoot 集成 AKKA

    场景1:bossActor 收到信息,然后发给 worker1Actor 和 worker2Actor

    controller 入口,初次调用 ActorRef.noSender()

    @Tag(name = "test")
    @RestController
    @RequestMapping("/test")
    @Validated
    @Inner(value = false)
    public class TestController {
    
        @Resource
        private ActorSystem actorSystem;
    
        @PostMapping("/test")
        @Operation(summary = "test")
        public R search( ) {
            ActorRef pcm = actorSystem.actorOf(Props.create(BossActor.class));
            pcm.tell("I AM MASTER.TELLING BOSS", ActorRef.noSender());
            return R.ok();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    BossActor
    接受 TestController 发送的信息,同时发送消息给 worker1Actor 和 worker2Actor

    import akka.actor.ActorRef;
    import akka.actor.Props;
    import akka.actor.UntypedAbstractActor;
    
    public class BossActor extends UntypedAbstractActor {
    
        private ActorRef worker1Actor;
        private ActorRef worker2Actor;
    
        public BossActor() {
            this.worker1Actor = this.context().actorOf(Props.create(Worker1Actor.class));
            this.worker2Actor = this.context().actorOf(Props.create(Worker2Actor.class));
        }
    
        @Override
        public void onReceive(Object message) {
            System.out.println("Boss 收到 master 的消息:" + message);
            worker1Actor.tell("I AM BOSS, TELLING WORKER1", self());
            worker2Actor.tell("I AM BOSS, TELLING WORKER2", self());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    Worker1Actor 和 Worker2Actor 获取 BossActor 的消息

    import akka.actor.UntypedAbstractActor;
    
    public class Worker1Actor extends UntypedAbstractActor {
    
        @Override
        public void onReceive(Object message) {
            System.out.println("worker1 收到 boss 消息:" + message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    import akka.actor.UntypedAbstractActor;
    
    public class Worker2Actor extends UntypedAbstractActor {
    
        @Override
        public void onReceive(Object message) {
            System.out.println("worker2 收到 boss 消息:" + message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    打印输出
    在这里插入图片描述
     
     
    场景2boss 发消息给 worker1,worker1 收到后发给 worker2,worker2 完成后返回 worker1,worker1再返回给 boss

    分析难点:
    1、由于 Actor 只能记住最后发消息给自己的人,所以 worker1 接到 worker2 消息后,getSender 只记住 worker2 ,找不到 boss 的地址
    2、Actor 的 onReceive 方法接受的是消息,没有针对发送人,worker1 需要针对不同发送人 boss 和 worker2 做不同处理

    针对问题2,我们可以设计一个实体 ReceiveDTO,里面包含 sender 发送人和 message 具体传参,所有消息严格实现这个类型

    @Data
    public class ReceiveDTO {
        String sender;
        Object message;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    controller 入口,初次调用 ActorRef.noSender()

      @Resource
        private ActorSystem actorSystem;
    
        @PostMapping("/test")
        @Operation(summary = "test")
        public R search() {
            ActorRef pcm = actorSystem.actorOf(Props.create(BossActor.class));
    
            ReceiveDTO receiveDTO = new ReceiveDTO();
            receiveDTO.setSender("start");
            receiveDTO.setMessage("start");
            pcm.tell(receiveDTO, ActorRef.noSender());
    
            return R.ok();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    BossActor,根据 receiveDTO.getSender() 判断发送人是谁

    import akka.actor.ActorRef;
    import akka.actor.Props;
    import akka.actor.UntypedAbstractActor;
    
    public class BossActor extends UntypedAbstractActor {
    
        private ActorRef worker1Actor;
    
        public BossActor() {
            this.worker1Actor = this.context().actorOf(Props.create(Worker1Actor.class));
        }
    
        @Override
        public void onReceive(Object message) {
            ReceiveDTO receiveDTO = (ReceiveDTO) message;
    
            if (receiveDTO.getSender().equals("start")) {
                receiveDTO.setMessage("I AM BOSS, TELLING WORKER1");
                receiveDTO.setSender("boss");
                System.out.println("boss start");
                worker1Actor.tell(receiveDTO, self());
            }
    
            if (receiveDTO.getSender().equals("worker1")) {
                System.out.println("boss end 收到 worker1 消息: " + receiveDTO.getMessage());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    Worker1Actor 接收 boss 信息时,同时把 boss 的 ActorRef 存到 bossActor,解决难题1

    import akka.actor.ActorRef;
    import akka.actor.Props;
    import akka.actor.UntypedAbstractActor;
    
    public class Worker1Actor extends UntypedAbstractActor {
    
        private ActorRef bossActor;
        private ActorRef worker2Actor;
    
        public Worker1Actor() {
            this.worker2Actor = this.context().actorOf(Props.create(Worker2Actor.class));
        }
    
        @Override
        public void onReceive(Object message) {
    
            ReceiveDTO receiveDTO = (ReceiveDTO) message;
    
            if (receiveDTO.getSender().equals("boss")) {
                System.out.println("worker1 收到 boss 消息: " + receiveDTO.getMessage());
                this.bossActor = getSender();
                ReceiveDTO receive1 = new ReceiveDTO();
                receive1.setMessage("I AM WORKER1, TELLING WORKER2");
                worker2Actor.tell(receive1, self());
            }
    
            if (receiveDTO.getSender().equals("worker2")) {
                System.out.println("worker1 收到 worker2 消息: " + receiveDTO.getMessage());
                ReceiveDTO receive1 = new ReceiveDTO();
                receive1.setMessage("I AM WORKER1, TELLING boss, job is over");
                receive1.setSender("worker1");
                bossActor.tell(receive1, self());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    Worker2Actor

    import akka.actor.UntypedAbstractActor;
    
    public class Worker2Actor extends UntypedAbstractActor {
    
        @Override
        public void onReceive(Object message) {
    
            ReceiveDTO receiveDTO = (ReceiveDTO) message;
            System.out.println("worker2 收到 worker1 消息:" + receiveDTO.getMessage());
    
            ReceiveDTO receive1 = (ReceiveDTO) message;
            receive1.setSender("worker2");
            receive1.setMessage("I AM WORKER2, TELLING WORKER1");
            getSender().tell(receive1, self());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    打印输出

    在这里插入图片描述

  • 相关阅读:
    蓝桥杯官网练习题(旋转)
    HCIP---VLAN
    【C++】SLT — list的使用 + 模拟实现
    Source map 超详细学习攻略_番茄出品
    古记事法:Windows 下 16 位汇编环境搭建指南(DOSBox-X 篇)
    C++20 实现字符串类型的转换操作
    态路小课堂丨光纤跳线的使用与维护小指南
    微服务项目:尚融宝(41)(核心业务流程:借款额度审批)
    SentinelResource注解之blockHander和fallback
    实现本地访问云主机,以及在云主机搭建FTP站点
  • 原文地址:https://blog.csdn.net/weixin_42555971/article/details/133277743