Actor模型 - 分布式应用框架Akka

技术
前言

一般来说有两种策略用来在并发线程中进行通信:共享数据和消息传递

共享数据 :通过改变共享存储器地址内的数据,让不同的并发线程进行通信。使用这种通信类型的并发程序,通常需要应用某种锁定的方式来达成线程间的同步,这些锁定技术包括mutexsemaphore,或monitor等。

消息传递 :消息传递方式采取的是线程(进程)之间的直接通信,不同的线程(进程)之间通过显式的发送消息来达到交互目的

Akka是另外一种解决并发问题的思路,通过线程进程之间传递消息,避免对共享资源的竞争,Akka提供了一种称之为Actor的并发模型,粒度比线程还要小(但并不等同于协程),这表明你可以在系统当中创建及其大量的Actor,Akka不仅可以在单机上构建高并发程序,还可以构建高性能高吞吐量分布式程序。

一、并发编程模型

并行工作者(Parallel worker)

picture.image

  • 多个相互独立的执行流
  • 共享内存(状态)
  • 抢占式的调度(任务顺序是不确定的)
  • 依赖锁,信号量等同步机制

多线程程序容易编写(因为写的是顺序程序),但是难分析、难调试,更容易出错,常见的有竞争条件,死锁、活锁、资源耗尽、优先级反转… 等等。

流水线模型(反应器/事件驱动)

picture.image

Actor 和 Channels 是两种比较类似的流水线模型。

在Actor模型中每个工作者被称为actor。Actor之间可以直接异步地发送和处理消息。

Actor可以被用来实现一个或多个像前文描述的那样的作业处理流水线。

下图给出了Actor模型:

picture.image

而在Channel模型中,工作者之间不直接进行通信。相反,它们在不同的通道中发布自己的消息(事件)。其他工作者们可以在这些通道上监听消息,发送者无需知道谁在监听。

下图给出了Channel模型:

picture.image

二、Actor模型

什么是actor模型

Actor模式是消息传递并发模型 ,在1973年于Carl Hewitt、Peter Bishop及Richard Steiger的论文中提出。

它已经被用作并发计算的理论理解框架和并发系统的实际实现基础。

通过组件方式定义并发编程范式,避免使用者直接接触多线程并发或线程池等基础概念,其消息传递更加符合面向对象的原始意图。

所有的线程(或进程)通过消息传递的方式进行合作,这些线程(或进程)称为Actor。

actor组成和通信

picture.image

Actor 模型的三要素是状态、行为和消息,有一个很流行的等式:

Actor 模型 =(状态 + 行为)+ 消息

状态(State)

Actor 组件本身的信息,相当于 OOP 对象中的属性。

Actor 的状态会受 Actor 自身行为的影响,且只能被自己修改。

行为(Behavior

Actor 的计算处理操作,相当于 OOP 对象中的成员函数。

Actor 之间不能直接调用其他 Actor 的计算逻辑。

Actor 只有收到消息才会触发自身的计算行为。

消息(Mail)

Actor 的消息以邮件形式在多个 Actor 之间通信传递,每个 Actor 会有一个自己的邮箱(MailBox),用于接收来自其他 Actor 的消息,因此 Actor 模型中的消息也称为邮件。

一般情况下,对于邮箱里面的消息,Actor 是按照消息达到的先后顺序(FIFO)进行读取和处理的。

Actor 工作原理

3 个 Actor 之间基于消息和消息队列的工作流程进行说明。

这 3 个 Actor 的工作流程:

picture.image

消息交互过程

  1. Actor1 和 Actor3 先后向 Actor2 发送消息,消息被依次放入 Actor2 的 MailBox 队列的队尾 ;
  2. Actor2 从 MailBox 队列的队首依次取出消息执行相应的操作,由于 Actor1 先把消息发送给 Actor2,因此 Actor2 先处理 Actor1 的消息;
  3. Actor2 处理完 Actor1 的消息后,更新内部状态,并且向其他 Actor 发送消息,然后处理 Actor3 发送的消息。

小结

在 Actor 模型里,每个 Actor 相当于系统中的一个组件,都是基本的计算单元。

Actor 模式采用了异步模式,并且每个 Actor 封装了自己的数据、方法等,一个Actor在同一时间处理最多一个消息,可以发送消息给其他Actor,保证了单独写原则,从而巧妙避免了多线程写争夺。

和共享数据方式相比,消息传递机制最大的优点就是不会产生数据竞争状态。

Actor模型的特点是:

  • 万物皆是Actor
  • Actor之间完全独立,只允许消息传递,不允许其他”任何”共享
  • 每个Actor最多同时只能进行一样工作
  • 每个Actor都有一个专属的命名Mailbox(非匿名)
  • 消息的传递是完全异步的
  • 消息是不可变的
三、分布式应用框架Akka

Actor的概念来自于Erlang,在AKKA中可以认为一个Actor就是一个容器,用来存储状态、行为、邮箱Mailbox、子Actor、Supervisor策略。

Actor之间并不直接通信,而是通过邮件Mail来互通有无。

Actor模型的本质就是消息传递,作为一种计算实体,Actor与原子类似。参与者是一个运算实体,回应接收到的消息,同时并行的发送有限数量的消息给其他参与者、创建有限数量的新参与者、指定接收到下一个消息时的行为。

AKKA是调度模块化的,它由许多拥有不同特性的JAR组成。

  • akka-actor – 经典角色、类型角色、IO角色等
  • akka-agent – 代理、整合了Scala的STM特性
  • akka-cluster – 集群成员管理、弹性路由
  • akka-kernel – AKKA微内核,运行着一个极简应用服务器
  • akka-remote – 远程角色
  • akka-slf4j – SLF4J Logger (事件总线监听器)
  • akka-testkit – 测试角色系统的工具包Toolkit for testing Actor systems
  • akka-transactor – Transactors 即支持事务的 actors,集成了Scala STM
  • akka-filebased-mmailbox – 支持基于文件的mailbox

Akka与Java内存模型

Akka是如何在并发应用中访问共享内存的。

Java内存模型(JMM)

JMM中定义了一些先行发生的关系,天然存在的,只有以下几种:

  1. 程序次序规则 (Program Order Rule): 一个线程内,按照程序代码顺序,写在前面的操作先行发生于后面的操作。

  2. 管程锁定规则 (Monitor Lock Rule): 一个unlock操作先行发生于后面对同一个锁lock操作。

  3. volatile 变量规则 (Volatile Variable Rule): 对一个volatile变量的写操作先行发生于后面对这个变量的读操作。

  4. 线程启动规则(Thread Start Rule)Thread对象的start()方法先行发生于此线程的其他所有动作。

  5. 线程终止规则 (Thread Termination Rule): 线程的所有操作先行发生于该线程的终止检测。

  6. 线程中断规则 (Thread Interruption Rule): 对线程的interrupt()方法调用先行发生于被中断的线程的代码检测到中断事件的发生。

  7. 对象终结规则 (Finalizer Rule): 一个对象的初始化完成先行发生于它的finalize()方法的开始。

  8. 传递性 (Transitivity)

若操作A先行发生于操作B,B先行发生于操作C,那操作A一定先行发生于操作C。

Actors与Java内存模型

使用Akka中的Actor实现,有两种方法让多个线程对共享的内存进行操作:

  • 如果一条消息被(例如,从另一个actor)发送到一个actor,大多数情况下消息是不可变的,但是如果这条消息不是一个正确创建的不可变对象,如果没有 “发生先于” 规则, 有可能接收方会看到部分初始化的数据,甚至可能看到无中生有的数据(long/double)。
  • 如果一个actor在处理某条消息时改变了自己的内部状态,而之后又在处理其它消息时又访问了这个状态。一条很重要的需要了解的规则是,在使用actor模型时你无法保证,同一个线程会在处理不同的消息时使用同一个actor。

为了避免actor中的可见性和重排序问题,Akka保证以下两条 “发生在先” 规则:

  • actor发送规则 : 一条消息的发送动作先于同一个actor对同一条消息的接收。
  • actor后续处理规则 : 一条消息的处理先于同一个actor的下一条消息处理。

注意

通俗地说,这意味着当这个actor处理下一个消息的时候,对actor的内部字段的改变是可见的。因此,在你的actor中的域不需要是volitale或是同等可见性的。

这两条规则都只应用于同一个actor实例,对不同的actor则无效。

Akka工作原理

picture.image

Akka中的角色

  • ProducerActor(ServerActor)
  • ConsumerActor(WorkerActor,onReceive方法接收消息)
  • ActorRef(tell方法,发送消息给MessageDispatcher消息派发器)
  • ActorSystem(actorOf方法,创建ActorRef,ActorRef就是ConsumerActor的Proxy)
  • MailBox
  • Dispatcher
  • Message

Akka工作流程

  1. 创建一个叫ActorSystem的对象
  2. ActorSystem创建一个叫ActorRef(ServerActor)的对象。然后将消息发送给ActorRef(WorkerActor)
  3. ActorRef(ServerActor)将消息发送给Dispatcher
  4. Dispatcher(ServerActor)将消息投递到目标Actor(WorkerActor)的邮箱中
  5. Dispatcher(WorkerActor)将Mailbox扔给一个线程去执行
  6. MailBox将消息出队并最终将其委托给真实的WorkerActor的接收方法去处理

创建ActorSystem

ActorSystem是进入到Actor的世界的一扇大门。

通过它你可以创建或中止Actor,甚至还可以把整个Actor环境给关闭掉。

Actor是一个分层的结构,ActorSystem之于Actor有点类似于java.lang.Object角色——也就是说,它是所有Actor的根对象。

当你通过ActorSystem的actorOf方法创建了一个Actor时,你其实创建的是ActorSystem下面的一个Actor。

创建ActorRef(ConsumerActor(ServerActor)的Proxy)

ActorRef server = system.actorOf(Props.create(ServerActor.class), “server”);

actorOf是ActorSystem中创建Actor的方法,但是它并不会返回我们所需要的ServerActor。

它返回的是一个ActorRef,这个ActorRef扮演了真实的Actor的一个代理的角色,客户端并不会直接和Actor通信。

这也正是Actor模型中避免直接访问ServerActor中任何的自定义/私有方法或者变量的一种方式。

将消息发送给代理

worker.tell("server", server);

serverActor将”server“字符串,发送给workerActor,消息会发送到workerActor的MailBox,等待worker处理。

分发器及邮箱

当我们创建ActorSystem和ActorRef的时候,就已经创建了一个Dispatcher和MailBox了。ActorRef把消息处理功能委托给了Dispatcher。

邮箱 :每个Actor都有一个MailBox,邮箱里也有一个队列来以FIFO(默认,可修改)的方式来存储并处理消息。

分发器 :从ActorRef中取出一条消息然后将它传给了MailBox。Dispatcher会封装一个ExecutorService(ForkJoinPoll或者ThreadPoolExecutor)。它把MailBox扔到ExecutorService中去运行。

消费ConsumerActor(WorkerActor)

当MailBox的run方法运行的时候,它会从队列中取出一条消息,然后将它传给Actor去处理。

当你把消息传给ActorRef的时候,最终调用的实际是目标Actor里面的一个receive方法。

Actor生命周期

picture.image

actorRef

Actor 交给开发者的是一个引用,这个引用包括 path和UID,即可定位一个 Actor。

select

上面程序中使用到了

ActorSelection selection =actorSystem.actorSelection("akka://Hello/user/server");进行actor的选择。

工作过程中可能会存在成千上万的actor,可以通过selection方便的选择actor进行消息投递,其支持通配符匹配getContext().actorSelection("/user/worker\_*")

ActorPath是通过字符串描述Actor的层级关系,并唯一标识一个Actor的方法。

ActorPath包含协议,位置Actor层级关系

//本地path                       
"akka://my-sys/user/service-a/worker1"           
//远程path
"akka.tcp://my-sys@host.example.com:5678/user/service-b"                  
//akka集群
"cluster://my-cluster/service-c"             

start

preStart()

节点 start 后,才会处理第一条消息。

stop

postStop()

节点 Stop 时,会先调用其所有子 actor 的 postStop() 方法。postStop()在actor 停止之前调用,调用后不再接收消息。

restart

preRestart()

在重启 Actor 之前在旧实例上调用。

postRestart()

在重启Actor之后在新实例上调用,Actor.scala默认的postRestart()方法调用了preStart()方法。

resume

Actor 继续处理下一条消息;为了控制 Actor 的 restart 和 resume,需要重写 supervisorStrategy() 方法,子actor抛出的异常都会被父actor接收,对于不同异常可以定义不同的处理方式。

下面的代码对 NullPointerException 进行 restart,对 IllegalArgumentException 进行 resume。

HelloWord

maven引用

  <!-- akka remote -->                     
        <dependency>  
            <groupId>com.typesafe.akka</groupId>    
            <artifactId>akka-remote_2.13</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>   
            <groupId>com.typesafe.akka</groupId>    
            <artifactId>akka-slf4j_2.13</artifactId>
            <version>${akka.version}</version> 
        </dependency>

配置文件

Akka的所有配置信息装在 ActorSystem的实例中, 或者换个说法, 从外界看来, ActorSystem 是配置信息的唯一消费者. 在构造一个actor系统时,你可以传进来一个 Config object,如果不传,就相当于传进来 ConfigFactory.load() (使用正确的classloader)。

这意味着将会读取classpath根目录下的所有application.conf, application.json and application.properties这些文件(请参阅之前推荐的文档)以了解细节,然后actor系统会合并classpath根目录下的 reference.conf 来组成其内部使用的缺省配置。

# 你可以在这个配置文件中覆盖掉reference files的配置.
                          
akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
                          
  # 日志级别
  # Options: OFF, ERROR, WARNING, INFO, DEBUG    
  loglevel = "WARNING"
                          
  # Log level for the very basic logger activated during ActorSystem startup.    
  # This logger prints the log messages to stdout (System.out).
  # Options: OFF, ERROR, WARNING, INFO, DEBUG             
  stdout-loglevel = "DEBUG"
                          
  akka.actor.default-mailbox {
    mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
  }
                          
  actor {                       
    provider = "akka.cluster.ClusterActorRefProvider"
                          
    default-dispatcher {
      # Throughput for default Dispatcher, set to 1 for as fair as possible  
      throughput = 10 
    }     
  }                     
}                   

Java代码

/**                     
 * Worker节点Actor,接受服务器请求                     
 *
 * @author yankun                      
 * @since 2021年01月11日15:31:02       
 */
@Slf4j                     
public class WorkerActor extends AbstractActor {
                        
    @Override
    public Receive createReceive() {
                        
        return receiveBuilder()
                .matchEquals("worker", e -> log.info("i get a message test{}", e))
                .match(String.class, e -> log.info("i get a message {}", e))
                .matchAny(obj -> log.warn("[WorkerActor] receive unknown request: {}.", obj))
                .build();                   
    }
                       
                        
    @Override 
    public void preStart() {           
        log.info("worker starting.");
    }
                        
    @Override                   
    public void postStop() throws Exception {           
        log.info("worker stoping..");
    }
                        
    @Override                   
    public void preRestart(Throwable reason, Optional<Object> message) throws Exception {           
        super.preRestart(reason, message);
        log.info("preRestart    hashCode=" + this.hashCode());
    }
                        
    @Override                   
    public void postRestart(Throwable reason) throws Exception {           
        super.postRestart(reason);      
        log.info("postRestart    hashCode=" + this.hashCode());
    }
                        
    @Override                   
    public SupervisorStrategy supervisorStrategy() {           
        return new OneForOneStrategy(3, Duration.create(3, TimeUnit.SECONDS),  
            DeciderBuilder
                .match(ArithmeticException.class, e -> SupervisorStrategy.resume())
                .match(NullPointerException.class, e -> SupervisorStrategy.restart())
                .match(ArithmeticException.class, e -> {
                    return SupervisorStrategy.restart();                  
                })
                .match(IllegalArgumentException.class, e -> SupervisorStrategy.stop())      
                .matchAny(o -> SupervisorStrategy.escalate())
                .build());
    }
}
                        
/**                       
 * Server节点Actor,发送服务器请求
                        
 *                     
 * @author yankun                      
 * @since 2021年01月11日15:31:02       
 */
@Slf4j                     
public class ServerActor extends AbstractActor {
                        
    @Override
    public Receive createReceive() {
                        
        return receiveBuilder()
                .matchEquals("server", e -> log.info("i get a message test", e))
                .match(String.class, e -> log.info("i get a message {}", e))
                .matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj))
                .build();
                        
    }
}                   

                        
public static void main(String[] args) {         
    ActorSystem actorSystem = ActorSystem.create("hello");
                        
    final Props props = Props.create(ServerActor.class);
    final ActorRef server = actorSystem.actorOf(props, "server");
    
    final Props props2 = Props.create(WorkerActor.class);
    final ActorRef worker = actorSystem.actorOf(props2, "worker");
    worker.tell(1, server);
    worker.tell("server", server);
    server.tell("worker", worker);
                        
    // worker.tell("close", ActorRef.noSender());
                        
    // 中断worker
    // worker.tell(PoisonPill.getInstance(), ActorRef.noSender());
                        
    ActorSelection actorSelection = actorSystem.actorSelection("akka://hello/user/worker");   
    actorSelection.tell("worker", server);
                        
    for (int i = 0; i < 10; i++) {
        actorSelection.tell("restart", server);
    }
    // 停止所有正在运行的演员                   
    // actorSystem.terminate();
                        
}
四、参考链接

并发编程的七个模型:

www.cnblogs.com/barrywxx/p/10406978.html

分布式概念:

https://www.pianshen.com/article/60731431723/

Akka官方文档:

https://doc.akka.io/docs/akka/current/typed/guide/modules.html

Akka学习教程:

https://blog.csdn.net/liubenlong007/article/details/53782966

*文|秋泉

0
0
0
0
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论