JavaでのCountDownLatchのガイド

1.はじめに

この記事では、CountDownLatchクラスのガイドを提供し、いくつかの実用的な例でどのように使用できるかを示します。

基本的に、CountDownLatchを使用することで、他のスレッドが特定のタスクを完了するまでスレッドをブロックさせることができます。

2.並行プログラミングでの使用法

簡単に言うと、CountDownLatchにはカウンターフィールドがあり、必要に応じてデクリメントできます。次に、それを使用して、ゼロにカウントダウンされるまで呼び出しスレッドをブロックできます。

並列処理を行っている場合は、処理するスレッドの数と同じカウンターの値を使用してCountDownLatchをインスタンス化できます。次に、各スレッドが終了した後にcountdown()を呼び出すだけで、await()を呼び出す依存スレッドがワーカースレッドが終了するまでブロックされることが保証されます。

3.スレッドのプールが完了するのを待つ

ワーカーを作成し、CountDownLatchフィールドを使用して、完了したことを通知することにより、このパターンを試してみましょう。

public class Worker implements Runnable { private List outputScraper; private CountDownLatch countDownLatch; public Worker(List outputScraper, CountDownLatch countDownLatch) { this.outputScraper = outputScraper; this.countDownLatch = countDownLatch; } @Override public void run() { doSomeWork(); outputScraper.add("Counted down"); countDownLatch.countDown(); } }

次に、CountDownLatchを取得してWorkerインスタンスが完了するのを待機できることを証明するためのテストを作成しましょう。

@Test public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion() throws InterruptedException { List outputScraper = Collections.synchronizedList(new ArrayList()); CountDownLatch countDownLatch = new CountDownLatch(5); List workers = Stream .generate(() -> new Thread(new Worker(outputScraper, countDownLatch))) .limit(5) .collect(toList()); workers.forEach(Thread::start); countDownLatch.await(); outputScraper.add("Latch released"); assertThat(outputScraper) .containsExactly( "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Latch released" ); }

当然、「ラッチリリース」は常に最後の出力になります。これは、CountDownLatchリリースに依存しているためです。

await()を呼び出さなかった場合、スレッドの実行順序を保証できないため、テストはランダムに失敗することに注意してください。

4.開始を待機しているスレッドのプール

前の例を取り上げたが、今回は5つではなく数千のスレッドを開始した場合、後のスレッドでstart()を呼び出す前に、前のスレッドの多くが処理を終了している可能性があります。これにより、すべてのスレッドを並行して実行することができないため、並行性の問題を再現することが困難になる可能性があります。

これを回避するために、CountdownLatchが前の例とは異なる動作をするようにしましょう。一部の子スレッドが終了するまで親スレッドをブロックする代わりに、他のすべてのスレッドが開始するまで各子スレッドをブロックできます。

run()メソッドを変更して、処理前にブロックするようにします。

public class WaitingWorker implements Runnable { private List outputScraper; private CountDownLatch readyThreadCounter; private CountDownLatch callingThreadBlocker; private CountDownLatch completedThreadCounter; public WaitingWorker( List outputScraper, CountDownLatch readyThreadCounter, CountDownLatch callingThreadBlocker, CountDownLatch completedThreadCounter) { this.outputScraper = outputScraper; this.readyThreadCounter = readyThreadCounter; this.callingThreadBlocker = callingThreadBlocker; this.completedThreadCounter = completedThreadCounter; } @Override public void run() { readyThreadCounter.countDown(); try { callingThreadBlocker.await(); doSomeWork(); outputScraper.add("Counted down"); } catch (InterruptedException e) { e.printStackTrace(); } finally { completedThreadCounter.countDown(); } } }

それでは、すべてのワーカーが開始するまでブロックし、ワーカーのブロックを解除してから、ワーカーが終了するまでブロックするようにテストを変更しましょう。

@Test public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime() throws InterruptedException { List outputScraper = Collections.synchronizedList(new ArrayList()); CountDownLatch readyThreadCounter = new CountDownLatch(5); CountDownLatch callingThreadBlocker = new CountDownLatch(1); CountDownLatch completedThreadCounter = new CountDownLatch(5); List workers = Stream .generate(() -> new Thread(new WaitingWorker( outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter))) .limit(5) .collect(toList()); workers.forEach(Thread::start); readyThreadCounter.await(); outputScraper.add("Workers ready"); callingThreadBlocker.countDown(); completedThreadCounter.await(); outputScraper.add("Workers complete"); assertThat(outputScraper) .containsExactly( "Workers ready", "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Workers complete" ); }

このパターンは、数千のスレッドにいくつかのロジックを並行して実行させるために使用できるため、同時実行のバグを再現しようとする場合に非常に役立ちます。

5.カウントダウンラッチを早期に終了する

場合によっては、CountDownLatchをカウントダウンする前にワーカーが誤って終了する状況に遭遇することがあります。これにより、ゼロに到達せず、await()が終了しない可能性があります。

@Override public void run() { if (true) { throw new RuntimeException("Oh dear, I'm a BrokenWorker"); } countDownLatch.countDown(); outputScraper.add("Counted down"); }

await()が永久にブロックする方法を示すために、BrokenWorkerを使用するように以前のテストを変更してみましょう。

@Test public void whenFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck() throws InterruptedException { List outputScraper = Collections.synchronizedList(new ArrayList()); CountDownLatch countDownLatch = new CountDownLatch(5); List workers = Stream .generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch))) .limit(5) .collect(toList()); workers.forEach(Thread::start); countDownLatch.await(); }

明らかに、これは私たちが望む動作ではありません。アプリケーションが無限にブロックするよりも継続する方がはるかに優れています。

これを回避するために、await()の呼び出しにtimeout引数を追加しましょう

boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS); assertThat(completed).isFalse();

ご覧のとおり、テストは最終的にタイムアウトになり、await()falseを返します

6.結論

このクイックガイドでは、CountDownLatchを使用して、他のスレッドが処理を完了するまでスレッドをブロックする方法を示しました。

また、スレッドが並行して実行されるようにすることで、並行性の問題をデバッグするためにどのように使用できるかを示しました。

これらの例の実装はGitHubにあります。これはMavenベースのプロジェクトであるため、そのまま実行するのは簡単です。