您的位置:首页 > 编程语言 > Java开发

akka介绍

2017-03-06 19:25 169 查看
1、概述

Akka是JAVA虚拟机JVM平台上构建高并发、分布式和容错应用的工具包和运行时。Akka用Scala语言写成,同时提供了Scala和JAVA的开发接口。

Akka处理并发的方法基于Actor模型。在Akka里,Actor之间通信的唯一机制就是消息传递。Akka框架支持两种语言Java和Scala。Akka是一个运行时与编程模型一致的系统。

2、Akka中的Actor是什么

Actor本质上就是接收消息并采取行动处理消息的对象。它从消息源中解耦出来,只负责正确识别接收到的消息类型,并采取相应的行动。

收到一条消息之后,一个Actor可能会采取以下一个或多个行动:

执行一些本身的操作(例如进行计算、持久化数据、调用外部的Web服务等);

把消息或衍生消息转发给另外一个Actor;

实例化一个新的Actor并把消息转发给它。

或者,如果这个Actor认为合适的话,可能会完全忽略这条消息(也就是说,它可能选择不响应)。

3、Akka使用场景

任何需要高吞吐率和低延迟的系统都可以使用Akka。

Actor使你能够进行服务失败管理(监管者),负载管理(缓和策略、超时和隔离),水平和垂直方向上的可扩展性(增加cpu核数和/或增加更多的机器)管理。

4、准备工作

Akka要求你安装了 Java 1.6或更高版本。

下载

下载Akka有几种方法。你可以下载包含微内核的完整发布包(包含所有的模块). 或者也可以从Maven或sbt从Akka Maven仓库下载对akka的依赖。

模块

Akka的模块化做得非常好,它为不同的功能提供了不同的Jar包。

akka-actor-2.0.jar – 标准Actor, 有类型Actor,等等

akka-remote-2.0.jar – 远程Actor

akka-slf4j-2.0.jar – SLF4J事件处理监听器

akka-testkit-2.0.jar – 用于测试Actor的工具包

akka-kernel-2.0.jar – Akka微内核,可运行一个基本的最小应用服务器

akka--mailbox-2.0.jar – Akka可容错邮箱

要查看每个Akka模块的jar包依赖见 依赖 章节. 虽然不重要不过akka-actor 没有外部依赖 (除了 scala-library.jar JAR包).

使用发布版:http://akka.io/downloads,下载发布包并解压.

5、maven配置

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.10</artifactId>
<version>2.1.2</version>
</dependency>


6、akka使用

(1)作为一个库,以普通jar包的形式放在classpath上,或放到web应用中的 WEB-INF/lib位置

(2)作为一个独立的应用程序,使用 Microkernel(微内核),自己有一个main类来初始化Actor系统

将Akka作为一个库。如果你是编写web应用,你估计要使用这种方式。通过添加更多的模块,可以有多种以库的形式使用Akka的方式。

7、akka的Java API

(1)Creating Actors

/*
Creating Actors
xtending the UntypedActor class and implementing the onReceive method
*/
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;

public class MyUntypedActor extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);

@override
public void onReceive(Object message) throws Exception {
if (message instanceof String)
log.info("Received String message: {}", message);
else
unhandled(message);
}
}


UntypedActor只定义了一个抽象方法,就是上面提到的onReceive(Objectmessage), 用来实现actor的行为。

如果当前actor的行为与收到的消息不匹配,则会调用unhandled方法, 它的缺省实现是向actor系统的事件流中发布一条 akka.actor.UnhandledMessage(message, sender, recipient)。

另外,它还包括:

getSelf(); //代表本actor的 ActorRef
getSender(); //代表最近收到的消息的发送actor,通常用于下面将讲到的回应消息中
supervisorStrategy(); //用户可重写它来定义对子actor的监管策略
getContext(); //暴露actor和当前消息的上下文信息


(2)Send messages

tell 意思是“fire-and-forget”, 异步发送一个消息并立即返回。这是发送消息的推荐方式。不会阻塞地等待消息。它拥有最好的并发性和可扩展性。ask 异步发送一条消息并返回一个 Future代表一个可能的回应。需要采用Future的处理模式。

public class MyReceivedTimeoutUntypedActor extends UntypedActor {

public MyReceivedTimeoutUntypedActor() {
getContext().setReceiveTimeout(Duration.parse("30 seconds"));
}

public void onReceive(Object message) {
if (message.equals("Hello")) {
getSender().tell("Hello world");
} else if (message == Actors.receiveTimeout()) {
throw new RuntimeException("received timeout");
} else {
unhandled(message);
}
}
}


如果服务端处理消息时发生了异常而导致没有给客户端回应,那么客户端收到的结果将会收到Timeout的Failure:Failure(akka.pattern.AskTimeoutException: Timed out)。可以将异常捕获用Failure封装异常发给客户端:actor.tell(new akka.actor.Status.Failure(e))。

Future的onComplete, onResult, 或 onTimeout 方法可以用来注册一个回调,以便在Future完成时得到通知。从而提供一种避免阻塞的方法。

Future<Object> future = Patterns.ask(queryActor, param, timeout);

future.onComplete(new OnComplete<Object>() {

public void onComplete(Throwable failure, Object result) {

if (failure != null) {

// time out exception occurs
if (failure.getMessage() != null) {
logger.error(failure.getMessage(), failure);
}

// mark as timeout error
res.resume(Response.ok().entity(result).build());

} else {
res.resume(Response.ok().entity(result).build());
}

logger.info("query cost " + stopwatch.elapsedTime(TimeUnit.MILLISECONDS) + " query is " + param);
}
}, actorSystem.dispatcher());
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 分布式