在面向对象编程中,一个对象可以访问或修改另一个对象的值,在高并发情况下,由于机器性能的瓶颈,当有多个对象对同一竞争资源进行操作时,可能会出现数据错误的问题(即实际读取的数据不是预期数据,而是前面阶段到这一阶段未修改完成的数据)。Actor模型对此进行了修改,它不是直接对对象进行操作,而是通过消息传递的方式与外界进行交互。如图所示:
Actor一次只接收处理一个消息,未处理消息会被放入队列等待处理。
Actor有几个重要概念:
在一个应用中,所有Actor组成了ActorSystem(Actor系统),它是一个层级结构,除顶级Actor外所有Actor都有一个父Actor,当子Actor在处理消息时出现异常情况,父Actor可以通过预先指定的方式来处理子Actor,处理方式有:恢复子Actor、重启子Actor、停止子Actor以及扩大化失败。在ActorSystem创建时,默认会启动三个Actor。
所有Actor都有自己的生命周期,Akka提供了对应的函数来响应不同的生命周期。常见的操作是构建一个Actor来处理其他Actor死亡时传递的消息,这个Actor也被称为Death Wath
。
因为Actor是通过消息进行通信的,所以对于其他Actor是在本地还是在远程它都不在乎,Actor仅仅操作它的引用。
粗略了解了Actor的相关知识后,接下来,我们就开始Akka的学习。
我的环境为:
操作系统:Windows10
jdk版本:jdk11
Akka依赖:
<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.13</artifactId> <version>2.6.0</version> </dependency> 复制代码
Akka是一个用于高并发、分布式、弹性伸缩场景下的消息驱动应用开发框架,它基于Actor模型,为开发者提供了消息控制状态的开发思想。前面提到,Actor通过操作模型的引用来控制内部状态的变化,Akka对此的实现是ActorRef对象,Akka通过ActorSystem.create()
方法获取Actor系统,然后调用API得到指定Actor的引用,通过引用发送消息来与其它Actor通信。
首先,让我们来尝试一下Akka版的Hello World
:
public class TestAkka{ static ExecutorService threadPool = Executors.newFixedThreadPool(10000); public static void test(){ Demo demo = new Demo(); for (int i = 0; i < 100000; i++) { threadPool.execute(new TestAkkaThread(demo)); } } public static void testAkka(){ // 获取Actor系统 ActorSystem sys = ActorSystem.create(); // 获取指定Actor ActorRef ref = sys.actorOf(Props.create(AkkaDemo.class), "startActor"); for (int i = 0; i < 100000; i++) { threadPool.execute(new TestAkkaThread(ref)); } } public static void main(String[] args) { // test(); testAkka(); } } class AkkaDemo extends UntypedAbstractActor { private static int cnt = 1; public void onReceive(Object message){ // 当Actor接收到消息时,自动调用此方法 System.out.println(String.format("第:%d次接收消息", cnt++)); } } class Demo { private static int cnt = 1; public void tell(){ System.out.println(String.format("第:%d次接收消息", cnt++)); } } class TestAkkaThread implements Runnable{ private Object ref; public TestAkkaThread(Object ref) { this.ref = ref; } @Override public void run() { if (ref instanceof ActorRef) // 通过ActorRef向对应Actor传送消息 ((ActorRef)ref).tell("", ActorRef.noSender()); else ((Demo)ref).tell(); } } 复制代码
getSelf():获取当前Actor的引用
getSender():返回当前Actor接收的消息的发送者的引用,比如如果Actor A向Actor B发送消息,则当B调用getSender()时,它将返回Actor A的引用。在这里可以简单理解为:返回来发送回应消息的目标的引用。
Akka提供两种发送消息的机制,分别为tell
和ask
,两者的主要区别有:
ask的应用如下:
public class StartAkka extends UntypedAbstractActor { @Override public void onReceive(Object message){ System.out.println("接收消息:" + message); getSender().tell("返回消息", getSelf()); } public static void main(String[] args) { ActorSystem sys = ActorSystem.create(); ActorRef ref = sys.actorOf(Props.create(StartAkka.class), "startAkka"); ref.tell("Hello Akka!", ActorRef.noSender()); Timeout timeout = new Timeout(10, TimeUnit.SECONDS); Future<Object> akka_ask = Patterns.ask(ref, "Akka Ask", timeout); System.out.println("ask..."); akka_ask.onComplete(new Function1<Try<Object>, Object>() { @Override public Object apply(Try<Object> v1) { // 获取回复成功的处理逻辑 if (v1.isSuccess()) System.out.println("发送成功,收到消息:" + v1.get()); // 获取回复失败的处理逻辑 if (v1.isFailure()) System.out.println("发送失败:" + v1.get()); return null; } }, sys.dispatcher()); System.out.println("continue..."); } } 复制代码
Patterns.ask 方法会异步执行,假如Actor返回消息超时了,会产生一个akka.pattern.AskTimeoutException
sys.dispatcher()
:返回当前Akka的消息分发器,该内容到后面会讲到。
在Actor模型
中我们了解到一个Actor系统其实就是一棵树,每一个Actor都是一个节点,对于已存在的Actor,我们可以通过路径(当前路径/绝对路径)来查找:
public class SearchAkka extends UntypedAbstractActor { private ActorRef target = getContext().actorOf(Props.create(Target.class), "targetActor"); @Override public void onReceive(Object message) throws Throwable, Throwable { if (message instanceof String) { if ("find".equals(message)){ /* LookupActor在收到"find"消息后,会通过ActorContext查找出ActorSelection对象. ActorSelection发送Identify时,需要指定一个messageId(用来区分Actor), 消息发送后,当前Actor会收到一个ActorIdentity,可以通过ActorIdentity.getRef() 方法来获取指定的ActorRef */ ActorSelection targetActor = getContext().actorSelection("targetActor"); // 异步查找Actor Timeout timeout = new Timeout(10, TimeUnit.SECONDS); Future<Object> find = Patterns.ask(targetActor, "find", timeout); find.onComplete(new Function1<Try<Object>, Object>() { @Override public Object apply(Try<Object> v1) { if (v1.isSuccess()) targetActor.tell(new Identify("A001"), getSelf()); if (v1.isFailure()) System.out.println("查找失败"); return null; } }, getContext().dispatcher()); } } else if (message instanceof ActorIdentity){ ActorIdentity actorIdentity = (ActorIdentity) message; if (actorIdentity.correlationId().equals("A001")) { Optional<ActorRef> ref = actorIdentity.getActorRef(); if (!ref.isEmpty()){ System.out.println("ActorIdentity is:" + actorIdentity.correlationId() + " " + ref); ref.get().tell("hello target", getSelf()); } } } else unhandled(message); } public static void main(String[] args) { ActorSystem system = ActorSystem.create("sys"); ActorRef actorRef = system.actorOf(Props.create(SearchAkka.class), "askActorDemo"); Timeout timeout = new Timeout(10, TimeUnit.MINUTES); Future<Object> akka_ask = Patterns.ask(actorRef, "find", timeout); akka_ask.onComplete(new Function1<Try<Object>, Object>() { @Override public Object apply(Try<Object> v1) { if (v1.isSuccess()){ System.out.println("收到消息:" + v1.get()); } else if (v1.isFailure()){ System.out.println("湖获取消息失败"); } return null; } }, system.dispatcher()); } } /** * 被查找的对象 */ class Target extends UntypedAbstractActor{ @Override public void onReceive(Object message) throws Throwable, Throwable { System.out.println("target actor reveive: " + message); } } 复制代码
Actor在运行时中会经历不同的阶段,有创建并启动、恢复运行、重启、停止。Akka针对Actor不同的状态提供了对应的响应API:
preStart():启动前。
aroundPreStart():可以覆盖preStart()方法,默认情况下调用preStart()。
preRestart():重启前。(将被废弃)
aroundPreResrat():可以覆盖preRestart()方法,默认情况下调用preRestart()。
postRestart():重启后。
aroundPostRestart():可以覆盖postRestart()方法,默认情况下调用postRestart()。
aroundPostStop():可以覆盖postStop()方法,默认情况下调用postStop()。
postStop():停止后。
对于Actor的停止有三种方法:
调用ActorSystem或getContext()的stop方法:
sys.stop(ref);
给Actor发送一个PoisonPill (毒丸)消息:
ref.tell(PoisonPill.getInstance(), ActorRef.noSender());
给Actor 发送一个Kill 的消息, 此时会抛出ActorKilledException 异常:
ref.tell(Kill.getInstance(), ActorRef.noSender());
当Actor停止时,它会执行以下流程:
当我们处理业务时,可能需要针对不同的消息采用不同的处理逻辑。我们可以将多个状态的处理过程封装为对应组件,然后进行组装。在Akka中,提供了Producer实现,它有两个方法:become(切换为某个行为),unbecome(切换为上一个行为)。实例如下:
public class StateAkka extends UntypedAbstractActor { private PartialFunction<Object, BoxedUnit> procedure1 = new PartialFunction<>() { @Override public BoxedUnit apply(Object param) { System.out.println(param); if ("break".equals(param)) getContext().unbecome(); else System.out.println("state1:" + param); return null; } @Override public boolean isDefinedAt(Object x) { return true; } }; private PartialFunction<Object, BoxedUnit> procedure2 = new PartialFunction<>() { @Override public BoxedUnit apply(Object param) { System.out.println(param); if ("break".equals(param)) getContext().unbecome(); else System.out.println("state2:" + param); return null; } @Override public boolean isDefinedAt(Object x) { return true; } }; @Override public void onReceive(Object message) { /* 当Procedue执行了unbecome方法后,计算流程会重新进入onReceive内。 当调用了一次become之后,新Producre的代码逻辑会被保存进一个执行栈中, 此时可以通过调用UNbecome来返回到上一个Procure。你也可以在become方法中 传递第二个参数为false来表示不存储当前行为。 */ System.out.println("开始执行模式:" + message); if ("1".equals(message)) getContext().become(procedure1); if ("2".equals(message)) getContext().become(procedure2); } public static void main(String[] args) { ActorSystem sys = ActorSystem.create("sys"); ActorRef ref = sys.actorOf(Props.create(StateAkka.class), "statActor"); ref.tell("1", ActorRef.noSender()); ref.tell("nihao", ActorRef.noSender()); ref.tell("nihao", ActorRef.noSender()); ref.tell("break", ActorRef.noSender()); ref.tell("2", ActorRef.noSender()); ref.tell("nihao", ActorRef.noSender()); ref.tell("nihao", ActorRef.noSender()); ref.tell("nihao", ActorRef.noSender()); ref.tell("nihao", ActorRef.noSender()); } } 复制代码
Actor系统采用"父监督"的模式进行管理,即父Actor会监督子Actor的异常情况,然后根据默认或者预设的处理逻辑来确定到底是该恢复Actor 、停止Actor 、重启Actor还是把错误提交到父级。
Akka 提供了两种监督策略:
One-For-One Strategy
(默认监督策略):当一个子Actor出现异常时,只对该Actor 做处理。All-For-One Stratw
,当一个子Actor出现异常时,对所有Actor 都做
处理。当程序中没有显式指定策略时,会启动一个默认策略,该策略遵循下列规则:
ActorlnitializationException
和ActorKilledException
时,会终止子Actor。Exception
时,会重启子Actor。Throwable
异常时, 会上溯到父级。自定义容错策略:
public class StrategyAkka extends UntypedAbstractActor { // 定义监督策略 private SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.apply("1 minute"), new Function<Throwable, SupervisorStrategy.Directive>() { @Override public SupervisorStrategy.Directive apply(Throwable err) throws Exception { if (err instanceof IOException) { System.out.println("-----------IOException-----------"); return SupervisorStrategy.resume(); // 恢复运行 } else if (err instanceof IndexOutOfBoundsException) { System.out.println("-----------IndexOutOfBoundsException-----------"); return SupervisorStrategy.restart(); // 重启 } else if (err instanceof SQLException) { System.out.println("-----------SQLException-----------"); return SupervisorStrategy.stop(); // 停止 } else { System.out.println("-----------UnkownException-----------"); return SupervisorStrategy.escalate(); // 升级失败 } } }); @Override public SupervisorStrategy supervisorStrategy() { return strategy; } @Override public void preStart() throws Exception { ActorRef ref = getContext().actorOf(Props.create(WorkActor.class), "workActor"); // 监控生命周期 getContext().watch(ref); ref.tell("Hello", ActorRef.noSender()); ref.tell(new IOException(), ActorRef.noSender()); ref.tell(new IndexOutOfBoundsException(), ActorRef.noSender()); Timeout timeout = new Timeout(10, TimeUnit.SECONDS); Future<Object> akka_ask = Patterns.ask(ref, "getValue", timeout); System.out.println("ask..."); akka_ask.onComplete(new Function1<Try<Object>, Object>() { @Override public Object apply(Try<Object> v1) { if (v1.isSuccess()) System.out.println("发送成功,收到消息:" + v1.get()); if (v1.isFailure()) System.out.println("发送失败:" + v1.get()); return null; } }, getContext().dispatcher()); System.out.println("continue..."); super.preStart(); } @Override public void onReceive(Object message) throws Throwable, Throwable { if (message instanceof Terminated) System.out.println(((Terminated)message).getActor() + "已经停止"); else System.out.println("stateCount:" + message); } public static void main(String[] args) { ActorSystem sys = ActorSystem.create("sys"); ActorRef ref = sys.actorOf(Props.create(StrategyAkka.class), "strategyActor"); } } class WorkActor extends UntypedAbstractActor { private int state = 1; // 状态参数 @Override public void preStart() throws Exception, Exception { System.out.println("start, state is:" + state++); super.preStart(); } @Override public void postStop() throws Exception { System.out.println("stop"); super.postStop(); } @Override public void postRestart(Throwable reason) throws Exception { System.out.println("postRestart"); super.postRestart(reason); } @Override public void onReceive(Object message) throws Exception { // 模拟计算任务 this.state++; System.out.println("message:" + message); if (message instanceof Exception) throw (Exception) message; else if ("getValue".equals(message)) getSender().tell(state, getSelf()); else unhandled(message); } } 复制代码
OneForOneStrategy对象需要三个参数:
maxNrOfRetries
:指定时间内的最大重启次数。withinTimeRange
:指定时间大小。decider
:接收一个Function对象,通过apply
方法返回监督指令:
- SupervisorStrategy.resume() :恢复运行
- SupervisorStrategy.restart(): 重启
- SupervisorStrategy.stop():停止
- SupervisorStrategy.escalate():升级失败
在分布环境下,可能由于网络或其它问题导致系统级联失败,为了防止系统不断重试而造成资源大量耗费,Actor提供了熔断机制。即当尝试指定次数后仍然失败,则反馈错误信息。熔断机制有以下三种状态:
public class CricuitBreakAkka extends UntypedAbstractActor { private ActorRef workChild; private static SupervisorStrategy strategy = new OneForOneStrategy(20, Duration.ofMinutes(1), new Function<Throwable, SupervisorStrategy.Directive>() { @Override public SupervisorStrategy.Directive apply(Throwable param) throws Exception { // 直接恢复运行 return SupervisorStrategy.resume(); } }); @Override public SupervisorStrategy supervisorStrategy() { return strategy; } @Override public void preStart() throws Exception { super.preStart(); workChild = getContext().actorOf(Props.create(CricuitWorkActor.class), "workActor"); } @Override public void onReceive(Object message) throws Throwable { workChild.tell(message, getSender()); } public static void main(String[] args) { ActorSystem sys = ActorSystem.create(); ActorRef ref = sys.actorOf(Props.create(CricuitBreakAkka.class), "cricuitBreakActor"); for (int i = 0; i < 15; i++) { ref.tell("block Hello " + i, ActorRef.noSender()); } } } class CricuitWorkActor extends UntypedAbstractActor{ private CircuitBreaker breaker; @Override public void preStart() throws Exception { super.preStart(); /** * 在启动阶段创建CircuitBreaker对象,当向CricuitWorkActor发送"block"开头的字符串信息后,会阻塞3s,来触发超时并计数一次。 * 当计数达到3次后,CircuitBreaker会处于Open状态,触发onOpen函数。5s后会进入Half-Open状态,此时调用onHalfOpen函数。 * 然后继续发送消息,如果消息被成功处理,那么CircuitBreaker进入Closed状态,并调用onClose函数,否则又进入Open状态。 */ this.breaker = new CircuitBreaker(getContext().dispatcher(), getContext().system().scheduler(), 3, Duration.ofSeconds(2), Duration.ofSeconds(5)).onOpen(new Function0<BoxedUnit>() { @Override public BoxedUnit apply() { System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date()) + "--> Acator CircuitBreak 开启"); return null; } }).onHalfOpen(new Function0<BoxedUnit>() { @Override public BoxedUnit apply() { System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date()) + "--> Acator CircuitBreak 半开启"); return null; } }).onClose(new Function0<BoxedUnit>() { @Override public BoxedUnit apply() { System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date()) + "--> Acator CircuitBreak 关闭"); return null; } }); } @Override public void onReceive(Object message) throws Throwable { if (message instanceof String) { String msg = (String) message; if (msg.startsWith("block")) { getSender().tell(breaker.callWithCircuitBreaker(new Callable() { @Override public String call() throws Exception { System.out.println("msg is: " + msg); Thread.sleep(3000); return null; } }), getSelf()); } } } } 复制代码
在Akka 中, Actor的消息通信和任务执行建立在一个完全透明的调度机制之上,它屏蔽了底层线程池的使用,暴露出了一个消息分发器作为线程的调度,这个消息分发器就是Dispatcher。
与Dispatcher相关联的为Executor,它为Dispatcher提供了执行异步任务的策略,Executor有两种类型:
自定义Dispatcher:
在类路径下创建application.conf
文件,在里面写上:
my-forkjoin-dispatcher{ # 自定义的Dispatcher的名字 type = Dispatcher # dispatcher 类型 executor = "fork-join-executor" fork-join-executor { # 配置forkjoin线程池 parallelism-min = 3 # 最小并发线程数 parallelism-factor = 3.0 # 并发因子 parallelism-max = 16 # 最大并发数 } throughput = 1executor # 对于一个Actor,某个线程在处理下一个Actor之前能处理的最大消息数 } my-pinned-dispatcher{ executor = "thread-pool-executor" type = PinnedDispatcher # PinnedDispatcher是另一种Dispatcher,为每个Actor提供只有一个线程的线程池 } 复制代码
parallelism-factor作为线程池的并发因子,影响线程池的最大 可用线程数:最大线程数=处理器个数*并发因子。
type包括:
Dispatcher:基于事件的调度器,将一组Actor绑定到线程池。
PinnedDispatcher:为每个Actor 提供只有一个线程的线程池。
CallingThreadDispatcher:不创建执行线程,当前线程执行Actor调用。
在代码中使用自己指定的Dispatcher:
// 在创建Actor引用时,指定策略 ActorRef ref = sys.actorOf(Props.create(CustomActorDemo.class). withDispatcher("my-forkjoin-dispatcher"), "customActor"); 复制代码
我们知道,当我们给一个Actor发送消息时,并不是直接传递给它,而是将消息发送到"邮箱",由邮箱来进行调度,决定什么时候发送。Actor的邮箱其实就是一个消息队列,默认遵循先进先出的原则。当然,我们基于某些场景自定义它。
邮箱分为有界(Unbounded)和无界(Unbounded),Actor默认 采用UnboundedMailbox。
UnboundedMailbox一个基于链表的队列结构,元素从队尾入队,从队首出队,同时它使用CAS保证多线程下的安全,保证了性能的高效。
Akka包含了很多自定义的邮箱,主要有:
邮箱 | 说明 | 是否阻塞 | 是否有界 |
---|---|---|---|
UnboundedMailbox | 基于ConcurrentLinkedQueue实现,先进先出。 | N | N |
SingleConsumerOnlyUnboundedMailbox | 多生产者——单消费者模式(比ConcurrentLinkedQueue慢)。 | Y | N |
NonBlockingBoundedMailbox | 多生产者——单消费者模式,直接将多余消息废弃,所以不需要mailbox-push-timeout-time 。 |
N | Y |
UnboundedControlAwareMailbox | 优先发送实现ControlMessage 的控制消息。 |
N | N |
UnboundedPriorityMailbox | 允许对其内容进行优先排序。扩展这个类并在构造函数中提供比较器。 | N | N |
UnboundedStablePriorityMailbox | 与UnboundedPriorityMailbox类似,但它保留同等优先级邮件的顺序。 | N | N |
BoundedMailbox | 参与者使用的默认有界邮箱类型。 | Z | N |
BoundedPriorityMailbox | 允许对其内容进行优先排序的有界邮箱。 | Z | Y |
BoundedStablePriorityMailbox | 允许对其内容进行优先排序,保留同等优先级邮件的顺序的有界邮箱。 | Z | Y |
BoundedControlAwareMailbox | 优先发送实现ControlMessage 控制消息的有界有偶像。 |
Z | Y |
Z表示:当
mailbox-push-timeout-time
非0 时,可能会阻塞, 反之则不会。
首先,我们要创建一个自定义的邮箱类:
class CustomEmail extends UnboundedStablePriorityMailbox { public CustomEmail(ActorSystem.Settings settings, Config config){ /* 返回值越小,优先级越高 */ super(new PriorityGenerator() { @Override public int gen(Object message) { if (message instanceof String) { String msg = (String) message; if (msg.startsWith("张")) return 0; if (msg.startsWith("李")) return 1; if (msg.startsWith("王")) return 2; } return 3; } }); } } 复制代码
然后,修改application.conf文件内容:
my-mailbox{ mailbox-type = "cn.bigkai.akka.CustomEmail" # 绑定邮箱 mailbox capacity = 1000 # 邮箱容量 mailbox-push-timeout-time = 10s # 入队超时时间(对于有界邮箱) } 复制代码
测试时,将自定义邮箱关联起来:
ActorRef ref = sys.actorOf(Props.create(CustomEmailActorDemo.class).withMailbox("my-mailbox")); 复制代码
你也可以在配置文件的dispatcher中配置邮箱,然后在代码中直接关联dispatcher:
my-forkjoin-dispatcher{ mailbox-type = "cn.bigkai.akka.CustomEmail" # 绑定邮箱 } 复制代码或者直接在你的Actor上继承对应的邮箱接口:
// 给该Actor发送的ControlMessage消息将会被优先处理 public class CustomMailBoxAkka extends UntypedAbstractActor implements RequiresMessageQueue<UnboundedControlAwareMailbox> { @Override public void onReceive(Object message) throws Throwable { System.out.println(message); } } 复制代码邮箱队列接口与类型对应:
接口 类型 UnboundedMessageQueueSemantics UnboundedMailbox BoundedMessageQueueSemantics BoundedMailbox DequeBasedMessageQueueSemantics UnboundedDequeBasedMailbox UnboundedDequeBasedMessageQueueSemantics UnboundedDequeBasedMailbox BoundedDequeBasedMessageQueueSemantics BoundedDequeBasedMailbox MultipleConsumerSemantics UnboundedMailbox AwareMessageQueueSemantics UnboundedControlAwareMailbox UnboundedControlAwareMessageQueueSemantics UnboundedControlAwareMailbox BoundedControlAwareMessageQueueSemantics BoundedControlAwareMailbox LoggerMessageQueueSemantics LoggerMailboxType
在上面我们通过继承对应的邮箱类型的接口来自定义邮箱,但是如果有的时候我们想自己修改队列的实现或者完成更细粒度的操作该怎么办呢?这时候就需要实现它们的父类接口:MessageQueue:
class BusinessMsgQueue implements MessageQueue{ private Queue<Envelope> queue = new ConcurrentLinkedDeque<>(); @Override public void enqueue(ActorRef receiver, Envelope handle) { queue.offer(handle); } @Override public Envelope dequeue() { return queue.poll(); } @Override public int numberOfMessages() { return queue.size(); } @Override public boolean hasMessages() { return !queue.isEmpty(); } @Override public void cleanUp(ActorRef owner, MessageQueue deadLetters) { for (Envelope ev : queue) deadLetters.enqueue(owner, ev); } } class BusinessMailBoxType implements MailboxType, ProducesMessageQueue<BusinessMsgQueue>{ // 必须要构建一个带Settings、Config的构造参数 public BusinessMailBoxType(ActorSystem.Settings settings, Config config) { } // 指定自定义的队列 @Override public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) { return new BusinessMsgQueue(); } } 复制代码
Akka除了为简单的消息传递提供了tell、ask等API,同时通过路由的方式也为轮询、广播等复杂消息投递逻辑实现了对应方法。下面,就让我们尝试一下Akka的路由机制。
在进行代码编写前,我们先理解两个概念:Router
和Route
:
下面是一段简单的测试代码:
public class RouteAkka extends UntypedAbstractActor{ private Router router; @Override public void preStart() throws Exception { super.preStart(); ArrayList<Routee> list = new ArrayList<>(); for (int i = 0; i < 2; i++) list.add(new ActorRefRoutee(getContext().actorOf(Props.create(RouteeActor.class), "routeeActor" + i))); router = new Router(new RoundRobinRoutingLogic(), list); // 轮询投递 } @Override public void onReceive(Object message) throws Throwable { router.route(message, getSender()); } public static void main(String[] args) { ActorSystem sys = ActorSystem.create(); ActorRef ref = sys.actorOf(Props.create(RouteAkka.class), "routeActor"); for (int i = 0; i < 10; i++) { ref.tell("Hello " + i, ActorRef.noSender()); } } } class RouteeActor extends UntypedAbstractActor { @Override public void onReceive(Object message) throws Throwable { System.out.println(getSelf() + "——>" + message); } } 复制代码
RoundRobinRoutingLogic继承了RoutingLogic,并重写了它的select方法,通过一个AtomicLong原子类递增并求余,来获取当前routee的下标。
private final AtomicLong next = new AtomicLong(); public Routee select(final Object message, final IndexedSeq<Routee> routees) { Object var10000; if (routees.nonEmpty()) { int size = routees.size(); int index = (int)(this.next().getAndIncrement() % (long)size); var10000 = (Routee)routees.apply(index < 0 ? size + index : index); } else { var10000 = .MODULE$; } return (Routee)var10000; } 复制代码
在Router的route方法中,通过send发送消息。
public void route(final Object message, final ActorRef sender) { BoxedUnit var3; if (message instanceof Broadcast) { Broadcast var5 = (Broadcast)message; Object msg = var5.message(); // 发送消息 (new SeveralRoutees(this.routees())).send(msg, sender); var3 = BoxedUnit.UNIT; } else { this.send(this.logic().select(message, this.routees()), message, sender); var3 = BoxedUnit.UNIT; } } 复制代码
内置路由类型:
路由 | 说明 | 支持pool | 支持group |
---|---|---|---|
RoundRobinRouting | 轮询发送消息。 | Y | Y |
RandomRouting | 随机发送消息 。 | Y | Y |
BalancingRouting | 动态分配Routee的任务,努力达到执行时间的平衡。 | Y | N |
SmallestMailbox | 尝试给消息较少的非挂起的Routee 发送消息。 | Y | N |
Broadcast | 会以广播的形式发送给所有Routee。 | Y | Y |
ScatterGatherFirstComp leted | 将消息发送给所有Routee ,并等待一个最快回复(如果指定时间内没有回复,则抛出一个异常)。 | Y | Y |
TailChopping | 首先发送消息给一个随机选取的Routee ,一段时间时间后发给第二个随机选取的Routee ,直到获取回复。 | Y | Y |
ConsistentHashing | 使用一致性Hash算法来选择Routee 。 | Y | Y |
一个路由Actor有两种模式:
pool:路由器Actor 会创建子Actor 作为其Routee 并对其监督和监控,当某个Routee 终止时将其移除出去,如上面实例所示。
group:可以将Routee 的生产方式放在外部(即在conf文件中设置),然后路由器Actor 通过路径( path )对这些目标进行消息发送。
my-dispatcher{ executor = "thread-pool-executor" type = PinnedDispatcher router = boradcast-group routees.path = ["/default/user/routeActor/routeeActor1", "/default/user/routeActor/routeeActor2"] } 复制代码