技術的負債に対する取り組みとクラウドネイティブな環境での代表的な設計パターン
現在ウェルスナビの開発組織では既存事業の成長を加速させること、新規事業の取り組みを進めることに並行して取り組んでいます。これまでにも大規模なデータベース移行やコンテナ化を進めてきましたが、複数サービスを同時並行で開発していこうとするなかで改めて認識された技術的負債や、クラウドネイティブなアーキテクチャへ移行するにあたって採用した代表的な設計パターンなどをご紹介します。
ウェルスナビ株式会社バックエンドエンジニアの浦野と申します。
「働く世代の豊かな老後のために、最先端のテクノロジーを活用し、世界水準の個人金融資産プラットフォームを築く」をミッションに掲げ、ロボアドバイザー「WealthNavi」等のサービスを提供しています。
現在ウェルスナビの開発組織では既存事業の成長を加速させること、新規事業の取り組みを進めることに並行して取り組んでおります。これまでにも大規模なデータベース移行やコンテナ化を進めてきましたが、複数サービスを同時並行で開発していこうとするなかで改めて認識された技術的負債や、クラウドネイティブなアーキテクチャへ移行するにあたって採用した代表的な設計パターンなどをご紹介できたらと思います。
技術的負債に対する取り組み
2016年7月にロボアドバイザー「WealthNavi」正式リリースから順調に事業成長を続けてきた中で、スピード重視で機能追加を繰り返し、モノリシックアプリケーションを急成長させて開発スピードが落ちるという状況に陥りました。
具体的にはソースコードの肥大化に伴うアプリケーションのビルド時間遅延、人員増加によるコミュニケーションコストの増大、モノリスな構造故の共通基盤(機能)アップデートによる影響範囲の拡大にともなう確認作業の発生などです。2019年頃から開発組織の中で顕在化してきたこのような課題・事象に対し、継続的に負債の返済について議論を続けてきました。キーワードになったのは「スケーラビリティ」でした。
肥大化したリポジトリを分割してビルドを効率化する工夫をしたり、クラウドネイティブなデリバリーパイプラインの構築を行うと共にアプリケーションのコンテナ化を進めたりしてきました。モノリスな構造でもありコードの重複も少なくないアプリケーションを少しずつクラウドネイティブなマイクロサービスアーキテクチャへ移行していく試みも行っています。このような試みを行うことで少しずつですが開発組織を取り巻く環境で「スケーラビリティ」が向上しつつあります。
本稿ではクラウドネイティブなマイクロサービスアーキテクチャへ移行していく段階で採用した2つの代表的な設計パターンについてご紹介したいと思います。
クラウドネイティブな環境での代表的な設計パターン
クラウドネイティブな環境であるかどうかに関わらずシステムアーキテクチャの設計パターンは昔から存在していますが、特に分散型システムであるマイクロサービスアーキテクチャで重要とされている設計パターンのうち2つを取り上げます。
サーキットブレーカー
ウェルスナビのシステムを構成するほとんどのアプリケーションが何らかのSaaSを利用して機能を実現しています。SaaSを利用することが前提のアーキテクチャが昨今では主流になっています。汎用的なSaaSを様々なアプリケーションから呼び出して利用しているような場合は、SaaS側で障害が起こった時の影響範囲が広範なものになります。サービスの可用性、耐障害性などの非機能要件をよく検討したうえで、アーキテクチャを考える必要があるのです。ウェルスナビでの具体的な事例として、SendGridを利用したメール送信実装ではプロトコルをフォールバックする仕組みを取り入れたりしています。ここではSaaS障害1に対して、取りうる選択肢の一つとして有力なサーキットブレーカーについて取り上げます。
図1: サーキットブレーカーのイメージ
サーキットブレーカーとは端的に説明すると、失敗する可能性があるリクエストをアプリケーションが繰り返し試行しないようにする、その他サービスへ障害がカスケードしないようにする装置のことを意味します。
サーキット ブレーカーの実装は失敗する可能性のあるリクエストのプロキシとして機能します。
ここではresillience4jというオープンソースライブラリの実装を前提に話を進めていきます。サーキットブレーカーは接続先のシステムまたはサービスの応答性の状態を元に有限ステートマシンを介して実装されています。
出典: https://resilience4j.readme.io/docs/circuitbreaker
図2: サーキットブレーカーの状態遷移図
図2に示される状態の他にDISABLEDとFORCED_OPENおよびMETRICS_ONLYが実装されていますが、オプションとして用意されている状態なので本稿では説明を割愛します。
サーキットブレーカーを介して接続するシステムまたはサービスが正常に応答を返す場合はCLOSED状態です。平常時は常にCLOSED状態のはずです。正常ではない(異常な)応答の返却率(エラー発生率)が閾値を超えた場合にサーキットブレーカーはOPENへ遷移します。また、OPEN状態からCLOSED状態へ戻るには必ずHALF_OPEN状態を経由することになります。HALF_OPEN状態とは接続するサービスが復旧したかどうかを確認するステップとして少しだけリクエストを試みるという状態です。この少しだけという部分が判りにくいかもしれないので、実装例2と設定パラメーターを示しながら解説していきます。
resillience4jのサーキットブレーカー実装はアルゴリズムにCount-based sliding windowかTime-based sliding windowのどちらかを選択することが可能です。このアルゴリズムの選択は主にエラー発生率の検出方法に関わります。
・サーキットブレーカーコンフィグレーションの実装例
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom() .failureRateThreshold(Float.parseFloat("10")) .slowCallRateThreshold(Float.parseFloat("50")) .slowCallDurationThreshold(Duration.ofSeconds("7")) .waitDurationInOpenState(Duration.ofMillis("1800000")) .permittedNumberOfCallsInHalfOpenState(7) .minimumNumberOfCalls(7) .slidingWindowType(CircuitBreakerConfig.SlidingWindowType.TIME_BASED) .slidingWindowSize(30) .recordExceptions(WebServiceException.class, XmlMappingException.class) .build(); this.circuitBreakerRegistry = CircuitBreakerRegistry.of(circuitBreakerConfig);
Fluent Builder パターンで実装されているCircuitBreakerConfigクラスを使って設定を行います。設定完了後レジストリに登録します。登録後はInMemoryCircuitBreakerRegistryというクラスで管理されます。
それぞれのパラメーターについて見ていきます。
プロパティ名 | デフォルト値 | 設定値(実装例) | 説明 |
failureRateThreshold | 50(%) | 10(%) | ブレーカーOPENへ遷移する通信異常割合 |
slowCallRateThreshold | 100(%) | 50(%) | ブレーカーOPENへ遷移する通信遅延割合 |
slowCallDurationThreshold | 60000 [ms] | 7 [sec] | 通信遅延(と判断される)時間の閾値 |
waitDurationInOpenState | 60000 [ms] | 1800000 [ms] | ブレーカーOPEN時待機時間(HALF_OPENへ遷移するまでのタイムアウト時間) |
permittedNumberOfCalls InHalfOpenState | 10 | 7 | HALF_OPEN時の受付リクエスト数 |
minimumNumberOfCalls | 100 | 7 | エラー率の計算に用いる最低リクエスト数 |
slidingWindowType | COUNT_BASED | TIME_BASED | エラー率検出ロジック |
slidingWindowSize | 100 | 30 | エラー率検出ウィンドウサイズ TIME_BASEDの場合は秒、COUNT_BASEDの場合は件数 |
recordExceptions | empty | WebServiceException XmlMappingException | サーキットブレーカーでエラーとして認識させる例外クラスの登録 |
表1. サーキットブレーカーの設定オプション
表1.の実装例ではプロパティはすべてデフォルト値から設定値を変更しています。閾値やリクエスト数の設定は、環境やサービスの特性、SLOなどを考慮した上で慎重に設定していく必要があります。最初から最適値を設定する事は難易度が高いと思いますので、運用前にチューニングする時間を十分に設けることをお薦めします。この例では比較的ミッションクリティカルな領域で利用する想定のものになっていますが、特徴的な部分はエラー率検出ロジック(アルゴリズム)を時間ベースに設定しているところです。選択するロジックによりウィンドウサイズの設定単位(秒または件数)も変更になるので注意してください。サーキットブレーカーを配置するシステム全体の特性によりアルゴリズムを選択する必要がありますが、エンジニアリングとしては非常にやりがいのある部分です。
・サーキットブレーカーを実装したサービス呼び出しの実装例
try { // BackendService呼び出し BackendServiceResponse backendServiceResponse = CircuitBreaker.decorateSupplier(circuitBreaker, () -> backendServiceClient.analyze(backendServiceRequest)).get(); return BackendServiceResponseFactory.create(backendServiceResponse); } catch (WebServiceException wse) { // 何かしらの処理 // ... } catch (XmlMappingException xme) { // 何かしらの処理 // ... }
CircuitBreakerインターフェースを利用してバックエンドサービスを呼び出します。backendServiceClient.analyze() の実装は外部サービスに接続してデータを取得する簡単なWebサービスの実装と考えてください。このWebサービスはWebServiceExceptionとXmlMappingExceptionの例外をスローする可能性があります。設定オプションでサーキットブレーカーに例外を登録してあるので、Webサービス呼び出しで例外が発生した場合はサーキットブレーカーにエラーとしてカウントされます。try-catchで実装していますが、Vavrを使ってTryモナドで実装することも可能です。
CircuitBreaker.decorateSupplierの引数にはCircuitBreakerの実体を渡す必要があります。Spring Bootでは以下のようにBean生成時にインスタンス化しても良いと思います。
@Override public void afterPropertiesSet() throws Exception { super.afterPropertiesSet(); this.circuitBreaker = this.circuitBreakerRegistry.circuitBreaker("backend-service"); this.circuitBreaker.getEventPublisher().onStateTransition(event -> { log.warn().log("CircuitBreaker of %s state transition to %s", event.getCircuitBreakerName(), event.getStateTransition().name()); }); }
この例ではCircuitBreaderのインスタンス化と同時に、サーキットブレーカーの状態遷移イベントが発生した際に行いたい処理(この例ではログ出力)を設定しています。
resillience4jにはSpring Boot専用のライブラリが用意されているので、Spring Boot環境であれば利用するのもよいかもしれません。
https://resilience4j.readme.io/docs/getting-started-3
サーキットブレーカーとJavaによるサーキットブレーカー実装ライブラリであるresillience4jを使ったサービス呼び出しのご紹介でした。
ウェルスナビのバックエンドサービスでも利用しているサーキットブレーカーパターンですが、パラメーター設定は運用開始されてから複数回見直しを実施しています。継続的に見直しが必要なパラメーターですが、サーキットブレーカーの状態遷移やアラートなどをログ出力などにより可視化できているのでチューニング作業自体はそれほど難しくありません。分散システム環境ではシステム間の通信の状況やメトリクスの収集、可視化などのオブザーバビリティが非常に重要で、サーキットブレーカーなどの設計パターンを導入する際はモニタリング可能な環境整備も合わせて行う事を推奨します。
リトライ
もう一つ代表的な設計パターンであるリトライを取り上げます。
基本的にリトライ(再試行)は一時的な障害に対応するもので、リクエストが成功する事を前提としています。この一時的な障害はほとんどの場合ネットワーク通信に関連するものです。先述したサーキットブレーカーは失敗する可能性のあるリクエストの実行を抑制(停止)するものであるのに対し、リトライは成功する可能性のあるリクエストでの障害の影響を最小限に抑えることを目的としています。
安易にリトライすればよいわけではなく、リトライの実施には以下のような判断が必要になってきます。
A. 特定の予期されたネットワークエラー(再試行する)
B. 一般的なネットワークエラー(時間をおいて再試行する)
C. 自動復旧しない断続的なエラー(再試行はキャンセルする)
Cの判断が継続するようであれば、リクエストの実行を停止する必要があるかもしれません。その場合はサーキットブレーカーを合わせて実装するとよいでしょう。
図3. サーキットブレーカーとリトライの設計パターンを組み合わせた場合のWebサービス呼び出しの実装コンポーネントのレイヤーイメージ
マイクロサービス開発やクラウドネイティブな環境に対応しているFWやライブラリを利用している場合は、リトライ機能はおおむねデフォルトで用意されていると思います。SpringFrameworkではRetryTemplateが用意されており、ユーザーが定義した例外発生時のみ再試行を実装することが簡易に実装できるようになっています。また「時間をおいて再試行する」を実現するためのバックオフポリシー(デフォルトはNoBackOffPolicy)が複数用意されています。リトライポリシーも様々なものが用意されており、中にはCircuitBreakerRetryPolicyという図3のイメージが実装されたクラスも存在します。
ウェルスナビの主要なサービスおよびアプリケーションはAWSにデプロイされており、機能の一部はAWSのサービスを利用する形で実現されています。バックエンドではAWS SDK for JavaをつかってAWSの各サービスへアクセスしています。AWS SDKのコアな部分で行われるHTTPS通信は自動リトライが実装されており、「エクスポネンシャルバックオフ」と呼ばれるアルゴリズムで実装されています。エクスポネンシャルバックオフの仕組み自体はネットワーク通信に関わる領域で幅広く利用されているものです。動作を端的に説明すると、リトライの際に待機間隔が線形ではなく徐々に(指数関数的に)長くなっていきます。この挙動を理解しているとネットワーク障害時のトラブルシューティングに役立ちます。
次にSDK内部で実際にどのように実現されているのか、少しご紹介したいと思います。SDKはバージョン1.x系を前提としています。
S3へローカルファイルシステムにあるファイルをアップロードするコード
public long uploadFile(Path local, String targetS3Path, boolean shouldDeleteLocalPath) throws IOException { try (InputStream inputStream = new FileInputStream(local.toFile())) { Path s3Path = fileSystem.getPath(targetS3Path); return Files.copy(inputStream, s3Path); } finally { if (shouldDeleteLocalPath) { Files.deleteIfExists(local); } } }
S3へアクセスするSDKを利用した実装はjava.nio.file.FileSystemを拡張した独自のファイルシステムでラップされています。Files.copyが呼び出された場合に行われる実際の処理はAmazonS3Client#copyObject3 です。copyObjectメソッドに渡された引数などは最終的にAmazonS3Clientが継承しているAmazonWebServiceClientに実装されているAmazonHttpClientのexecuteメソッドに引き渡されて実行されます。このAmazonHttpClientクラスがSDKのコアの部分であり、AWSの各サービスへのリクエストを実現する実体です。このAmazonHttpClientにはユーザー側で様々なパラメーター設定ができるようになっており、AmazonWebServiceClientのClientConfigurationを利用して行います。尚、ClientConfigurationクラスはスレッドセーフではないので気を付けてください。本題のリトライポリシーについてはClientConfigurationクラスにデフォルトで定義されています。ClientConfigurationを意識せずSDKを素で使ってもリトライポリシーが適用されています。
RetryPolicyコンストラクタの引数 | 説明 | デフォルト実装 |
RetryCondition | リトライする判断ロジックの実装 | SDKDefaultRetryCondition |
BackoffStrategy | バックオフアルゴリズムの戦略 | SDKDefaultBackoffStrategy |
maxErrorRetry | 最大リトライ回数(整数) | 3 |
表2. AWS SDKのデフォルトリトライポリシー
RetryConditionのデフォルト実装を見ると、IOExceptionとAmazonServiceExceptionの場合のみリトライすることになっています。AmazonServiceExceptionであっても以下の場合のみリトライを許可しています。
- リトライ可能なサービスであること
- スロットリング例外
- クロックスキュー例外
BackoffStrategyのデフォルト実装は「エクスポネンシャルバックオフアンドジッター」です。スロットリング例外とそれ以外では内部で選択されるジッター(乱数)のアルゴリズムが異なります。スロットリング例外の場合は“Equal Jitter”でそれ以外は“Full Jitter”になっています。詳細なアルゴリズムの説明はAWSの公式ブログでご確認ください。
それぞれのパラメーター値はデフォルトでは以下の通りです。デフォルト値はClientConfigurationを使ってカスタマイズ可能です。
- “Full Jitter”の待機時間: 100 (ms)
- “Equal Jitter”の待機時間”: 500 (ms)
- 最大バックオフ時間: 20 (sec)
AWS SDKを利用した場合、ユーザーは意識しなくてもリトライが実装されていることになるので、別途リトライを実装する必要は基本的には無いはずです。一方でSDK内部の動作仕様をよく理解せずに誤った利用の仕方をしてしまうと想定外の挙動を引き起こす可能性があるので、SDKを利用する場合はよく注意して実装するように心がけています。
ClienctConfigurationを使ってパラメーターをカスタマイズする場合の実装例です。
@Component public class AmazonS3Factory implements InitializingBean { private ClientConfiguration clientConfiguration; public AmazonS3 build(AWSCredentialsProvider credentials) { return AmazonS3ClientBuilder.standard() .withCredentials(credentials) .withClientConfiguration(clientConfiguration) .build(); } @Override public void afterPropertiesSet() throws Exception { try { Resource resource = new ClassPathResource( "aws-sdk.properties"); Properties props = PropertiesLoaderUtils .loadProperties(resource); int connectionTimeout = Integer .parseInt(props.getProperty("connectionTimeout", "0")); int socketTimeout = Integer .parseInt(props.getProperty("socketTimeout", "0")); int maxConnections = Integer .parseInt(props.getProperty("maxConnections", "255")); int maxErrorRetry = Integer .parseInt(props.getProperty("maxErrorRetry", "3")); int requestTimeout = Integer .parseInt(props.getProperty("requestTimeout", "0")); int clientExecutionTimeout = Integer .parseInt(props.getProperty("clientExecutionTimeout", "0")); int baseDelay = Integer .parseInt(props.getProperty("baseDelay", "100")); int maxBackoffTime = Integer .parseInt(props.getProperty("maxBackoffTime", "20000")); clientConfiguration = new ClientConfiguration(); clientConfiguration.setConnectionTimeout(connectionTimeout); clientConfiguration.setSocketTimeout(socketTimeout); clientConfiguration.setMaxConnections(maxConnections); clientConfiguration.setMaxErrorRetry(maxErrorRetry); clientConfiguration.setRequestTimeout(requestTimeout); clientConfiguration.setClientExecutionTimeout(clientExecutionTimeout); clientConfiguration.setRetryPolicy(new RetryPolicy( PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION, new PredefinedBackoffStrategies.FullJitterBackoffStrategy(baseDelay, maxBackoffTime), maxErrorRetry, true) ); } catch (Exception e) { // } } }
AWS SDKやSpringFrameworkでも実装されているリトライ機能ですが、resilliencej4でもリトライ機能が提供されています。いずれの実装も基本的にはエクスポネンシャルバックオフアルゴリズムで実装が提供されているので、特段の優劣があるわけではなくお使いの環境に合わせて利用するのがよいと思います。
クラウドネイティブな環境でのサーキットブレーカーとリトライの設計パターンについて見てきましたが、このほかにも代表的なものとして流量制御(レートリミッター)やバルクヘッド、CQRSなど分散システムでは定番となる設計パターンがあります。AWS基盤で採用されているレートリミットのアルゴリズムであるトークンバケットアルゴリズムは非常に有名です。これらの設計パターンは要件に応じて適用していくものではありますが、設計パターンや実装技術だけでは解決できない問題があります。特にマイクロサービス化を進めていく中で技術面だけでは解決できない問題が顕在化してくると考えています。ウェルスナビでも組織構造や肥大化する共通リポジトリ、運用体制への依存など様々な問題を抱えつつも ”ものづくりする金融機関”のビジョンを掲げ次世代の金融インフラを構築すべく日々奮闘しています。
バッチ処理の設計パターン
ウェルスナビにはジョブと呼ばれるバッチ処理が、大きなもの(注文執行や手数料計算など)から小さなもの(ファイルをコピーするだけ)まですべて含めると数百個あります。ジョブの中核をなすバッチ処理を動かしているアプリケーションは2つあり、いずれも事業成長とともに肥大化していきました。肥大化していくとともに開発運用性は低下していました。
そこで、3年ほど前から一定の業務単位でジョブを選別してコンテナ基盤へ移行する取り組みを行っています。コンテナ基盤はECS(on Fargate)を使っています。ECS(on Fargate)のよいところはFargateのよいところに集約されるのですが(笑)、OS管理不要でかつ完全オンデマンドで運用できる都度起動のバッチ処理には最適です。起動時間のオーバーヘッドがあるので、処理時間に厳しい制約がある場合は事前起動する(ホットスタンバイ)などの工夫が必要です。ウェルスナビではすべてのアプリケーションが共通で利用できるパイプラインを構築しているので、コンテナ化したアプリケーションはリリースまでの工程をクイックに実行できるメリットを享受できます。ジョブはドメインの境界が明確でリポジトリを独立させること(他のライブラリやプロジェクトに依存しない)による弊害がすくないため、今のところ非常によく機能しています。
ウェルスナビのバッチ処理で使われているデータ処理のパターンのうち2つを取り上げてみたいと思います。
スプリットマージ
ひとつめはスプリット(アンド)マージと呼ばれる設計パターンです。ウェルスナビではこのパターンを使ってファイル出力の処理性能を改善しました。
図4. スプリットマージの処理フローイメージ
スプリットマージの処理フローについて図3の項番に従って説明します。
(1) データベースからファイル出力に必要なデータを抽出します。
余談:
一般的に処理性能改善を行う場合に最も注目されるのはクエリのパフォーマンスだと思います。データ抽出処理が最も処理コストが高い場合は少なくありません。クエリのパフォーマンス向上には継続的なチューニングが欠かせません。ウェルスナビではスロークエリを可視化できるように仕組み化しており、実行時間が閾値を超えた場合は速やかに開発者へ通知が行われ、改善を行っています。
(2) 抽出したデータを分割したいサイズ(ファイル出力処理の並列数)に分割します。ここではローカルのファイルシステムを想定しています。サーバーレスのクラウド環境であればローカルのストレージに余裕がない場合もあると思います。その場合はクラウドのストレージサービスなどを利用することが考えられます。
(3) プロセス内部で生成されたスレッド毎にファイル出力処理を行います。
(4) スレッド毎に出力されたファイルをマージして、必要に応じて整形(フォーマット)処理などを行います。ヘッダー、フッターなどが必要な場合はここで処理します。
スプリットマージはシンプルでポピュラーな分散データ処理の水平スケールパターンです。分散データの出力がファイルなので、データベース負荷や接続コストなどを気にする必要はありません。逆に入力側がファイルで出力側がデータベースだった場合は、データベース負荷の問題と向き合うことが必要になってきます。
ローカルファイルシステムを使う場合は実装も難しくないので比較的導入しやすいとおもいます。
Javaでの実装例
package com.example.splitmerge; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.ListUtils; import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.Test; import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.file.FlatFileItemWriter; import org.springframework.core.io.FileSystemResource; import java.io.OutputStream; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; /** * スプリットマージのJava実装サンプル */ public class SplitMergeFileTest { // 分割リストの最小単位 private static final int MIN_CHUNK_SIZE = 1; // 出力ファイルの文字コード private static final String FILE_CHARSET = "UTF-8"; // 出力ファイルの改行コード private static final String FILE_LINE_SEPARATOR = "\n"; @Test public void execute() { // 抽出データを分割してファイル出力する String artifactPath = "path/to/split_merge.csv"; List<Path> splitFiles = splitRecords2Files( artifactPath, // 出力ファイルのパス Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8"), // 抽出データ 8 // 並列スレッド数 ); try { // 分割出力したファイルをマージする mergeFiles(splitFiles, artifactPath); // 分割出力したファイルを削除する splitFiles.forEach(path -> { try { Files.delete(path); } catch (Exception e) { // todo } }); } catch (Exception e) { // todo } } private <T> List<Path> splitRecords2Files(String artifactPath, List<T> records, int threadCount) { List<Path> splitFiles = new ArrayList<>(); List<List<T>> partitionModels = separate(records, threadCount); ExecutorService executorService = Executors.newFixedThreadPool(threadCount); List<CompletableFuture<Path>> futures = buildFutures(artifactPath, partitionModels, executorService); CompletableFuture<Void> completableFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); completableFuture.join(); completableFuture.whenComplete((result, ex) -> { if (ex == null) { splitFiles.addAll(futures.stream().map(future -> { try { return future.get(); } catch (Exception e) { // todo } return null; }).collect(Collectors.toList())); } else { // todo } }); return splitFiles; } private <T> List<List<T>> separate(List<T> chunk, int number) { if (CollectionUtils.isEmpty(chunk) || chunk.size() <= MIN_CHUNK_SIZE) { List<List<T>> result = new ArrayList<>(); result.add(chunk); return result; } return ListUtils.partition( chunk, (chunk.size() / number) + (chunk.size() % number > 0 ? 1 : 0)); } private <T> List<CompletableFuture<Path>> buildFutures(String fileName, List<List<T>> partitionModels, ExecutorService executorService) { List<CompletableFuture<Path>> futures = new ArrayList<>(); for (int i = 0; i < partitionModels.size(); i++) { List<T> records = partitionModels.get(i); String splitFile = fileName + i; futures.add(CompletableFuture.supplyAsync(() -> outputRecords(splitFile, records), executorService)); } return futures; } private <T> Path outputRecords(String fileName, List<T> records) { FlatFileItemWriter<T> flatFileItemWriter = new FlatFileItemWriter<>(); flatFileItemWriter.setEncoding(FILE_CHARSET); flatFileItemWriter.setLineSeparator(FILE_LINE_SEPARATOR); flatFileItemWriter.setResource(new FileSystemResource(fileName)); flatFileItemWriter.setLineAggregator(item -> { // 実装例のための暫定対応 // todo: 型パラメーターの具象クラスに合わせてLineAggregatorを実装すること return (String) item; }); try { flatFileItemWriter.open(new ExecutionContext()); flatFileItemWriter.write(records); } catch (Exception e) { // todo return null; } finally { flatFileItemWriter.close(); } return Paths.get(fileName); } private void mergeFiles(List<Path> splitFiles, String artifactPath) throws Exception { Path outputFile = Paths.get(artifactPath); if (Files.exists(outputFile)) { Files.delete(outputFile); } try (OutputStream outputStream = Files.newOutputStream(outputFile, StandardOpenOption.CREATE, StandardOpenOption.APPEND)) { for (Path path : splitFiles) { List<String> lines = Files.readAllLines(path, Charset.forName(FILE_CHARSET)); IOUtils.writeLines(lines, FILE_LINE_SEPARATOR, outputStream, FILE_CHARSET); } outputStream.flush(); } catch (Exception e) { // todo } } }
※ SpringFrameworkおよびJUnit、ApacheCommonsライブラリを利用しています。
ファンアウト
ファンアウト(Fan-out)という用語は電子回路や半導体の分野でよく使われる技術用語でもありますが、ソフトウェアエンジニアリングの世界でも使われています。たとえばMOM(Message-oriented middleware)を使ったメッセージング基盤において、Publishされたメッセージがキューイングされて複数のSubscriberが受信できるようにする場合などにファンアウトパターンと呼ばれたりします。
図5. SNSとSQSを使ったファンアウトメッセージングパターン 出典: https://docs.aws.amazon.com/ja_jp/sns/latest/dg/sns-common-scenarios.html
ウェルスナビではバッチ処理で取り扱うデータ量が日々増え続けています。そのような状況化で問題になってきたのが処理時間です。突然処理時間が極端に遅延するという事象はまれで、ほとんどの場合気づかない程度の日々の遅延時間が蓄積していき遅延監視の閾値を超えてアラートが発報されます。しかし、日々の観察を怠っていたり、モニタリングが完全ではない監視体制だったりすると、遅延アラートはある日突然やってきます。ですので、取り扱うデータ量に関わらず一定の時間内に処理を完了させるためのアーキテクチャが必要となってきます。
図6. ファンアウトバッチジョブのイメージ
図5. はファンアウトパターンで実装されたバッチジョブのイメージです。処理の流れを説明していきます。
先行タスク(T1)では事前処理を行います。データベースから処理に必要なデータの取得を行い、後続タスク(T2)の入力となる対象データを生成、分割します。T1の出力がT2の入力になるのですが、T2の処理時間が遅延監視の閾値を超えない一定のスループットを保つようにT1の出力を調整することが重要です。T1で生成されるデータは、例えばユーザー識別子のリストです。T2で安定して処理できるサイズでユーザー識別子のリストを分割生成します。分割されたリストは分割リストストアに保管されます。ここまでが先行タスクの処理になります。分割リストストアはAWSを使う場合SQSやS3などが候補になります。SQSのメッセージの最大サイズは256KBですので、大きなサイズの出力データを扱う場合はS3を使うことになります。
後続タスクでは起動時にSQSやS3から入力データを取得し、処理対象のユーザー識別子を特定します。ユーザー識別子を使って自タスクが行うべき処理を行います。図のイメージではプロセス内部で生成されたスレッドがひとつのユーザー識別子を対象として処理を行いデータベースに登録/更新処理を行っています。
T1、T2のECSタスクはともに処理が終了(正常、異常にかかわらず)したらShutdownするように実装しています。ECSタスク(バッチジョブ)の終了状態はログ出力とdescribe-tasksなどから得られる情報を元に判定を行います。
リスト分割数(=タスク並列数)はチューニング箇所になります。また、データベースへの同時並列処理は、データベースの負荷を一時的に高める可能性があります。単純にタスクの並列数やプロセス内部のスレッド数を増加させても想定どおりのスループットが得られない場合があるので注意してください。T2の出力先がデータベースの場合はタスクやスレッドの水平スケール(スケールアウト)と同時にデータベースの垂直スケール(スケールアップ)が必要な場合があります。
このファンアウトバッチジョブの実装はT1とT2のそれぞれのタスクに依存関係があるので、実行条件やタスク数の制御等を柔軟に行うことができるAWS StepFunctionsを使うことも考えられます。ウェルスナビではデータ移行のシチュエーションでファンアウトパターンを採用することが多いです。またT1がなくT2のみ(つまりECSタスクの並列実行)のパターンでバッチ処理を行っているものもあります。注意してほしいのが、ECSタスクの同時実行数には上限があること、リソースが割り当てられなくて失敗することがあることです。なのでECSタスクを利用するバッチジョブの実装では、再実行性(もしくは冪等性)を考慮することを忘れないでください。いわゆるビックデータを取り扱うような場合で大規模な並列数が必要なジョブを動かすユースケースなどはAWS Batchを利用することになると思います。
ここまでバッチ処理で採用されている設計パターンの一部をご紹介しました。どんなに素晴らしい設計パターンを実装したとしても、ソフトウェアが動く環境は時間とともに変化するものですので、継続的な改善を怠ると大きな技術的負債になってしまうと考えています。また、どの設計パターンにもトレードオフが存在することを認識しておく必要があります。ウェルスナビのバックエンド開発チームは持続可能性や継続性、運用保守性を非常に重要視しています。
最後に
ウェルスナビでは、「働く世代に豊かさを」というミッション実現のため、ロボアドバイザー「WealthNavi」の更なる成長、新規事業の開発など、会社として新たな展開を迎えています。
私たちのミッションに共鳴してくださる方は、ぜひ一緒に働きましょう!
浦野 勝由(Katsuyoshi Urano)