您的位置:首页 > 理论基础 > 计算机网络

使用Scala的Akka HTTP,Akka Stream和Reactive Mongo建立REST服务

2015-11-07 15:44 483 查看



  该教程演示了如何使用Akka Http(Spray)创建一个简单的Web服务器,展示如何从Spray.io路由转发到Akka。这篇文章包括以下步骤:

将一些虚拟数据放入mongoDB用于测试.
使用Akka Http创建服务器,使用简单的异步来处理请求
使用订制的流程图来创建一个服务器以处理传入的请求
使用Akka-Http创建的http客户端测试这两个服务器

加载数据到MongoDB

  首先我们使用一些数据样本

wget http://jsonstudio.com/wp-content/uploads/2014/02/stocks.zip
在终端启动MongoDB:

mongod --dbpath ./data/

使用mongoimport导入数据:

unzip -c stocks.zip | mongoimport --db akka --collection stocks

使用下面查询确认数据导入成功:

jos@Joss-MacBook-Pro.local:~$ mongo akka

MongoDB shell version: 2.4.8

connecting to: akka

> db.stocks.findOne({},{Company: 1, Country: 1, Ticker:1 } )

{

"_id" : ObjectId("52853800bb1177ca391c17ff"),

"Ticker" : "A",

"Country" : "USA",

"Company" : "Agilent Technologies Inc."

}

>

使用简单异步处理创建一个服务器

  为了能让Akka Http正常工作和可以访问Mongo中数据,我们需要一些附加的库包,如下sbt配置:

import com.typesafe.sbt.SbtAspectj._

name := "http-akka"

version := "1.0"

scalaVersion := "2.11.5"

libraryDependencies ++= Seq(

"com.typesafe.akka" %% "akka-http-core-experimental" % "1.0-M2",

"org.reactivemongo" %% "reactivemongo" % "0.10.5.0.akka23",

"org.reactivemongo" %% "play2-reactivemongo" % "0.10.5.0.akka23",

"com.typesafe.play" % "play-json_2.11" % "2.4.0-M2",

"ch.qos.logback" % "logback-classic" % "1.1.2"

)

resolvers += "Sonatype Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/"

resolvers += "Typesafe" at "https://repo.typesafe.com/typesafe/releases/"

mainClass in (Compile, run) := Some("Boot")

从上面配置中你可以看到以下依赖库:

akka-http-core-experimental 包含了所有Http服务器和客户端的库包,这个库包依赖akka-stream。我们也能从系统的class path获得它。
reactiemongo 允许我们以reactive方式连接mongo
加入play2-reactivemongo 和play-json能够让从MongoDB获得BSON转为JSON更容易些。
logback是日志记录

下面我们看看你如何运行服务器代码,使用这段代码如何查询MongDB,创建一个辅助对象Database:

import reactivemongo.api._

import reactivemongo.api.collections.default.BSONCollection

import reactivemongo.bson.BSONDocument

import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent.Future

object Database {

val collection = connect()

def connect(): BSONCollection = {

val driver = new MongoDriver

val connection = driver.connection(List("localhost"))

val db = connection("akka")

db.collection("stocks")

}

def findAllTickers(): Future[List[BSONDocument]] = {

val query = BSONDocument()

val filter = BSONDocument("Company" -> 1, "Country" -> 1, "Ticker" -> 1)

// which results in a Future[List[BSONDocument]]

Database.collection

.find(query, filter)

.cursor[BSONDocument]

.collect[List]()

}

def findTicker(ticker: String) : Future[Option[BSONDocument]] = {

val query = BSONDocument("Ticker" -> ticker)

Database.collection

.find(query)

.one

}

}

请注意,这里find函数返回的是一个future,所以调用这个函数不会阻塞,现在我们已经有了一些基本方法,让我们看看使用异步处理器实现的第一个Http服务器:

/**

* Simple Object that starts an HTTP server using akka-http. All requests are handled

* through an Akka flow.

*/

object Boot extends App {

// the actor system to use. Required for flowmaterializer and HTTP.

// passed in implicit

implicit val system = ActorSystem("Streams")

implicit val materializer = FlowMaterializer()

// start the server on the specified interface and port.

val serverBinding2 = Http().bind(interface = "localhost", port = 8091)

serverBinding2.connections.foreach { connection =>

connection.handleWith(Flow[HttpRequest].mapAsync(asyncHandler))

}

}

在这段代码中,我们创建一个http服务器监听端口是8091,我们使用asyncHandler处理每个连接,这个处理器应该返回一个Future[HttpResponse],看看这个处理器函数的内容:

// With an async handler, we use futures. Threads aren't blocked.

def asyncHandler(request: HttpRequest): Future[HttpResponse] = {

// we match the request, and some simple path checking

request match {

// match specific path. Returns all the avaiable tickers

case HttpRequest(GET, Uri.Path("/getAllTickers"), _, _, _) => {

// make a db call, which returns a future.

// use for comprehension to flatmap this into

// a Future[HttpResponse]

for {

input <- Database.findAllTickers

} yield {

HttpResponse(entity = convertToString(input))

}

}

// match GET pat. Return a single ticker

case HttpRequest(GET, Uri.Path("/get"), _, _, _) => {

// next we match on the query paramter

request.uri.query.get("ticker") match {

// if we find the query parameter

case Some(queryParameter) => {

// query the database

val ticker = Database.findTicker(queryParameter)

// use a simple for comprehension, to make

// working with futures easier.

for {

t <- ticker

} yield {

t match {

case Some(bson) => HttpResponse(entity = convertToString(bson))

case None => HttpResponse(status = StatusCodes.OK)

}

}

}

// if the query parameter isn't there

case None => Future(HttpResponse(status = StatusCodes.OK))

}

}

// Simple case that matches everything, just return a not found

case HttpRequest(_, _, _, _, _) => {

Future[HttpResponse] {

HttpResponse(status = StatusCodes.NotFound)

}

}

}

}

这段代码很简单,我们使用模式匹配来匹配一个用户调用的URL,使用Database对象来查询mongo,注意到这个调用是convertToString,这是使用play库包将BSON转换到JSON的帮助工具:

def convertToString(input: List[BSONDocument]) : String = {

input

.map(f => convertToString(f))

.mkString("[", ",", "]")

}

def convertToString(input: BSONDocument) : String = {

Json.stringify(BSONFormats.toJSON(input))

}

启动服务器用浏览器访问:http://localhost:8091/get?ticker=ABCB

使用定制流程图处理请求的服务器

  Akka-http内部使用akka-stream处理http连接,这意味着我们很容易通过一个reactive方式使用akka-stream,对于一个线性流程,我们能够使用akka提供的标准流程api,使用akka-stream提供的DSL,你很容易创建一个复杂的流程事件并行处理图。

我们在8090端口建立一个新的服务器绑定:

object Boot extends App {

// the actor system to use. Required for flowmaterializer and HTTP.

// passed in implicit

implicit val system = ActorSystem("Streams")

implicit val materializer = FlowMaterializer()

// start the server on the specified interface and port.

val serverBinding1 = Http().bind(interface = "localhost", port = 8090)

serverBinding1.connections.foreach { connection =>

connection.handleWith(broadCastMergeFlow)

}

}

这个服务器绑定是和之前案例差不多方式,主要区别是这次我们没有将请求处理传入一个处理器,而是,而是指定了一个名为broadCastMergeFlow的流程实例,其内部流程merge如下:

val bCast = Broadcast[HttpRequest]

// some basic steps that each retrieve a different ticket value (as a future)

val step1 = Flow[HttpRequest].mapAsync[String](getTickerHandler("GOOG"))

val step2 = Flow[HttpRequest].mapAsync[String](getTickerHandler("AAPL"))

val step3 = Flow[HttpRequest].mapAsync[String](getTickerHandler("MSFT"))

// We'll use the source and output provided by the http endpoint

val in = UndefinedSource[HttpRequest]

val out = UndefinedSink[HttpResponse]

// when an element is available on one of the inputs, take

// that one, igore the rest

val merge = Merge[String]

// since merge doesn't output a HttpResponse add an additional map step.

val mapToResponse = Flow[String].map[HttpResponse](

(inp:String) => HttpResponse(status = StatusCodes.OK, entity = inp)

)

// define another flow. This uses the merge function which

// takes the first available response

val broadCastMergeFlow = Flow[HttpRequest, HttpResponse]() {

implicit builder =>

bCast ~> step1 ~> merge

in ~> bCast ~> step2 ~> merge ~> mapToResponse ~> out

bCast ~> step3 ~> merge

(in, out)

}

最重要的是代码最后几行,我们定义了一个图,指明服务器如何处理请求,首先,我们将进来的请求广播到三个并行刘处理,然后调用我们的数据库获得一个ticket,然后将结果merge在一起,创建一个响应,我们获得的ticket将可能是 GOOG, AAPL or MSFT等数据,前后顺序取决于哪个步骤最快了,为了看到结果,我们加入一个sleep到getTickerHandler:

def getTickerHandler(tickName: String)(request: HttpRequest): Future[String] = {

// query the database

val ticker = Database.findTicker(tickName)

Thread.sleep(Math.random() * 1000 toInt)

// use a simple for comprehension, to make

// working with futures easier.

for {

t <- ticker

} yield {

t match {

case Some(bson) => convertToString(bson)

case None => ""

}

}

}

Akka-stream提供了许多疾病构建块,你能使用它们创建流程,这里我们要压缩所有步骤在一起,因此创建流程如下:

// waits for events on the three inputs and returns a response

val zip = ZipWith[String, String, String, HttpResponse] (

(inp1, inp2, inp3) => new HttpResponse(status = StatusCodes.OK,entity = inp1 + inp2 + inp3)

// define a flow which broadcasts the request to the three

// steps, and uses the zipWith to combine the elements before

val broadCastZipFlow = Flow[HttpRequest, HttpResponse]() {

implicit builder =>

bCast ~> step1 ~> zip.input1

in ~> bCast ~> step2 ~> zip.input2 ~> out

bCast ~> step3 ~> zip.input3

(in, out)

}

当调用10次,结果输出如下:

{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217

{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217

{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217

{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282

{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217

{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217

{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282

{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217

{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217

{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282

使用Akka-http建立的http客户端测试这两个服务器

Akka-http提供了易于在http客户端使用stream/flow处理消息流,下面是完整代码:

import akka.actor.ActorSystem

import akka.http.Http

import akka.stream.FlowMaterializer

import akka.http.model._

import akka.stream.scaladsl._

import akka.stream.scaladsl.Source

import akka.stream.scaladsl.FlowGraphImplicits._

import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent.Future

/**

* Simple HTTP client created with akka-http

*/

object Client extends App {

// the actor system to use. Required for flowmaterializer and HTTP.

// passed in implicit

implicit val system = ActorSystem("ServerTest")

implicit val materializer = FlowMaterializer()

val httpClient1 = Http(system).outgoingConnection("localhost", 8090).flow

val httpClient2 = Http(system).outgoingConnection("localhost", 8091).flow

// define a sink that will process the answer

// we could also process this as a flow

val printChunksConsumer = Sink.foreach[HttpResponse] { res =>

if(res.status == StatusCodes.OK) {

println("Recieved response : " + res);

res.entity.getDataBytes().map {

chunk =>

System.out.println("Chunk: " + chunk.decodeString(HttpCharsets.`UTF-8`.value).substring(0, 80))

}.to(Sink.ignore).run()

} else

println(res.status)

}

// we need to set allow cycles since internally the httpclient

// has some cyclic flows (apparently)

// we construct a sink, to which we connect a later to define source.

val reqFlow2: Sink[HttpRequest] = Sink[HttpRequest]() { implicit b =>

b.allowCycles()

val source = UndefinedSource[HttpRequest]

val bcast = Broadcast[HttpRequest]

val concat = Concat[HttpResponse]

// simple graph. Duplicate the request, send twice.

// concat the result.

bcast ~> httpClient1 ~> concat.first

source ~> bcast ~> httpClient1 ~> concat.second ~> printChunksConsumer

source

}

// make two calls, both return futures, first one shows direct linked sinks and

// sources. Second one makes yse if our graph.

// make number of calls

val res = 1 to 5 map( i => {

Source.single(HttpRequest()).to(reqFlow2).run().get(printChunksConsumer)

})

val f = Future.sequence(res)

// make some calls with filled in request URI

val f3 = Source.single(HttpRequest(uri = Uri("/getAllTickers"))).via(httpClient2).runWith(printChunksConsumer)

val f4 = Source.single(HttpRequest(uri = Uri("/get?ticker=ADAT"))).via(httpClient2).runWith(printChunksConsumer)

val f5 = Source.single(HttpRequest(uri = Uri("/get?tikcer=FNB"))).via(httpClient2).runWith(printChunksConsumer)

for {

f2Result <- f

f2Result <- f3

f2Result <- f4

f2Result <- f5

} yield ({

println("All calls done")

system.shutdown()

system.awaitTermination()

}

)

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: