Reactor 3 参考文档阅读笔记

本文将记录笔者
Reactor 3 文档中文翻译 的阅读笔记。

快速上手

Flux 用于 N 个元素, Mono用于 0/1 个元素。

响应式编程

阻塞是对资源的浪费

  1. 并行化:使用更多的线程和硬件资源
  2. 基于现有的资源来提高效率

异步可以解决问题吗?

两种异步编程方式

  1. 回调 (回调地狱)
  2. Futures (对于多个处理的组合不好用,get()方法仍然会阻塞且缺乏对多个值以及更进一步对错误的处理。)

反正这里就是说:Reactor一级棒

从命令式编程到响应式编程

Reactor 这样的响应式编程库就是要弥补上述“经典”的 JVM 异步方式所带来的不足。

  1. 可编排性(Composability) 以及 可读性(Readability)

  2. 使用丰富的 操作符 来处理形如 流 的数据

  3. 在 订阅(subscribe) 之前什么都不会发生

  4. 背压(backpressure) 具体来说即 消费者能够反向告知生产者生产内容的速度的能力

  5. 高层次 (同时也是有高价值的)的抽象,从而达到 并发无关 的效果

可编排性与可读性

可编排性,指的是编排多个异步任务的能力。比如我们将前一个任务的结果传递给后一个任务作为输入, 或者将多个任务以分解再汇总(fork-join)的形式执行,或者将异步的任务作为离散的组件在系统中 进行重用。

Reactor 提供了丰富的编排操作,从而代码直观反映了处理流程,并且所有的操作保持在同一层次 (尽量避免了嵌套)。

就像装配流水线

你可以想象数据在响应式应用中的处理,就像流过一条装配流水线。Reactor 既是传送带, 又是一个个的装配工或机器人。原材料从源头(最初的 Publisher)流出,最终被加工为成品, 等待被推送到消费者(或者说 Subscriber)。

原材料会经过不同的中间处理过程,或者作为半成品与其他半成品进行组装。如果某处有齿轮卡住, 或者某件产品的包装过程花费了太久时间,相应的工位就可以向上游发出信号来限制或停止发出原材料。

这两段比喻可以说时很形象了,从代码角度.两下就完事了。

操作符

在 Reactor 中,操作符(operator)就像装配线中的工位(操作员或装配机器人)。每一个操作符 对 Publisher 进行相应的处理,然后将 Publisher 包装为一个新的 Publisher。就像一个链条, 数据源自第一个 Publisher,然后顺链条而下,在每个环节进行相应的处理。最终,一个订阅者 (Subscriber)终结这个过程。请记住,在订阅者(Subscriber)订阅(subscribe)到一个 发布者(Publisher)之前,什么都不会发生。

subscribe()之前什么都不会发生

在 Reactor 中,当你创建了一条 Publisher 处理链,数据还不会开始生成。事实上,你是创建了 一种抽象的对于异步处理流程的描述(从而方便重用和组装)。

当真正“订阅(subscrib)”的时候,你需要将 Publisher 关联到一个 Subscriber 上,然后 才会触发整个链的流动。这时候,Subscriber 会向上游发送一个 request 信号,一直到达源头 的 Publisher。

背压

向上游传递信号这一点也被用于实现 背压 ,就像在装配线上,某个工位的处理速度如果慢于流水线 速度,会对上游发送反馈信号一样。

热 vs 冷

在 Rx 家族的响应式库中,响应式流分为“热”和“冷”两种类型,区别主要在于响应式流如何 对订阅者进行响应:

  1. 一个“冷”的序列,指对于每一个 Subscriber,都会收到从头开始所有的数据。如果源头 生成了一个 HTTP 请求,对于每一个订阅都会创建一个新的 HTTP 请求。

  2. 一个“热”的序列,指对于一个 Subscriber,只能获取从它开始 订阅 之后 发出的数据。不过注意,有些“热”的响应式流可以缓存部分或全部历史数据。 通常意义上来说,一个“热”的响应式流,甚至在即使没有订阅者接收数据的情况下,也可以 发出数据(这一点同 “Subscribe() 之前什么都不会发生”的规则有冲突)。

Reactor核心特性

Reactor 项目的主要 artifact 是 reactor-core,这是一个基于 Java 8 的实现了响应式流规范 (Reactive Streams specification)的响应式库。

简单的创建和订阅Flux或Mono的方法

Flux 和 Mono 提供的工厂方法

1
2
3
4
5
6
7
8
9
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");

List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
Mono<String> noData = Mono.empty();

Mono<String> data = Mono.just("foo");

Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);

基于lambda的对Flux的订阅

1
subscribe();

订阅并触发序列

1
subscribe(Consumer<? super T> consumer);

对每个生成的元素进行消费

1
2
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);

对正常元素进行消费,也对错误进行响应。

1
2
3
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);

对正常元素和错误均有响应,还定义了序列正常完成后的回调

1
2
3
4
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);

对正常元素、错误和完成信号均有响应,同时也定义了对该subscribe方法返回的Subscription执行的回调

以上方法会返回一个 Subscription 的引用,如果不再需要更多元素你可以通过它来取消订阅。 取消订阅时, 源头会停止生成新的数据,并清理相关资源。取消和清理的操作在 Reactor 中是在 接口 Disposable 中定义的。

subscribe方法示例

1
Flux<Integer> ints = Flux.range(1, 3);

配置一个在订阅的时候产生3个值得Flux。

1
ints.subscribe();

最简单得订阅方式。

第二行代码没有任何输出,但是它确实执行了。Flux 产生了3个值。如果我们传入一个 lambda, 我们就可以看到这几个值,如下一个列子:

1
Flux<Integer> ints = Flux.range(1, 3);

配置一个在订阅时会产生3个值得Flux。

1
ints.subscribe(i -> System.out.println(i));

订阅它并打印值。

== 待续 ==