您的位置:首页 > 理论基础 > 计算机网络

微服务框架Finagle介绍 Part2: 在Finagle中开发基于Http协议的应用

2017-05-18 11:20 471 查看
原文地址:http://skaka.me/blog/2016/05/01/finagle2/

上篇文章中我介绍了Finagle中的Future/Service/Filter.
这篇文章里, 我们将构建一个基于Http协议的echo服务端和客户端, 下篇文章将构建一个基于thrift协议的客户端和服务端. 这两篇文章对应的源代码地址在Github.
代码中有Java和Scala版本两套版本的实现, 但是这里我只会介绍Java版本.

首先来看echo应用的Server端代码, 打开
java-finagle-example/src/main/java/com/akkafun/finagle/Server.java
:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

public class Server extends Service<Request, Response> {                             //1

@Override
public Future<Response> apply(Request request) {                                 //2
System.out.println("request: " + request.getContentString());
Response response = Response.apply(Version.Http11$.MODULE$, Status.Ok());
response.setContentString(request.getContentString());
return Future.value(response);
}

public static void main(String[] args) throws Exception {
Server service = new Server();

ListeningServer server = Http.server().                                      //3
withLabel("echo-server").
withTracer(ZipkinTracer.mk("192.168.99.100",
9410, DefaultStatsReceiver$.MODULE$, 1.0f)).
serve(new InetSocketAddress(8081), service);

Await.result(server);
}
}

在Finagle中, 实现一个RPC服务非常简单. 只需要继承Service抽象类, 实现它的apply方法. Service抽象类有两个类型参数, 第一个类型参数代表的是请求对象, 第二个类型参数代表的是返回对象. 这两个对象的具体类型与Service实现类使用的具体协议有关. 例如我们在echo服务中使用Http协议, 对应的Request类就是
com.twitter.finagle.http.Request
,
对应的Response类是
com.twitter.finagle.http.Response
.
如果是thrift协议, 则这两个类型参数在Service实现类中都是
scala.Array<scala.Byte>
(Array和Byte都是scala中的类,
对应Java中的数组与byte).

apply方法中, 我们首先使用Response的工厂方法构造一个Response对象. 然后将Request中的请求内容原封不动的设置到Response中, 再将Response设置到Future中返回. 需要最后一步的原因是apply方法的返回值类型是
Future<Response>
,
但是我们在这个方法中不需要进行异步操作, 所以可以直接使用
Future.value(response)
将对象包装成Future返回.
另外, 细心的你应该发现了一行比较碍眼的代码: 
Response.apply(Version.Http11$.MODULE$,
Status.Ok())
, 其中Version的用法很古怪. 这是Java调用Scala伴生对象的副作用, Scala有一些语法和特性在Java中没有对应的概念, 这种情况下Java调用Scala的代码就会比较晦涩.

为了启动Service实例, 我们需要构造一个
com.twitter.finagle.ListeningServer
withLabel
设置服务名称, 
withTracer
设置监控信息,
这个等后面介绍zipkin的时候在解释. 最后指定端口启动服务.

现在来看echo应用的Client端代码, 打开
java-finagle-example/src/main/java/com/akkafun/finagle/Client.java
:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2223
24
25
26
27
28
29
30
3132
33
34
35

import static scala.compat.java8.JFunction.*;

public class Client {

public static void main(String[] args) throws TimeoutException, InterruptedException {
Service<Request, Response> service = Http.client().                             //1
withLabel("echo-client").
withTracer(ZipkinTracer.mk("192.168.99.100",
9410, DefaultStatsReceiver$.MODULE$, 1.0f)).
newService("127.0.0.1:8081");

//create a "Greetings!" request.
Reader data = Reader$.MODULE$.fromStream(                                       //2
new ByteArrayInputStream("Greetings!".getBytes(StandardCharsets.UTF_8)));
Request request = Request.apply(Version.Http11$.MODULE$,
Method.Post$.MODULE$, "/", data);

Future<Response> responseFuture = Await.ready(service.apply(request));          //3
responseFuture.onSuccess(func(response -> {                                     //4
System.out.println(String.format("response status: %s, response string: %s",
response.status().toString(), response.contentString()));
return BoxedUnit.UNIT;
}));
responseFuture.onFailure(func(e -> {
System.out.println("error: " + e.toString());
return BoxedUnit.UNIT;
}));
responseFuture.ensure(func(() -> {
service.close();
//IDE may complain here, just ignore
return BoxedUnit.UNIT;
}));

}
}

这部分代码和我们之前的Server类代码很像. 在Server类中, 我们创建了一个Service实例并监听了8081端口, 现在客户端通过newService创建了一个Service的stub.

这部分代码用来构造一个消息内容为Greetings的Http请求.

service.apply(request)
就是一次客户端到服务端的RPC调用.
这个调用的返回值是
Future<Response>
.

service.apply(request)
是一个异步操作,
主线程调用这个方法并不会阻塞, 有可能主线程退出了实际调用还没有完成. 所以这里就要用到
Await.ready
了. 
Await.ready
的作用是等待一个Future执行完成再返回,
是一个同步操作. 通过调用
Await.ready
我们就能将一个异步操作转化成一个同步操作.

接下来我们在Future上注册请求成功与失败的回调函数. 请求成功的回调函数中只是简单的打印出响应的消息内容.

这里有个细节需要说明一下. Future的onSuccess方法需要传入一个Scala的函数特质: 
scala.Function1[Response,
BoxedUnit]
. 如果是Java6或7, 我们可以这样实现这个特质:

1
2
3
4
5
6
7
8

responseFuture.onSuccess(new AbstractFunction1<Response, BoxedUnit>(){
@Override
public BoxedUnit apply(Response response) {
System.out.println(String.format("response status: %s, response string: %s",
response.status().toString(), response.contentString()));
return BoxedUnit.UNIT;
}
});

在Java8中, 这种匿名类我们一般会使用Lambda代替, 理想情况下写法是这样:

1
2
3
4
5

responseFuture.onSuccess(response -> {
System.out.println(String.format("response status: %s, response string: %s",
response.status().toString(), response.contentString()));
return BoxedUnit.UNIT;
});

可惜的是这种写法编译不会通过, 因为只有符合
FunctionalInterface
定义的接口才能使用Lambda表达式(什么是
FunctionalInterface
,
请参考Javadoc),
而在Scala2.11中, 
scala.Function1
不是一个
FunctionalInterface
(Scala2.12会兼容Java8).
为了在这里使用Lambda, 我们使用了scala-java8-compat这个库,
调用
scala.compat.java8.JFunction.func
方法将一个
FunctionalInterface
转化成
scala.Function1
.

可以看出, 在Java中调用Finagle的API不是很方便. 所以Finagle适合以Scala为主, Java为辅的项目. 如果项目全是Java, 则值得为Finagle主要的API写一层Java的适配层, 来屏蔽Java调用Scala代码会出现的一些晦涩代码.

现在我们启动服务端和客户端来看看运行结果. 首先启动Server类, 然后启动Client. Client运行完毕自动结束, 你应该能在Client的控制台看到如下输出:

1

response status: Status(200), response string: Greetings!

Server控制台的输出:

1

request: Greetings!

Http协议比较适合用于对外提供服务, 并且一般会使用REST. 在Finagle中使用REST可以使用Finch库.
这个库轻量小巧, API简单, 提供了一套很方便的对Http消息进行操作的DSL. 如果是内网服务调用, 一般推荐使用结构紧凑, 传输效率高的协议. 比如protocol buffer, thrift或Avro. Finagle对thrift有很好的支持, 下篇文章我将介绍在Finagle中如何开发thrift应用.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: