一般来说有两种策略用来在并发线程中进行通信:共享数据和消息传递 。
共享数据 :通过改变共享存储器地址内的数据,让不同的并发线程进行通信。使用这种通信类型的并发程序,通常需要应用某种锁定的方式来达成线程间的同步,这些锁定技术包括mutex,semaphore,或monitor等。
消息传递 :消息传递方式采取的是线程(进程)之间的直接通信,不同的线程(进程)之间通过显式的发送消息来达到交互目的。
Akka是另外一种解决并发问题的思路,通过线程进程之间传递消息,避免对共享资源的竞争,Akka提供了一种称之为Actor的并发模型,粒度比线程还要小(但并不等同于协程),这表明你可以在系统当中创建及其大量的Actor,Akka不仅可以在单机上构建高并发程序,还可以构建高性能高吞吐量分布式程序。
并行工作者(Parallel worker)
- 多个相互独立的执行流
- 共享内存(状态)
- 抢占式的调度(任务顺序是不确定的)
- 依赖锁,信号量等同步机制
多线程程序容易编写(因为写的是顺序程序),但是难分析、难调试,更容易出错,常见的有竞争条件,死锁、活锁、资源耗尽、优先级反转… 等等。
流水线模型(反应器/事件驱动)
Actor 和 Channels 是两种比较类似的流水线模型。
在Actor模型中每个工作者被称为actor。Actor之间可以直接异步地发送和处理消息。
Actor可以被用来实现一个或多个像前文描述的那样的作业处理流水线。
下图给出了Actor模型:
而在Channel模型中,工作者之间不直接进行通信。相反,它们在不同的通道中发布自己的消息(事件)。其他工作者们可以在这些通道上监听消息,发送者无需知道谁在监听。
下图给出了Channel模型:
什么是actor模型
Actor模式是消息传递并发模型 ,在1973年于Carl Hewitt、Peter Bishop及Richard Steiger的论文中提出。
它已经被用作并发计算的理论理解框架和并发系统的实际实现基础。
通过组件方式定义并发编程范式,避免使用者直接接触多线程并发或线程池等基础概念,其消息传递更加符合面向对象的原始意图。
所有的线程(或进程)通过消息传递的方式进行合作,这些线程(或进程)称为Actor。
actor组成和通信
Actor 模型的三要素是状态、行为和消息,有一个很流行的等式:
Actor 模型 =(状态 + 行为)+ 消息
状态(State) :
Actor 组件本身的信息,相当于 OOP 对象中的属性。
Actor 的状态会受 Actor 自身行为的影响,且只能被自己修改。
行为(Behavior ) :
Actor 的计算处理操作,相当于 OOP 对象中的成员函数。
Actor 之间不能直接调用其他 Actor 的计算逻辑。
Actor 只有收到消息才会触发自身的计算行为。
消息(Mail) :
Actor 的消息以邮件形式在多个 Actor 之间通信传递,每个 Actor 会有一个自己的邮箱(MailBox),用于接收来自其他 Actor 的消息,因此 Actor 模型中的消息也称为邮件。
一般情况下,对于邮箱里面的消息,Actor 是按照消息达到的先后顺序(FIFO)进行读取和处理的。
Actor 工作原理:
3 个 Actor 之间基于消息和消息队列的工作流程进行说明。
这 3 个 Actor 的工作流程:
消息交互过程 :
- Actor1 和 Actor3 先后向 Actor2 发送消息,消息被依次放入 Actor2 的 MailBox 队列的队尾 ;
- Actor2 从 MailBox 队列的队首依次取出消息执行相应的操作,由于 Actor1 先把消息发送给 Actor2,因此 Actor2 先处理 Actor1 的消息;
- Actor2 处理完 Actor1 的消息后,更新内部状态,并且向其他 Actor 发送消息,然后处理 Actor3 发送的消息。
小结
在 Actor 模型里,每个 Actor 相当于系统中的一个组件,都是基本的计算单元。
Actor 模式采用了异步模式,并且每个 Actor 封装了自己的数据、方法等,一个Actor在同一时间处理最多一个消息,可以发送消息给其他Actor,保证了单独写原则,从而巧妙避免了多线程写争夺。
和共享数据方式相比,消息传递机制最大的优点就是不会产生数据竞争状态。
Actor模型的特点是:
- 万物皆是Actor
- Actor之间完全独立,只允许消息传递,不允许其他”任何”共享
- 每个Actor最多同时只能进行一样工作
- 每个Actor都有一个专属的命名Mailbox(非匿名)
- 消息的传递是完全异步的
- 消息是不可变的
Actor的概念来自于Erlang,在AKKA中可以认为一个Actor就是一个容器,用来存储状态、行为、邮箱Mailbox、子Actor、Supervisor策略。
Actor之间并不直接通信,而是通过邮件Mail来互通有无。
Actor模型的本质就是消息传递,作为一种计算实体,Actor与原子类似。参与者是一个运算实体,回应接收到的消息,同时并行的发送有限数量的消息给其他参与者、创建有限数量的新参与者、指定接收到下一个消息时的行为。
AKKA是调度模块化的,它由许多拥有不同特性的JAR组成。
- akka-actor – 经典角色、类型角色、IO角色等
- akka-agent – 代理、整合了Scala的STM特性
- akka-cluster – 集群成员管理、弹性路由
- akka-kernel – AKKA微内核,运行着一个极简应用服务器
- akka-remote – 远程角色
- akka-slf4j – SLF4J Logger (事件总线监听器)
- akka-testkit – 测试角色系统的工具包Toolkit for testing Actor systems
- akka-transactor – Transactors 即支持事务的 actors,集成了Scala STM
- akka-filebased-mmailbox – 支持基于文件的mailbox
Akka与Java内存模型
Akka是如何在并发应用中访问共享内存的。
Java内存模型(JMM)
JMM中定义了一些先行发生的关系,天然存在的,只有以下几种:
- 
程序次序规则 (Program Order Rule): 一个线程内,按照程序代码顺序,写在前面的操作先行发生于后面的操作。
- 
管程锁定规则 (Monitor Lock Rule): 一个unlock操作先行发生于后面对同一个锁的lock操作。
- 
volatile变量规则(Volatile Variable Rule): 对一个volatile变量的写操作先行发生于后面对这个变量的读操作。
- 
线程启动规则 (Thread Start Rule):Thread对象的start()方法先行发生于此线程的其他所有动作。
- 
线程终止规则 (Thread Termination Rule): 线程的所有操作先行发生于该线程的终止检测。
- 
线程中断规则 (Thread Interruption Rule): 对线程的interrupt()方法调用先行发生于被中断的线程的代码检测到中断事件的发生。
- 
对象终结规则 (Finalizer Rule): 一个对象的初始化完成先行发生于它的finalize()方法的开始。
- 
传递性 (Transitivity):
若操作A先行发生于操作B,B先行发生于操作C,那操作A一定先行发生于操作C。
Actors与Java内存模型
使用Akka中的Actor实现,有两种方法让多个线程对共享的内存进行操作:
- 如果一条消息被(例如,从另一个actor)发送到一个actor,大多数情况下消息是不可变的,但是如果这条消息不是一个正确创建的不可变对象,如果没有 “发生先于” 规则, 有可能接收方会看到部分初始化的数据,甚至可能看到无中生有的数据(long/double)。
- 如果一个actor在处理某条消息时改变了自己的内部状态,而之后又在处理其它消息时又访问了这个状态。一条很重要的需要了解的规则是,在使用actor模型时你无法保证,同一个线程会在处理不同的消息时使用同一个actor。
为了避免actor中的可见性和重排序问题,Akka保证以下两条 “发生在先” 规则:
- actor发送规则 : 一条消息的发送动作先于同一个actor对同一条消息的接收。
- actor后续处理规则 : 一条消息的处理先于同一个actor的下一条消息处理。
注意
通俗地说,这意味着当这个actor处理下一个消息的时候,对actor的内部字段的改变是可见的。因此,在你的actor中的域不需要是volitale或是同等可见性的。
这两条规则都只应用于同一个actor实例,对不同的actor则无效。
Akka工作原理
Akka中的角色
- ProducerActor(ServerActor)
- ConsumerActor(WorkerActor,onReceive方法接收消息)
- ActorRef(tell方法,发送消息给MessageDispatcher消息派发器)
- ActorSystem(actorOf方法,创建ActorRef,ActorRef就是ConsumerActor的Proxy)
- MailBox
- Dispatcher
- Message
Akka工作流程
- 创建一个叫ActorSystem的对象
- ActorSystem创建一个叫ActorRef(ServerActor)的对象。然后将消息发送给ActorRef(WorkerActor)
- ActorRef(ServerActor)将消息发送给Dispatcher
- Dispatcher(ServerActor)将消息投递到目标Actor(WorkerActor)的邮箱中
- Dispatcher(WorkerActor)将Mailbox扔给一个线程去执行
- MailBox将消息出队并最终将其委托给真实的WorkerActor的接收方法去处理
创建ActorSystem
ActorSystem是进入到Actor的世界的一扇大门。
通过它你可以创建或中止Actor,甚至还可以把整个Actor环境给关闭掉。
Actor是一个分层的结构,ActorSystem之于Actor有点类似于java.lang.Object角色——也就是说,它是所有Actor的根对象。
当你通过ActorSystem的actorOf方法创建了一个Actor时,你其实创建的是ActorSystem下面的一个Actor。
创建ActorRef(ConsumerActor(ServerActor)的Proxy)
ActorRef server = system.actorOf(Props.create(ServerActor.class), “server”);
actorOf是ActorSystem中创建Actor的方法,但是它并不会返回我们所需要的ServerActor。
它返回的是一个ActorRef,这个ActorRef扮演了真实的Actor的一个代理的角色,客户端并不会直接和Actor通信。
这也正是Actor模型中避免直接访问ServerActor中任何的自定义/私有方法或者变量的一种方式。
将消息发送给代理
worker.tell("server", server);
serverActor将”server“字符串,发送给workerActor,消息会发送到workerActor的MailBox,等待worker处理。
分发器及邮箱
当我们创建ActorSystem和ActorRef的时候,就已经创建了一个Dispatcher和MailBox了。ActorRef把消息处理功能委托给了Dispatcher。
邮箱 :每个Actor都有一个MailBox,邮箱里也有一个队列来以FIFO(默认,可修改)的方式来存储并处理消息。
分发器 :从ActorRef中取出一条消息然后将它传给了MailBox。Dispatcher会封装一个ExecutorService(ForkJoinPoll或者ThreadPoolExecutor)。它把MailBox扔到ExecutorService中去运行。
消费ConsumerActor(WorkerActor)
当MailBox的run方法运行的时候,它会从队列中取出一条消息,然后将它传给Actor去处理。
当你把消息传给ActorRef的时候,最终调用的实际是目标Actor里面的一个receive方法。
Actor生命周期
actorRef
Actor 交给开发者的是一个引用,这个引用包括 path和UID,即可定位一个 Actor。
select
上面程序中使用到了
ActorSelection selection =actorSystem.actorSelection("akka://Hello/user/server");进行actor的选择。
工作过程中可能会存在成千上万的actor,可以通过selection方便的选择actor进行消息投递,其支持通配符匹配getContext().actorSelection("/user/worker\_*")。
ActorPath是通过字符串描述Actor的层级关系,并唯一标识一个Actor的方法。
ActorPath包含协议,位置和Actor层级关系。
//本地path                       
"akka://my-sys/user/service-a/worker1"           
//远程path
"akka.tcp://my-sys@host.example.com:5678/user/service-b"                  
//akka集群
"cluster://my-cluster/service-c"             
start
preStart()
节点 start 后,才会处理第一条消息。
stop
postStop()
节点 Stop 时,会先调用其所有子 actor 的 postStop() 方法。postStop()在actor 停止之前调用,调用后不再接收消息。
restart
preRestart()
在重启 Actor 之前在旧实例上调用。
postRestart()
在重启Actor之后在新实例上调用,Actor.scala默认的postRestart()方法调用了preStart()方法。
resume
Actor 继续处理下一条消息;为了控制 Actor 的 restart 和 resume,需要重写 supervisorStrategy() 方法,子actor抛出的异常都会被父actor接收,对于不同异常可以定义不同的处理方式。
下面的代码对 NullPointerException 进行 restart,对 IllegalArgumentException 进行 resume。
HelloWord
maven引用
  <!-- akka remote -->                     
        <dependency>  
            <groupId>com.typesafe.akka</groupId>    
            <artifactId>akka-remote_2.13</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>   
            <groupId>com.typesafe.akka</groupId>    
            <artifactId>akka-slf4j_2.13</artifactId>
            <version>${akka.version}</version> 
        </dependency>
配置文件
Akka的所有配置信息装在 ActorSystem的实例中, 或者换个说法, 从外界看来, ActorSystem 是配置信息的唯一消费者. 在构造一个actor系统时,你可以传进来一个 Config object,如果不传,就相当于传进来 ConfigFactory.load() (使用正确的classloader)。
这意味着将会读取classpath根目录下的所有application.conf, application.json and  application.properties这些文件(请参阅之前推荐的文档)以了解细节,然后actor系统会合并classpath根目录下的 reference.conf 来组成其内部使用的缺省配置。
# 你可以在这个配置文件中覆盖掉reference files的配置.
                          
akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
                          
  # 日志级别
  # Options: OFF, ERROR, WARNING, INFO, DEBUG    
  loglevel = "WARNING"
                          
  # Log level for the very basic logger activated during ActorSystem startup.    
  # This logger prints the log messages to stdout (System.out).
  # Options: OFF, ERROR, WARNING, INFO, DEBUG             
  stdout-loglevel = "DEBUG"
                          
  akka.actor.default-mailbox {
    mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
  }
                          
  actor {                       
    provider = "akka.cluster.ClusterActorRefProvider"
                          
    default-dispatcher {
      # Throughput for default Dispatcher, set to 1 for as fair as possible  
      throughput = 10 
    }     
  }                     
}                   
Java代码
/**                     
 * Worker节点Actor,接受服务器请求                     
 *
 * @author yankun                      
 * @since 2021年01月11日15:31:02       
 */
@Slf4j                     
public class WorkerActor extends AbstractActor {
                        
    @Override
    public Receive createReceive() {
                        
        return receiveBuilder()
                .matchEquals("worker", e -> log.info("i get a message test{}", e))
                .match(String.class, e -> log.info("i get a message {}", e))
                .matchAny(obj -> log.warn("[WorkerActor] receive unknown request: {}.", obj))
                .build();                   
    }
                       
                        
    @Override 
    public void preStart() {           
        log.info("worker starting.");
    }
                        
    @Override                   
    public void postStop() throws Exception {           
        log.info("worker stoping..");
    }
                        
    @Override                   
    public void preRestart(Throwable reason, Optional<Object> message) throws Exception {           
        super.preRestart(reason, message);
        log.info("preRestart    hashCode=" + this.hashCode());
    }
                        
    @Override                   
    public void postRestart(Throwable reason) throws Exception {           
        super.postRestart(reason);      
        log.info("postRestart    hashCode=" + this.hashCode());
    }
                        
    @Override                   
    public SupervisorStrategy supervisorStrategy() {           
        return new OneForOneStrategy(3, Duration.create(3, TimeUnit.SECONDS),  
            DeciderBuilder
                .match(ArithmeticException.class, e -> SupervisorStrategy.resume())
                .match(NullPointerException.class, e -> SupervisorStrategy.restart())
                .match(ArithmeticException.class, e -> {
                    return SupervisorStrategy.restart();                  
                })
                .match(IllegalArgumentException.class, e -> SupervisorStrategy.stop())      
                .matchAny(o -> SupervisorStrategy.escalate())
                .build());
    }
}
                        
/**                       
 * Server节点Actor,发送服务器请求
                        
 *                     
 * @author yankun                      
 * @since 2021年01月11日15:31:02       
 */
@Slf4j                     
public class ServerActor extends AbstractActor {
                        
    @Override
    public Receive createReceive() {
                        
        return receiveBuilder()
                .matchEquals("server", e -> log.info("i get a message test", e))
                .match(String.class, e -> log.info("i get a message {}", e))
                .matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj))
                .build();
                        
    }
}                   
                        
public static void main(String[] args) {         
    ActorSystem actorSystem = ActorSystem.create("hello");
                        
    final Props props = Props.create(ServerActor.class);
    final ActorRef server = actorSystem.actorOf(props, "server");
    
    final Props props2 = Props.create(WorkerActor.class);
    final ActorRef worker = actorSystem.actorOf(props2, "worker");
    worker.tell(1, server);
    worker.tell("server", server);
    server.tell("worker", worker);
                        
    // worker.tell("close", ActorRef.noSender());
                        
    // 中断worker
    // worker.tell(PoisonPill.getInstance(), ActorRef.noSender());
                        
    ActorSelection actorSelection = actorSystem.actorSelection("akka://hello/user/worker");   
    actorSelection.tell("worker", server);
                        
    for (int i = 0; i < 10; i++) {
        actorSelection.tell("restart", server);
    }
    // 停止所有正在运行的演员                   
    // actorSystem.terminate();
                        
}
并发编程的七个模型:
www.cnblogs.com/barrywxx/p/10406978.html
分布式概念:
https://www.pianshen.com/article/60731431723/
Akka官方文档:
https://doc.akka.io/docs/akka/current/typed/guide/modules.html
Akka学习教程:
https://blog.csdn.net/liubenlong007/article/details/53782966
*文|秋泉
