Akka

Akka 是一个建立在 Actors 概念和可组合 Futures 之上的并发框架, Akka 设计灵感来源于 Erlang;Erlang 是基于 Actor 模型构建的。它通常被用来取代阻塞锁如同步、读写锁及类似的更高级别的异步抽象。Netty 是一个异步网络库,使 Java NIO 的功能更好用。Akka 针对 IO 操作有一个抽象,这和 netty 是一样的。使用 Akka 可以用来创建计算集群,Actor 在不同的机器之间传递消息。从这个角度来看,Akka 相对于 Netty 来说,是一个更高层次的抽象

Akka 是一种高度可扩展的软件,这不仅仅表现在性能方面,也表现在它所适用的应用的大小。Akka 的核心,Akka-actor 是非常小的,可以非常方便地放进你的应用中,提供你需要的异步无锁并行功能,不会有任何困扰。你可以任意选择 Akka 的某些部分集成到你的应用中,也可以使用完整的包——Akka 微内核,它是一个独立的容器,可以直接部署你的 Akka 应用。随着 CPU 核数越来越多,即使你只使用一台电脑,Akka 也可作为一种提供卓越性能的选择。Akka 还同时提供多种并发范型,允许用户选择正确的工具来完成工作。

Actor 声明与实例化

最简单的 Actor 声明即是继承自 AbstractActor,并且实现了 createReceive 方法,该方法会自定义消息的处理方式。

class PrintMyActorRefActor extends AbstractActor {
  @Override
  public Receive createReceive() {
    return receiveBuilder()
      .matchEquals(
        "printit",
        p -> {
          // 创建新的子 Actor 实例
          ActorRef secondRef = getContext()
            .actorOf(Props.empty(), "second-actor");
          System.out.println("Second: " + secondRef);
        }
      )
      .build();
  }
}Copy to clipboardErrorCopied
// 精确匹配某个值
.matchEquals
// 匹配某个类对象
.match(String.class, s -> {
log.info("Received String message: {}", s);
})
// 匹配任意对象
.matchAny(o -> log.info("received unknown message"))Copy to clipboardErrorCopied

然后在 Actor System 中注册该 Actor 的实例对象:

ActorSystem system = ActorSystem.create("testSystem");
// 创建某个 Actor
ActorRef firstRef = system.actorOf(Props.create(PrintMyActorRefActor.class), "first-actor");
System.out.println("First: " + firstRef);
// 发送消息
firstRef.tell("printit", ActorRef.noSender());
// 终止该系统
system.terminate();
// First: Actor[akka://testSystem/user/first-actor#1053618476]
// Second: Actor[akka://testSystem/user/first-actor/second-actor#-1544706041]Copy to clipboardErrorCopied

Props是一个配置类,用于指定创建参与者的选项,可以将它看作是一个不可变的,因此可以自由共享的方法,用于创建包含相关部署信息的参与者。

Props props1 = Props.create(MyActor.class);
// 该方法不建议使用
Props props2 = Props.create(ActorWithArgs.class,
  () -> new ActorWithArgs("arg"));
Props props3 = Props.create(ActorWithArgs.class, "arg");Copy to clipboardErrorCopied

消息传递

消息定义

在真实场景下,我们往往会使用 POJO 或者 Enum 做消息的结构化传递:

// GreeterActor
public static enum Msg {
    GREET, DONE;
}
@Override
public Receive createReceive() {
    return receiveBuilder()
        .matchEquals(Msg.GREET, m -> {
        System.out.println("Hello World!");
        sender().tell(Msg.DONE, self());
    })
    .build();
}Copy to clipboardErrorCopied

Future 异步调用

类似于 Java 中的 Future,可以将一个 actor 的返回结果重定向到另一个 actor 中进行处理,主 actor 或者进程无需等待 actor 的返回结果。

ActorSystem system = ActorSystem.create("strategy", ConfigFactory.load("akka.config"));
ActorRef printActor = system.actorOf(Props.create(PrintActor.class), "PrintActor");
ActorRef workerActor = system.actorOf(Props.create(WorkerActor.class), "WorkerActor");
//等等future返回
Future<Object> future = Patterns.ask(workerActor, 5, 1000);
int result = (int) Await.result(future, Duration.create(3, TimeUnit.SECONDS));
System.out.println("result:" + result);
//不等待返回值,直接重定向到其他actor,有返回值来的时候将会重定向到printActor
Future<Object> future1 = Patterns.ask(workerActor, 8, 1000);
Patterns.pipe(future1, system.dispatcher()).to(printActor);
workerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());]Copy to clipboardErrorCopied

LifeCycle | 生命周期

image

class StartStopActor1 extends AbstractActor {
  @Override
  public void preStart() {
    ...
    getContext().actorOf(Props.create(StartStopActor2.class), "second");
  }
  @Override
  public void postStop() {
    ...
  }
  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .matchEquals("stop", s -> {
          getContext().stop(getSelf());
        })
        .build();
  }
}Copy to clipboardErrorCopied

各类型 Actor

Typed Actor & AbstractActor

早期 Akka 库中常见的是 TypedActor 与 UntypedActor,TypedActor 的优势在于其内置了静态协议,而不需要用户自定义消息类型;并且 UntypedActor 已经被弃用,而应该使用 AbstractActor。

AbstractLoggingActor

public static class Terminator extends AbstractLoggingActor {
    private final ActorRef ref;
    public Terminator(ActorRef ref) {
        this.ref = ref;
        getContext().watch(ref);
    }
    @Override
    public Receive createReceive() {
        return receiveBuilder()
        .match(Terminated.class, t -> {
            log().info("{} has terminated, shutting down system", ref.path());
            getContext().system().terminate();
        })
        .build();
    }
}
// 可以用来监控其他的 Actor
ActorRef actorRef = system.actorOf(Props.create(HelloWorld.class), "helloWorld");
system.actorOf(Props.create(Terminator.class, actorRef), "terminator");

持久化

Akka 持久化可以使有状态的 actor 能够保持其内部状态,以便在启动、JVM 崩溃后重新启动、或在集群中迁移时,恢复它们的内部状态。Akka 持久性关键点在于,只有对 actor 内部状态的更改才会被持久化,而不会直接保持其当前状态(可选快照除外)。这些更改只会追加到存储,没有任何修改,这允许非常高的事务速率和高效的复制;通过加载持久化的数据 stateful actors 可以重建内部状态。

Akka 持久性扩展依赖一些内置持久性插件,包括基于内存堆的日志,基于本地文件系统的快照存储和基于 LevelDB 的日志。

<dependency>
    <groupId>org.iq80.leveldb</groupId>
    <artifactId>leveldb</artifactId>
    <version>0.7</version>
</dependency>Copy to clipboardErrorCopied

然后我们还需要针对持久化策略添加相关的配置:

akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"
# DO NOT USE THIS IN PRODUCTION !!!
# See also https://github.com/typesafehub/activator/issues/287
akka.persistence.journal.leveldb.native = falseCopy to clipboardErrorCopied

当我们声明某个可持久化的 Actor 时,需要使其继承自 AbstractPersistentActor:

class ExamplePersistentActor extends AbstractPersistentActor {}Copy to clipboardErrorCopied

然后复写 createReceiveRecover 与 createReceive 方法;createReceive 是正常的处理消息的方法,而 createReceiveRecover 则是用于在恢复阶段处理接收到的消息的方法。

@Override
public Receive createReceiveRecover() {
    return receiveBuilder()
        // 恢复之前在上一个快照点之后发布的 Event
        .match(Evt.class, e -> state.update(e))
        // 恢复之前保存的状态
        .match(SnapshotOffer.class, ss -> state = (ExampleState) ss.snapshot())
        .build();
}
@Override
public Receive createReceive() {
    return receiveBuilder()
        .match(Cmd.class, c -> {
            final String data = c.getData();
            final Evt evt = new Evt(data + "-" + getNumEvents());
            // 持久化消息
            persist(evt, (Evt event) -> {
                state.update(event);
                getContext().system().eventStream().publish(event);
            });
        })
        // 触发持久化当前状态
        .matchEquals("snap", s -> saveSnapshot(state.copy()))
        .matchEquals("print", s -> System.out.println(state))
        .build();
}Copy to clipboardErrorCopied

在外部调用时,我们可以手动地触发进行状态存储:

persistentActor.tell(new Cmd("foo"), null);
persistentActor.tell("snap", null);
persistentActor.tell(new Cmd("buzz"), null);
persistentActor.tell("print", null);

邮箱与路由

Mailbox

整个 akka 的 Actor 系统是通过消息进行传递的,其实还可以使用 Inbox 消息收件箱来给某个 actor 发消息,并且可以进行交互。

// 创建 Inbox 收件箱
Inbox inbox = Inbox.create(system);
// 监听一个 actor
inbox.watch(inboxTest);
//通过inbox来发送消息
inbox.send(inboxTest, Msg.WORKING);
inbox.send(inboxTest, Msg.DONE);
inbox.send(inboxTest, Msg.CLOSE);Copy to clipboardErrorCopied

然后在 Inbox 中可以循环等待消息回复:

while(true){
    try {
        Object receive = inbox.receive(Duration.create(1, TimeUnit.SECONDS));
        if(receive == Msg.CLOSE){//收到的inbox的消息
            System.out.println("inboxTextActor is closing");
        }else if(receive instanceof Terminated){//中断,和线程一个概念
            System.out.println("inboxTextActor is closed");
            system.terminate();
            break;
        }else {
            System.out.println(receive);
        }
    } catch (TimeoutException e) {
        e.printStackTrace();
    }
}Copy to clipboardErrorCopied

路由与投递

通常在分布式任务调度系统中会有这样的需求:一组 actor 提供相同的服务,我们在调用任务的时候只需要选择其中一个 actor 进行处理即可。其实这就是一个负载均衡或者说路由策略,akka 作为一个高性能支持并发的 actor 模型,可以用来作为任务调度集群使用,当然负载均衡就是其本职工作了,akka 提供了 Router 来进行消息的调度。

// RouterActor,用于分发消息的 Actor
// 创建多个子 Actor
ArrayList<Routee> routees = new ArrayList<>();
for(int i = 0; i < 5; i ++) {
    //借用上面的 inboxActor
    ActorRef worker = getContext().actorOf(Props.create(InboxTest.class), "worker_" + i);
    getContext().watch(worker);//监听
    routees.add(new ActorRefRoutee(worker));
}
/**
* 创建路由对象
* RoundRobinRoutingLogic: 轮询
* BroadcastRoutingLogic: 广播
* RandomRoutingLogic: 随机
* SmallestMailboxRoutingLogic: 空闲
*/
router = new Router(new RoundRobinRoutingLogic(), routees);
@Override
public void onReceive(Object o) throws Throwable {
    if(o instanceof InboxTest.Msg){
        // 进行路由转发
        router.route(o, getSender());
    }else if(o instanceof Terminated){
        // 发生中断,将该actor删除
        router = router.removeRoutee(((Terminated)o).actor());
        System.out.println(((Terminated)o).actor().path() + " 该actor已经删除。router.size=" + router.routees().size());
        // 没有可用 actor 了
        if(router.routees().size() == 0){
            System.out.print("没有可用actor了,系统关闭。");
            flag.compareAndSet(true, false);
            getContext().system().shutdown();
        }
    }else {
        unhandled(o);
    }
}
// 外部系统照常发送消息
ActorRef routerActor = system.actorOf(Props.create(RouterActor.class), "RouterActor");
routerActor.tell(Msg.WORKING, ActorRef.noSender());
下一节:RxJava 是 JVM 的响应式扩展(ReactiveX),它是通过使用可观察的序列将异步和基于事件的程序组合起来的一个库。