您的位置:首页 > 其它

Vert.x Core 中文使用手册(3.5版)(持续更新)

2017-10-18 00:00 1651 查看
摘要: 该博客是我阅读Vert.x Core Manual的笔记,有理解错误的敬请提出,文档地址:http://vertx.io/docs/vertx-core/java/#_in_the_beginning_there_was_vert_x

1. 开始使用

使用该框架第一步是创建Vertx对象,该框架的功能都依赖于Vertx,例如创建client,servers,获取event bus,设置timers等等操作。

简单创建Vertx

可以简单创建Vertx,代码如下:

Vertx vertx =Vertx.vertx()


创建时做某些配置

如果想配置一些信息,可以这么创建,代码如下:

Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(40))

更多配置信息请阅读VertxOptions文档。

创建集群的Vertx对象

后续补充。

注意:多数情况下只需要创建一个Vertx对象就可以了,不过也有些情况需要创建多个Vertx对象,例如两个总线需要隔离或者客户端服务器需要分组时。(具体解释后续补充)

2. 链式操作

Vertx的方法会返回自身对象的引用,所以你可以链式调用函数

链式调用代码:

request.response().putHeader("Content-Type", "text/plain").write("some text").end()

如果不喜欢链式调用,也可以拆开,代码如下:

HttpServerResponse response = request.response();
response.putHeader("Content-Type", "text/plain");
response.write("some text");
response.end();

上面两段代码没有区别。

3. 回调

Vertx APIs是事件驱动的,也就是当你感兴趣的事情发生后,Vertx会以发送events的方式通知你

下面是部分events:

定时器被触发

socket接收到了一些数据

数据已经从硬盘中读取

产生了exception

HTTP server 接到了请求

你可以通过向Vertx APIs注册处理函数(handler)的方式处理这些events,例如启动一个每秒触发一次的定时器,并注册一个处理这个事件的函数,代码如下:

vertx.setPeriodic(1000, id -> {
// This handler will get called every second
System.out.println("timer fired!");
});

又例如处理Http请求,代码如下:

server.requestHandler(request -> {
// This handler will be called every time an HTTP request is received at the server
request.response().end("hello world!");
});

注意:当事件发生时,Vertx会异步调用处理函数(handler)。由于是异步的,下面将介绍一些在Vertx中重要的概念。

4.拒绝阻塞

绝大部分Vertx APIs是不会阻塞当前线程的。(有非常少的会,例如后续补充)。

如果代码能够立即得到结果,可以在当前线程执行。如果不能,你通常应该注册一个处理函数(handler),等代码执行完后由Vertx调用处理函数(handler)做后续处理。

通常需要较长时间的操作会发生在以下场景:

从socket中读取数据

向硬盘写数据

发送数据并等待回复

许多其他的情况

如果在在当前线程执行以上操作,并等待操作执行完(期间该不能做其他事)再做后续的处理,这会很低效(这种方式我称为阻塞式)。

阻塞式的例子

/*
比如说,你以上面所说的方式实现了一个服务器。
现在每秒共有100个请求同时到达,并且其中50个请求需要读取硬盘数据(用时2秒),其余50个能立即完成。
那么10秒内共有1000个请求到达,
第一秒需要开启51个线程,其中50个在等待数据读取操作的完成,其中1个线程处理其余操作
第二秒开始,有1个线程的工作已经完成,50个线程在忙,所以需要再开50个线程处理需要读取硬盘数据的操作
第三秒开始~~~~~~~~~~
以此类推,阻塞式的处理方式需要开启大量线程才能使你的服务器不卡死
*/


非阻塞式

/*
和上面例子情况相关,不过当进行数据读取操作时,线程不会一直等待,而是继续处理其他请求
第一秒需要开启1个线程,其中50请求会开启数据读取操作,当开启后不阻塞,会继续处理其他请求
第二秒开始,有1个线程的工作已经完成,接着向第一秒一样处理请求
第三秒~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~
以此类推,非阻塞式的处理方式只需要少数线程就能让服务器不卡死
*/

补充说明:在计算机中,有专门的硬件处理硬盘读写操作,当操作完成后会发出中断通知对应程序,也就是硬盘读写不占用cpu。不只是硬盘读写,很多操作也是有专门的硬件处理的,不占用cpu。具体内容请看计算机组成原理(大概是这样,具体是什么我忘了,欢迎知道的补充完整,上面有错误的也欢迎指出)

线程占用的内存以及不同线程间的切换会带来开销。现在的应用的并发量会很大,阻塞式的处理方式难以满足处理大量并发请求的需求。

5.反应器和多个反应器

上面提到过,Vertx是事件驱动的,也就是当某个事件发生时,会调用相应的处理函数(handler)

大多数情况下,Vertx使用一个称为event loop的线程调用你的处理函数(handler)

这个event loop会在events到达时将其(指events)分派给不同的处理函数(handler),这样你的程序就不会阻塞。

因为不会阻塞,一个event loop可以在短时间内分发大量的events。例如单个event loop可以非常快的处理数以千计的HTTP 请求。

我们称这种模式为 Reactor Pattern(反应器模式)

补充:上面那个链接是维基百科的,应该需要翻墙才能看,这里给个国内博客的链接:Reactor(反应器)模式初探

标准的反应器模式实现是单线程的,也就是一个event loop循环监听events的到来,并将其分发到对应的处理函数(handler)。(换句话说,一个event loop处理全部的events)

这种实现方式的缺陷是,任何时候它只能运行在cpu的单核上,如果你想使用cpu的其他核,你必须启动和管理多个进程。下面给出原文,因为怕理解不对

The trouble with a single thread is it can only run on a single core at any one time, so if you want your single threaded reactor application (e.g. your Node.js application) to scale over your multi-core server you have to start up and manage many different processes.

Vertx的实现方式和上面的不同。每一个Vertx实例持有多个event loop而不是单个event loop。默认情况下会根据机器上可用核的数量决定event loop的数量,当然这个可以被重写。(意思是默认情况下Vertx框架会自动根据机器上cpu的核数决定event loop的数量,默认event loop的数量是cpu核数*2,下面给出原文)

Vert.x works differently here. Instead of a single event loop, each Vertx instance maintains several event loops. By default we choose the number based on the number of available cores on the machine, but this can be overridden.

为了和单线程的反应器模式区别,我们称这个模式为多反应器模式(Multi-Reactor Pattern)。

注意:虽然一个Vertx实例会持有多个event loop,但是任何一个处理函数(handler)不会被并发调用。在大多数情况(除了后续补充)下,处理函数会由固定的event loop调用。(换句话说,现在有处理函数A,B,C以及event loop a,b。通常不会发生a和b同时调用A的情况,一般是A固定由b调用,B固定由a调用)

6. 黄金法则 -不要阻塞Event Loop

上面提到过,Vertx APIs是非阻塞的并且不会阻塞event loop。但是如果你在处理函数内写了阻塞代码,event loop还是会被阻塞。

如果全部event loop被阻塞了,你的程序就会完全停止。所以别在处理函数内写阻塞代码!

下面给出一些阻塞代码例子:

Thread.sleep()

等待锁

做长时间的数据库操作或者等待结果

做复杂运算

大量循环(不知到理解对没,原文是Spinning in a loop)

多长时间的操作算阻塞?这个根据实际情况定。例如你希望你的程序每秒能处理10000个HTTP请求,那么单个请求处理时间不能超过0.1ms。也就是说执行时间超过0.1ms就是阻塞了。

如果检测到某个event loop有一定时间没有返回了,Vertx能自动以日志的形式发出提醒。如果你看到类似下面的日志,你就要注意是否在event loop的哪个地方发生阻塞。

Thread vertx-eventloop-thread-3 has been blocked for 20458 ms

Vertx也提供栈式追踪,从而精确定位发生阻塞的位置。

如果你想关闭提醒或者更改设置,可以在VertxOptions对象上做相应设置。注意:应该在创建Vertix对象前设置。

Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(40))


7.运行阻塞式代码

理想情况下全部APIs是异步的,然后实际上并不是。

在JVM技术圈,有许多异步的APIs也有许多阻塞式的APIs。一个很好的例子就是JDBC,它是天然同步的,即使Vertx再牛逼,也不能把它变成异步的。

Vertx不打算将全部都改写成异步的,而是提供一种安全运行阻塞代码的方式。

前面我们说过,不要在处理函数(handler)内写阻塞式代码,那如何执行阻塞代码呢?

答案是通过调用excuteBlocking来执行指定的阻塞式代码,并且当阻塞式代码执行完后,处理函数会被异步调用。例子如下:

/*
第一个参数是阻塞式代码,第二个参数是处理函数
*/
vertx.executeBlocking(future -> {
// Call some blocking API that takes a significant amount of time to return
String result = someAPI.blockingMethod("hello");
future.complete(result);
}, res -> {
System.out.println("The result is: " + res.result());
});

默认情况下,如果executeBlocking在同一个上下文(context)中多次被调用(例如在同一个Vertix实例),不同的executeBlocking会被顺序执行(一个接着一个)。

如果你不在意执行顺序,你可以将参数ordered设为false。这样,任何executeBlocking都可能会在工作池中并行执行(一些名词解释后续补充)

<T> void executeBlocking(Handler<Future<T>> blockingCodeHandler,
boolean ordered,
Handler<AsyncResult<T>> resultHandler)

另一种运行阻塞式代码的可选方式是使用worker verticleworker verticle总是由来自工作池的线程执行(A worker verticle is always executed with a thread from the worker pool.)。(具体情况后续补充)

默认情况下,阻塞代码式会在Vertx的工作池中执行,这个工作池由VertxOptions对象的setWorkerPoolSize配置。

你也可以创建新的工作池用来执行阻塞式代码(除了执行阻塞式代码,应该还有其余用途,后续补充)。示例代码如下:

/*
创建一个WorkerExecuter,对象持有新的线程池来执行代码?
具体情况后续补充
*/
WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool");
executor.executeBlocking(future -> {
// Call some blocking API that takes a significant amount of time to return
String result = someAPI.blockingMethod("hello");
future.complete(result);
}, res -> {
System.out.println("The result is: " + res.result());
});

注意:新创建的工作池不再使用的时候必须关闭

executor.close();

如果多个同名的WorkerExcutor被创建,那么这些WorkerExcutor使用相同的线程池。当全部使用这个线程池的WorkerExcutor被关闭,该线程池会被销毁。

如果WorkerExcutor是在Verticle中创建的,那么当这个Verticle被unDeployed后,Vertx会自动帮你关闭对应的WorkerExcutor。

WorkerExcutors是可配置的,实例代码如下:

int poolSize = 10;

// 2 minutes
long maxExecuteTime = 120000;

WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool", poolSize, maxExecuteTime);


8.异步协作

可以使用Vertx的futures实现多个异步结果的协作。Vertx支持并行组合(并发运行数个异步操作)和顺序组合(链式异步操作)。(我的理解是支持并行和顺序两种方式整合异步操作的结果下面给出原文)

Coordination of multiple asynchronous results can be achieved with Vert.x futures. It supports concurrent composition (run several async operations in parallel) and sequential composition (chain async operations).

并行组合

CompositeFuture.all接收数个Future参数(最多6个),返回一个Future(如果前面接受的Future参数的状态都是succeed,返回的这个Future的状态也是succeed,如果前面接受的Future有一个或者多个的状态式failed,那么返回的这个Future的状态式failed),下面给出官方示例代码:

Future<HttpServer> httpServerFuture = Future.future();
httpServer.listen(httpServerFuture.completer());

Future<NetServer> netServerFuture = Future.future();
netServer.listen(netServerFuture.completer());

CompositeFuture.all(httpServerFuture, netServerFuture).setHandler(ar -> {
if (ar.succeeded()) {
// All servers started
} else {
// At least one server failed
}
});

补充说明:下面给出自己写的例子,辅助理解

/*
这段代码的作用是:
1.运行两段阻塞式的代码,代码根据当前时间设置状态为运行成功还是不成功
2.使用CompositeFuture.All判断结果
*/
//创建Vertx和两个Future
Future<String> oneFuture=Future.future();
Future<String> secondFuture=Future.future();
Vertx vertx=new Vertx();

//运行阻塞式代码
vertx.executeBlocking(future -> {
int i = 2;
while (i-- > 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//根据时间设置运行结果
if(new Date().getTime()%2==0)
future.complete("运行成功");
else
future.fail("运行失败");
},oneFuture.completer());

vertx.executeBlocking(future -> {
int i = 2;
while (i-- > 0) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(new Date().getTime()%2==0)
future.complete("运行成功");
else
future.fail("运行失败");
},secondFuture.completer());

Future rtFuture=CompositeFuture.all(oneFuture,secondFuture);
rtFuture.setHandler(ar->{
if(ar.succeeded())
System.out.println("全部代码运行成功");
else
System.out.println("至少有一个运行失败");
});

操作是并行执行的,在返回的Future中设置的处理函数(rtFuture.setHandler....)会在操作全部完成后调用。如果传入CompositeFuture.all的Future中有一个或者多个被标记为failed,那么返回的Future也会被标记为failed,若全部被标记为succeed,那返回的Future会被标记为succeed

简单总结一下,

创建几个Future,让阻塞式代码运行完后给对应的Future设置一下状态(succed or falied),然后这几个Future就可以表示对应阻塞代码是否运行成功。

因为阻塞代码需要较长时间运行而且结束时间不一定,为了协作结果,Vertx提供了一个API,这个API返回一个Future,并且这个Future在全部Future被设置了状态后会被设置成相应状态。

而且Future可以设置一个处理函数,这个处理函数(handler)会在设置了状态后执行。这样就达到了协作异步操作结果的效果。

CompositeFuture.all还有许多重载方法,具体内容请看文档。

并行组合的方法除了all外,还有any(CompositeFuture.all)和join(CompositeFuture.join)。这三者用法相同(当然方法名称是不相同的,相同的是传入的参数和返回的参数)。为了方便说明,假设传入了三个Future,分别记为a,b,c,返回的Future记为rt。三者区别如下:

对于all。如果a,b,c全被设为succeed,那么rt被设为succed。如果出现一个参数被设为failed,那么rt立即被设为failed(立即的意思就是不管其他参数有没被设置,只有有一个被设为failed,就立即给rt设置failed状态)

对于any。有一个参数被设置为succeed,rt就立即被设置为succeed。如果全部a,b,c都被设为failed,rt就设为failed

对于join。需要全部a,b,c都被设置了状态,rt才会被设置状态。如果a,b,c都为succeed,rt为succeed。如果a,b,c中有一个为failed,rt就被设为failed。

顺序组合

顺序组合的方法是compose。下面给出官方示例代码,下文有说明。

FileSystem fs = vertx.fileSystem();
Future<Void> startFuture = Future.future();

Future<Void> fut1 = Future.future();
fs.createFile("/foo", fut1.completer());

fut1.compose(v -> {
// When the file is created (fut1), execute this:
Future<Void> fut2 = Future.future();
fs.writeFile("/foo", Buffer.buffer(), fut2.completer());
return fut2;
}).compose(v -> {
// When the file is written (fut2), execute this:
fs.move("/foo", "/bar", startFuture.completer());
},
// mark startFuture it as failed if any step fails.
startFuture);

在这个例子中,有三个操作链式执行。

创建一个文件(fut1)

向文件写入一些数据(fut2)

移动文件(startFuture)

当这三个操作都成功时,最后一个Future(startFuture)会被设置为succeed。若有一个操作不成功,那最后一个Future会被设置为failed。

简单说明compose的用法(compose是Future的方法,下面所说的future都是指compose所属的Future对象):

先给出方法原型:

default <U> Future<U> compose(java.util.function.Function<T,Future<U>> mapper)

default <U> Future<U> compose(Handler<T> handler, Future<U> next)


一个参数的compose:当future被设置状态succeed,传入compose的处理函数会被调用。这个处理函数会返回一个Future(不会设置返回的Future的状态)。如果future被设置为failed,处理函数(handler)不会被调用,但处理函数(handler)返回的Future会被设置为failed。

两个参数的compose:当future被设置为succeed,传入compose的处理函数会被调用。在这个处理函数(handler)中应该设置next的状态。当future被设置为failed,处理函数(handler)不会被调用,但next会自动设置为failed

补充说明:下面给出源码

default <U> Future<U> compose(Function<T, Future<U>> mapper) {
if (mapper == null) {
throw new NullPointerException();
}
Future<U> ret = Future.future();
setHandler(ar -> {
if (ar.succeeded()) {
Future<U> apply;
try {
apply = mapper.apply(ar.result());
} catch (Throwable e) {
ret.fail(e);
return;
}
apply.setHandler(ret);
} else {
ret.fail(ar.cause());
}
});
return ret;
}

default <U> Future<U> compose(Handler<T> handler, Future<U> next) {
setHandler(ar -> {
if (ar.succeeded()) {
try {
handler.handle(ar.result());
} catch (Throwable err) {
if (next.isComplete()) {
throw err;
}
next.fail(err);
}
} else {
next.fail(ar.cause());
}
});
return next;
}

简单的说,如果future被设置为fail,或者处理函数(handler)抛出异常,这个通过compose方法关联的future(例如第一种compose返回的future,第二种方法中的第二个参数)也会被设置为fail,如果future被设置为success,并且处理函数(handler)没有抛出异常,关联的future不会被设置任何状态,你需要在处理函数(handler),或者其他什么地方设置关联的future的状态。

9. Verticles

Vertx带有一个简单,可扩展,actor-like部署(原文是actor-like deployment)和并发的开箱即用模型。你可以用它来协助你实现自己的功能。

这个模型式是完全可选的。如果你不喜欢它,Vertx不会强迫你使用这种方式创建你的应用。

这个模型不是标准的角色模型(actor-model)的实现。它在并发,扩展和部署这些方面和角色模型(actor-model)相似。

使用这个模型, 你的代码就像是verticles(不理解没关系,看下文对这个概念的解释)的集合。

verticles是一个由Vertx运行和部署的代码块。verticles类似于角色模型(actor-model)中的actor。

一个由Vertx实现的应用一般是由许多同时运行在同一个Vertx的verticle实例组成。不同的verticle实例通过在event bus发送消息的方式通信。

编写Verticles

verticle类必须实现Verticle接口。你可以直接 implement Verticle接口,不过通常我们会继承抽象类AbstractVerticle

下面给出官方示例代码:

public class MyVerticle extends AbstractVerticle {

// Called when verticle is deployed
public void start() {
}

// Optional - called when verticle is undeployed
public void stop() {
}

}

一般你需要重写start方法。当Vertx部署这个verticle时候,它会调用start方法。当start方法执行完后,这个verticle就被认为已经启动了。

你可以根据需要重写stop方法。stop方法会在这个verticle卸载(undeployed)的时候调用。当stop方法执行完,这个verticle就被认为是停止的。

异步Verticle的启动和停止

你可能想要在start函数内执行耗时比较长的操作或者你想当start函数执行完后,并且某个事情已经发生了(例如等待其他的verticle启动),这个verticle才被认为是启动的。

上文说过,你不能阻塞线程。那么应该怎么做?

答案是实现异步的start方法。异步的start方法需要一个Future参数。当异步的start方法执行完后,verticle不会被认为是已经启动的。等你已经执行完阻塞代码(耗时比较长的),通过设置传入的Future参数的状态表明你已经执行完。

下面给出官方示例代码:

public class MyVerticle extends AbstractVerticle {

public void start(Future<Void> startFuture) {
// Now deploy some other verticle:

vertx.deployVerticle("com.foo.OtherVerticle", res -> {
if (res.succeeded()) {
startFuture.complete();
} else {
startFuture.fail(res.cause());
}
});
}
}

类似的,也有一个异步的stop方法。

public class MyVerticle extends AbstractVerticle {

public void start() {
// Do something
}

public void stop(Future<Void> stopFuture) {
obj.doSomethingThatTakesTime(res -> {
if (res.succeeded()) {
stopFuture.complete();
} else {
stopFuture.fail();
}
});
}
}


提示:你不需要手动卸载由某个verticle(记为A)启动的verticle(记为As)。当你卸载了A,As会由Vertx自动卸载。

Verticle的类型

有三种类型的verticle:

1. 标准Verticle(standard verticles)

这是最常见和有用的类型。它通常使用event loop线程执行。具体内容在下一部分讲。

2. 工作者Verticle(worker verticles)

这种类型使用工作线程执行,一个实例不会被多个线程并发运行。

3. 多线程工作者Verticle(multi-threaded worker verticles)

这种类型使用工作线程执行,一个实例可以被多个线程并发执行。

标准Verticle(Standard verticles)

当标准的verticle被创建时,它会被分配给一个event loop 线程。verticle的启动方法(void start() )由这个event loop调用。当你在event loop中调用在vertx core API中可以接受一个处理函数(handler)的任何方法时,vertx会保证这些处理函数(handler)将由相同的event loop执行。

在event loop中调用在vertx core API.....

这句话我的理解是:event loop(记为A)执行了一个方法,这个方法调用了一个接受处理函数(handler)的方法。那么以后某个时刻这些处理函数(handler)被调用时,vertx保证是由A调用的。

下面给出原文:

Standard verticles are assigned an event loop thread when they are created and the start method is called with that event loop. When you call any other methods that takes a handler on a core API from an event loop then Vert.x will guarantee that those handlers, when called, will be executed on the same event loop.

这意味着我们可以保证verticle实例中的全部代码总是由相同的event loop 执行的(当然,前提是你没有创建自己的线程并调用它)。

也意味着你可以把你的应用当做单线程写,至于线程的扩展、并发等问题就交由vertx处理。你不用再担心同步和稳定性的问题,你也可以避免条件竞争(race conditions)和死锁(deadlock)等问题,而这些问题在编写传统的多线程应用时是很常见的。

工作者verticles(Worker verticles)

工作者verticles(worker verticles)和标准verticles(standard verticles)是类似的。不同的是,工作者verticles(worker verticles)不是由event loop执行,而是用一个来自vertx工作者线程池( Vert.x worker thread pool)的线程执行。

工作者verticles(worker verticles)是为调用阻塞代码而设计的,所以它不会阻塞任何event loops。

如果你不想使用工作者verticles(worker verticles)运行阻塞代码,你也可以在event loop中直接运行内联的阻塞代码。(内联的阻塞代码就是指上文所说的使用excuteBlocking运行阻塞代码)

如果你想把一个verticle部署为工作者verticle(worker verticle),你可以使用setWorker

DeploymentOptions options = new DeploymentOptions().setWorker(true);
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);

工作者verticle(worker verticle)实例是从不被多于一个线程并发的执行,不过它可以由不同的线程在不同时刻执行。

多线程工作者verticles(multi-threaded worker verticles)

一个多线程工作verticle (multi-threaded worker verticle)和普通的工作者verticle(worker verticle)类似,不过它可以被不同的线程并发执行。

警告:多线程工作verticles(multi-threaded worker verticle)是一个高级的特性。大多数应用是不需要使用它的。因为使用标准的java技术开放多线程程序时,为了保持verticles(多线程工作者verticles)状态的一致性,你必须特别小心在这些verticles(多线程工作者verticles)中的并发

以编程的方式部署verticles

你可以使用deployVerticle方法部署一个verticle。使用这类方法你需要指定verticle的名称或者可以传递一个你已经创建好的verticle实例。

注意:仅可以在java中以传递verticle实例的方式部署verticle(这个框架还有其他语言版的,具体可以看官网:Vert‘x官方网

Verticle myVerticle = new MyVerticle();
vertx.deployVerticle(myVerticle);

你也可以指定verticle名称的方式部署verticle。

verticle的名称是用于查找特定的VerticleFactory(这个东西用于实例化实际的verticle实例)。

不同的verticle工厂(verticle factory)用于实例化不同语言版本的verticle,这么做的原因有很多,例如为了在运行时加载、获取来自maven的services或者verticles。

这允许你部署使用任何语言编写的verticle(当然,前提是vertix支持这种语言)。

这有一个部署一些不同类型的verticle的例子:

vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle");

// Deploy a JavaScript verticle
vertx.deployVerticle("verticles/myverticle.js");

// Deploy a Ruby verticle verticle
vertx.deployVerticle("verticles/my_verticle.rb");


从verticle名字到verticle 工厂(verticle factory)的映射规则

在“使用verticle的名字部署verticle“这种方式中,verticle的名字是用于选择实例化verticle的verticle工厂(verticle factory)。

verticle的名字可以有一个前缀,这个前缀是一个后面跟着一个冒号的字符串。如果添加这个前缀,vertic将根据这个前缀选择实例化verticle的verticle工厂(verticle factory)。

js:foo.js // 使用 JavaScript verticle factory groovy:com.mycompany.SomeGroovyCompiledVerticle // 使用 Groovy verticle factory service:com.mycompany:myorderservice // 使用 service verticle factory

如果没有添加前缀,vertx 将根据后缀选择实例化verticle的verticle工厂(verticle factory)。

foo.js // Will also use the JavaScript verticle factory

SomeScript.groovy // Will use the Groovy verticle factory

如果既没有前缀也没有后缀,vertix 将假定这个名字是java的类的全名(包名加类名?)并尝试实例化它。

verticle 工厂(verticle factory)是怎么定位的?

大多数verticle 工厂是从类路径加载并在vertic启动的时候注册。

如果你愿意,你也可以使用registerVerticleFactoryunregisterVerticleFactory注册和注销verticle工厂(verticle factory)。

等待部署的完成

verticle 的部署是异步的,也许会在调用的deploy方法已经return后才完成(完成部署)。

如果你想在部署完成的时候通知你,你可以注册一个处理函数(handler):

vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", res -> {
if (res.succeeded()) {
System.out.println("Deployment id is: " + res.result());
} else {
System.out.println("Deployment failed!");
}
});

如果部署成果,这个处理函数(handler)会被传递一个deployment ID字符串。

如果你以后想卸载这个verticle,可以使用对应的deployment ID进行卸载操作。

卸载verticle

可以使用undeploy卸载已部署的verticle。

卸载也是异步的,所有如果你想卸载完成的时候通知你,你可以指定一个处理函数(handler):

vertx.undeploy(deploymentID, res -> {
if (res.succeeded()) {
System.out.println("Undeployed ok");
} else {
System.out.println("Undeploy failed!");
}
});


指定verticle实例的数量

当使用verticle的名字部署时,你可以指定你想要部署的verticle的数量。

DeploymentOptions options = new DeploymentOptions().setInstances(16);
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);

这对于在多核的机器上扩展是很有用的。比如说,你可能有一个web-server verticle要部署,并且你的服务器是多核的, 很自然的你会想部署多个实例从而充分利用全部核心(cpu的核)。

设置verticle的配置

可以在部署的时候将JSON格式的配置传给verticle

JsonObject config = new JsonObject().put("name", "tim").put("directory", "/blah");
DeploymentOptions options = new DeploymentOptions().setConfig(config);
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);

这个配置可以通过Context对象或者直接使用config方法获取。

配置会以JSON对象的形式返回,因此你可以像下面这么获取配置数据

System.out.println("Configuration: " + config().getString("name"));


在verticle中访问环境变量

使用java API获取环境变量和系统属性是很简单的

System.getProperty("prop");
System.getenv("HOME");


verticle 隔离组

默认情况下,vertx有一个扁平的路径(flat classpath)。例如,当vertx部署verticle时,它使用现在的类加载器(classloader)而不是创建一个新的。在大多数情况下,这是最简单,清晰,明了的做法。

然而,在一些情况下,你也许想部署一个verticle。在你的应用中,这个verticle的class是和其他的分离的。(我理解的意思是:你写了一个verticleA,部署了两个verticleA实例,你想再部署一个verticleA实例,并且这个实例对应的class和另外两个的class不一样。这里涉及java虚拟机的一些东西详细解释后续补充)。

还有一个例子,例如,如果你想在同一个vertix实例中部署两个类名相同版本不同的verticle,或者你有两个使用同一个类库的不同版本的verticle。

当使用一个隔离组时,你需要使用setIsolatedClasses设置你想隔离的类名。类名可以是完整的全限定名,例如:com.mycompany.myproject.engine.MyClass或者是带有通配符的类名(这种会匹配包和子包中任何类),例如com.mycompany.myproject.*会匹配到包com.mycompany.myproject以及其子包下的任何类。

需要注意的是,只有匹配到的类会被隔离,其余不被隔离的类将被当前的类加载器(class loader)加载。

如果你想加载不在主类路径上的类或者资源,你可以使用setExtraClasspath设置额外的类路径。

警告:需要谨慎的使用这个功能。类加载器可能是个蠕虫罐子(装有许多虫子(bug)),它可能会使debug十分困难。

这里有一个使用隔离组去隔离verticle部署的例子:

DeploymentOptions options = new DeploymentOptions().setIsolationGroup("mygroup");
options.setIsolatedClasses(Arrays.asList("com.mycompany.myverticle.*",
"com.mycompany.somepkg.SomeClass", "org.somelibrary.*"));
vertx.deployVerticle("com.mycompany.myverticle.VerticleClass", options);


高可用性

verticle可以以高可用性(High Availability)的方式部署。在这种方式中,当一个vertx实例中的verticle突然挂了,集群上的其他vertx实例会重新部署这个verticle。

使用高可以性方式运行verticle,只需要在后面加上 -ha

vertx run my-verticle.js -ha

当启用高可用性,不再需要加上 -cluster

更多关于高可用性功能和配置的细节在高可用性和故障转移部分.

在命令行中运行verticle

你可以通过“添加对vertx core 类库的依赖“这种一般的方式在你的项目中直接使用vertx。

然而,你也可以使用命令行直接运行vertx 的verticle。

为了做到这些,你需要下载和安装vertx distribution,并且把bin目录地址添加到系统环境变量PATH中。当然,你也需要安装java 8 JDK。

注意:为了支持编译java代码,JDK是必须的。

安装完以上所说的东西后,你就可以使用vertx run命令运行verticle了。这里有一些例子:

# Run a JavaScript verticle
vertx run my_verticle.js

# Run a Ruby verticle
vertx run a_n_other_verticle.rb

# Run a Groovy script verticle, clustered
vertx run FooVerticle.groovy -cluster

你甚至可以先不编译,直接运行verticle的java源码。

vertx run SomeJavaSourceFile.java

vertx 会在运行它前,动态的编译这个java源码文件。这是对于快速建立原型和部署demo来说是非常有用的。

关于执行vertx命令时的全部可选值的资料,可以通过在命令行中输入vertx获取。

退出vertx

持有vertx实例的线程不是守护线程,所有它会阻止JVM退出。(涉及一些JVM的知识,后续补充。)

如果你正以嵌入的方式使用vertx,并且已经使用完了,你可以调用close来结束它。

这将会关闭全部内部线程池,关闭其他资源和允许JVM退出。

上下文对象(Context object)

当vertx传递一个事件(event)给一个处理方法(handler)或者调用verticle的start,stop方法时,这些方法会接受一个Context。通常一个context是一个event-loop context并且它捆绑一个特定的event loop 线程。所有关于context的操作总是发生在相同的event loop 线程。至于工作者verticle和运行内联阻塞代码,会有一个worker context与之关联,这些操作都由worker线程池里的线程运行。

可以使用getOrCreateContext获取context

Context context = vertx.getOrCreateContext();

如果当前线程已经和一个context关联,该线程会重用这个context对象。如果一个新的context实例被创建,你可以测试你所接收到的context的类型。

Context context = vertx.getOrCreateContext();
if (context.isEventLoopContext()) {
System.out.println("Context attached to Event Loop");
} else if (context.isWorkerContext()) {
System.out.println("Context attached to Worker Thread");
} else if (context.isMultiThreadedWorkerContext()) {
System.out.println("Context attached to Worker Thread - multi threaded worker");
} else if (! Context.isOnVertxThread()) {
System.out.println("Context not attached to a thread managed by vert.x");
}

当你已经接收到这个context对象, 你可以在这个context中异步运行代码。换句话说,你提交一个任务(task),这个任务将在相同的context中运行。

vertx.getOrCreateContext().runOnContext( (v) -> {
System.out.println("This will be executed asynchronously in the same context");
});

有时候在相同的context中运行的处理函数(handler)可能想共享数据。这个context对象提供存储和获取数据(数据应该是在这个context对象中分享的,也就是无法操作在其他context对象分享的数据)的方法。例如,可以传递数据给通过runOnContext运行的操作。

final Context context = vertx.getOrCreateContext();
context.put("data", "hello");
context.runOnContext((v) -> {
String hello = context.get("data");
});

你可以使用context对象的config方法获得verticle configuration。更多内容请看传递configuration 给一个verticle(Passing configuration to a verticle)章节。

执行定期和延迟操作

在vertx中,想定期或者延迟执行一些操作是很常见的。

在标准verticle(standard verticle)中,你不能简单的通过休眠线程来到达延迟执行的目的,因为这样会阻塞event loop 线程。

不过你可以使用vertx 的定时器(timer)。定时器(timer)可以是一次性(one-shot)或者是周期性的(periodic)。我们将会对两者进行详细说明。

一次性的定时器(one-shot Timer)

一个一次性定时器(one-shot Timer)会在将来某个时间调用处理函数(handler)。

可以使用setTimer方法设置一个一次性的定时器(one-shot Timer)。这个方法接受两个参数,分别是延迟时间(以毫秒为单位)和处理函数(handler)。

long timerID = vertx.setTimer(1000, id -> {
System.out.println("And one second later this is printed");
});

System.out.println("First this is printed");

setTimer方法返回的long值是这个定时器(Timer)的id,这个id是唯一的。你可以使用这个id来取消定时器(Timer)。设置定时器(Timer)时,传入的处理函数(Handler)也会接收一个long参数,这个参数就是定时器的id。

定期定时器(Periodic Timer)

你可以使用setPeriodic来设置一个定期定时器(Periodic Timer)。

setPeriodic方法同样会放回一个long值,该值的含义和用法与setTimer相同。

setPeriodic接收的参数和setTimer和大致相同,不同的是接收的参数“延迟时间(以毫秒为单位)“代表隔多久执行一次处理函数(Handler)。

需要注意的是,这种定时器(指定期定时器)是定期执行的。如果定期执行的操作持续时间较长,定时器可能会持续运行(例如设置一个隔一小时执行一次的定时器,而定时执行的操作恰好需要一小时完成)。还有更糟糕的情况是,定时器可能会叠加运行(例如,以一小时为周期的定时器,定时执行的操作需要耗时2小时)。

在这种情况,你应该考虑使用setTimer。当操作完成后,再设置下一个定时器。

long timerID = vertx.setPeriodic(1000, id -> {
System.out.println("And every second this is printed");
});

System.out.println("First this is printed");


取消定时器

可以调用cancelTimer取消定时器,这个方法需要传入定时器id。

vertx.cancelTimer(timerID)


自动清理verticle中的定时器(Timer)

如果你在verticle内创建了定时器,那么当这个verticle被卸载时,在这个verticle内创建的定时器也会被清理掉。

verticle的工作线程池(Verticle worker pool)

verticle使用vertx的工作线程池执行阻塞操作,例如executeBlocking和工作者verticle(worker verticle)。
可以在部署选项中指定不同的工作池:
vertx.deployVerticle("the-verticle", new DeploymentOptions().setWorkerPoolName("the-specific-pool"));


9.事件总线(Event Bus)

事件总线(event bus)是vert.x的神经系统。

每个vertx实例都拥有一个单独的事件总线实例,你可以使用eventBus方法获得它。

事件总线允许你应用程序的不同部分相互通信,即使不同部分的开发语言不同,所处的vertx实例不同,也能够相互通信。

它甚至可以被桥接,从而允许运行在浏览器的javaScript代码在相同的事件总线(event bus)上通信。

事件总线(event bus)形成了分布式的跨越多个服务器端点和多个浏览器的对等消息系统。

事件总线支持发布/订阅,端对端,请求-响应式的消息。(The event bus supports publish/subscribe, point to point, and request-response messaging.)

事件总线(event bus)的API是非常简单的。它主要上涉及注册、注销处理函数(Handler),发生和发布消息等操作。

首先介绍一些概念。

概念部分

1.地址

在事件总线中,消息将被送到一个地址。

vertx没有选用复杂的选址方案,在vertx中,一个地址就是一个简单的字符串(String)。任何字符串都可以被定义为地址。不够,使用某种格式命名地址是个明智的选择。例如使用限定名命名地址。

一些有效的地址命名例子:news.feed1,acme,games.pacman,sausages,X.

2.处理函数(handler)

消息是由处理函数(handler)接收。你可以在一个地址注册了一个处理函数(handler)。

一个地址可以注册多个处理函数(handler)。一个处理函数可以在多个地址上注册。

3.发布/订阅消息

事件总线支持发布消息。消息会被发布到一个地址。“发布”意味着传递消息到注册在对应地址上的所有处理函数(handler)。

4.端对端和请求-响应式(request-response)的消息

事件总线也支持端对端的消息。

这种模式下,消息会被发送到一个地址。vertx接着会把消息路由到这个地址上的其中一个处理函数(handler)上。

如果在这个地址注册了多个处理函数(handler),vertx将会使用非严格的轮询(non-strict round-robin)算法选择其中的一个。

在端对端的模式下, 可以在发送消息时,可以设置一个处理函数(handler)来处理回复(若A发消息给B,B可能会对该消息进行回复,也就B接收并处理消息后向A发一条回复消息)。

当一个消息已经被接收并且处理,接受者可以决定是否回复这个消息。如果接收者回复了消息,发送者设置的处理函数(handler)将被调用。

在上述情景下,如果发送者接收到了回复消息,发送者也能选择是否进行回复。发送者和接收者的对话可以无限延续,并且对话可以在两个不同的verticle中进行。

这是一种普通的消息模式,这种模式被称为请求-响应(request-response)模式。

5.最大努力交付(Best-effort delivery)

vertx会尽量发送消息,不会主动的丢弃消息。这种行为被称为最大努力交付(best-effor delivery)。

然而,在事件总线全部或者部分的失败例子中,存在消息遗失的可能。

如果你的应用在乎消息的丢失,你应该确保你编写的处理函数(handler)具有幂等性(idempotent)和回复和重试发送。(幂等性资料

6.消息的类型

开箱即用的vertx允许消息是任何原始和简单的类型,例如String 或者 buffers。

不过,在vertx中以json形式发送消息是方便和常见的。

在所有vertx支持的语言中,JSON是非常容易创建,读取和解析的。所以JSON已经成为一种vertx的通用语(lingua franca)。

当然,如果你不想用JSON,vertx不会强迫你用。

事件总线是非常灵活的,它也支持发送抽象任意对象。要做到发送任意对象,你需要为你想发送的对象定义codec

事件总线(Event Bus)API

1.获取事件总线(event bus)

你可以像下面这样获取事件总线(event bus)引用

EventBus eb = vertx.eventBus();

这是一个vertx实例的唯一的事件总线的实例。

2.注册处理函数

注册处理函数(handler)的最简单方式是使用consumer。这有个例子:

EventBus eb = vertx.eventBus();

eb.consumer("news.uk.sport", message -> {
System.out.println("I have received a message: " + message.body());
});

当一个消息到达,你的处理函数(handler)会被调用,并且被传入这个消息。

调用consumer()返回的对象是一个MessageConsumer实例。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Vert.x 中文文档