Vert.x-Core(一)- 基础篇

本文是我在学习Vert.x过程中的一些笔记,作为记录。
因为是初学,对Vert.x的理解还不够透彻,如有错误之处我们可以在评论中一起讨论呦。

Vert.x core模块是vertx的根基

是基于netty的一个工具包,提供tcp、http、websocket、dns、eventbus等基础功能封装

Vertx对象

Vertx对象是Vert.x的控制中心,是做一切事情的基础

直接创建该对象:Vertx vertx = Vertx.vertx();

或者在创建时使用相关配置属性:Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(40));

VertxOptions的具体属性参数参考

在Vertx中我们需要时刻保持eventLoop的畅通,当创建集群模式下的Vertx对象时,就不能用单机模式的方式了,因为让不同的 Vert.x 实例组成一个集群需要一些时间(也许是几秒钟)。在这段时间内,我们不想去阻塞调用线程,所以我们通过异步的方式来获取Vertx对象。

1
2
3
4
5
6
7
8
Vertx.clusteredVertx(new VertxOptions(), res -> {
if (res.succeeded()) {
Vertx vertx = res.result(); // 获取到了集群模式下的 Vertx 对象
// .....
} else {
// 获取失败,可能是集群管理器出现了问题
}
});

Vertx是事件驱动

当Vertx有一个事件要传递给某一个Hander去处理时,他会异步的去调用这个Hander。

Vertx中的大部分api都是不会阻塞线程的

传统的阻塞式的api,例如spring开发中,往往会有以下场景

线程a调用线程b,线程b执行逻辑,执行完毕后返回结果到线程a,线程a处理返回结果,线程a执行完毕

在这种场景下,线程a调用了线程b后就一直处于阻塞状态,如果此时有大量请求涌入,很可能造成灾难性的后果。

而如果使用Vertx来处理这种场景,则变成了如下的逻辑

线程a调用线程b,并告知线程b执行完毕后的通知线程c,线程a执行完毕。线程b开始执行执行完毕后通知线程c,线程c处理返回结果。

因为Vert.x API不会阻塞线程,所以通过Vert.x您可以只使用少量的线程来处理大量的并发。

EventLoop

Vert.x的api保证无阻塞的情况下,Vert.x使用 Event Loop 来调用您的处理器。Event Loop 可以在事件到达时快速地分发到不同的处理器中。由于没有阻塞,Event Loop 可在短时间内分发大量的事件。例如,一个单独的 Event Loop 可以非常迅速地处理数千个 HTTP 请求。这种方式被称为反应器(Reactor)模式,所以呢,在Vertx中有一条黄金法则:不要阻塞EventLoop

处理阻塞式代码

虽然Vertx的大部分api是无阻塞的,但仍然存在一些阻塞式的代码。比如数据库操作,如果这种方法或者线程运行在EventLoop上,势必会造成阻塞,这种情况下内,Vertx提供了专门为阻塞式代码执行和处理回调的方法。

  • executeBlocking
1
2
3
4
5
6
7
8
vertx.executeBlocking(future -> {
// 调用一些需要耗费很长时间返回结果的阻塞式API
String result = someAPI.blockingMethod("hello");
future.complete(result);
}, res -> {
//处理结果
System.out.println("The result is: " + res.result());
});
默认情况下,如果 executeBlocking 在同一个上下文环境中(如:同一个 Verticle 实例)被调用了多次,
那么这些不同的 executeBlocking 代码块会 顺序执行(一个接一个)。

若您不需要关心您调用 executeBlocking 的顺序,
可以将 ordered 参数的值设为 false。这样任何 executeBlocking 都会在 Worker Pool 中并行执行。
  • Worker Verticle
1
2
3
4
5
6
7
8
WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool");
executor.executeBlocking(future -> {
// 调用一些需要耗费显著执行时间返回结果的阻塞式API
String result = someAPI.blockingMethod("hello");
future.complete(result);
}, res -> {
System.out.println("The result is: " + res.result());
});

Worker Executor 在不需要的时候必须被关闭:

executor.close();

异步协调

在Vertx中,Future可以用来协调多个异步线程的操作结果,Future支持两种组合方式:并发组合、顺序组合

并发组合

static <T1,T2> CompositeFuture all(Future<T1> f1,Future<T2> f2)
static CompositeFuture all(List<Future> futures)

该方法接受多个 Future 对象作为参数(最多6个,或者传入 List)。当所有的 Future 都成功完成,该方法将返回一个 成功的 Future;当任一个 Future 执行失败,则返回一个 失败的 Future,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
Future<HttpServer> httpServerFuture = Future.future();
Future<NetServer> netServerFuture = Future.future();

httpServer.listen(httpServerFuture.completer());
netServer.listen(netServerFuture.completer());

CompositeFuture.all(httpServerFuture, netServerFuture).setHandler(ar -> {
if (ar.succeeded()) {
// 所有服务器启动完成
} else {
// 有一个服务器启动失败
}
});

当组合的处理操作完成时,该方法返回的 Future 上绑定的处理器(Handler)会被调用。当一个操作失败(其中的某一个 Future 的状态被标记成失败),则返回的 Future 会被标记为失败。当所有的操作都成功时,返回的 Future 将会成功完成。

static <T1,T2> CompositeFuture any(Future<T1> f1,Future<T2> f2)
static CompositeFuture any(List<Future> futures)

该方法的合并会等待第一个成功执行的Future。CompositeFuture.any 方法接受多个 Future 作为参数(最多6个,或传入 List)。当任意一个 Future 成功得到结果,则该 Future 成功;当所有的 Future 都执行失败,则该 Future 失败。

1
2
3
4
5
6
7
CompositeFuture.any(future1, future2).setHandler(ar -> {
if (ar.succeeded()) {
// 至少一个成功
} else {
// 所有的都失败
}
});

static <T1,T2> CompositeFuture join(Future<T1> f1,Future<T2> f2)
static CompositeFuture join(List<Future> futures)

join方法的合并会等待所有的 Future 完成,无论成败。CompositeFuture.join 方法接受多个 Future 作为参数(最多6个),并将结果归并成一个 Future 。当全部 Future 成功执行完成,得到的 Future 是成功状态的;当至少一个 Future 执行失败时,得到的 Future 是失败状态的。

1
2
3
4
5
6
7
CompositeFuture.join(future1, future2, future3).setHandler(ar -> {
if (ar.succeeded()) {
// 所有都成功
} else {
// 至少一个失败
}
});

顺序合并
和 all 、join以及 any 实现的并发组合不同,compose 方法作用于顺序组合 Future。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

Future<Void> startFuture = Future.future();
Future<Void> fut1 = Future.future();
FileSystem fs = vertx.fileSystem();

fs.createFile("/foo", fut1.completer());

fut1.compose(v -> {
// fut1中文件创建完成后执行
Future<Void> fut2 = Future.future();
fs.writeFile("/foo", Buffer.buffer(), fut2.completer());
return fut2;
}).compose(v -> {
// fut2文件写入完成后执行
System.out.println("--------------------");
fs.copy("/foo", "/foo", startFuture.completer());
},
// 如果任何一步失败,将startFuture标记成failed
startFuture)
.setHandler(a -> {
if (startFuture.succeeded()) {
System.out.println("success...");
} else {
System.out.println("error...");
}
});

这里例子中,有三个操作被串起来了:

  • 一个文件被创建(fut1)
  • 一些东西被写入到文件(fut2)
  • 文件被移走(startFuture)

如果这三个步骤全部成功,则最终的 Future(startFuture)会是成功的;其中任何一步失败,则最终 Future 就是失败的。

Verticle

Verticle 是由 Vert.x 部署和运行的代码块。一个应用程序通常是由在同一个 Vert.x 实例中同时运行的许多 Verticle 实例组合而成。不同的 Verticle 实例通过向 Event Bus 上发送消息来相互通信。

Verticle 的实现类必须实现 Verticle 接口。

如果您喜欢的话,可以直接实现该接口,但是通常直接从抽象类 AbstractVerticle 继承更简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MyVerticle extends AbstractVerticle {

// Called when verticle is deployed
// Verticle部署时调用
public void start() {
}

// Optional - called when verticle is undeployed
// 可选 - Verticle撤销时调用
public void stop() {
}

}

Verticle 种类

  • Stardand Verticle:这是最常用的一类 Verticle —— 它们永远运行在 Event Loop 线程上。

    • 当 Standard Verticle 被创建时,它会被分派给一个 Event Loop 线程,并在这个 Event Loop 中执行它的 start 方法。当您在一个 Event Loop 上调用了 Core API 中的方法并传入了处理器时,Vert.x 将保证用与调用该方法时相同的 Event Loop 来执行这些处理器。
    • 这意味着我们可以保证您的 Verticle 实例中 所有的代码都是在相同Event Loop中执行(只要您不创建自己的线程并调用它!)
    • 同样意味着您可以将您的应用中的所有代码用单线程方式编写,让 Vert.x 去考虑线程和扩展问题。您不用再考虑 synchronized 和 volatile 的问题,也可以避免传统的多线程应用经常会遇到的竞态条件和死锁的问题。
  • Worker Verticle:这类 Verticle 会运行在 Worker Pool 中的线程上。一个实例绝对不会被多个线程同时执行。

    • 不是由一个 Event Loop 来执行,而是由Vert.x中的 Worker Pool 中的线程执行。
    • Worker Verticle 被设计来调用阻塞式代码,它不会阻塞任何 Event Loop。
    • 将 Verticle 部署成一个 Worker Verticle,通过 如下方法来设置:
    1
    2
    DeploymentOptions options = new DeploymentOptions().setWorker(true);
    vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);
  • Multi-Threaded Worker Verticle:这类 Verticle 也会运行在 Worker Pool 中的线程上。一个实例可以由多个线程同时执行(因此需要开发者自己确保线程安全)。

Verticle部署

deployVerticle方法可用来部署Verticle,具体怎么部署可以看看源码中提供的方法:
package io.vertx.core.Vertx;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@GenIgnore({"permitted-type"})
void deployVerticle(Verticle var1);

@GenIgnore({"permitted-type"})
void deployVerticle(Verticle var1, Handler<AsyncResult<String>> var2);

@GenIgnore({"permitted-type"})
void deployVerticle(Verticle var1, DeploymentOptions var2);

@GenIgnore({"permitted-type"})
void deployVerticle(Verticle var1, DeploymentOptions var2, Handler<AsyncResult<String>> var3);

@GenIgnore
void deployVerticle(Class<? extends Verticle> var1, DeploymentOptions var2);

@GenIgnore
void deployVerticle(Class<? extends Verticle> var1, DeploymentOptions var2, Handler<AsyncResult<String>> var3);

@GenIgnore({"permitted-type"})
void deployVerticle(Supplier<Verticle> var1, DeploymentOptions var2);

@GenIgnore({"permitted-type"})
void deployVerticle(Supplier<Verticle> var1, DeploymentOptions var2, Handler<AsyncResult<String>> var3);

void deployVerticle(String var1);

void deployVerticle(String var1, Handler<AsyncResult<String>> var2);

void deployVerticle(String var1, DeploymentOptions var2);

void deployVerticle(String var1, DeploymentOptions var2, Handler<AsyncResult<String>> var3);

阅读源码可以看出,部署方式大概有两类

  • 实例部署
    • vertx.deployVerticle(new MyFirstVerticle());
    • vertx.deployVerticle(MyFirstVerticle.class,new DeploymentOptions());
    • vertx.deployVerticle(MyFirstVerticle::new,new DeploymentOptions());
  • 类名部署
    • vertx.deployVerticle(“com.zhengql.vertx.MyFirstVerticle”);

Verticle的部署是异步的,当我们调用deployVerticle方法后,部署结果不是立即返回的,我们可以同步绑定处理异步返回结果的处理器:
void deployVerticle(String var1, Handler<AsyncResult<String>> var2);

1
2
3
4
5
6
7
8
vertx.deployVerticle("com.zhengql.vertx.MyFirstVerticle", res -> {
if (res.succeeded()) {
//如果部署成功,这个完成处理器的结果中将会包含部署ID的字符串。这个部署 ID可以在之后您想要撤销它时使用。
System.out.println("Deployment id is: " + res.result());
} else {
System.out.println("Deployment failed!");
}
});

撤销Verticle

我们可以通过 undeploy 方法来撤销部署好的 Verticle。

撤销操作也是异步的,因此若您想要在撤销完成过后收到通知则可以指定另一个完成处理器:

1
2
3
4
5
6
7
vertx.undeploy(deploymentID, res -> {
if (res.succeeded()) {
System.out.println("Undeployed ok");
} else {
System.out.println("Undeploy failed!");
}
});

DeploymentOptions

在上边的部署方法api中有一个参数是DeploymentOptions ,可以通过配置自定义的配置来部署Verticle。

指定Verticle的实例数量。

1
2
DeploymentOptions options = new DeploymentOptions().setInstances(2);
vertx.deployVerticle("com.zhengql.vertx.MyFirstVerticle", options);

部署时传给 Verticle 一个 JSON 格式的配置,该配置中的值可以在Verticle的start()中通过config().getString()方法来获取。

1
2
3
JsonObject config = new JsonObject().put("name", "zhengql").put("age", 18);
DeploymentOptions options = new DeploymentOptions().setConfig(config);
vertx.deployVerticle("com.zhengql.vertx.MyFirstVerticle", options);

定时任务

在Vertx中我们要求了Verticle不可以阻塞EventLoop,所以我们不能在Verticle中使用线程调度方法sleep、wait等,好在Vert.x为我们提供了专用的定时器

一次性定时器 setTimer
一次性计时器会在一定延迟后调用 Event Handler如下
延迟5s,打印字符串

1
2
3
4
//5000代表延迟时间,单位毫秒
long timerID = vertx.setTimer(5000, id -> {
System.out.println("hello xiaogege");
});

周期性定时器 setPeriodic
周期性触发的定时器setPeriodic,在任务第一次触发前也是需要延时的,demo如下
每5s打印一次字符串,注:第一次不是立即触发,时间单位毫秒

1
2
3
long timerID = vertx.setPeriodic(5000, id -> {
System.out.println("hello xiaojiejie");
});

取消定时任务 cancelTimer

首先先来看看定时任务的api

long setTimer(long var1, Handler<Long> var3);

long setPeriodic(long var1, Handler<Long> var3);

boolean cancelTimer(long var1);

从api中可以看出,设置定时任务都有一个long型的返回值,取消定时任务需要一个long型的参数,可能你已经猜到了两者之间的关系,那么这个long型的返回值是什么呢?

1
2
3
4
5
6
7
8
9
setTimer
long setTimer(long delay,
Handler<Long> handler)
Set a one-shot timer to fire after delay milliseconds, at which point handler will be called with the id of the timer.
Parameters:
delay - the delay in milliseconds, after which the timer will fire
handler - the handler that will be called with the timer ID when the timer fires
Returns:
the unique ID of the timer

这个返回值是定时器的唯一id,当定时器触发调用处理器也是通过这个唯一id。

撤销定时器直接调用cancelTimer即可

资料参考

https://vertx.io/docs/

https://vertx.io/docs/vertx-core/java/

---------- 😏本文结束  感谢您的阅读😏 ----------
评论