一般来说有两种策略用来在并发线程中进行通信:共享数据和消息传递 。
共享数据 :通过改变共享存储器地址内的数据,让不同的并发线程进行通信。使用这种通信类型的并发程序,通常需要应用某种锁定的方式来达成线程间的同步,这些锁定技术包括mutex,semaphore,或monitor等。
消息传递 :消息传递方式采取的是线程(进程)之间的直接通信,不同的线程(进程)之间通过显式的发送消息来达到交互目的。
Akka是另外一种解决并发问题的思路,通过线程进程之间传递消息,避免对共享资源的竞争,Akka提供了一种称之为Actor的并发模型,粒度比线程还要小(但并不等同于协程),这表明你可以在系统当中创建及其大量的Actor,Akka不仅可以在单机上构建高并发程序,还可以构建高性能高吞吐量分布式程序。
并行工作者(Parallel worker)
- 多个相互独立的执行流
- 共享内存(状态)
- 抢占式的调度(任务顺序是不确定的)
- 依赖锁,信号量等同步机制
多线程程序容易编写(因为写的是顺序程序),但是难分析、难调试,更容易出错,常见的有竞争条件,死锁、活锁、资源耗尽、优先级反转… 等等。
流水线模型(反应器/事件驱动)
Actor 和 Channels 是两种比较类似的流水线模型。
在Actor模型中每个工作者被称为actor。Actor之间可以直接异步地发送和处理消息。
Actor可以被用来实现一个或多个像前文描述的那样的作业处理流水线。
下图给出了Actor模型:
而在Channel模型中,工作者之间不直接进行通信。相反,它们在不同的通道中发布自己的消息(事件)。其他工作者们可以在这些通道上监听消息,发送者无需知道谁在监听。
下图给出了Channel模型:
什么是actor模型
Actor模式是消息传递并发模型 ,在1973年于Carl Hewitt、Peter Bishop及Richard Steiger的论文中提出。
它已经被用作并发计算的理论理解框架和并发系统的实际实现基础。
通过组件方式定义并发编程范式,避免使用者直接接触多线程并发或线程池等基础概念,其消息传递更加符合面向对象的原始意图。
所有的线程(或进程)通过消息传递的方式进行合作,这些线程(或进程)称为Actor。
actor组成和通信
Actor 模型的三要素是状态、行为和消息,有一个很流行的等式:
Actor 模型 =(状态 + 行为)+ 消息
状态(State) :
Actor 组件本身的信息,相当于 OOP 对象中的属性。
Actor 的状态会受 Actor 自身行为的影响,且只能被自己修改。
行为(Behavior ) :
Actor 的计算处理操作,相当于 OOP 对象中的成员函数。
Actor 之间不能直接调用其他 Actor 的计算逻辑。
Actor 只有收到消息才会触发自身的计算行为。
消息(Mail) :
Actor 的消息以邮件形式在多个 Actor 之间通信传递,每个 Actor 会有一个自己的邮箱(MailBox),用于接收来自其他 Actor 的消息,因此 Actor 模型中的消息也称为邮件。
一般情况下,对于邮箱里面的消息,Actor 是按照消息达到的先后顺序(FIFO)进行读取和处理的。
Actor 工作原理:
3 个 Actor 之间基于消息和消息队列的工作流程进行说明。
这 3 个 Actor 的工作流程:
消息交互过程 :
- Actor1 和 Actor3 先后向 Actor2 发送消息,消息被依次放入 Actor2 的 MailBox 队列的队尾 ;
- Actor2 从 MailBox 队列的队首依次取出消息执行相应的操作,由于 Actor1 先把消息发送给 Actor2,因此 Actor2 先处理 Actor1 的消息;
- Actor2 处理完 Actor1 的消息后,更新内部状态,并且向其他 Actor 发送消息,然后处理 Actor3 发送的消息。
小结
在 Actor 模型里,每个 Actor 相当于系统中的一个组件,都是基本的计算单元。
Actor 模式采用了异步模式,并且每个 Actor 封装了自己的数据、方法等,一个Actor在同一时间处理最多一个消息,可以发送消息给其他Actor,保证了单独写原则,从而巧妙避免了多线程写争夺。
和共享数据方式相比,消息传递机制最大的优点就是不会产生数据竞争状态。
Actor模型的特点是:
- 万物皆是Actor
- Actor之间完全独立,只允许消息传递,不允许其他”任何”共享
- 每个Actor最多同时只能进行一样工作
- 每个Actor都有一个专属的命名Mailbox(非匿名)
- 消息的传递是完全异步的
- 消息是不可变的
Actor的概念来自于Erlang,在AKKA中可以认为一个Actor就是一个容器,用来存储状态、行为、邮箱Mailbox、子Actor、Supervisor策略。
Actor之间并不直接通信,而是通过邮件Mail来互通有无。
Actor模型的本质就是消息传递,作为一种计算实体,Actor与原子类似。参与者是一个运算实体,回应接收到的消息,同时并行的发送有限数量的消息给其他参与者、创建有限数量的新参与者、指定接收到下一个消息时的行为。
AKKA是调度模块化的,它由许多拥有不同特性的JAR组成。
- akka-actor – 经典角色、类型角色、IO角色等
- akka-agent – 代理、整合了Scala的STM特性
- akka-cluster – 集群成员管理、弹性路由
- akka-kernel – AKKA微内核,运行着一个极简应用服务器
- akka-remote – 远程角色
- akka-slf4j – SLF4J Logger (事件总线监听器)
- akka-testkit – 测试角色系统的工具包Toolkit for testing Actor systems
- akka-transactor – Transactors 即支持事务的 actors,集成了Scala STM
- akka-filebased-mmailbox – 支持基于文件的mailbox
Akka与Java内存模型
Akka是如何在并发应用中访问共享内存的。
Java内存模型(JMM)
JMM
中定义了一些先行发生的关系,天然存在的,只有以下几种:
-
程序次序规则
(Program Order Rule)
: 一个线程内,按照程序代码顺序,写在前面的操作先行发生于后面的操作。 -
管程锁定规则
(Monitor Lock Rule)
: 一个unlock
操作先行发生于后面对同一个锁的lock
操作。 -
volatile
变量规则(Volatile Variable Rule)
: 对一个volatile
变量的写操作先行发生于后面对这个变量的读操作。 -
线程启动规则
(Thread Start Rule)
:Thread
对象的start()
方法先行发生于此线程的其他所有动作。 -
线程终止规则
(Thread Termination Rule)
: 线程的所有操作先行发生于该线程的终止检测。 -
线程中断规则
(Thread Interruption Rule)
: 对线程的interrupt()
方法调用先行发生于被中断的线程的代码检测到中断事件的发生。 -
对象终结规则
(Finalizer Rule)
: 一个对象的初始化完成先行发生于它的finalize()
方法的开始。 -
传递性
(Transitivity)
:
若操作A先行发生于操作B,B先行发生于操作C,那操作A一定先行发生于操作C。
Actors与Java内存模型
使用Akka中的Actor实现,有两种方法让多个线程对共享的内存进行操作:
- 如果一条消息被(例如,从另一个actor)发送到一个actor,大多数情况下消息是不可变的,但是如果这条消息不是一个正确创建的不可变对象,如果没有 “发生先于” 规则, 有可能接收方会看到部分初始化的数据,甚至可能看到无中生有的数据(long/double)。
- 如果一个actor在处理某条消息时改变了自己的内部状态,而之后又在处理其它消息时又访问了这个状态。一条很重要的需要了解的规则是,在使用actor模型时你无法保证,同一个线程会在处理不同的消息时使用同一个actor。
为了避免actor中的可见性和重排序问题,Akka保证以下两条 “发生在先” 规则:
- actor发送规则 : 一条消息的发送动作先于同一个actor对同一条消息的接收。
- actor后续处理规则 : 一条消息的处理先于同一个actor的下一条消息处理。
注意
通俗地说,这意味着当这个actor处理下一个消息的时候,对actor的内部字段的改变是可见的。因此,在你的actor中的域不需要是volitale或是同等可见性的。
这两条规则都只应用于同一个actor实例,对不同的actor则无效。
Akka工作原理
Akka中的角色
- ProducerActor(ServerActor)
- ConsumerActor(WorkerActor,onReceive方法接收消息)
- ActorRef(tell方法,发送消息给MessageDispatcher消息派发器)
- ActorSystem(actorOf方法,创建ActorRef,ActorRef就是ConsumerActor的Proxy)
- MailBox
- Dispatcher
- Message
Akka工作流程
- 创建一个叫ActorSystem的对象
- ActorSystem创建一个叫ActorRef(ServerActor)的对象。然后将消息发送给ActorRef(WorkerActor)
- ActorRef(ServerActor)将消息发送给Dispatcher
- Dispatcher(ServerActor)将消息投递到目标Actor(WorkerActor)的邮箱中
- Dispatcher(WorkerActor)将Mailbox扔给一个线程去执行
- MailBox将消息出队并最终将其委托给真实的WorkerActor的接收方法去处理
创建ActorSystem
ActorSystem是进入到Actor的世界的一扇大门。
通过它你可以创建或中止Actor,甚至还可以把整个Actor环境给关闭掉。
Actor是一个分层的结构,ActorSystem之于Actor有点类似于java.lang.Object角色——也就是说,它是所有Actor的根对象。
当你通过ActorSystem的actorOf方法创建了一个Actor时,你其实创建的是ActorSystem下面的一个Actor。
创建ActorRef(ConsumerActor(ServerActor)的Proxy)
ActorRef server = system.actorOf(Props.create(ServerActor.class), “server”);
actorOf是ActorSystem中创建Actor的方法,但是它并不会返回我们所需要的ServerActor。
它返回的是一个ActorRef,这个ActorRef扮演了真实的Actor的一个代理的角色,客户端并不会直接和Actor通信。
这也正是Actor模型中避免直接访问ServerActor中任何的自定义/私有方法或者变量的一种方式。
将消息发送给代理
worker.tell("server", server);
serverActor将”server“字符串,发送给workerActor,消息会发送到workerActor的MailBox,等待worker处理。
分发器及邮箱
当我们创建ActorSystem和ActorRef的时候,就已经创建了一个Dispatcher和MailBox了。ActorRef把消息处理功能委托给了Dispatcher。
邮箱 :每个Actor都有一个MailBox,邮箱里也有一个队列来以FIFO(默认,可修改)的方式来存储并处理消息。
分发器 :从ActorRef中取出一条消息然后将它传给了MailBox。Dispatcher会封装一个ExecutorService(ForkJoinPoll或者ThreadPoolExecutor)。它把MailBox扔到ExecutorService中去运行。
消费ConsumerActor(WorkerActor)
当MailBox的run方法运行的时候,它会从队列中取出一条消息,然后将它传给Actor去处理。
当你把消息传给ActorRef的时候,最终调用的实际是目标Actor里面的一个receive方法。
Actor生命周期
actorRef
Actor 交给开发者的是一个引用,这个引用包括 path和UID,即可定位一个 Actor。
select
上面程序中使用到了
ActorSelection selection =actorSystem.actorSelection("akka://Hello/user/server")
;进行actor的选择。
工作过程中可能会存在成千上万的actor,可以通过selection方便的选择actor进行消息投递,其支持通配符匹配getContext().actorSelection("/user/worker\_*")
。
ActorPath是通过字符串描述Actor的层级关系,并唯一标识一个Actor的方法。
ActorPath包含协议,位置和Actor层级关系。
//本地path
"akka://my-sys/user/service-a/worker1"
//远程path
"akka.tcp://my-sys@host.example.com:5678/user/service-b"
//akka集群
"cluster://my-cluster/service-c"
start
preStart()
节点 start 后,才会处理第一条消息。
stop
postStop()
节点 Stop 时,会先调用其所有子 actor 的 postStop() 方法。postStop()在actor 停止之前调用,调用后不再接收消息。
restart
preRestart()
在重启 Actor 之前在旧实例上调用。
postRestart()
在重启Actor之后在新实例上调用,Actor.scala默认的postRestart()方法调用了preStart()方法。
resume
Actor 继续处理下一条消息;为了控制 Actor 的 restart 和 resume,需要重写 supervisorStrategy() 方法,子actor抛出的异常都会被父actor接收,对于不同异常可以定义不同的处理方式。
下面的代码对 NullPointerException 进行 restart,对 IllegalArgumentException 进行 resume。
HelloWord
maven引用
<!-- akka remote -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.13</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.13</artifactId>
<version>${akka.version}</version>
</dependency>
配置文件
Akka的所有配置信息装在 ActorSystem的实例中, 或者换个说法, 从外界看来, ActorSystem 是配置信息的唯一消费者. 在构造一个actor系统时,你可以传进来一个 Config object,如果不传,就相当于传进来 ConfigFactory.load() (使用正确的classloader)。
这意味着将会读取classpath根目录下的所有application.conf
, application.json
and application.properties
这些文件(请参阅之前推荐的文档)以了解细节,然后actor系统会合并classpath根目录下的 reference.conf 来组成其内部使用的缺省配置。
# 你可以在这个配置文件中覆盖掉reference files的配置.
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
# 日志级别
# Options: OFF, ERROR, WARNING, INFO, DEBUG
loglevel = "WARNING"
# Log level for the very basic logger activated during ActorSystem startup.
# This logger prints the log messages to stdout (System.out).
# Options: OFF, ERROR, WARNING, INFO, DEBUG
stdout-loglevel = "DEBUG"
akka.actor.default-mailbox {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
}
actor {
provider = "akka.cluster.ClusterActorRefProvider"
default-dispatcher {
# Throughput for default Dispatcher, set to 1 for as fair as possible
throughput = 10
}
}
}
Java代码
/**
* Worker节点Actor,接受服务器请求
*
* @author yankun
* @since 2021年01月11日15:31:02
*/
@Slf4j
public class WorkerActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("worker", e -> log.info("i get a message test{}", e))
.match(String.class, e -> log.info("i get a message {}", e))
.matchAny(obj -> log.warn("[WorkerActor] receive unknown request: {}.", obj))
.build();
}
@Override
public void preStart() {
log.info("worker starting.");
}
@Override
public void postStop() throws Exception {
log.info("worker stoping..");
}
@Override
public void preRestart(Throwable reason, Optional<Object> message) throws Exception {
super.preRestart(reason, message);
log.info("preRestart hashCode=" + this.hashCode());
}
@Override
public void postRestart(Throwable reason) throws Exception {
super.postRestart(reason);
log.info("postRestart hashCode=" + this.hashCode());
}
@Override
public SupervisorStrategy supervisorStrategy() {
return new OneForOneStrategy(3, Duration.create(3, TimeUnit.SECONDS),
DeciderBuilder
.match(ArithmeticException.class, e -> SupervisorStrategy.resume())
.match(NullPointerException.class, e -> SupervisorStrategy.restart())
.match(ArithmeticException.class, e -> {
return SupervisorStrategy.restart();
})
.match(IllegalArgumentException.class, e -> SupervisorStrategy.stop())
.matchAny(o -> SupervisorStrategy.escalate())
.build());
}
}
/**
* Server节点Actor,发送服务器请求
*
* @author yankun
* @since 2021年01月11日15:31:02
*/
@Slf4j
public class ServerActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("server", e -> log.info("i get a message test", e))
.match(String.class, e -> log.info("i get a message {}", e))
.matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj))
.build();
}
}
public static void main(String[] args) {
ActorSystem actorSystem = ActorSystem.create("hello");
final Props props = Props.create(ServerActor.class);
final ActorRef server = actorSystem.actorOf(props, "server");
final Props props2 = Props.create(WorkerActor.class);
final ActorRef worker = actorSystem.actorOf(props2, "worker");
worker.tell(1, server);
worker.tell("server", server);
server.tell("worker", worker);
// worker.tell("close", ActorRef.noSender());
// 中断worker
// worker.tell(PoisonPill.getInstance(), ActorRef.noSender());
ActorSelection actorSelection = actorSystem.actorSelection("akka://hello/user/worker");
actorSelection.tell("worker", server);
for (int i = 0; i < 10; i++) {
actorSelection.tell("restart", server);
}
// 停止所有正在运行的演员
// actorSystem.terminate();
}
并发编程的七个模型:
www.cnblogs.com/barrywxx/p/10406978.html
分布式概念:
https://www.pianshen.com/article/60731431723/
Akka官方文档:
https://doc.akka.io/docs/akka/current/typed/guide/modules.html
Akka学习教程:
https://blog.csdn.net/liubenlong007/article/details/53782966
*文|秋泉