您的位置:首页 > 其它

Akka和ProtoBuf的简单实例

2012-07-19 18:38 316 查看
Akka是什么?

可扩展的实时事务处理

我们相信编写出正确的具有容错性和可扩展性的并发程序太困难了。这多数是因为我们使用了错误的工具和错误的抽象级别。Akka就是为了改变这种状况而生的。通过使用Actor模型我们提升了抽象级别,为构建正确的可扩展并发应用提供了一个更好的平台。在容错性方面我们采取了“let it crash”(让它崩溃)模型,人们已经将这种模型用在了电信行业,构建出“自愈合”的应用和永不停机的系统,取得了巨大成功。Actor还为透明的分布式系统以及真正的可扩展高容错应用的基础进行了抽象。其实简单一点来说,就是面向对象的高度封装,所有Actor之间的交互都是通过消息来传递来实现的。

Akka是开源的,可以通过Apache 2许可获得。

http://akka.io/downloads/ 下载


Akka实现了独特的混合模型


Actors

Actors为你提供:

对并发/并行程序的简单的、高级别的抽象。
异步、非阻塞、高性能的事件驱动编程模型。
非常轻量的事件驱动处理(1G内存可容纳约270万个actors)。

参阅 Actors (Scala) 和 Actors
(Java)


容错性

使用“let-it-crash”语义和监管者树形结构来实现容错。非常适合编写永不停机、自愈合的高容错系统。监管者树形结构可以跨多个JVM来提供真正的高容错系统。
参阅 容错性 (Scala) 和 容错性
(Java)


位置透明性

Akka的所有元素都为分布式环境而设计:所有actor都仅通过发送消息进行互操作,所有操作都是异步的。
了解远程调用请参阅 事务透明性


事务性actors

事务性Actor是actor与STM(Software Transactional Memory)的组合。它使你能够使用自动重试和回滚来组合出原子消息流。
参阅 事务性actor (Scala) 和 事务性actor
(Java)


Akka平台提供哪些有竞争力的功能?

Akka提供可扩展的实时事务处理。
Akka是一个运行时与编程模型一致的系统,为以下目标设计:

垂直扩展(并发)
水平扩展(远程调用)
高容错

在Akka的世界里,只有一个内容需要学习和管理,具有高内聚和高一致的语义。
Akka是一种高度可扩展的软件,这不仅仅表现在性能方面,也表现在它所适用的应用的大小。Akka的核心,Akka-actor是非常小的,可以非常方便地放进你的应用中,提供你需要的异步无锁并行功能,不会有任何困扰。
你可以任意选择Akka的某些部分集成到你的应用中,也可以使用完整的包——Akka 微内核,它是一个独立的容器,可以直接部署你的Akka应用。随着CPU核数越来越多,即使你只使用一台电脑,Akka也可作为一种提供卓越性能的选择。Akka还同时提供多种并发范型,允许用户选择正确的工具来完成工作。


什么场景下特别适合使用Akka?

我们看到Akka被成功运用在众多行业的众多大企业,从投资业到商业银行、从零售业到社会媒体、仿真、游戏和赌博、汽车和交通系统、数据分析等等等等。任何需要高吞吐率和低延迟的系统都是使用Akka的候选。
Actor使你能够进行服务失败管理(监管者),负载管理(缓和策略、超时和隔离),水平和垂直方向上的可扩展性(增加cpu核数和/或增加更多的机器)管理。
下面的链接中有一些Akka用户关于他们如何使用Akka的描述:http://stackoverflow.com/questions/4493001/good-use-case-for-akka

首先将以下jar包到classpth
akka-actor-2.0.2.jar
akka-remote-2.0.2.jar

scala-library.jar

protobuf-java-2.4.0a.jar

下面是ProtoBuf的配置文件,使用命令编译成java文件
protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/Customer.proto


option java_package = "akka.test.bean";
option java_outer_classname = "PbCustomer";

message Customer {
required string username = 1;
required int32 age = 2;
optional string tel = 3;
}


序列化的配置文件,放在src目录即可,文件名称可以任意,如serial.conf

akka {
actor {
serializers {
java = "akka.serialization.JavaSerializer"
proto = "akka.serialization.ProtobufSerializer"
}

serialization-bindings {
"java.lang.String" = java
"akka.test.bean.Customer" = java
"com.google.protobuf.Message" = proto
}
}
}


说明:Message类采用ProtobufSerializer来序列化,Customer是一个普通的POJO,采用java序列化

下面是主要的Java测试类

package akka.test;

import java.io.File;

import akka.actor.Actor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.serialization.Serialization;
import akka.serialization.SerializationExtension;
import akka.serialization.Serializer;
import akka.test.bean.PbCustomer.Customer;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

public class TestAkkaSerial {
private static final Config config = ConfigFactory.parseFile(new File(
"serial.conf"));

public static class ServerActor extends UntypedActor {

private final Serializer serializer;

public ServerActor(Serializer serializer) {
this.serializer = serializer;
}

@Override
public void onReceive(Object message) throws Exception {
// TODO Auto-generated method stub
if (message instanceof byte[]) {
// 反序列化成对象
Customer customer = (Customer) serializer.fromBinary(
(byte[]) message, Customer.class);

// 重新设置后发送给client
getSender().tell(
serializer.toBinary(Customer.newBuilder()
.setAge(customer.getAge() + 1)
.setTel("10000000000")
.setUsername(customer.getUsername()).build()));
} else {
unhandled(message);
}
}

}

public static class ClientActor extends UntypedActor {
private final Serializer serializer;
private final ActorRef server;

public ClientActor(Serializer serializer, ActorRef server) {
this.serializer = serializer;
this.server = server;
}

@Override
public void onReceive(Object message) throws Exception {
// TODO Auto-generated method stub
if (message instanceof String && message.equals("start")) {
for (int i = 0; i < 10; i++) {
server.tell(
serializer.toBinary(Customer.newBuilder()
.setUsername("Yang").setTel("136666666")
.setAge(i).build()), getSelf());
}
} else if (message instanceof byte[]) {
Customer back = (Customer) serializer.fromBinary(
(byte[]) message, Customer.class);
System.out.println(back);
}
}

}

public static void main(String[] args) throws Exception {
ActorSystem system = ActorSystem.create("example", config);
Serialization serialization = SerializationExtension.get(system);
final Serializer serializer = serialization
.serializerFor(Customer.class);

final ActorRef server = system.actorOf(new Props(
new UntypedActorFactory() {

private static final long serialVersionUID = 1L;

@Override
public Actor create() {
// TODO Auto-generated method stub
return new ServerActor(serializer);
}
}), "server");

final ActorRef client = system.actorOf(new Props(
new UntypedActorFactory() {

private static final long serialVersionUID = 1L;

@Override
public Actor create() {
// TODO Auto-generated method stub
return new ClientActor(serializer, server);
}
}), "client");

client.tell("start");

}
}
程序说明,一个client Actor,一个server Actor,client通过循环向server发送序列化后的对象,server端反序列化后简单处理,随即将结果重新序列化后发送给client,client收到reply后反序列化成对象,进行输出显示;

控制台输出结果:

username: "Yang"
age: 1
tel: "10000000000"

username: "Yang"
age: 2
tel: "10000000000"

username: "Yang"
age: 3
tel: "10000000000"

username: "Yang"
age: 4
tel: "10000000000"

username: "Yang"
age: 5
tel: "10000000000"

username: "Yang"
age: 6
tel: "10000000000"

username: "Yang"
age: 7
tel: "10000000000"

username: "Yang"
age: 8
tel: "10000000000"

username: "Yang"
age: 9
tel: "10000000000"

username: "Yang"
age: 10
tel: "10000000000"
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: