Akka-CQRS(11)- akka-http for http-web-service: Marshalling-数据序列化
前面几篇讨论了关于gRPC方式的前后端连接集成方式。gRPC也是一个开放的标准,但讲到普及性就远远不及基于http/1.1协议的web-service了。特别是gRPC的前端编程还是有一定的门槛,所以作为一种开放的网络大平台还是必须考虑用web-service方式的集成。平台服务api可以有两样选择:一种是传统web-service方式,新的一种是rest api款式。rest api比较适合数据库表的crud操作。在2017年我曾经写了一系列博客介绍akka-http,这里就不再叙述它的细节了。这篇我们只聚焦在解决当前问题上。在POS控制平台例子里不会涉及到POST操作,应该全部是GET类型的,如:
http://192.168.11.189:2588/pos/logon?opr=1010 http://192.168.11.189:2588/pos/logoff http://192.168.11.189:2588/pos/logsales?acct=001&dpt=01&code=978111&qty=3&price=1200 http://192.168.11.189:2588/pos/subtotal?level=0 http://192.168.11.189:2588/pos/discount?disctype=2&grouped=true&code=481&percent=20
可以看到,请求部分只是带参数的uri,不含entity数据部分,数据通过querystring提供。但返回会有几种数据类型:POSResponse,TxnsItems,vchState,这些都曾经在Protobuffer用IDL定义过:
message PBVchState { //单据状态 string opr = 1; //收款员 int64 jseq = 2; //begin journal sequence for read-side replay int32 num = 3; //当前单号 int32 seq = 4; //当前序号 bool void = 5; //取消模式 bool refd = 6; //退款模式 bool susp = 7; //挂单 bool canc = 8; //废单 bool due = 9; //当前余额 string su = 10; //主管编号 string mbr = 11; //会员号 int32 mode = 12; //当前操作流程:0=logOff, 1=LogOn, 2=Payment } message PBTxnItem { //交易记录 string txndate = 1; //交易日期 string txntime = 2; //录入时间 string opr = 3; //操作员 int32 num = 4; //销售单号 int32 seq = 5; //交易序号 int32 txntype = 6; //交易类型 int32 salestype = 7; //销售类型 int32 qty = 8; //交易数量 int32 price = 9; //单价(分) int32 amount = 10; //码洋(分) int32 disc = 11; //折扣率 (%) int32 dscamt = 12; //折扣额:负值 net实洋 = amount + dscamt string member = 13; //会员卡号 string code = 14; //编号(商品、卡号...) string acct = 15; //账号 string dpt = 16; //部类 } message PBPOSResponse { int32 sts = 1; string msg = 2; PBVchState voucher = 3; repeated PBTxnItem txnitems = 4; }
那么概括我们现在的主要工作包括:Uri解析,HttpResponse实例的构建和传输。
首先,用akka-http搭建一个http server框架:
import akka.actor._ import akka.stream._ import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives._ object HttpServerDemo extends App { implicit val httpSys = ActorSystem("httpSystem") implicit val httpMat = ActorMaterializer() implicit val httpEC = httpSys.dispatcher val route = path("hello") { complete {"hello, http server "} } val (port, host) = (8011,"192.168.11.189") val bindingFuture = Http().bindAndHandle(route,host,port) println(s"Server running at $host $port. Press any key to exit ...") scala.io.StdIn.readLine() bindingFuture.flatMap(_.unbind()) .onComplete(_ => httpSys.terminate()) /* bindingFuture.foreach(s => println(s.localAddress.getHostString)) bindingFuture.foreach(_.unbind()) bindingFuture.onComplete { case Success(value) => value.unbind() } */ }
用akka-http的server api很快就完成了一个简单的http-server。下一步研究一下如何构建返回的HttpResponse:httpresponse是从server端传送到client端的。这个过程包括把HttpResponse Entity里的数据从某种类型转换成通讯用的二进制数据流、到了客户端再转换成目标类型。akka-http的数据转换机制Marshaller/Unmarshaller是通过类型转换的隐式实例来实现的,akka-http提供了多个标准类型数据转换的隐式实例,如StringMarshaller:
implicit val ByteArrayMarshaller: ToEntityMarshaller[Array[Byte]] = byteArrayMarshaller(`application/octet-stream`) def byteArrayMarshaller(contentType: ContentType): ToEntityMarshaller[Array[Byte]] = Marshaller.withFixedContentType(contentType) { bytes => HttpEntity(contentType, bytes) } implicit val ByteStringMarshaller: ToEntityMarshaller[ByteString] = byteStringMarshaller(`application/octet-stream`) def byteStringMarshaller(contentType: ContentType): ToEntityMarshaller[ByteString] = Marshaller.withFixedContentType(contentType) { bytes => HttpEntity(contentType, bytes) } implicit val StringMarshaller: ToEntityMarshaller[String] = stringMarshaller(`text/plain`) def stringMarshaller(mediaType: MediaType.WithOpenCharset): ToEntityMarshaller[String] = Marshaller.withOpenCharset(mediaType) { (s, cs) => HttpEntity(mediaType withCharset cs, s) } def stringMarshaller(mediaType: MediaType.WithFixedCharset): ToEntityMarshaller[String] = Marshaller.withFixedContentType(mediaType) { s => HttpEntity(mediaType, s) } ...
因为akka-http提供了implicit val StringMarshaller,所以在上面的例子里我可以直接写成: complete("hello world!"),然后系统自动构建一个含字符类型数据entity的HttpResponse。Entity.dataBytes中的数据类型是由Entity.contentType指明的:
object ContentTypes { val `application/json` = ContentType(MediaTypes.`application/json`) val `application/octet-stream` = ContentType(MediaTypes.`application/octet-stream`) val `application/x-www-form-urlencoded` = ContentType(MediaTypes.`application/x-www-form-urlencoded`) val `text/plain(UTF-8)` = MediaTypes.`text/plain` withCharset HttpCharsets.`UTF-8` val `text/html(UTF-8)` = MediaTypes.`text/html` withCharset HttpCharsets.`UTF-8` val `text/xml(UTF-8)` = MediaTypes.`text/xml` withCharset HttpCharsets.`UTF-8` val `text/csv(UTF-8)` = MediaTypes.`text/csv` withCharset HttpCharsets.`UTF-8` val `application/grpc+proto` = ContentType(MediaTypes.`application/grpc+proto`) // used for explicitly suppressing the rendering of Content-Type headers on requests and responses val NoContentType = ContentType(MediaTypes.NoMediaType) }
客户端收到HttpResponse后把收到的二进制数据流转换成MediaTypes指定的类型。当然,最基本的数据类型就是String了。所有客户端都提供String类型的反序列化deserialization。理论上来讲,我们可以用字符形式来描述任何类型数据,这样我们可以把一个特殊类型实例转成String,然后发送给客户端。客户端再按照协议好的类型转换规则把字符转换成目标类型:
case class TextMessage(msg: String) val helloMsg: String = TextMessage("hello string message converter").toString val route = path("hello") { complete {helloMsg} }
不过,这种情况只适用于内部系统的数据交换,因为数据类型转换的规则方式都是内部私有的。xml,json是开放平台系统数据交换的标准数据类型描述语言,本身是字符String形式的,只是它用String描述类型的语法是行业标准的。客户端可以按行业标准从一个xml/json文件里提取里面的数据类型和实例。所以,自定义类型的数据转换主要包括 类型->jsonstring->bytestring->jsonstring->类型。换句话说我们只要有隐式JsonMarshaller实例就可以完成大部分的数据交换工作了。
spray-json是akka-http自带默认的一个json工具库,它提供了通用的针对任何类型T的Marshaller/Unmarshaller:
ToEntityMarshaller[T] 和 FromEntityUnmarshaller[T]。使用spay-json很简单,如下:
import akka.http.scaladsl.marshallers.sprayjson._ import spray.json._ object JsonMarshaller extends SprayJsonSupport with DefaultJsonProtocol { //domain models case class Person(name:String, age: Int) case class Location(province: String, city: String, zipcode: Int) case class Employee(person: Person, loccation: Location) //collect your json format instances implicit val fmtPerson = jsonFormat2(Person.apply) implicit val fmtLocation = jsonFormat3(Location.apply) implicit val fmtEmployee = jsonFormat2(Employee.apply) }
使用Marshaller时只要import JsonMarshaller._ 把几个类型的隐式转换实例带进可视域即可,如下:
import JsonMarshaller._ val person = Person("Jonh Doe", 23) val location = Location("GuangDong","ShenZhen",10223) val employee = Employee(person,location) val route = path("json") { complete {employee} }
就这么简单,试试看:
http://192.168.11.189:8011/json {"loccation":{"city":"ShenZhen","province":"GuangDong","zipcode":10223},"person":{"age":23,"name":"Jonh Doe"}}
没错,客户端的确收到正确的json数据。还有一项需求是在Reponse里返回一个数据流(多条数据),如当前交易项目清单。这个也比较容易:akka-http本身支持json-streaming。具体使用方法如下:
import akka.http.scaladsl.common.EntityStreamingSupport import akka.stream.scaladsl._ implicit val jsonStreamingSupport = EntityStreamingSupport.json() .withParallelMarshalling(parallelism = 4, unordered = false) val persons = List(person,Person("Peter Kung",28), Person("Ketty Wang",16)) val personDataSource: Source[Person,Any] = Source.fromIterator(() => persons.iterator) val route = path("json") { complete {employee} } ~ path("stream") { complete(personDataSource) }
在客户端browser上测试:
http://192.168.11.189:8011/stream [{"age":23,"name":"Jonh Doe"},{"age":28,"name":"Peter Kung"},{"age":16,"name":"Ketty Wang"}]
也没问题。下面是本次示范中使用的依赖和它们的版本:
libraryDependencies ++= Seq( "de.heikoseeberger" %% "akka-http-json4s" % "1.26.0", "org.json4s" %% "json4s-jackson" % "3.6.6", "org.json4s" %% "json4s-ext" % "3.6.6", "com.typesafe.akka" %% "akka-http" % "10.1.8" , "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8", "com.typesafe.akka" %% "akka-stream" % "2.5.23" )
- Akka(40): Http:Marshalling reviewed - 传输数据序列化重温
- Akka(40): Http:Marshalling reviewed - 传输数据序列化重温
- HttpService & WebService For Flex Develop
- The WinHTTP Web Proxy Auto-Discovery Service has been idle for 15 minutes,it will be shut down.
- HttpService & WebService For Flex Develop
- Develop an Apache HttpClient client for Android to a JAX-RS web service
- C# WebService XmlSerializer反序列化失败:时间字符串不是有效的AllXsd值
- use webHttpBinding to implement WCF Rest Service
- 使用httpwebrequest Post数据到网站
- 面向 Java 开发人员的 Ajax: 结合 Direct Web Remoting 使用 Ajax----数据序列化不可能比这更简单了!
- 【请教】org.springframework.web.servlet.PageNotFound - No mapping found for HTTP req
- 使用httpwebrequest发送数据到网站
- 纯vc sdk实现http post 方式上传数据到web服务器
- flex和后端的数据交互(一)--XML和HTTPService
- 使用httpclient实现上传下载(javaWeb系统数据传输http实现)
- 运用HttpWebRequest与HttpWebResponse获取Web页数据
- C# webservice 名称以无效字符开头.处理http://localhost/webservice/test.aspx是出错
- Android与服务器端数据交互(基于SOAP协议整合android+webservice)
- HTTP Status 401 - Servlet.service() for servlet jsp threw exception
- vs2008运行WEB程序时提示无法启动程序"http://localhost:12345/index.aspx".有更多数据可用