Spark集群 + Akka + Kafka + Scala 开发(3) : 开发一个Akka + Spark的应用
2016-10-02 00:59
357 查看
前言
在Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境中,我们已经部署好了一个Spark的开发环境。在Spark集群 + Akka + Kafka + Scala 开发(2) : 开发一个Spark应用中,我们已经写好了一个Spark的应用。
本文的目标是写一个基于akka的scala工程,在一个spark standalone的集群环境中运行。
akka是什么?
akka的作用
akka的名字是action kernel的回文。根据官方定义:akka用于resilient elastic distributed real-time transaction processing。个人理解是:
resilient:是指对需求和安全性等方面(来自于外部的)的一种适应力(弹性)。
elastic:是指对资源利用方面的弹性。
因此,akka是一个满足需求弹性、资源分配弹性的分布式实时事务处理系统。
akka只是一个类库,一个工具,并没有提供一个平台。
akka的运行模式和用例
akka有两种运行模式:As a library: 一个使用于web应用,把akka作为一个普通的jar包放到classpath或者
WEB-INF/lib。
As an application: 也称为micro system。
akka的用例
akka的用例很多,可以参照Examples of use-cases for Akka.
本文中的用例
在本文中,一个Spark + akka的环境里,akka被用于as an application模式下。
我们会创建一个akka工程,含有两个应用:
akka host application
建立一个actor system, 定义了所有的任务(actors)。等待客户端的请求。
部分actor使用了spark的云计算功能。
这是一个spark的应用。
akka client application
调用host application上特定的actor。
我们看出,这里我们把akka作为一个任务处理器,并通过spark来完成任务。
项目结构和文件说明
说明
这个工程包含了两个应用。一个Consumer应用:CusomerApp:实现了通过Spark的Stream+Kafka的技术来实现处理消息的功能。
一个Producer应用:ProducerApp:实现了向Kafka集群发消息的功能。
文件结构
AkkaSampleApp # 项目目录 |-- build.bat # build文件 |-- src |-- main |-- resources |-- application.conf # Akka Server应用的配置文件 |-- client.conf # Akka Client应用的配置文件 |-- scala |-- ClientActor.scala # Akka Client的Actor:提供了一种调用Server Actor的方式。 |-- ClientApp.scala # Akka Client应用 |-- ProductionReaper.scala # Akka Shutdown pattern的实现者 |-- Reaper.scala # Akka Shutdown pattern的Reaper抽象类 |-- ServerActor.scala # Akka Server的Actor,提供一个求1到n的MapReduce计算。使用了Spark。 |-- ServerApp.scala # Akka Server应用
构建工程目录
可以运行:mkdir AkkaSampleApp mkdir -p /AkkaSampleApp/src/main/resources mkdir -p /AkkaSampleApp/src/main/scala
代码
build.sbt
name := "akka-sample-app" version := "1.0" scalaVersion := "2.11.8" scalacOptions += "-feature" scalacOptions += "-deprecation" scalacOptions += "-language:postfixOps" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % "2.4.10", "com.typesafe.akka" %% "akka-remote" % "2.4.10", "org.apache.spark" %% "spark-core" % "2.0.0" ) resolvers += "Akka Snapshots" at "http://repo.akka.io/snapshots/"
application.conf
akka { #loglevel = "DEBUG" actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2552 } #log-sent-messages = on #log-received-messages = on } }
cient.conf
akka { #loglevel = "DEBUG" actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 0 } #log-sent-messages = on #log-received-messages = on } }
注:
port = 0表示这个端口号会自动生成一个。
ClientActor.scala
import akka.actor._ import akka.event.Logging class ClientActor(serverPath: String) extends Actor { val log = Logging(context.system, this) val serverActor = context.actorSelection(serverPath) def receive = { case msg: String => log.info(s"ClientActor received message '$msg'") serverActor ! 10000L } }
ClientApp.scala
import com.typesafe.config.ConfigFactory import akka.actor._ import akka.remote.RemoteScope import akka.util._ import java.util.concurrent.TimeUnit import scala.concurrent._ import scala.concurrent.duration._ object ClientApp { def main(args: Array[String]): Unit = { val system = ActorSystem("LocalSystem", ConfigFactory.load("client")) // get the remote actor via the server actor system's address val serverAddress = AddressFromURIString("akka.tcp://ServerActorSystem@127.0.0.1:2552") val actor = system.actorOf(Props[ServerActor].withDeploy(Deploy(scope = RemoteScope(serverAddress)))) // invoke the remote actor via a client actor. // val remotePath = "akka.tcp://ServerActorSystem@127.0.0.1:2552/user/serverActor" // val actor = system.actorOf(Props(classOf[ClientActor], remotePath), "clientActor") buildReaper(system, actor) // tell actor ! 10000L waitShutdown(system, actor) } private def buildReaper(system: ActorSystem, actor: ActorRef): Unit = { import Reaper._ val reaper = system.actorOf(Props(classOf[ProductionReaper])) // Watch the action reaper ! WatchMe(actor) } private def waitShutdown(system: ActorSystem, actor: ActorRef): Unit = { // trigger the shutdown operation in ProductionReaper system.stop(actor) // wait to shutdown Await.result(system.whenTerminated, 60.seconds) } }
ProductionReaper.scala
当所有的Actor停止后,终止Actor System。class ProductionReaper extends Reaper { // Shutdown def allSoulsReaped(): Unit = { context.system.terminate() } }
Reaper.scala
import akka.actor.{Actor, ActorRef, Terminated} import scala.collection.mutable.ArrayBuffer object Reaper { // Used by others to register an Actor for watching case class WatchMe(ref: ActorRef) } abstract class Reaper extends Actor { import Reaper._ // Keep track of what we're watching val watched = ArrayBuffer.empty[ActorRef] // Derivations need to implement this method. It's the // hook that's called when everything's dead def allSoulsReaped(): Unit // Watch and check for termination final def receive = { case WatchMe(ref) => context.watch(ref) watched += ref case Terminated(ref) => watched -= ref if (watched.isEmpty) allSoulsReaped() } }
ServerActor.scala
提供一个求1到n平方和的MapReduce计算。import akka.actor.Actor import akka.actor.Props import akka.event.Logging import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf class ServerActor extends Actor { val log = Logging(context.system, this) def receive = { case n: Long => squareSum(n) } private def squareSum(n: Long): Long = { val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) val squareSum = sc.parallelize(1L until n).map { i => i * i }.reduce(_ + _) log.info(s"============== The square sum of $n is $squareSum. ==============") squareSum } }
ServerApp.scala
import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import akka.actor.ActorSystem import akka.actor.Props object ServerApp { def main(args: Array[String]): Unit = { val system = ActorSystem("ServerActorSystem") val actor = system.actorOf(Props[ServerActor], name = "serverActor") } }
构建工程
进入目录AkkaSampleApp。运行:sbt package
第一次运行时间会比较长。
测试应用
启动Spark服务
启动spark集群master server$SPARK_HOME/sbin/start-master.sh
master服务,默认会使用
7077这个端口。可以通过其日志文件查看实际的端口号。
启动spark集群slave server
$SPARK_HOME/sbin/start-slave.sh spark://$(hostname):7077
启动Akka Server应用
运行:$SPARK_HOME/bin/spark-submit --master spark://$(hostname):7077 --class ServerApp target/scala-2.11/akka-sample-app_2.11-1.0.jar
如果出现java.lang.NoClassDefFoundError错误,
请参照Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境,
确保akka的包在Spark中设置好了。
注:可以使用Ctrl+C来中断这个Server应用。
启动Akka Client应用
新启动一个终端,运行:java -classpath ./target/scala-2.11/akka-sample-app_2.11-1.0.jar:$AKKA_HOME/lib/akka/*:$SCALA_HOME/lib/* ClientApp # or # $SPARK_HOME/bin/spark-submit --master spark://$(hostname):7077 --class ClientApp target/scala-2.11/akka-sample-app_2.11-1.0.jar
然后:看看Server应用是否开始处理了。
总结
Server应用需要Spark的技术,因此,是在Spark环境中运行。Clinet应用,可以是一个普通的Java应用。
下面请看
至此,我们已经写好了一个spark集群+akka+scala的应用。下一步请看:Spark集群 + Akka + Kafka + Scala 开发(4) : 开发一个Kafka + Spark的应用
参照
akka documentElasticity (cloud computing)
Resilient control systems
akka 2.4.10 code samples
akka office samples
A simple Akka (actors) remote example
Shutdown Patterns in AKKA 2
相关文章推荐
- Spark集群 + Akka + Kafka + Scala 开发(2) : 开发一个Spark应用
- Spark集群 + Akka + Kafka + Scala 开发(4) : 开发一个Kafka + Spark的应用
- Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境
- Scala-IDE Eclipse(Windows)中开发Spark应用程序,在Ubuntu Spark集群上运行
- Spark入门--基于Intellij IDEA开发Spark应用并在集群上运行
- 大数据架构开发 挖掘分析 Hadoop HBase Hive Flume ZooKeeper Storm Kafka Redis MongoDB Scala Spark 机器学习 Docker 云计算
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十五)Structured Streaming:同一个topic中包含一组数据的多个部分,按照key它们拼接为一条记录(以及遇到的问题)。
- 第94讲, 使用Scala开发集群运行的Spark 实现在线黑名单过滤程序
- eclipse编写scala应用运行在spark集群上
- 开发系列:02、使用Scala和SBT开发Spark应用
- 第95讲:使用Scala开发集群运行的Spark来实现在线热搜索词获取
- maven环境下使用java、scala混合开发spark应用
- python scala kafka 集成一个流程项目 spark
- JDK8+Scala2.11+spark-2.0.0+Intellij2017.3.4开发wordcount程序并在集群中运行
- Spark RDD编程(Python和Scala版本)----Spark中的RDD就是一个不可变的分布式对象集合,是一种具有兼容性的基于内存的集群计算抽象方法,Spark则是这个方法的抽象。 Spa
- Spark架构开发 大数据视频教程 SQL Streaming Scala Akka Hadoop
- 使用Intellij IDEA开发并提交Spark应用到远程Spark集群
- 大数据架构开发 挖掘分析 Hadoop HBase Hive Flume ZooKeeper Storm Kafka Redis MongoDB Scala Spark 机器学习 Docker 虚拟化
- 【译】谷歌正在建立一个叫做Spark的Chrome应用开发环境
- 搭建symbian应用开发环境的一个注意点