RxJavaの紹介

1。概要

この記事では、JavaでReactive Extensions(Rx)を使用して、データのシーケンスを作成および使用することに焦点を当てます。

一見すると、APIはJava 8 Streamsに似ているように見えますが、実際には、はるかに柔軟で流暢であり、強力なプログラミングパラダイムになっています。

RxJavaについて詳しく知りたい場合は、この記事をご覧ください。

2.セットアップ

MavenプロジェクトでRxJavaを使用するには、pom.xmlに次の依存関係を追加する必要があります

 io.reactivex rxjava ${rx.java.version} 

または、Gradleプロジェクトの場合:

compile 'io.reactivex.rxjava:rxjava:x.y.z'

3.関数型リアクティブコンセプト

一方で関数型プログラミングは、純粋関数を構成し、共有状態、可変データ、および副作用を回避することによってソフトウェアを構築するプロセスです。

一方、リアクティブプログラミングは、データストリームと変更の伝播に関係する非同期プログラミングパラダイムです。

一緒に、官能性反応プログラミングは、それが入るような値その経時変化や場所消費者が反応データへと-イベント駆動型プログラミングにエレガントなアプローチを表すことができる機能的反応技術の組み合わせを形成します。

このテクノロジーは、そのコア原則のさまざまな実装をまとめたもので、一部の作成者は、新しいタイプのアプリケーションを説明するための一般的な語彙を定義するドキュメントを作成しました。

3.1。反応性マニフェスト

Reactive Manifestoは、ソフトウェア開発業界内のアプリケーションの高水準を示すオンラインドキュメントです。簡単に言えば、リアクティブシステムは次のとおりです。

  • 応答性–システムはタイムリーに応答する必要があります
  • メッセージ駆動型–システムは、コンポーネント間の非同期メッセージパッシングを使用して、疎結合を確保する必要があります
  • 弾力性–システムは高負荷下でも応答性を維持する必要があります
  • 回復力–一部のコンポーネントに障害が発生しても、システムは応答性を維持する必要があります

4.オブザーバブル

Rxを使用するときに理解する2つの重要なタイプがあります

  • Observableは、データソースからデータを取得でき、他のオブジェクトが関心を登録する方法で状態が関心を持つ可能性があるオブジェクトを表します。
  • 観察者は通知されることを希望する任意のオブジェクトである場合、別のオブジェクトの状態が変化

観察者はに加入して観察可能なシーケンス。シーケンスは、アイテムを一度に1つずつオブザーバーに送信します。

観察者は、次のいずれかを処理する前に、それぞれを処理します。多くのイベントが非同期で発生する場合は、キューに保存するか、ドロップする必要があります。

ではRxの観察者は順不同でアイテムに呼び出されていないか、コールバックは、前の項目に戻った前に呼び出されることはないだろう。

4.1。オブザーバブルの種類

2つのタイプがあります:

  • ノンブロッキング–非同期実行がサポートされており、イベントストリームの任意の時点でサブスクライブを解除できます。この記事では、主にこの種のタイプに焦点を当てます
  • ブロッキング–すべてのonNextオブザーバー呼び出しは同期され、イベントストリームの途中でサブスクライブを解除することはできません。toBlockingメソッドを使用して、いつでもObservableBlockingObservableに変換できます
BlockingObservable blockingObservable = observable.toBlocking();

4.2。演算子

オペレータは、 1取る関数であるOのbservable最初の引数及び戻り別として(ソース)を観測(宛先)。次に、ソースobservableが発行するすべてのアイテムについて、そのアイテムに関数を適用し、宛先Observableで結果を発行します。

演算子をチェーン化して、特定の基準に基づいてイベントをフィルタリングする複雑なデータフローを作成できます。複数の演算子を同じオブザーバブルに適用できます。

Observableオペレーターまたはオブザーバーがアイテムを消費するよりも速くアイテムを放出している状況に入るのは難しくありません。背圧について詳しくは、こちらをご覧ください。

4.3。Observableを作成する

基本的な演算子は、完了する前に単一の汎用インスタンスである文字列「Hello」を発行するObservableを生成するだけです。Observableから情報を取得する場合は、オブザーバーインターフェイスを実装してから、目的のObservableでsubscribeを呼び出します

Observable observable = Observable.just("Hello"); observable.subscribe(s -> result = s); assertTrue(result.equals("Hello"));

4.4。OnNext、OnError、およびOnCompleted

オブザーバーインターフェイスには、次の3つの方法があります。

  1. OnNextは、添付のObservableに新しいイベントが公開されるたびにオブザーバーで呼び出されます。これは、各イベントに対して何らかのアクションを実行する方法です。
  2. OnCompleted is called when the sequence of events associated with an Observable is complete, indicating that we should not expect any more onNext calls on our observer
  3. OnError is called when an unhandled exception is thrown during the RxJava framework code or our event handling code

The return value for the Observablessubscribe method is a subscribe interface:

String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; Observable observable = Observable.from(letters); observable.subscribe( i -> result += i, //OnNext Throwable::printStackTrace, //OnError () -> result += "_Completed" //OnCompleted ); assertTrue(result.equals("abcdefg_Completed"));

5. Observable Transformations and Conditional Operators

5.1. Map

The map operator transforms items emitted by an Observable by applying a function to each item.

Let's assume there is a declared array of strings that contains some letters from the alphabet and we want to print them in capital mode:

Observable.from(letters) .map(String::toUpperCase) .subscribe(letter -> result += letter); assertTrue(result.equals("ABCDEFG"));

The flatMap can be used to flatten Observables whenever we end up with nested Observables.

More details about the difference between map and flatMap can be found here.

Assuming we have a method that returns an Observable from a list of strings. Now we'll be printing for each string from a new Observable the list of titles based on what Subscriber sees:

Observable getTitle() { return Observable.from(titleList); } Observable.just("book1", "book2") .flatMap(s -> getTitle()) .subscribe(l -> result += l); assertTrue(result.equals("titletitle"));

5.2. Scan

The scan operator applies a function to each item emitted by an Observable sequentially and emits each successive value.

It allows us to carry forward state from event to event:

String[] letters = {"a", "b", "c"}; Observable.from(letters) .scan(new StringBuilder(), StringBuilder::append) .subscribe(total -> result += total.toString()); assertTrue(result.equals("aababc"));

5.3. GroupBy

Group by operator allows us to classify the events in the input Observable into output categories.

Let's assume that we created an array of integers from 0 to 10, then apply group by that will divide them into the categories even and odd:

Observable.from(numbers) .groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD") .subscribe(group -> group.subscribe((number) -> { if (group.getKey().toString().equals("EVEN")) { EVEN[0] += number; } else { ODD[0] += number; } }) ); assertTrue(EVEN[0].equals("0246810")); assertTrue(ODD[0].equals("13579"));

5.4. Filter

The operator filter emits only those items from an observable that pass a predicate test.

So let's filter in an integer array for the odd numbers:

Observable.from(numbers) .filter(i -> (i % 2 == 1)) .subscribe(i -> result += i); assertTrue(result.equals("13579"));

5.5. Conditional Operators

DefaultIfEmpty emits item from the source Observable, or a default item if the source Observable is empty:

Observable.empty() .defaultIfEmpty("Observable is empty") .subscribe(s -> result += s); assertTrue(result.equals("Observable is empty"));

The following code emits the first letter of the alphabet ‘a' because the array letters is not empty and this is what it contains in the first position:

Observable.from(letters) .defaultIfEmpty("Observable is empty") .first() .subscribe(s -> result += s); assertTrue(result.equals("a"));

TakeWhile operator discards items emitted by an Observable after a specified condition becomes false:

Observable.from(numbers) .takeWhile(i -> i  sum[0] += s); assertTrue(sum[0] == 10);

Of course, there more others operators that could cover our needs like Contain, SkipWhile, SkipUntil, TakeUntil, etc.

6. Connectable Observables

A ConnectableObservable resembles an ordinary Observable, except that it doesn't begin emitting items when it is subscribed to, but only when the connect operator is applied to it.

In this way, we can wait for all intended observers to subscribe to the Observable before the Observable begins emitting items:

String[] result = {""}; ConnectableObservable connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); connectable.subscribe(i -> result[0] += i); assertFalse(result[0].equals("01")); connectable.connect(); Thread.sleep(500); assertTrue(result[0].equals("01"));

7. Single

Single is like an Observable who, instead of emitting a series of values, emits one value or an error notification.

With this source of data, we can only use two methods to subscribe:

  • OnSuccess returns a Single that also calls a method we specify
  • OnError also returns a Single that immediately notifies subscribers of an error
String[] result = {""}; Single single = Observable.just("Hello") .toSingle() .doOnSuccess(i -> result[0] += i) .doOnError(error -> { throw new RuntimeException(error.getMessage()); }); single.subscribe(); assertTrue(result[0].equals("Hello"));

8. Subjects

A Subject is simultaneously two elements, a subscriber and an observable. As a subscriber, a subject can be used to publish the events coming from more than one observable.

And because it's also observable, the events from multiple subscribers can be reemitted as its events to anyone observing it.

In the next example, we'll look at how the observers will be able to see the events that occur after they subscribe:

Integer subscriber1 = 0; Integer subscriber2 = 0; Observer getFirstObserver() { return new Observer() { @Override public void onNext(Integer value) { subscriber1 += value; } @Override public void onError(Throwable e) { System.out.println("error"); } @Override public void onCompleted() { System.out.println("Subscriber1 completed"); } }; } Observer getSecondObserver() { return new Observer() { @Override public void onNext(Integer value) { subscriber2 += value; } @Override public void onError(Throwable e) { System.out.println("error"); } @Override public void onCompleted() { System.out.println("Subscriber2 completed"); } }; } PublishSubject subject = PublishSubject.create(); subject.subscribe(getFirstObserver()); subject.onNext(1); subject.onNext(2); subject.onNext(3); subject.subscribe(getSecondObserver()); subject.onNext(4); subject.onCompleted(); assertTrue(subscriber1 + subscriber2 == 14)

9. Resource Management

Using operation allows us to associate resources, such as a JDBC database connection, a network connection, or open files to our observables.

ここでは、この目標を達成するために必要な手順と実装の例を解説で示しています。

String[] result = {""}; Observable values = Observable.using( () -> "MyResource", r -> { return Observable.create(o -> { for (Character c : r.toCharArray()) { o.onNext(c); } o.onCompleted(); }); }, r -> System.out.println("Disposed: " + r) ); values.subscribe( v -> result[0] += v, e -> result[0] += e ); assertTrue(result[0].equals("MyResource"));

10.結論

この記事では、RxJavaライブラリの使用方法と、その最も重要な機能を調べる方法について説明しました。

ここで使用されているすべてのコードサンプルを含むプロジェクトの完全なソースコードは、Githubにあります。