Java EE7バッチ処理

1.はじめに

給与明細の処理、利息の計算、請求書の生成などのタスクを手動で完了する必要があると想像してください。それは非常に退屈で、エラーが発生しやすく、手動タスクの終わりのないリストになります!

このチュートリアルでは、JakartaEEプラットフォームの一部であるJavaBatch Processing(JSR 352)と、このようなタスクを自動化するための優れた仕様について説明します。これは、アプリケーション開発者がビジネスロジックに集中できるように、堅牢なバッチ処理システムを開発するためのモデルを提供します。

2.Mavenの依存関係

JSR 352は単なる仕様であるため、jberetのようにAPIと実装を含める必要があります。

 javax.batch javax.batch-api 1.0.1   org.jberet jberet-core 1.0.2.Final   org.jberet jberet-support 1.0.2.Final   org.jberet jberet-se 1.0.2.Final 

また、インメモリデータベースを追加して、より現実的なシナリオを確認できるようにします。

3.重要な概念

JSR 352は、次のように見ることができるいくつかの概念を導入しています。

まず、各部分を定義しましょう。

  • 左側から、JobOperatorがあります。それは、このような起動、停止、および再起動などの処理ジョブのすべての側面を管理します
  • 次に、ジョブがあります。ジョブは、ステップの論理的なコレクションです。バッチプロセス全体をカプセル化します
  • ジョブには、1〜n個のステップが含まれます。各ステップは、独立した連続した作業単位です。ステップは、入力の読み取り、その入力の処理、および出力の書き込みで構成されます
  • 最後になりましたが、ジョブの実行情報を格納するJobRepositoryがあります。ジョブ、その状態、および完了結果を追跡するのに役立ちます

手順はこれよりも少し詳細なので、次にそれを見てみましょう。最初にチャンクステップを見てから、次にバッチレットを見ていきます。

4.チャンクの作成

前に述べたように、チャンクは一種のステップです。チャンクを使用して、アイテムのセットなど、何度も実行される操作を表現することがよくあります。これは、Javaストリームからの中間操作のようなものです。

チャンクを説明するときは、アイテムをどこから取得するか、どのように処理するか、後でどこに送信するかを表現する必要があります。

4.1。読書アイテム

アイテムを読み取るには、ItemReaderを実装する必要があります

この場合、1から10までの数字を単純に出力するリーダーを作成します。

@Named public class SimpleChunkItemReader extends AbstractItemReader { private Integer[] tokens; private Integer count; @Inject JobContext jobContext; @Override public Integer readItem() throws Exception { if (count >= tokens.length) { return null; } jobContext.setTransientUserData(count); return tokens[count++]; } @Override public void open(Serializable checkpoint) throws Exception { tokens = new Integer[] { 1,2,3,4,5,6,7,8,9,10 }; count = 0; } }

ここでは、クラスの内部状態から読み取っています。ただし、もちろん、readItemはデータベース、ファイルシステム、またはその他の外部ソースから取得できます

JobContext#setTransientUserData()を使用して、この内部状態の一部を保存していることに注意してください。これは後で役立ちます。

また、チェックポイントパラメータにも注意してください。それもまた取り上げます。

4.2。アイテムの処理

もちろん、チャンク化する理由は、アイテムに対して何らかの操作を実行したいからです。

アイテムプロセッサからnullを返すときはいつでも、そのアイテムをバッチから削除します。

それで、ここで、偶数だけを保持したいとしましょう。nullを返すことで奇数を拒否するItemProcessorを使用できます

@Named public class SimpleChunkItemProcessor implements ItemProcessor { @Override public Integer processItem(Object t) { Integer item = (Integer) t; return item % 2 == 0 ? item : null; } }

processItemは、ItemReaderが発行するアイテムごとに1回呼び出されます。

4.3。アイテムを書く

最後に、ジョブはItemWriterを呼び出して、変換されたアイテムを記述できるようにします。

@Named public class SimpleChunkWriter extends AbstractItemWriter { List processed = new ArrayList(); @Override public void writeItems(List items) throws Exception { items.stream().map(Integer.class::cast).forEach(processed::add); } } 

アイテムの長さはどれくらいですか?すぐに、チャンクのサイズを定義します。これにより、writeItemsに送信されるリストのサイズが決まります。

4.4。ジョブでのチャンクの定義

ここで、JSLまたはジョブ仕様言語を使用してこれらすべてをXMLファイルにまとめます。リーダー、プロセッサー、チャンカー、およびチャンクサイズをリストすることに注意してください。

チャンクサイズは、チャンクの進行状況がジョブリポジトリにコミットされる頻度です。これは、システムの一部に障害が発生した場合に、完了を保証するために重要です。

このファイルをMETA-INF / batch-jobsforに配置する必要があります。jarファイルとでWEB-INF /クラス/ META-INF /バッチ・ジョブの.warファイル

ジョブに「simpleChunk」というIDを付けたので、単体テストで試してみましょう。

現在、ジョブは非同期で実行されるため、テストが難しくなります。サンプルでは、ジョブが完了するまでポーリングして待機するBatchTestHelperを確認してください。

@Test public void givenChunk_thenBatch_completesWithSuccess() throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); Long executionId = jobOperator.start("simpleChunk", new Properties()); JobExecution jobExecution = jobOperator.getJobExecution(executionId); jobExecution = BatchTestHelper.keepTestAlive(jobExecution); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); } 

これがチャンクです。それでは、バッチレットを見てみましょう。

5.バッチレットの作成

すべてが反復モデ​​ルにうまく適合するわけではありません。たとえば、タスクを1回呼び出し、実行して完了し、終了ステータスを返すだけでよい場合があります

バッチレットの契約は非常に簡単です。

@Named public class SimpleBatchLet extends AbstractBatchlet { @Override public String process() throws Exception { return BatchStatus.COMPLETED.toString(); } }

JSLと同様に:

そして、以前と同じアプローチを使用してテストできます。

@Test public void givenBatchlet_thenBatch_completeWithSuccess() throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); Long executionId = jobOperator.start("simpleBatchLet", new Properties()); JobExecution jobExecution = jobOperator.getJobExecution(executionId); jobExecution = BatchTestHelper.keepTestAlive(jobExecution); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

そのため、手順を実装するためのいくつかの異なる方法を検討しました。

次に、進行状況マークして保証するためのメカニズムを見てみましょう

6.カスタムチェックポイント

Failures are bound to happen in the middle of a job. Should we just start over the whole thing, or can we somehow start where we left off?

As the name suggests, checkpoints help us to periodically set a bookmark in case of failure.

By default, the end of chunk processing is a natural checkpoint.

However, we can customize it with our own CheckpointAlgorithm:

@Named public class CustomCheckPoint extends AbstractCheckpointAlgorithm { @Inject JobContext jobContext; @Override public boolean isReadyToCheckpoint() throws Exception { int counterRead = (Integer) jobContext.getTransientUserData(); return counterRead % 5 == 0; } }

Remember the count that we placed in transient data earlier? Here, we can pull it out with JobContext#getTransientUserDatato state that we want to commit on every 5th number processed.

Without this, a commit would happen at the end of each chunk, or in our case, every 3rd number.

And then, we match that up with the checkout-algorithm directive in our XML underneath our chunk:

Let's test the code, again noting that some of the boilerplate steps are hidden away in BatchTestHelper:

@Test public void givenChunk_whenCustomCheckPoint_thenCommitCountIsThree() throws Exception { // ... start job and wait for completion jobOperator.getStepExecutions(executionId) .stream() .map(BatchTestHelper::getCommitCount) .forEach(count -> assertEquals(3L, count.longValue())); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

So, we might be expecting a commit count of 2 since we have ten items and configured the commits to be every 5th item. But, the framework does one more final read commit at the end to ensure everything has been processed, which is what brings us up to 3.

Next, let's look at how to handle errors.

7. Exception Handling

By default, the job operator will mark our job as FAILED in case of an exception.

Let's change our item reader to make sure that it fails:

@Override public Integer readItem() throws Exception { if (tokens.hasMoreTokens()) { String tempTokenize = tokens.nextToken(); throw new RuntimeException(); } return null; }

And then test:

@Test public void whenChunkError_thenBatch_CompletesWithFailed() throws Exception { // ... start job and wait for completion assertEquals(jobExecution.getBatchStatus(), BatchStatus.FAILED); }

But, we can override this default behavior in a number of ways:

  • skip-limit specifies the number of exceptions this step will ignore before failing
  • retry-limit specifies the number of times the job operator should retry the step before failing
  • skippable-exception-class specifies a set of exceptions that chunk processing will ignore

So, we can edit our job so that it ignores RuntimeException, as well as a few others, just for illustration:

And now our code will pass:

@Test public void givenChunkError_thenErrorSkipped_CompletesWithSuccess() throws Exception { // ... start job and wait for completion jobOperator.getStepExecutions(executionId).stream() .map(BatchTestHelper::getProcessSkipCount) .forEach(skipCount -> assertEquals(1L, skipCount.longValue())); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

8. Executing Multiple Steps

We mentioned earlier that a job can have any number of steps, so let's see that now.

8.1. Firing the Next Step

By default, each step is the last step in the job.

In order to execute the next step within a batch job, we'll have to explicitly specify by using the next attribute within the step definition:

If we forget this attribute, then the next step in sequence will not get executed.

And we can see what this looks like in the API:

@Test public void givenTwoSteps_thenBatch_CompleteWithSuccess() throws Exception { // ... start job and wait for completion assertEquals(2 , jobOperator.getStepExecutions(executionId).size()); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

8.2. Flows

A sequence of steps can also be encapsulated into a flow. When the flow is finished, it is the entire flow that transitions to the execution element. Also, elements inside the flow can't transition to elements outside the flow.

We can, say, execute two steps inside a flow, and then have that flow transition to an isolated step:

And we can still see each step execution independently:

@Test public void givenFlow_thenBatch_CompleteWithSuccess() throws Exception { // ... start job and wait for completion assertEquals(3, jobOperator.getStepExecutions(executionId).size()); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

8.3. Decisions

We also have if/else support in the form of decisions. Decisions provide a customized way of determining a sequence among steps, flows, and splits.

Like steps, it works on transition elements such as next which can direct or terminate job execution.

Let's see how the job can be configured:

Any decision element needs to be configured with a class that implements Decider. Its job is to return a decision as a String.

Each next inside decision is like a case in a switch statement.

8.4. Splits

Splits are handy since they allow us to execute flows concurrently:

Of course, this means that the order isn't guaranteed.

Let's confirm that they still all get run. The flow steps will be performed in an arbitrary order, but the isolated step will always be last:

@Test public void givenSplit_thenBatch_CompletesWithSuccess() throws Exception { // ... start job and wait for completion List stepExecutions = jobOperator.getStepExecutions(executionId); assertEquals(3, stepExecutions.size()); assertEquals("splitJobSequenceStep3", stepExecutions.get(2).getStepName()); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

9. Partitioning a Job

We can also consume the batch properties within our Java code which have been defined in our job.

They can be scoped at three levels – the job, the step, and the batch-artifact.

Let's see some examples of how they consumed.

When we want to consume the properties at job level:

@Inject JobContext jobContext; ... jobProperties = jobContext.getProperties(); ...

This can be consumed at a step level as well:

@Inject StepContext stepContext; ... stepProperties = stepContext.getProperties(); ...

When we want to consume the properties at batch-artifact level:

@Inject @BatchProperty(name = "name") private String nameString;

This comes in handy with partitions.

See, with splits, we can run flows concurrently. But we can also partition a step into n sets of items or set separate inputs, allowing us another way to split up the work across multiple threads.

To comprehend the segment of work each partition should do, we can combine properties with partitions:

10. Stop and Restart

Now, that's it for defining jobs. Now let's talk for a minute about managing them.

We've already seen in our unit tests that we can get an instance of JobOperator from BatchRuntime:

JobOperator jobOperator = BatchRuntime.getJobOperator();

And then, we can start the job:

Long executionId = jobOperator.start("simpleBatchlet", new Properties());

However, we can also stop the job:

jobOperator.stop(executionId);

And lastly, we can restart the job:

executionId = jobOperator.restart(executionId, new Properties());

Let's see how we can stop a running job:

@Test public void givenBatchLetStarted_whenStopped_thenBatchStopped() throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); Long executionId = jobOperator.start("simpleBatchLet", new Properties()); JobExecution jobExecution = jobOperator.getJobExecution(executionId); jobOperator.stop(executionId); jobExecution = BatchTestHelper.keepTestStopped(jobExecution); assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED); }

And if a batch is STOPPED, then we can restart it:

@Test public void givenBatchLetStopped_whenRestarted_thenBatchCompletesSuccess() { // ... start and stop the job assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED); executionId = jobOperator.restart(jobExecution.getExecutionId(), new Properties()); jobExecution = BatchTestHelper.keepTestAlive(jobOperator.getJobExecution(executionId)); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

11. Fetching Jobs

When a batch job is submitted then the batch runtime creates an instance of JobExecution to track it.

To obtain the JobExecution for an execution id, we can use the JobOperator#getJobExecution(executionId) method.

And, StepExecution provides helpful information for tracking a step's execution.

To obtain the StepExecution for an execution id, we can use the JobOperator#getStepExecutions(executionId) method.

And from that, we can get several metrics about the step via StepExecution#getMetrics:

@Test public void givenChunk_whenJobStarts_thenStepsHaveMetrics() throws Exception { // ... start job and wait for completion assertTrue(jobOperator.getJobNames().contains("simpleChunk")); assertTrue(jobOperator.getParameters(executionId).isEmpty()); StepExecution stepExecution = jobOperator.getStepExecutions(executionId).get(0); Map metricTest = BatchTestHelper.getMetricsMap(stepExecution.getMetrics()); assertEquals(10L, metricTest.get(Metric.MetricType.READ_COUNT).longValue()); assertEquals(5L, metricTest.get(Metric.MetricType.FILTER_COUNT).longValue()); assertEquals(4L, metricTest.get(Metric.MetricType.COMMIT_COUNT).longValue()); assertEquals(5L, metricTest.get(Metric.MetricType.WRITE_COUNT).longValue()); // ... and many more! }

12. Disadvantages

JSR 352 is powerful, though it is lacking in a number of areas:

  • JSONなどの他の形式を処理できるリーダーとライターが不足しているようです
  • ジェネリックのサポートはありません
  • パーティショニングは単一のステップのみをサポートします
  • APIは、スケジューリングをサポートするものを何も提供していません(ただし、J2EEには別個のスケジューリングモジュールがあります)
  • 非同期であるため、テストは困難な場合があります
  • APIは非常に冗長です

13.結論

この記事では、JSR 352を見て、チャンク、バッチレット、スプリット、フローなどについて学びました。それでも、表面をほとんど傷つけていません。

いつものように、デモコードはGitHubにあります。