• Scala--- Actor通信模型


    一、概念理解

    1、Java中的并发编程

    Java中的并发编程是基于共享数据和加锁的一种机制,即会有一个共享的数据,然后有若干个线程去访问这个共享的数据(主要是对这个共享的数据进行修改),同时Java利用加锁的机制(即synchronized)来确保同一时间只有一个线程对我们的共享数据进行访问,进而保证共享数据的一致性。Java中的并发编程存在资源争夺和死锁等多种问题,因此程序越大问题越麻烦。

    2、Scala中的并发编程

    Scala中的并发编程思想与Java中的并发编程思想完全不一样,Scala中的Actor是一种不共享数据,依赖于消息传递的一种并发编程模式, 避免了死锁、资源争夺等情况。在具体实现的过程中,Scala中的Actor会不断的循环自己的邮箱,并通过receive偏函数进行消息的模式匹配并进行相应的处理。

    如果Actor A和 Actor B要相互沟通的话,首先A要给B传递一个消息,B会有一个收件箱,然后B会不断的循环自己的收件箱, 若看见A发过来的消息,B就会解析A的消息并执行,处理完之后就有可能将处理的结果通过邮件的方式发送给A。

    Actor Model是用来编写并行计算或分布式系统的高层次抽象(类似java中的Thread)让程序员不必为多线程模式下共享锁而烦恼,被用在Erlang 语言上, 高可用性99.9999999 % 一年只有31ms 宕机Actors将状态和行为封装在一个轻量的进程/线程中,但是不和其他Actors分享状态,每个Actors有自己的世界观,当需要和其他Actors交互时,通过发送事件和消息,发送是异步的,非堵塞的(fire-andforget),发送消息后不必等另外Actors回复,也不必暂停,每个Actors有自己的消息队列,进来的消息按先来后到排列,这就有很好的并发策略和可伸缩性,可以建立性能很好的事件驱动系统。

    Actor的特征:

    • ActorModel是消息传递模型,基本特征就是消息传递
    • 消息发送是异步的,非阻塞的
    • 消息一旦发送成功,不能修改
    • Actor之间传递时,自己决定决定去检查消息,而不是一直等待,是异步非阻塞的

    什么是Akka

    Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和Scala 的 Actor 模型应用,底层实现就是Actor,Akka是一个开发库和运行环境,可以用于构建高并发、分布式、可容错、事件驱动的基于JVM的应用。使构建高并发的分布式应用更加容易。

    spark1.6版本之前,spark分布式节点之间的消息传递使用的就是Akka,底层也就是actor实现的。1.6之后使用的netty传输。

    二、例:Actor简单例子发送接收消息

    1. 1.import scala.actors.Actor
    2. 2.
    3. 3.class myActor extends Actor{
    4. 4.
    5. 5. def act(){
    6. 6. while(true){
    7. 7. receive {
    8. 8. case x:String => println("get String ="+ x)
    9. 9. case x:Int => println("get Int")
    10. 10. case _ => println("get default")
    11. 11. }
    12. 12. }
    13. 13. }
    14. 14.}
    15. 15.
    16. 16.object Lesson_Actor {
    17. 17. def main(args: Array[String]): Unit = {
    18. 18.
    19. 19. //创建actor的消息接收和传递
    20. 20. val actor =new myActor()
    21. 21. //启动
    22. 22. actor.start()
    23. 23. //发送消息写法
    24. 24. actor ! "i love you !"
    25. 25.
    26. 26. }
    27. 27.}

    三、例:Actor与Actor之间通信

    1. 1.case class Message(actor:Actor,msg:Any)
    2. 2.
    3. 3.class Actor1 extends Actor{
    4. 4. def act(){
    5. 5. while(true){
    6. 6. receive{
    7. 7. case msg :Message => {
    8. 8. println("i sava msg! = "+ msg.msg)
    9. 9.
    10. 10. msg.actor!"i love you too !"
    11. 11. }
    12. 12. case msg :String => println(msg)
    13. 13. case _ => println("default msg!")
    14. 14. }
    15. 15. }
    16. 16. }
    17. 17.}
    18. 18.
    19. 19.class Actor2(actor :Actor) extends Actor{
    20. 20. actor ! Message(this,"i love you !")
    21. 21. def act(){
    22. 22. while(true){
    23. 23. receive{
    24. 24. case msg :String => {
    25. 25. if(msg.equals("i love you too !")){
    26. 26. println(msg)
    27. 27. actor! "could we have a date !"
    28. 28. }
    29. 29. }
    30. 30. case _ => println("default msg!")
    31. 31. }
    32. 32. }
    33. 33. }
    34. 34.}
    35. 35.
    36. 36.object Lesson_Actor2 {
    37. 37. def main(args: Array[String]): Unit = {
    38. 38. val actor1 = new Actor1()
    39. 39. actor1.start()
    40. 40. val actor2 = new Actor2(actor1)
    41. 41. actor2.start()
    42. 42. }
    43. 43.}

  • 相关阅读:
    速卖通选品推荐:韩国市场有哪些潜力机会商品?
    GJB软件需求规格说明-编制指南
    面试突击81:什么是跨域问题?如何解决?
    一百八十四、大数据离线数仓完整流程——步骤三、在Hive中建基础库维度表并加载MySQL中的维度表数据
    手机移动端ui前端在线点餐美食欧米香APP的设计与制作
    cesium 在大屏自适应插件下放大位置偏移
    druid数据源配置项参数解读
    442 - Matrix Chain Multiplication (UVA)
    通达信交易系统接口实现自动交易策略的方法分享
    redis
  • 原文地址:https://blog.csdn.net/yaya_jn/article/details/134510136