Javaのリアクティブシステム

1.はじめに

このチュートリアルでは、Springやその他のツールやフレームワークを使用してJavaでリアクティブシステムを作成するための基本を理解します。

その過程で、リアクティブプログラミングがリアクティブシステムを作成するための単なる推進力である方法について説明します。これは、リアクティブシステムを作成する理由と、その過程でインスピレーションを得たさまざまな仕様、ライブラリ、および標準を理解するのに役立ちます。

2.リアクティブシステムとは何ですか?

過去数十年にわたって、テクノロジーの展望は、テクノロジーの価値の見方に完全な変革をもたらしたいくつかの混乱を経験してきました。インターネットの前のコンピューティングの世界は、それが私たちの現在を変える方法と手段を想像することはできませんでした。

インターネットが大衆に届き、それが約束する絶え間なく進化する経験により、アプリケーションアーキテクトは彼らの需要を満たすために気を配る必要があります。

基本的に、これは、以前の方法でアプリケーションを設計することは決してできないことを意味します。応答性の高いアプリケーションは、もはや贅沢が、必要ではありません

それもまた、ランダムな障害と予測できない負荷に直面しています。時間の必要性は、正しい結果を得るだけでなく、それを速く得ることです!私たちが提供することを約束する素晴らしいユーザーエクスペリエンスを推進することは非常に重要です。

これが、リアクティブシステムを提供できるアーキテクチャスタイルの必要性を生み出すものです。

2.1。反応性マニフェスト

2013年に、Jonas Bonerが率いる開発者チームが集まり、ReactiveManifestoと呼ばれるドキュメントで一連のコア原則を定義しました。これが、リアクティブシステムを作成するためのアーキテクチャスタイルの基礎を築いたものです。それ以来、このマニフェストは開発者コミュニティから多くの関心を集めています。

基本的に、このドキュメントでは、リアクティブシステムが柔軟で、疎結合で、スケーラブルであるためのレシピを規定しています。これにより、このようなシステムの開発が容易になり、障害に耐え、最も重要なことに応答性が高くなり、すばらしいユーザーエクスペリエンスの基盤となります。

それで、この秘密のレシピは何ですか?まあ、それはほとんど秘密ではありません!マニフェストは、リアクティブシステムの基本的な特性または原則を定義します。

  • 応答性:反応性システムは、迅速で一貫した応答時間を提供し、したがって一貫したサービス品質を提供する必要があります
  • 回復力:レプリケーションと分離によるランダムな障害が発生した場合でも、リアクティブシステムは応答性を維持する必要があります
  • 弾力性:このようなシステムは、費用効果の高いスケーラビリティを通じて、予測できないワークロードの下でも応答性を維持する必要があります
  • メッセージ駆動型:システムコンポーネント間で受け渡される非同期メッセージに依存する必要があります

これらの原則は単純で賢明に聞こえますが、複雑なエンタープライズアーキテクチャに実装するのが常に簡単であるとは限りません。このチュートリアルでは、これらの原則を念頭に置いて、Javaでサンプルシステムを開発します。

3.リアクティブプログラミングとは何ですか?

先に進む前に、リアクティブプログラミングとリアクティブシステムの違いを理解することが重要です。私たちはこれらの用語を頻繁に使用し、一方を他方と誤解しがちです。前に見たように、リアクティブシステムは特定のアーキテクチャスタイルの結果です。

対照的に、リアクティブプログラミングは、非同期コンポーネントと非ブロッキングコンポーネントの開発に重点を置いたプログラミングパラダイムです。リアクティブプログラミングの中核は、私たちが観察して反応することができ、背圧をかけることさえできるデータストリームです。これにより、非ブロッキング実行が可能になり、実行スレッドが少なくなり、スケーラビリティが向上します。

さて、これはリアクティブシステムとリアクティブプログラミングが相互に排他的であることを意味するものではありません。実際、リアクティブプログラミングはリアクティブシステムを実現するための重要なステップですが、それだけではありません。

3.1。リアクティブストリーム

Reactive Streamsは、非ブロッキングバックプレッシャを使用した非同期ストリーム処理の標準提供するために2013年に開始されたコミュニティイニシアチブです。ここでの目的は、必要な操作とエンティティを記述できる一連のインターフェイス、メソッド、およびプロトコルを定義することでした。

それ以来、リアクティブストリーム仕様に準拠する複数のプログラミング言語でのいくつかの実装が登場しました。これらには、いくつか例を挙げると、Akka Streams、Ratpack、Vert.xが含まれます。

3.2。Java用のリアクティブライブラリ

リアクティブストリームの背後にある当初の目的の1つは、最終的には公式のJava標準ライブラリとして含まれることでした。その結果、リアクティブストリームの仕様は、Java9で導入されたJavaFlowライブラリと意味的に同等です。

それとは別に、Javaでリアクティブプログラミングを実装するためのいくつかの一般的な選択肢があります。

  • Reactive Extensions:一般にReactiveXとして知られ、監視可能なストリームを使用した非同期プログラミング用のAPIを提供します。これらは、RxJavaとして知られているJavaを含む、複数のプログラミング言語およびプラットフォームで使用できます。
  • Project Reactor:これは別のリアクティブライブラリであり、リアクティブストリームの仕様に基づいており、JVMで非アプリケーションを構築することを目的としています。また、Springエコシステムのリアクティブスタックの基盤でもあります。

4.シンプルなアプリケーション

このチュートリアルの目的のために、最小限のフロントエンドを備えたマイクロサービスアーキテクチャに基づく単純なアプリケーションを開発します。アプリケーションアーキテクチャには、リアクティブシステムを作成するのに十分な要素が必要です。

このアプリケーションでは、エンドツーエンドのリアクティブプログラミングやその他のパタ​​ーンとツールを採用して、リアクティブシステムの基本的な特性を実現します。

4.1。建築

まず、リアクティブシステムの特性を必ずしも示すとは限らない単純なアプリケーションアーキテクチャを定義することから始めます。今後は、これらの特性を1つずつ実現するために必要な変更を加えていきます。

したがって、最初に、単純なアーキテクチャを定義することから始めましょう。

これは非常に単純なアーキテクチャであり、注文を行うことができるコマースのユースケースを容易にするための多数のマイクロサービスがあります。また、ユーザーエクスペリエンスのフロントエンドがあり、すべての通信はREST overHTTPとして行われます。さらに、すべてのマイクロサービスは、個々のデータベースでデータを管理します。これは、サービスごとのデータベースと呼ばれる手法です。

次のサブセクションでは、この単純なアプリケーションを作成します。これは、このアーキテクチャの誤謬と、これをリアクティブシステムに変換できるように、原則と実践を採用する方法と手段を理解するための基盤になります。

4.3。在庫マイクロサービス

在庫マイクロサービスは、製品のリストとその現在の在庫を管理する責任があります。また、注文の処理時に在庫を変更することもできます。このサービスの開発には、MongoDBでSpringBootを使用します。

いくつかのエンドポイントを公開するコントローラーを定義することから始めましょう。

@GetMapping public List getAllProducts() { return productService.getProducts(); } @PostMapping public Order processOrder(@RequestBody Order order) { return productService.handleOrder(order); } @DeleteMapping public Order revertOrder(@RequestBody Order order) { return productService.revertOrder(order); }

およびビジネスロジックをカプセル化するサービス:

@Transactional public Order handleOrder(Order order) { order.getLineItems() .forEach(l -> { Product> p = productRepository.findById(l.getProductId()) .orElseThrow(() -> new RuntimeException("Could not find the product: " + l.getProductId())); if (p.getStock() >= l.getQuantity()) { p.setStock(p.getStock() - l.getQuantity()); productRepository.save(p); } else { throw new RuntimeException("Product is out of stock: " + l.getProductId()); } }); return order.setOrderStatus(OrderStatus.SUCCESS); } @Transactional public Order revertOrder(Order order) { order.getLineItems() .forEach(l -> { Product p = productRepository.findById(l.getProductId()) .orElseThrow(() -> new RuntimeException("Could not find the product: " + l.getProductId())); p.setStock(p.getStock() + l.getQuantity()); productRepository.save(p); }); return order.setOrderStatus(OrderStatus.SUCCESS); }

トランザクション内でエンティティを永続化することに注意してください。これにより、例外が発生した場合に一貫性のない状態が発生することはありません。

これらとは別に、ドメインエンティティ、リポジトリインターフェイス、およびすべてが正しく機能するために必要な一連の構成クラスも定義する必要があります。

But since these are mostly boilerplate, we'll avoid going through them, and they can be referred to in the GitHub repository provided in the last section of this article.

4.4. Shipping Microservice

The shipping microservice will not be very different either. This will be responsible for checking if a shipment can be generated for the order and create one if possible.

As before we'll define a controller to expose our endpoints, in fact just a single endpoint:

@PostMapping public Order process(@RequestBody Order order) { return shippingService.handleOrder(order); }

and a service to encapsulate the business logic related to order shipment:

public Order handleOrder(Order order) { LocalDate shippingDate = null; if (LocalTime.now().isAfter(LocalTime.parse("10:00")) && LocalTime.now().isBefore(LocalTime.parse("18:00"))) { shippingDate = LocalDate.now().plusDays(1); } else { throw new RuntimeException("The current time is off the limits to place order."); } shipmentRepository.save(new Shipment() .setAddress(order.getShippingAddress()) .setShippingDate(shippingDate)); return order.setShippingDate(shippingDate) .setOrderStatus(OrderStatus.SUCCESS); }

Our simple shipping service is just checking the valid time window to place orders. We'll avoid discussing the rest of the boilerplate code as before.

4.5. Order Microservice

Finally, we'll define an order microservice which will be responsible for creating a new order apart from other things. Interestingly, it'll also play as an orchestrator service where it will communicate with the inventory service and the shipping service for the order.

Let's define our controller with the required endpoints:

@PostMapping public Order create(@RequestBody Order order) { Order processedOrder = orderService.createOrder(order); if (OrderStatus.FAILURE.equals(processedOrder.getOrderStatus())) { throw new RuntimeException("Order processing failed, please try again later."); } return processedOrder; } @GetMapping public List getAll() { return orderService.getOrders(); }

And, a service to encapsulate the business logic related to orders:

public Order createOrder(Order order) { boolean success = true; Order savedOrder = orderRepository.save(order); Order inventoryResponse = null; try { inventoryResponse = restTemplate.postForObject( inventoryServiceUrl, order, Order.class); } catch (Exception ex) { success = false; } Order shippingResponse = null; try { shippingResponse = restTemplate.postForObject( shippingServiceUrl, order, Order.class); } catch (Exception ex) { success = false; HttpEntity deleteRequest = new HttpEntity(order); ResponseEntity deleteResponse = restTemplate.exchange( inventoryServiceUrl, HttpMethod.DELETE, deleteRequest, Order.class); } if (success) { savedOrder.setOrderStatus(OrderStatus.SUCCESS); savedOrder.setShippingDate(shippingResponse.getShippingDate()); } else { savedOrder.setOrderStatus(OrderStatus.FAILURE); } return orderRepository.save(savedOrder); } public List getOrders() { return orderRepository.findAll(); }

The handling of orders where we're orchestrating calls to inventory and shipping services is far from ideal. Distributed transactions with multiple microservices is a complex topic in itself and beyond the scope of this tutorial.

However, we'll see later in this tutorial how a reactive system can avoid the need for distributed transactions to a certain extent.

As before, we'll not go through the rest of the boilerplate code. However, this can be referenced in the GitHub repo.

4.6. Front-end

Let's also add a user interface to make the discussion complete. The user interface will be based on Angular and will be a simple single-page application.

We'll need to create a simple component in Angular to handle create and fetch orders. Of specific importance is the part where we call our API to create the order:

createOrder() { let headers = new HttpHeaders({'Content-Type': 'application/json'}); let options = {headers: headers} this.http.post('//localhost:8080/api/orders', this.form.value, options) .subscribe( (response) => { this.response = response }, (error) => { this.error = error } ) }

The above code snippet expects order data to be captured in a form and available within the scope of the component. Angular offers fantastic support for creating simple to complex forms using reactive and template-driven forms.

Also important is the part where we get previously created orders:

getOrders() { this.previousOrders = this.http.get(''//localhost:8080/api/orders'') }

Please note that the Angular HTTP module is asynchronous in nature and hence returns RxJS Observables. We can handle the response in our view by passing them through an async pipe:

Your orders placed so far:

  • Order ID: {{ order.id }}, Order Status: {{order.orderStatus}}, Order Message: {{order.responseMessage}}

Of course, Angular will require templates, styles, and configurations to work, but these can be referred to in the GitHub repository. Please note that we have bundled everything in a single component here, which is ideally not something we should do.

But, for this tutorial, those concerns are not in scope.

4.7. Deploying the Application

Now that we've created all individual parts of the application, how should we go about deploying them? Well, we can always do this manually. But we should be careful that it can soon become tedious.

For this tutorial, we'll use Docker Compose to build and deploy our application on a Docker Machine. This will require us to add a standard Dockerfile in each service and create a Docker Compose file for the entire application.

Let's see how this docker-compose.yml file looks:

version: '3' services: frontend: build: ./frontend ports: - "80:80" order-service: build: ./order-service ports: - "8080:8080" inventory-service: build: ./inventory-service ports: - "8081:8081" shipping-service: build: ./shipping-service ports: - "8082:8082"

This is a fairly standard definition of services in Docker Compose and does not require any special attention.

4.8. Problems With This Architecture

Now that we have a simple application in place with multiple services interacting with each other, we can discuss the problems in this architecture. There are what we'll try to address in the following sections and eventually get to the state where we would have transformed our application into a reactive system!

While this application is far from a production-grade software and there are several issues, we'll focus on the issues that pertain to the motivations for reactive systems:

  • Failure in either inventory service or shipping service can have a cascading effect
  • The calls to external systems and database are all blocking in nature
  • The deployment cannot handle failures and fluctuating loads automatically

5. Reactive Programming

Blocking calls in any program often result in critical resources just waiting for things to happen. These include database calls, calls to web services, and file system calls. If we can free up threads of execution from this waiting and provide a mechanism to circle back once results are available, it will yield much better resource utilization.

This is what adopting the reactive programming paradigm does for us. While it's possible to switch over to a reactive library for many of these calls, it may not be possible for everything. For us, fortunately, Spring makes it much easier to use reactive programming with MongoDB and REST APIs:

Spring Data Mongo has support for reactive access through the MongoDB Reactive Streams Java Driver. It provides ReactiveMongoTemplate and ReactiveMongoRepository, both of which have extensive mapping functionality.

Spring WebFlux provides the reactive-stack web framework for Spring, enabling non-blocking code and Reactive Streams backpressure. It leverages the Reactor as its reactive library. Further, it provides WebClient for performing HTTP requests with Reactive Streams backpressure. It uses Reactor Netty as the HTTP client library.

5.1. Inventory Service

We'll begin by changing our endpoints to emit reactive publishers:

@GetMapping public Flux getAllProducts() { return productService.getProducts(); }
@PostMapping public Mono processOrder(@RequestBody Order order) { return productService.handleOrder(order); } @DeleteMapping public Mono revertOrder(@RequestBody Order order) { return productService.revertOrder(order); }

Obviously, we'll have to make necessary changes to the service as well:

@Transactional public Mono handleOrder(Order order) { return Flux.fromIterable(order.getLineItems()) .flatMap(l -> productRepository.findById(l.getProductId())) .flatMap(p -> { int q = order.getLineItems().stream() .filter(l -> l.getProductId().equals(p.getId())) .findAny().get() .getQuantity(); if (p.getStock() >= q) { p.setStock(p.getStock() - q); return productRepository.save(p); } else { return Mono.error(new RuntimeException("Product is out of stock: " + p.getId())); } }) .then(Mono.just(order.setOrderStatus("SUCCESS"))); } @Transactional public Mono revertOrder(Order order) { return Flux.fromIterable(order.getLineItems()) .flatMap(l -> productRepository.findById(l.getProductId())) .flatMap(p -> { int q = order.getLineItems().stream() .filter(l -> l.getProductId().equals(p.getId())) .findAny().get() .getQuantity(); p.setStock(p.getStock() + q); return productRepository.save(p); }) .then(Mono.just(order.setOrderStatus("SUCCESS"))); }

5.2. Shipping Service

Similarly, we'll change the endpoint of our shipping service:

@PostMapping public Mono process(@RequestBody Order order) { return shippingService.handleOrder(order); }

And, corresponding changes in the service to leverage reactive programming:

public Mono handleOrder(Order order) { return Mono.just(order) .flatMap(o -> { LocalDate shippingDate = null; if (LocalTime.now().isAfter(LocalTime.parse("10:00")) && LocalTime.now().isBefore(LocalTime.parse("18:00"))) { shippingDate = LocalDate.now().plusDays(1); } else { return Mono.error(new RuntimeException("The current time is off the limits to place order.")); } return shipmentRepository.save(new Shipment() .setAddress(order.getShippingAddress()) .setShippingDate(shippingDate)); }) .map(s -> order.setShippingDate(s.getShippingDate()) .setOrderStatus(OrderStatus.SUCCESS)); }

5.3. Order Service

We'll have to make similar changes in the endpoints of the order service:

@PostMapping public Mono create(@RequestBody Order order) { return orderService.createOrder(order) .flatMap(o -> { if (OrderStatus.FAILURE.equals(o.getOrderStatus())) { return Mono.error(new RuntimeException("Order processing failed, please try again later. " + o.getResponseMessage())); } else { return Mono.just(o); } }); } @GetMapping public Flux getAll() { return orderService.getOrders(); }

The changes to service will be more involved as we'll have to make use of Spring WebClient to invoke the inventory and shipping reactive endpoints:

public Mono createOrder(Order order) { return Mono.just(order) .flatMap(orderRepository::save) .flatMap(o -> { return webClient.method(HttpMethod.POST) .uri(inventoryServiceUrl) .body(BodyInserters.fromValue(o)) .exchange(); }) .onErrorResume(err -> { return Mono.just(order.setOrderStatus(OrderStatus.FAILURE) .setResponseMessage(err.getMessage())); }) .flatMap(o -> { if (!OrderStatus.FAILURE.equals(o.getOrderStatus())) { return webClient.method(HttpMethod.POST) .uri(shippingServiceUrl) .body(BodyInserters.fromValue(o)) .exchange(); } else { return Mono.just(o); } }) .onErrorResume(err -> { return webClient.method(HttpMethod.POST) .uri(inventoryServiceUrl) .body(BodyInserters.fromValue(order)) .retrieve() .bodyToMono(Order.class) .map(o -> o.setOrderStatus(OrderStatus.FAILURE) .setResponseMessage(err.getMessage())); }) .map(o -> { if (!OrderStatus.FAILURE.equals(o.getOrderStatus())) { return order.setShippingDate(o.getShippingDate()) .setOrderStatus(OrderStatus.SUCCESS); } else { return order.setOrderStatus(OrderStatus.FAILURE) .setResponseMessage(o.getResponseMessage()); } }) .flatMap(orderRepository::save); } public Flux getOrders() { return orderRepository.findAll(); }

This kind of orchestration with reactive APIs is no easy exercise and often error-prone as well as hard to debug. We'll see how this can be simplified in the next section.

5.4. Front-end

Now, that our APIs are capable of streaming events as they occur, it's quite natural that we should be able to leverage that in our front-end as well. Fortunately, Angular supports EventSource, the interface for Server-Sent Events.

Let's see how can we pull and process all our previous orders as a stream of events:

getOrderStream() { return Observable.create((observer) => { let eventSource = new EventSource('//localhost:8080/api/orders') eventSource.onmessage = (event) => { let json = JSON.parse(event.data) this.orders.push(json) this._zone.run(() => { observer.next(this.orders) }) } eventSource.onerror = (error) => { if(eventSource.readyState === 0) { eventSource.close() this._zone.run(() => { observer.complete() }) } else { this._zone.run(() => { observer.error('EventSource error: ' + error) }) } } }) }

6. Message-Driven Architecture

The first problem we're going to address is related to service-to-service communication. Right now, these communications are synchronous, which presents several problems. These include cascading failures, complex orchestration, and distributed transactions to name a few.

An obvious way to solve this problem is to make these communications asynchronous. A message broker for facilitating all service-to-service communication can do the trick for us. We'll use Kafka as our message broker and Spring for Kafka to produce and consume messages:

We'll use a single topic to produce and consume order messages with different order statuses for services to react.

Let's see how each service needs to change.

6.1. Inventory Service

Let's begin by defining the message producer for our inventory service:

@Autowired private KafkaTemplate kafkaTemplate; public void sendMessage(Order order) { this.kafkaTemplate.send("orders", order); }

Next, we'll have to define a message consumer for inventory service to react to different messages on the topic:

@KafkaListener(topics = "orders", groupId = "inventory") public void consume(Order order) throws IOException { if (OrderStatus.RESERVE_INVENTORY.equals(order.getOrderStatus())) { productService.handleOrder(order) .doOnSuccess(o -> { orderProducer.sendMessage(order.setOrderStatus(OrderStatus.INVENTORY_SUCCESS)); }) .doOnError(e -> { orderProducer.sendMessage(order.setOrderStatus(OrderStatus.INVENTORY_FAILURE) .setResponseMessage(e.getMessage())); }).subscribe(); } else if (OrderStatus.REVERT_INVENTORY.equals(order.getOrderStatus())) { productService.revertOrder(order) .doOnSuccess(o -> { orderProducer.sendMessage(order.setOrderStatus(OrderStatus.INVENTORY_REVERT_SUCCESS)); }) .doOnError(e -> { orderProducer.sendMessage(order.setOrderStatus(OrderStatus.INVENTORY_REVERT_FAILURE) .setResponseMessage(e.getMessage())); }).subscribe(); } }

This also means that we can safely drop some of the redundant endpoints from our controller now. These changes are sufficient to achieve asynchronous communication in our application.

6.2. Shipping Service

The changes in shipping service are relatively similar to what we did earlier with the inventory service. The message producer is the same, and the message consumer is specific to shipping logic:

@KafkaListener(topics = "orders", groupId = "shipping") public void consume(Order order) throws IOException { if (OrderStatus.PREPARE_SHIPPING.equals(order.getOrderStatus())) { shippingService.handleOrder(order) .doOnSuccess(o -> { orderProducer.sendMessage(order.setOrderStatus(OrderStatus.SHIPPING_SUCCESS) .setShippingDate(o.getShippingDate())); }) .doOnError(e -> { orderProducer.sendMessage(order.setOrderStatus(OrderStatus.SHIPPING_FAILURE) .setResponseMessage(e.getMessage())); }).subscribe(); } }

We can safely drop all the endpoints in our controller now as we no longer need them.

6.3. Order Service

The changes in order service will be a little more involved as this is where we were doing all the orchestration earlier.

Nevertheless, the message producer remains unchanged, and message consumer takes on order service-specific logic:

@KafkaListener(topics = "orders", groupId = "orders") public void consume(Order order) throws IOException { if (OrderStatus.INITIATION_SUCCESS.equals(order.getOrderStatus())) { orderRepository.findById(order.getId()) .map(o -> { orderProducer.sendMessage(o.setOrderStatus(OrderStatus.RESERVE_INVENTORY)); return o.setOrderStatus(order.getOrderStatus()) .setResponseMessage(order.getResponseMessage()); }) .flatMap(orderRepository::save) .subscribe(); } else if ("INVENTORY-SUCCESS".equals(order.getOrderStatus())) { orderRepository.findById(order.getId()) .map(o -> { orderProducer.sendMessage(o.setOrderStatus(OrderStatus.PREPARE_SHIPPING)); return o.setOrderStatus(order.getOrderStatus()) .setResponseMessage(order.getResponseMessage()); }) .flatMap(orderRepository::save) .subscribe(); } else if ("SHIPPING-FAILURE".equals(order.getOrderStatus())) { orderRepository.findById(order.getId()) .map(o -> { orderProducer.sendMessage(o.setOrderStatus(OrderStatus.REVERT_INVENTORY)); return o.setOrderStatus(order.getOrderStatus()) .setResponseMessage(order.getResponseMessage()); }) .flatMap(orderRepository::save) .subscribe(); } else { orderRepository.findById(order.getId()) .map(o -> { return o.setOrderStatus(order.getOrderStatus()) .setResponseMessage(order.getResponseMessage()); }) .flatMap(orderRepository::save) .subscribe(); } }

The consumer here is merely reacting to order messages with different order statuses. This is what gives us the choreography between different services.

Lastly, our order service will also have to change to support this choreography:

public Mono createOrder(Order order) { return Mono.just(order) .flatMap(orderRepository::save) .map(o -> { orderProducer.sendMessage(o.setOrderStatus(OrderStatus.INITIATION_SUCCESS)); return o; }) .onErrorResume(err -> { return Mono.just(order.setOrderStatus(OrderStatus.FAILURE) .setResponseMessage(err.getMessage())); }) .flatMap(orderRepository::save); }

Note that this is far simpler than the service we had to write with reactive endpoints in the last section. Asynchronous choreography often results in far simpler code, although it does come at the cost of eventual consistency and complex debugging and monitoring. As we may guess, our front-end will no longer get the final status of the order immediately.

7. Container Orchestration Service

The last piece of the puzzle that we want to solve is related to deployment.

What we want in the application is ample redundancy and a tendency to scale up or down depending upon the need automatically.

We've already achieved containerization of services through Docker and are managing dependencies between them through Docker Compose. While these are fantastic tools in their own right, they do not help us to achieve what we want.

Hence, we need a container orchestration service that can take care of redundancy and scalability in our application. While there are several options, one of the popular ones includes Kubernetes. Kubernetes provides us with a cloud vendor-agnostic way to achieve highly scalable deployments of containerized workloads.

Kubernetes wraps containers like Docker into Pods, which are the smallest unit of deployment. Further, we can use Deployment to describe the desired state declaratively.

Deployment creates ReplicaSets, which internally is responsible for bringing up the pods. We can describe a minimum number of identical pods that should be running at any point in time. This provides redundancy and hence high availability.

Let's see how can we define a Kubernetes deployment for our applications:

apiVersion: apps/v1 kind: Deployment metadata: name: inventory-deployment spec: replicas: 3 selector: matchLabels: name: inventory-deployment template: metadata: labels: name: inventory-deployment spec: containers: - name: inventory image: inventory-service-async:latest ports: - containerPort: 8081 --- apiVersion: apps/v1 kind: Deployment metadata: name: shipping-deployment spec: replicas: 3 selector: matchLabels: name: shipping-deployment template: metadata: labels: name: shipping-deployment spec: containers: - name: shipping image: shipping-service-async:latest ports: - containerPort: 8082 --- apiVersion: apps/v1 kind: Deployment metadata: name: order-deployment spec: replicas: 3 selector: matchLabels: name: order-deployment template: metadata: labels: name: order-deployment spec: containers: - name: order image: order-service-async:latest ports: - containerPort: 8080

Here we're declaring our deployment to maintain three identical replicas of pods at any time. While this is a good way to add redundancy, it may not be sufficient for varying loads. Kubernetes provides another resource known as the Horizontal Pod Autoscaler which can scale the number of pods in a deployment based on observed metrics like CPU utilization.

Please note that we have just covered the scalability aspects of the application hosted on a Kubernetes cluster. This does not necessarily imply that the underlying cluster itself is scalable. Creating a high availability Kubernetes cluster is a non-trivial task and beyond the scope of this tutorial.

8. Resulting Reactive System

Now that we've made several improvements in our architecture, it's perhaps time to evaluate this against the definition of a Reactive System. We'll keep the evaluation against the four characteristics of a Reactive Systems we discussed earlier in the tutorial:

  • Responsive: The adoption of the reactive programming paradigm should help us achieve end-to-end non-blocking and hence a responsive application
  • Resilient: Kubernetes deployment with ReplicaSet of the desired number of pods should provide resilience against random failures
  • Elastic: Kubernetes cluster and resources should provide us the necessary support to be elastic in the face of unpredictable loads
  • Message-Driven: Having all service-to-service communication handled asynchronously through a Kafka broker should help us here

While this looks quite promising, it's far from over. To be honest, the quest for a truly reactive system should be a continuous exercise of improvements. We can never preempt all that can fail in a highly complex infrastructure, where our application is just a small part.

A reactive system thus will demand reliability from every part that makes the whole. Right from the physical network to infrastructure services like DNS, they all should fall in line to help us achieve the end goal.

Often, it may not be possible for us to manage and provide the necessary guarantees for all these parts. And this is where a managed cloud infrastructure helps alleviate our pain. We can choose from a host of services like IaaS (Infeastrure-as-a-Service), BaaS (Backend-as-a-Service), and PaaS (Platform-as-a-Service) to delegate the responsibilities to external parties. This leaves us with the responsibility of our application as far as possible.

9. Conclusion

In this tutorial, we went through the basics of reactive systems and how does it compare with reactive programming. We created a simple application with multiple microservices and highlighted the problems we intend to solve with a reactive system.

さらに、リアクティブプログラミング、メッセージベースのアーキテクチャ、およびアーキテクチャにコンテナオーケストレーションサービスを導入して、リアクティブシステムを実現しました。

最後に、結果として得られるアーキテクチャと、それがリアクティブシステムへの道のりであり続ける方法について説明しました。このチュートリアルでは、リアクティブシステムの作成に役立つすべてのツール、フレームワーク、またはパターンを紹介しているわけではありませんが、その道のりを紹介しています。

いつものように、この記事のソースコードはGitHubにあります。