Javaでのリングバッファの実装

1。概要

このチュートリアルでは、Javaでリングバッファを実装する方法を学習します。

2.リングバッファ

リングバッファ(または循環バッファ)は、2つ以上のスレッド間でデータをバッファリングするために使用される制限付き循環データ構造です。リングバッファに書き込みを続けると、最後に到達するとラップアラウンドします。

2.1。使い方

リングバッファは、境界でラップアラウンドする固定サイズの配列を使用して実装されます。

アレイとは別に、次の3つのことを追跡します。

  • 要素を挿入するためにバッファ内で次に使用可能なスロット、
  • バッファ内の次の未読要素、
  • 配列の終わり–バッファが配列の先頭にラップアラウンドするポイント

リングバッファがこれらの要件を処理する方法の仕組みは、実装によって異なります。たとえば、この件に関するWikipediaのエントリには、4つのポインタを使用する方法が示されています。

シーケンスを使用したリングバッファのDisruptorの実装からアプローチを借ります。

最初に知っておく必要があるのは、容量、つまりバッファーの固定最大サイズです。次に、2つの単調に増加するシーケンスを使用します

  • 書き込みシーケンス:-1から始まり、要素を挿入すると1ずつ増加します
  • 読み取りシーケンス:0から始まり、要素を消費するにつれて1ずつ増加します

mod演算を使用して、シーケンスを配列内のインデックスにマップできます。

arrayIndex = sequence % capacity 

MOD操作は、バッファ内のスロットを導出する境界線の周りに配列をラップします

要素を挿入する方法を見てみましょう。

buffer[++writeSequence % capacity] = element 

要素を挿入する前に、シーケンスを事前にインクリメントしています。

要素を消費するために、ポストインクリメントを行います。

element = buffer[readSequence++ % capacity] 

この場合、シーケンスに対してポストインクリメントを実行します。要素を消費しても、その要素はバッファから削除されません上書きされるまで、配列内にとどまります

2.2。空のバッファと完全なバッファ

配列をラップアラウンドすると、バッファ内のデータの上書きが開始されます。バッファがいっぱいの場合、リーダーがデータを消費したかどうかに関係なく、最も古いデータを上書きするか、読み取られていないデータの上書きを防ぐかを選択できます

読者が中間値または古い値(株価のティッカーなど)を見逃す余裕がある場合は、データが消費されるのを待たずにデータを上書きできます。一方、リーダーがすべての値を消費する必要がある場合(eコマーストランザクションの場合など)、バッファーに使用可能なスロットができるまで待機(ブロック/ビジーウェイト)する必要があります。

バッファのサイズがその容量等しい場合、バッファはいっぱいです。ここで、そのサイズは未読要素の数に等しくなります。

size = (writeSequence - readSequence) + 1 isFull = (size == capacity) 

書き込みシーケンスが読み取りシーケンスより遅れている場合、バッファは空です:

isEmpty = writeSequence < readSequence 

バッファが空の場合、バッファはnull値を返します。

2.2。長所と短所

リングバッファは効率的なFIFOバッファです。事前に事前に割り当てることができる固定サイズの配列を使用し、効率的なメモリアクセスパターンを可能にします。すべてのバッファ操作は、要素のシフトを必要としないため、要素の消費を含め、一定時間O(1)です。

反対に、リングバッファの正しいサイズを決定することは重要です。たとえば、バッファのサイズが小さく、読み取りが遅い場合、書き込み操作が長時間ブロックされることがあります。動的なサイズ設定を使用できますが、データを移動する必要があり、上記の利点のほとんどを見逃してしまいます。

3.Javaでの実装

リングバッファがどのように機能するかを理解したので、Javaでの実装に進みましょう。

3.1。初期化

まず、事前定義された容量でバッファーを初期化するコンストラクターを定義しましょう。

public CircularBuffer(int capacity) { this.capacity = capacity; this.data = (E[]) new Object[capacity]; this.readSequence = 0; this.writeSequence = -1; } 

これにより、前のセクションで説明したように、空のバッファが作成され、シーケンスフィールドが初期化されます。

3.3。提供

次に、次に使用可能なスロットのバッファーに要素を挿入し、成功するとtrueを返すoffer操作を実装します。バッファが空のスロットを見つけられない場合、つまり未読の値を上書きできない場合は、falseを返します

Javaでofferメソッドを実装しましょう。

public boolean offer(E element) { boolean isFull = (writeSequence - readSequence) + 1 == capacity; if (!isFull) { int nextWriteSeq = writeSequence + 1; data[nextWriteSeq % capacity] = element; writeSequence++; return true; } return false; } 

そのため、書き込みシーケンスをインクリメントし、次に使用可能なスロットの配列内のインデックスを計算しています。次に、データをバッファに書き込み、更新された書き込みシーケンスを保存します。

それを試してみましょう:

@Test public void givenCircularBuffer_whenAnElementIsEnqueued_thenSizeIsOne() { CircularBuffer buffer = new CircularBuffer(defaultCapacity); assertTrue(buffer.offer("Square")); assertEquals(1, buffer.size()); } 

3.4。投票

最後に、次の未読要素を取得して削除するポーリング操作を実装します。ポーリング操作は、要素を削除するが、読み出しシーケンスをインクリメントしません

それを実装しましょう:

public E poll() { boolean isEmpty = writeSequence < readSequence; if (!isEmpty) { E nextValue = data[readSequence % capacity]; readSequence++; return nextValue; } return null; } 

ここでは、配列内のインデックスを計算することにより、現在の読み取りシーケンスでデータを読み取っています。次に、バッファが空でない場合は、シーケンスをインクリメントして値を返します。

それをテストしてみましょう:

@Test public void givenCircularBuffer_whenAnElementIsDequeued_thenElementMatchesEnqueuedElement() { CircularBuffer buffer = new CircularBuffer(defaultCapacity); buffer.offer("Triangle"); String shape = buffer.poll(); assertEquals("Triangle", shape); } 

4.生産者/消費者問題

2つ以上のスレッド間でデータを交換するためのリングバッファーの使用について説明しました。これは、生産者/消費者問題と呼ばれる同期問題の例です。Javaでは、セマフォ、制限付きキュー、リングバッファなどを使用して、さまざまな方法で生産者/消費者問題を解決できます。

リングバッファに基づくソリューションを実装しましょう。

4.1。揮発性シーケンスフィールド

リングバッファの実装はスレッドセーフではありません。単純な単一のプロデューサーと単一のコンシューマーの場合にスレッドセーフにしましょう。

プロデューサーはデータをバッファーに書き込み、writeSequenceをインクリメントしますが、コンシューマーはバッファーから読み取り、readSequenceをインクリメントするだけです。したがって、バッキングアレイは競合がなく、同期なしで回避できます。

ただし、コンシューマーがwriteSequenceフィールドの最新の値(可視性)を確認できること、およびデータが実際にバッファーで使用可能になる前にwriteSequenceが更新されないこと(順序付け)を確認する必要があります。

We can make the ring buffer concurrent and lock-free in this case by making the sequence fields volatile:

private volatile int writeSequence = -1, readSequence = 0; 

In the offer method, a write to the volatile field writeSequence guarantees that the writes to the buffer happen before updating the sequence. At the same time, the volatile visibility guarantee ensures that the consumer will always see the latest value of writeSequence.

4.2. Producer

Let's implement a simple producer Runnable that writes to the ring buffer:

public void run() { for (int i = 0; i < items.length;) { if (buffer.offer(items[i])) { System.out.println("Produced: " + items[i]); i++; } } } 

The producer thread would wait for an empty slot in a loop (busy-waiting).

4.3. Consumer

We'll implement a consumer Callable that reads from the buffer:

public T[] call() { T[] items = (T[]) new Object[expectedCount]; for (int i = 0; i < items.length;) { T item = buffer.poll(); if (item != null) { items[i++] = item; System.out.println("Consumed: " + item); } } return items; } 

コンシューマースレッドは、バッファーからnull値を受け取った場合、印刷せずに続行します

ドライバーコードを書いてみましょう。

executorService.submit(new Thread(new Producer(buffer))); executorService.submit(new Thread(new Consumer(buffer))); 

生産者/消費者プログラムを実行すると、次のような出力が生成されます。

Produced: Circle Produced: Triangle Consumed: Circle Produced: Rectangle Consumed: Triangle Consumed: Rectangle Produced: Square Produced: Rhombus Consumed: Square Produced: Trapezoid Consumed: Rhombus Consumed: Trapezoid Produced: Pentagon Produced: Pentagram Produced: Hexagon Consumed: Pentagon Consumed: Pentagram Produced: Hexagram Consumed: Hexagon Consumed: Hexagram 

5.結論

このチュートリアルでは、リングバッファーを実装する方法を学び、それを使用して生産者/消費者問題を解決する方法を探りました。

いつものように、すべての例のソースコードはGitHubで入手できます。