akka入门-基于信道进行消息可靠传输
2015-05-17 21:28
260 查看
程序的演示场景是:处理器发送命令,接收者接收到消息后进行处理并且对发送方发送消息确认表明已经成功收到消息。如果没有发送确认则表明该消息没有被接收并正确处理。失败消息会到达死信箱,系统下次启动时后继续发送死信箱中的发送失败的消息。
1.创建信道回复命令对象
4.测试类
5.输出结果
1.创建信道回复命令对象
import com.center.akka.simple.command.Command; public class ChannelReply { private Command command; private long sequenceNbr ; public ChannelReply(Command command, long sequenceNbr ) { this. command = command; this. sequenceNbr = sequenceNbr ; } public Command getCommand() { return command; } public long getSequenceNbr() { return sequenceNbr; } @Override public String toString() { return "ChannelReply{" + "command=" + command + ", sequenceNbr=" + sequenceNbr + '}'; } }2.编写接收者对象
import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.persistence.ConfirmablePersistent; import com.center.akka.persistent_channel.command.ChannelReply; import com.center.akka.simple.command.Command; public class Receiver extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); @Override public void onReceive(Object msg ) throws Exception { if (msg instanceof ConfirmablePersistent) { ConfirmablePersistent confirmablePersistent = (ConfirmablePersistent) msg; log.info("Incoming Paylod: " + confirmablePersistent.payload() + " #: " + confirmablePersistent.sequenceNr()); getSender().tell( new ChannelReply((Command) confirmablePersistent.payload(), confirmablePersistent.sequenceNr()), null); confirmablePersistent. confirm(); } } }3.编写处理器
import java.util.concurrent.TimeUnit; import scala.concurrent.duration.Duration; import akka.actor.ActorRef; import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.persistence.Deliver; import akka.persistence.Persistent; import akka.persistence.PersistentChannel; import akka.persistence.PersistentChannelSettings; import com.center.akka.persistent_channel.command.ChannelReply; public class BaseProcessor extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); private ActorRef receiver; private ActorRef channel; public BaseProcessor(ActorRef receiver) { this. receiver = receiver; this. channel = getContext().actorOf( PersistentChannel.props(PersistentChannelSettings .create().withRedeliverInterval(Duration.create(30, TimeUnit.SECONDS )) .withPendingConfirmationsMax(10000) // max # of pending confirmations. suspend // delivery until < .withPendingConfirmationsMin(2000) // min # of pending confirmation. suspend // delivery until > .withReplyPersistent( true) // ack .withRedeliverMax(15)) , "channel" ); } @Override public void onReceive(Object msg ) { if (msg instanceof Persistent) { log.info("Send to Channel: " + (( Persistent) msg).payload()); channel.tell(Deliver.create (((Persistent) msg).withPayload(((Persistent ) msg ).payload()), receiver.path()), getSelf()); } else if (msg instanceof ChannelReply) { log.info(msg.toString()); } } }
4.测试类
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.persistence.Persistent; import com.center.akka.persistent_channel.actor.BaseProcessor; import com.center.akka.persistent_channel.actor.Receiver; import com.center.akka.simple.command.Command; public class System { public static final Logger log = LoggerFactory.getLogger(System.class); public static void main(String... args) throws Exception { final ActorSystem actorSystem = ActorSystem.create("channel-system"); Thread.sleep(2000); final ActorRef receiver = actorSystem.actorOf(Props.create(Receiver. class)); final ActorRef processor = actorSystem.actorOf(Props.create(BaseProcessor. class, receiver), "channel-processor" ); for ( int i = 0; i < 10; i++) { processor.tell(Persistent.create (new Command("CMD " + i )), null); } Thread.sleep(2000); log.debug( "Actor System Shutdown Starting..." ); actorSystem.shutdown(); } }
5.输出结果
[INFO] [05/17/2015 18:29:46.699] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 0'} [INFO] [05/17/2015 18:29:46.702] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 1'} [INFO] [05/17/2015 18:29:46.702] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 2'} [INFO] [05/17/2015 18:29:46.702] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 3'} [INFO] [05/17/2015 18:29:46.702] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 4'} [INFO] [05/17/2015 18:29:46.703] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 5'} [INFO] [05/17/2015 18:29:46.703] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 6'} [INFO] [05/17/2015 18:29:46.703] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 7'} [INFO] [05/17/2015 18:29:46.703] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 8'} [INFO] [05/17/2015 18:29:46.703] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 9'} [INFO] [05/17/2015 18:29:46.965] [channel-system-akka.actor.default-dispatcher-2] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 0'} #: 1 [INFO] [05/17/2015 18:29:46.986] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 0'}, sequenceNbr=1} [INFO] [05/17/2015 18:29:46.986] [channel-system-akka.actor.default-dispatcher-2] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 1'} #: 2 [INFO] [05/17/2015 18:29:46.986] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 1'}, sequenceNbr=2} [INFO] [05/17/2015 18:29:46.986] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 2'} #: 3 [INFO] [05/17/2015 18:29:46.987] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 2'}, sequenceNbr=3} [INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 3'} #: 4 [INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 3'}, sequenceNbr=4} [INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 4'} #: 5 [INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 5'} #: 6 [INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-11] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 4'}, sequenceNbr=5} [INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 6'} #: 7 [INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-11] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 5'}, sequenceNbr=6} [INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-11] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 6'}, sequenceNbr=7} [INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 7'} #: 8 [INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-4] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 7'}, sequenceNbr=8} [INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 8'} #: 9 [INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-4] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 8'}, sequenceNbr=9} [INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 9'} #: 10 [INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-11] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 9'}, sequenceNbr=10} 18:29:48.699 [main] DEBUG c.c.a.persistent_channel.app.System - Actor System Shutdown Starting...
相关文章推荐
- ActiveMQ源码解析(四):聊聊消息的可靠传输机制和事务控制
- 一个基于UDP的可靠数据传输库
- UDT协议-基于UDP的可靠数据传输协议
- 分享:基于UDP协议实现可靠的数据传输
- Akka在运行时对消息进行实时切换处理的特性: become和unbecome
- Scala 深入浅出实战经典 第90讲:基于Scala的Actor之上的分布式并发消息驱动框架Akka初体验
- 基于http在互联网传输敏感数据的消息摘要、签名与加密方案
- UDT协议-基于UDP的可靠数据传输协议
- 基于UDP可靠传输协议UDT----报文协议详解
- 初解,Scala语言中基于Actor的并发编程的机制,并展示了在Spark中基于Scala语言的Actor而产生的消息驱动框架Akka的使用,
- 用 Amazon Web Services 进行云计算,第 4 部分: 用 SQS 进行可靠的消息传递
- 基于web-msg-sender进行消息推送
- UDT协议-基于UDP的可靠数据传输协议
- 转:Java 开发 2.0: 使用 Amazon SQS 进行基于云计算的消息传送
- UDT:基于UDP的可靠传输协议
- 要做个P2P应用,先收集点相关基于UDP可靠传输的资料
- 基于MQTT协议的消息传输
- udp可靠文件传输实现(基于滑动窗口机制)
- 利用 WAS V6.1 开发安全可靠的 Web Services,第 2 部分:实现可靠的消息传输
- UDT协议-基于UDP的可靠数据传输协议