  该教程演示了如何使用Akka Http(Spray)创建一个简单的Web服务器,展示如何从Spray.io路由转发到Akka。这篇文章包括以下步骤:

使用Akka Http创建服务器,使用简单的异步来处理请求



wget http://jsonstudio.com/wp-content/uploads/2014/02/stocks.zip

mongod --dbpath ./data/


unzip -c stocks.zip | mongoimport --db akka --collection stocks


jos@Joss-MacBook-Pro.local:~$ mongo akka

MongoDB shell version: 2.4.8

connecting to: akka

> db.stocks.findOne({},{Company: 1, Country: 1, Ticker:1 } )


"_id" : ObjectId("52853800bb1177ca391c17ff"),

"Ticker" : "A",

"Country" : "USA",

"Company" : "Agilent Technologies Inc."




  为了能让Akka Http正常工作和可以访问Mongo中数据,我们需要一些附加的库包,如下sbt配置:

import com.typesafe.sbt.SbtAspectj._

name := "http-akka"

version := "1.0"

scalaVersion := "2.11.5"

libraryDependencies ++= Seq(

"com.typesafe.akka" %% "akka-http-core-experimental" % "1.0-M2",

"org.reactivemongo" %% "reactivemongo" % "",

"org.reactivemongo" %% "play2-reactivemongo" % "",

"com.typesafe.play" % "play-json_2.11" % "2.4.0-M2",

"ch.qos.logback" % "logback-classic" % "1.1.2"


resolvers += "Sonatype Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/"

resolvers += "Typesafe" at "https://repo.typesafe.com/typesafe/releases/"

mainClass in (Compile, run) := Some("Boot")


akka-http-core-experimental 包含了所有Http服务器和客户端的库包,这个库包依赖akka-stream。我们也能从系统的class path获得它。
reactiemongo 允许我们以reactive方式连接mongo
加入play2-reactivemongo 和play-json能够让从MongoDB获得BSON转为JSON更容易些。


import reactivemongo.api._

import reactivemongo.api.collections.default.BSONCollection

import reactivemongo.bson.BSONDocument

import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent.Future

object Database {

val collection = connect()

def connect(): BSONCollection = {

val driver = new MongoDriver

val connection = driver.connection(List("localhost"))

val db = connection("akka")



def findAllTickers(): Future[List[BSONDocument]] = {

val query = BSONDocument()

val filter = BSONDocument("Company" -> 1, "Country" -> 1, "Ticker" -> 1)

// which results in a Future[List[BSONDocument]]


.find(query, filter)




def findTicker(ticker: String) : Future[Option[BSONDocument]] = {

val query = BSONDocument("Ticker" -> ticker)








* Simple Object that starts an HTTP server using akka-http. All requests are handled

* through an Akka flow.


object Boot extends App {

// the actor system to use. Required for flowmaterializer and HTTP.

// passed in implicit

implicit val system = ActorSystem("Streams")

implicit val materializer = FlowMaterializer()

// start the server on the specified interface and port.

val serverBinding2 = Http().bind(interface = "localhost", port = 8091)

serverBinding2.connections.foreach { connection =>





// With an async handler, we use futures. Threads aren't blocked.

def asyncHandler(request: HttpRequest): Future[HttpResponse] = {

// we match the request, and some simple path checking

request match {

// match specific path. Returns all the avaiable tickers

case HttpRequest(GET, Uri.Path("/getAllTickers"), _, _, _) => {

// make a db call, which returns a future.

// use for comprehension to flatmap this into

// a Future[HttpResponse]

for {

input <- Database.findAllTickers

} yield {

HttpResponse(entity = convertToString(input))



// match GET pat. Return a single ticker

case HttpRequest(GET, Uri.Path("/get"), _, _, _) => {

// next we match on the query paramter

request.uri.query.get("ticker") match {

// if we find the query parameter

case Some(queryParameter) => {

// query the database

val ticker = Database.findTicker(queryParameter)

// use a simple for comprehension, to make

// working with futures easier.

for {

t <- ticker

} yield {

t match {

case Some(bson) => HttpResponse(entity = convertToString(bson))

case None => HttpResponse(status = StatusCodes.OK)




// if the query parameter isn't there

case None => Future(HttpResponse(status = StatusCodes.OK))



// Simple case that matches everything, just return a not found

case HttpRequest(_, _, _, _, _) => {

Future[HttpResponse] {

HttpResponse(status = StatusCodes.NotFound)






def convertToString(input: List[BSONDocument]) : String = {


.map(f => convertToString(f))

.mkString("[", ",", "]")


def convertToString(input: BSONDocument) : String = {







object Boot extends App {

// the actor system to use. Required for flowmaterializer and HTTP.

// passed in implicit

implicit val system = ActorSystem("Streams")

implicit val materializer = FlowMaterializer()

// start the server on the specified interface and port.

val serverBinding1 = Http().bind(interface = "localhost", port = 8090)

serverBinding1.connections.foreach { connection =>





val bCast = Broadcast[HttpRequest]

// some basic steps that each retrieve a different ticket value (as a future)

val step1 = Flow[HttpRequest].mapAsync[String](getTickerHandler("GOOG"))

val step2 = Flow[HttpRequest].mapAsync[String](getTickerHandler("AAPL"))

val step3 = Flow[HttpRequest].mapAsync[String](getTickerHandler("MSFT"))

// We'll use the source and output provided by the http endpoint

val in = UndefinedSource[HttpRequest]

val out = UndefinedSink[HttpResponse]

// when an element is available on one of the inputs, take

// that one, igore the rest

val merge = Merge[String]

// since merge doesn't output a HttpResponse add an additional map step.

val mapToResponse = Flow[String].map[HttpResponse](

(inp:String) => HttpResponse(status = StatusCodes.OK, entity = inp)


// define another flow. This uses the merge function which

// takes the first available response

val broadCastMergeFlow = Flow[HttpRequest, HttpResponse]() {

implicit builder =>

bCast ~> step1 ~> merge

in ~> bCast ~> step2 ~> merge ~> mapToResponse ~> out

bCast ~> step3 ~> merge

(in, out)


最重要的是代码最后几行,我们定义了一个图,指明服务器如何处理请求,首先,我们将进来的请求广播到三个并行刘处理,然后调用我们的数据库获得一个ticket,然后将结果merge在一起,创建一个响应,我们获得的ticket将可能是 GOOG, AAPL or MSFT等数据,前后顺序取决于哪个步骤最快了,为了看到结果,我们加入一个sleep到getTickerHandler:

def getTickerHandler(tickName: String)(request: HttpRequest): Future[String] = {

// query the database

val ticker = Database.findTicker(tickName)

Thread.sleep(Math.random() * 1000 toInt)

// use a simple for comprehension, to make

// working with futures easier.

for {

t <- ticker

} yield {

t match {

case Some(bson) => convertToString(bson)

case None => ""





// waits for events on the three inputs and returns a response

val zip = ZipWith[String, String, String, HttpResponse] (

(inp1, inp2, inp3) => new HttpResponse(status = StatusCodes.OK,entity = inp1 + inp2 + inp3)

// define a flow which broadcasts the request to the three

// steps, and uses the zipWith to combine the elements before

val broadCastZipFlow = Flow[HttpRequest, HttpResponse]() {

implicit builder =>

bCast ~> step1 ~> zip.input1

in ~> bCast ~> step2 ~> zip.input2 ~> out

bCast ~> step3 ~> zip.input3

(in, out)



{"_id":{"$oid":"52853804bb1177ca391c2221"},"Ticker":"GOOG","Profit Margin":0.217

{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217

{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282

{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217

{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282

{"_id":{"$oid":"52853800bb1177ca391c1809"},"Ticker":"AAPL","Profit Margin":0.217

{"_id":{"$oid":"52853807bb1177ca391c2781"},"Ticker":"MSFT","Profit Margin":0.282



import akka.actor.ActorSystem

import akka.http.Http

import akka.stream.FlowMaterializer

import akka.http.model._

import akka.stream.scaladsl._

import akka.stream.scaladsl.Source

import akka.stream.scaladsl.FlowGraphImplicits._

import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent.Future


* Simple HTTP client created with akka-http


object Client extends App {

// the actor system to use. Required for flowmaterializer and HTTP.

// passed in implicit

implicit val system = ActorSystem("ServerTest")

implicit val materializer = FlowMaterializer()

val httpClient1 = Http(system).outgoingConnection("localhost", 8090).flow

val httpClient2 = Http(system).outgoingConnection("localhost", 8091).flow

// define a sink that will process the answer

// we could also process this as a flow

val printChunksConsumer = Sink.foreach[HttpResponse] { res =>

if(res.status == StatusCodes.OK) {

println("Recieved response : " + res);

res.entity.getDataBytes().map {

chunk =>

System.out.println("Chunk: " + chunk.decodeString(HttpCharsets.`UTF-8`.value).substring(0, 80))


} else



// we need to set allow cycles since internally the httpclient

// has some cyclic flows (apparently)

// we construct a sink, to which we connect a later to define source.

val reqFlow2: Sink[HttpRequest] = Sink[HttpRequest]() { implicit b =>


val source = UndefinedSource[HttpRequest]

val bcast = Broadcast[HttpRequest]

val concat = Concat[HttpResponse]

// simple graph. Duplicate the request, send twice.

// concat the result.

bcast ~> httpClient1 ~> concat.first

source ~> bcast ~> httpClient1 ~> concat.second ~> printChunksConsumer



// make two calls, both return futures, first one shows direct linked sinks and

// sources. Second one makes yse if our graph.

// make number of calls

val res = 1 to 5 map( i => {



val f = Future.sequence(res)

// make some calls with filled in request URI

val f3 = Source.single(HttpRequest(uri = Uri("/getAllTickers"))).via(httpClient2).runWith(printChunksConsumer)

val f4 = Source.single(HttpRequest(uri = Uri("/get?ticker=ADAT"))).via(httpClient2).runWith(printChunksConsumer)

val f5 = Source.single(HttpRequest(uri = Uri("/get?tikcer=FNB"))).via(httpClient2).runWith(printChunksConsumer)

for {

f2Result <- f

f2Result <- f3

f2Result <- f4

f2Result <- f5

} yield ({

println("All calls done")





