炉心入門

1.はじめに

Reactor Coreは、リアクティブプログラミングモデルを実装するJava8ライブラリです。これは、リアクティブアプリケーションを構築するための標準であるリアクティブストリーム仕様に基づいて構築されています。

非リアクティブJava開発の背景から、リアクティブになることは非常に急な学習曲線になる可能性があります。これは、Java 8 Stream APIと比較すると、同じ高レベルの抽象化であると誤解される可能性があるため、より困難になります。

この記事では、このパラダイムをわかりやすく説明します。リアクティブコードを作成する方法の図を作成するまで、Reactorを少しずつ進め、後のシリーズで登場するより高度な記事の基礎を築きます。

2.リアクティブストリームの仕様

Reactorを見る前に、ReactiveStreams仕様を見る必要があります。これはReactorが実装するものであり、ライブラリの基礎を築きます。

基本的に、ReactiveStreamsは非同期ストリーム処理の仕様です。

つまり、多くのイベントが非同期的に生成および消費されるシステムです。金融アプリケーションに入る毎秒数千の株式更新のストリームについて考えてみてください。そして、それらの更新にタイムリーに応答する必要があります。

これの主な目標の1つは、背圧の問題に対処することです。イベントを処理できるよりも速くコンシューマーにイベントを発行しているプロデューサーがいる場合、最終的にコンシューマーはイベントに圧倒され、システムリソースが不足します。

背圧とは、これを防ぐために、消費者が送信するデータの量をプロデューサーに伝えることができる必要があることを意味します。これは、仕様に規定されていることです。

3.Mavenの依存関係

始める前に、Mavenの依存関係を追加しましょう。

 io.projectreactor reactor-core 3.3.9.RELEASE   ch.qos.logback logback-classic 1.1.3 

Logbackも依存関係として追加しています。これは、データの流れをよりよく理解するために、Reactorの出力をログに記録するためです。

4.データストリームの生成

アプリケーションがリアクティブであるためには、最初に実行できる必要があるのは、データのストリームを生成することです。

これは、前に示した株式更新の例のようなものである可能性があります。このデータがなければ、反応するものは何もありません。これが論理的な最初のステップである理由です。

Reactive Coreは、これを可能にする2つのデータ型を提供します。

4.1。フラックス

これを行う最初の方法は、Fluxを使用することです。これは、0..n要素を放出できるストリームです。簡単なものを作成してみましょう:

Flux just = Flux.just(1, 2, 3, 4);

この場合、4つの要素の静的ストリームがあります。

4.2。単核症

これを行う2番目の方法は、0..1要素のストリームであるMonoを使用することです。1つをインスタンス化してみましょう:

Mono just = Mono.just(1);

これは、Fluxとほぼ同じように見え、動作しますが、今回は1つの要素のみに制限されています。

4.3。なぜフラックスだけではないのですか?

さらに実験する前に、これら2つのデータ型がある理由を強調する価値があります。

まず、両方のことに留意すべきであるフラックスモノラル反応ストリームの実装であるパブリッシャインターフェース。どちらのクラスも仕様に準拠しており、代わりにこのインターフェイスを使用できます。

Publisher just = Mono.just("foo");

しかし、実際には、このカーディナリティを知ることは役に立ちます。これは、いくつかの操作が2つのタイプのいずれかに対してのみ意味があり、より表現力が高いためです(リポジトリ内のfindOne()を想像してください)。

5.ストリームへのサブスクライブ

これで、データストリームを生成する方法の概要がわかりました。要素を出力するには、データストリームをサブスクライブする必要があります。

5.1。要素の収集

subscribe()メソッドを使用して、ストリーム内のすべての要素を収集しましょう。

List elements = new ArrayList(); Flux.just(1, 2, 3, 4) .log() .subscribe(elements::add); assertThat(elements).containsExactly(1, 2, 3, 4);

サブスクライブするまでデータは流れ始めません。ロギングも追加したことに注意してください。これは、舞台裏で何が起こっているかを確認するときに役立ちます。

5.2。要素の流れ

ログインを設定すると、それを使用して、データがストリームをどのように流れているかを視覚化できます。

20:25:19.550 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | request(unbounded) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(1) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(2) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(3) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(4) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onComplete()

まず、すべてがメインスレッドで実行されています。この記事の後半で並行性についてさらに検討するため、これについては詳しく説明しません。ただし、すべてを順番に処理できるため、作業が簡単になります。

次に、ログに記録したシーケンスを1つずつ見ていきましょう。

  1. onSubscribe() –これはストリームをサブスクライブするときに呼び出されます
  2. request(unbounded) – When we call subscribe, behind the scenes we are creating a Subscription. This subscription requests elements from the stream. In this case, it defaults to unbounded, meaning it requests every single element available
  3. onNext() – This is called on every single element
  4. onComplete() – This is called last, after receiving the last element. There's actually a onError() as well, which would be called if there is an exception, but in this case, there isn't

This is the flow laid out in the Subscriber interface as part of the Reactive Streams Specification, and in reality, that's what's been instantiated behind the scenes in our call to onSubscribe(). It's a useful method, but to better understand what's happening let's provide a Subscriber interface directly:

Flux.just(1, 2, 3, 4) .log() .subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(Integer integer) { elements.add(integer); } @Override public void onError(Throwable t) {} @Override public void onComplete() {} });

We can see that each possible stage in the above flow maps to a method in the Subscriber implementation. It just happens that the Flux has provided us with a helper method to reduce this verbosity.

5.3. Comparison to Java 8 Streams

It still might appear that we have something synonymous to a Java 8 Stream doing collect:

List collected = Stream.of(1, 2, 3, 4) .collect(toList());

Only we don't.

The core difference is that Reactive is a push model, whereas the Java 8 Streams are a pull model. In a reactive approach, events are pushed to the subscribers as they come in.

The next thing to notice is a Streams terminal operator is just that, terminal, pulling all the data and returning a result. With Reactive we could have an infinite stream coming in from an external resource, with multiple subscribers attached and removed on an ad hoc basis. We can also do things like combine streams, throttle streams and apply backpressure, which we will cover next.

6. Backpressure

The next thing we should consider is backpressure. In our example, the subscriber is telling the producer to push every single element at once. This could end up becoming overwhelming for the subscriber, consuming all of its resources.

Backpressure is when a downstream can tell an upstream to send it fewer data in order to prevent it from being overwhelmed.

We can modify our Subscriber implementation to apply backpressure. Let's tell the upstream to only send two elements at a time by using request():

Flux.just(1, 2, 3, 4) .log() .subscribe(new Subscriber() { private Subscription s; int onNextAmount; @Override public void onSubscribe(Subscription s) { this.s = s; s.request(2); } @Override public void onNext(Integer integer) { elements.add(integer); onNextAmount++; if (onNextAmount % 2 == 0) { s.request(2); } } @Override public void onError(Throwable t) {} @Override public void onComplete() {} });

Now if we run our code again, we'll see the request(2) is called, followed by two onNext() calls, then request(2) again.

23:31:15.395 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 23:31:15.397 [main] INFO reactor.Flux.Array.1 - | request(2) 23:31:15.397 [main] INFO reactor.Flux.Array.1 - | onNext(1) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(2) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(3) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(4) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onComplete()

Essentially, this is reactive pull backpressure. We are requesting the upstream to only push a certain amount of elements, and only when we are ready.

If we imagine we were being streamed tweets from twitter, it would then be up to the upstream to decide what to do. If tweets were coming in but there are no requests from the downstream, then the upstream could drop items, store them in a buffer, or some other strategy.

7. Operating on a Stream

We can also perform operations on the data in our stream, responding to events as we see fit.

7.1. Mapping Data in a Stream

A simple operation that we can perform is applying a transformation. In this case, let's just double all the numbers in our stream:

Flux.just(1, 2, 3, 4) .log() .map(i -> i * 2) .subscribe(elements::add);

map() will be applied when onNext() is called.

7.2. Combining Two Streams

We can then make things more interesting by combining another stream with this one. Let's try this by using zip() function:

Flux.just(1, 2, 3, 4) .log() .map(i -> i * 2) .zipWith(Flux.range(0, Integer.MAX_VALUE), (one, two) -> String.format("First Flux: %d, Second Flux: %d", one, two)) .subscribe(elements::add); assertThat(elements).containsExactly( "First Flux: 2, Second Flux: 0", "First Flux: 4, Second Flux: 1", "First Flux: 6, Second Flux: 2", "First Flux: 8, Second Flux: 3");

Here, we are creating another Flux that keeps incrementing by one and streaming it together with our original one. We can see how these work together by inspecting the logs:

20:04:38.064 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 20:04:38.065 [main] INFO reactor.Flux.Array.1 - | onNext(1) 20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) 20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onNext(0) 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(2) 20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(1) 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(3) 20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(2) 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(4) 20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(3) 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onComplete() 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | cancel() 20:04:38.067 [main] INFO reactor.Flux.Range.2 - | cancel()

Note how we now have one subscription per Flux. The onNext() calls are also alternated, so the index of each element in the stream will match when we apply the zip() function.

8. Hot Streams

Currently, we've focused primarily on cold streams. These are static, fixed-length streams that are easy to deal with. A more realistic use case for reactive might be something that happens infinitely.

For example, we could have a stream of mouse movements that constantly needs to be reacted to or a twitter feed. These types of streams are called hot streams, as they are always running and can be subscribed to at any point in time, missing the start of the data.

8.1. Creating a ConnectableFlux

One way to create a hot stream is by converting a cold stream into one. Let's create a Flux that lasts forever, outputting the results to the console, which would simulate an infinite stream of data coming from an external resource:

ConnectableFlux publish = Flux.create(fluxSink -> { while(true) { fluxSink.next(System.currentTimeMillis()); } }) .publish();

By calling publish() we are given a ConnectableFlux. This means that calling subscribe() won't cause it to start emitting, allowing us to add multiple subscriptions:

publish.subscribe(System.out::println); publish.subscribe(System.out::println);

If we try running this code, nothing will happen. It's not until we call connect(), that the Flux will start emitting:

publish.connect();

8.2. Throttling

If we run our code, our console will be overwhelmed with logging. This is simulating a situation where too much data is being passed to our consumers. Let's try getting around this with throttling:

ConnectableFlux publish = Flux.create(fluxSink -> { while(true) { fluxSink.next(System.currentTimeMillis()); } }) .sample(ofSeconds(2)) .publish();

Here, we've introduced a sample() method with an interval of two seconds. Now values will only be pushed to our subscriber every two seconds, meaning the console will be a lot less hectic.

Of course, there are multiple strategies to reduce the amount of data sent downstream, such as windowing and buffering, but they will be left out of scope for this article.

9. Concurrency

All of our above examples have currently run on the main thread. However, we can control which thread our code runs on if we want. The Scheduler interface provides an abstraction around asynchronous code, for which many implementations are provided for us. Let's try subscribing to a different thread to main:

Flux.just(1, 2, 3, 4) .log() .map(i -> i * 2) .subscribeOn(Schedulers.parallel()) .subscribe(elements::add);

The Parallel scheduler will cause our subscription to be run on a different thread, which we can prove by looking at the logs. We see the first entry comes from the main thread and the Flux is running in another thread called parallel-1.

20:03:27.505 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 20:03:27.529 [parallel-1] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | request(unbounded) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(1) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(2) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(3) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(4) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onComplete()

Concurrency get's more interesting than this, and it will be worth us exploring it in another article.

10. Conclusion

この記事では、ReactiveCoreの概要をエンドツーエンドで説明しました。ストリームのパブリッシュとサブスクライブ、バックプレッシャの適用、ストリームの操作、およびデータの非同期処理の方法について説明しました。これにより、リアクティブアプリケーションを作成するための基礎が築かれるはずです。

このシリーズの後半の記事では、より高度な並行性とその他の反応的な概念について説明します。Reactor withSpringをカバーする別の記事もあります。

アプリケーションのソースコードは、GitHubで入手できます。これは、そのまま実行できるはずのMavenプロジェクトです。