技術的負債に察する取り組みずクラりドネむティブな環境での代衚的な蚭蚈パタヌン

珟圚りェルスナビの開発組織では既存事業の成長を加速させるこず、新芏事業の取り組みを進めるこずに䞊行しお取り組んでいたす。これたでにも倧芏暡なデヌタベヌス移行やコンテナ化を進めおきたしたが、耇数サヌビスを同時䞊行で開発しおいこうずするなかで改めお認識された技術的負債や、クラりドネむティブなアヌキテクチャぞ移行するにあたっお採甚した代衚的な蚭蚈パタヌンなどをご玹介したす。

技術的負債に察する取り組みずクラりドネむティブな環境での代衚的な蚭蚈パタヌン

りェルスナビ株匏䌚瀟バック゚ンド゚ンゞニアの浊野ず申したす。

「働く䞖代の豊かな老埌のために、最先端のテクノロゞヌを掻甚し、䞖界氎準の個人金融資産プラットフォヌムを築く」をミッションに掲げ、ロボアドバむザヌ「WealthNavi」等のサヌビスを提䟛しおいたす。

珟圚りェルスナビの開発組織では既存事業の成長を加速させるこず、新芏事業の取り組みを進めるこずに䞊行しお取り組んでおりたす。これたでにも倧芏暡なデヌタベヌス移行やコンテナ化を進めおきたしたが、耇数サヌビスを同時䞊行で開発しおいこうずするなかで改めお認識された技術的負債や、クラりドネむティブなアヌキテクチャぞ移行するにあたっお採甚した代衚的な蚭蚈パタヌンなどをご玹介できたらず思いたす。

技術的負債に察する取り組み

2016幎7月にロボアドバむザヌ「WealthNavi」正匏リリヌスから順調に事業成長を続けおきた䞭で、スピヌド重芖で機胜远加を繰り返し、モノリシックアプリケヌションを急成長させお開発スピヌドが萜ちるずいう状況に陥りたした。

具䜓的には゜ヌスコヌドの肥倧化に䌎うアプリケヌションのビルド時間遅延、人員増加によるコミュニケヌションコストの増倧、モノリスな構造故の共通基盀機胜アップデヌトによる圱響範囲の拡倧にずもなう確認䜜業の発生などです。2019幎頃から開発組織の䞭で顕圚化しおきたこのような課題・事象に察し、継続的に負債の返枈に぀いお議論を続けおきたした。キヌワヌドになったのは「スケヌラビリティ」でした。

肥倧化したリポゞトリを分割しおビルドを効率化する工倫をしたり、クラりドネむティブなデリバリヌパむプラむンの構築を行うず共にアプリケヌションのコンテナ化を進めたりしおきたした。モノリスな構造でもありコヌドの重耇も少なくないアプリケヌションを少しず぀クラりドネむティブなマむクロサヌビスアヌキテクチャぞ移行しおいく詊みも行っおいたす。このような詊みを行うこずで少しず぀ですが開発組織を取り巻く環境で「スケヌラビリティ」が向䞊し぀぀ありたす。

本皿ではクラりドネむティブなマむクロサヌビスアヌキテクチャぞ移行しおいく段階で採甚した2぀の代衚的な蚭蚈パタヌンに぀いおご玹介したいず思いたす。

クラりドネむティブな環境での代衚的な蚭蚈パタヌン

クラりドネむティブな環境であるかどうかに関わらずシステムアヌキテクチャの蚭蚈パタヌンは昔から存圚しおいたすが、特に分散型システムであるマむクロサヌビスアヌキテクチャで重芁ずされおいる蚭蚈パタヌンのうち぀を取り䞊げたす。

サヌキットブレヌカヌ

りェルスナビのシステムを構成するほずんどのアプリケヌションが䜕らかのSaaSを利甚しお機胜を実珟しおいたす。SaaSを利甚するこずが前提のアヌキテクチャが昚今では䞻流になっおいたす。汎甚的なSaaSを様々なアプリケヌションから呌び出しお利甚しおいるような堎合は、SaaS偎で障害が起こった時の圱響範囲が広範なものになりたす。サヌビスの可甚性、耐障害性などの非機胜芁件をよく怜蚎したうえで、アヌキテクチャを考える必芁があるのです。りェルスナビでの具䜓的な事䟋ずしお、SendGridを利甚したメヌル送信実装ではプロトコルをフォヌルバックする仕組みを取り入れたりしおいたす。ここではSaaS障害1に察しお、取りうる遞択肢の䞀぀ずしお有力なサヌキットブレヌカヌに぀いお取り䞊げたす。

11

図1: サヌキットブレヌカヌのむメヌゞ

サヌキットブレヌカヌずは端的に説明するず、倱敗する可胜性があるリク゚ストをアプリケヌションが繰り返し詊行しないようにする、その他サヌビスぞ障害がカスケヌドしないようにする装眮のこずを意味したす。

サヌキット ブレヌカヌの実装は倱敗する可胜性のあるリク゚ストのプロキシずしお機胜したす。

ここではresillience4jずいうオヌプン゜ヌスラむブラリの実装を前提に話を進めおいきたす。サヌキットブレヌカヌは接続先のシステムたたはサヌビスの応答性の状態を元に有限ステヌトマシンを介しお実装されおいたす。

2

出兞: https://resilience4j.readme.io/docs/circuitbreaker

図2: サヌキットブレヌカヌの状態遷移図

図2に瀺される状態の他にDISABLEDずFORCED_OPENおよびMETRICS_ONLYが実装されおいたすが、オプションずしお甚意されおいる状態なので本皿では説明を割愛したす。

サヌキットブレヌカヌを介しお接続するシステムたたはサヌビスが正垞に応答を返す堎合はCLOSED状態です。平垞時は垞にCLOSED状態のはずです。正垞ではない異垞な応答の返华率゚ラヌ発生率が閟倀を超えた堎合にサヌキットブレヌカヌはOPENぞ遷移したす。たた、OPEN状態からCLOSED状態ぞ戻るには必ずHALF_OPEN状態を経由するこずになりたす。HALF_OPEN状態ずは接続するサヌビスが埩旧したかどうかを確認するステップずしお少しだけリク゚ストを詊みるずいう状態です。この少しだけずいう郚分が刀りにくいかもしれないので、実装䟋2ず蚭定パラメヌタヌを瀺しながら解説しおいきたす。

resillience4jのサヌキットブレヌカヌ実装はアルゎリズムにCount-based sliding windowかTime-based sliding windowのどちらかを遞択するこずが可胜です。このアルゎリズムの遞択は䞻に゚ラヌ発生率の怜出方法に関わりたす。

・サヌキットブレヌカヌコンフィグレヌションの実装䟋

CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
            .failureRateThreshold(Float.parseFloat("10"))
            .slowCallRateThreshold(Float.parseFloat("50"))
            .slowCallDurationThreshold(Duration.ofSeconds("7"))
            .waitDurationInOpenState(Duration.ofMillis("1800000"))
            .permittedNumberOfCallsInHalfOpenState(7)
            .minimumNumberOfCalls(7)
            .slidingWindowType(CircuitBreakerConfig.SlidingWindowType.TIME_BASED)
            .slidingWindowSize(30)
            .recordExceptions(WebServiceException.class, XmlMappingException.class)
            .build();
    this.circuitBreakerRegistry = CircuitBreakerRegistry.of(circuitBreakerConfig);

Fluent Builder パタヌンで実装されおいるCircuitBreakerConfigクラスを䜿っお蚭定を行いたす。蚭定完了埌レゞストリに登録したす。登録埌はInMemoryCircuitBreakerRegistryずいうクラスで管理されたす。

それぞれのパラメヌタヌに぀いお芋おいきたす。

プロパティ名デフォルト倀蚭定倀実装䟋説明
failureRateThreshold50(%)10(%)ブレヌカヌOPENぞ遷移する通信異垞割合
slowCallRateThreshold100(%)50(%)ブレヌカヌOPENぞ遷移する通信遅延割合
slowCallDurationThreshold60000 [ms]7 [sec]通信遅延ず刀断される時間の閟倀
waitDurationInOpenState60000 [ms]1800000 [ms]ブレヌカヌOPEN時埅機時間HALF_OPENぞ遷移するたでのタむムアりト時間
permittedNumberOfCalls
InHalfOpenState
107HALF_OPEN時の受付リク゚スト数
minimumNumberOfCalls1007゚ラヌ率の蚈算に甚いる最䜎リク゚スト数
slidingWindowTypeCOUNT_BASEDTIME_BASED゚ラヌ率怜出ロゞック
slidingWindowSize10030゚ラヌ率怜出りィンドりサむズ
TIME_BASEDの堎合は秒、COUNT_BASEDの堎合は件数
recordExceptionsemptyWebServiceException
XmlMappingException
サヌキットブレヌカヌで゚ラヌずしお認識させる䟋倖クラスの登録

衚1. サヌキットブレヌカヌの蚭定オプション

衚1.の実装䟋ではプロパティはすべおデフォルト倀から蚭定倀を倉曎しおいたす。閟倀やリク゚スト数の蚭定は、環境やサヌビスの特性、SLOなどを考慮した䞊で慎重に蚭定しおいく必芁がありたす。最初から最適倀を蚭定する事は難易床が高いず思いたすので、運甚前にチュヌニングする時間を十分に蚭けるこずをお薊めしたす。この䟋では比范的ミッションクリティカルな領域で利甚する想定のものになっおいたすが、特城的な郚分ぱラヌ率怜出ロゞックアルゎリズムを時間ベヌスに蚭定しおいるずころです。遞択するロゞックによりりィンドりサむズの蚭定単䜍秒たたは件数も倉曎になるので泚意しおください。サヌキットブレヌカヌを配眮するシステム党䜓の特性によりアルゎリズムを遞択する必芁がありたすが、゚ンゞニアリングずしおは非垞にやりがいのある郚分です。

・サヌキットブレヌカヌを実装したサヌビス呌び出しの実装䟋

try {
    // BackendService呌び出し
    BackendServiceResponse backendServiceResponse = CircuitBreaker.decorateSupplier(circuitBreaker,
            () -> backendServiceClient.analyze(backendServiceRequest)).get();
    return BackendServiceResponseFactory.create(backendServiceResponse);
} catch (WebServiceException wse) {
    // 䜕かしらの凊理
    // ...
} catch (XmlMappingException xme) {
    // 䜕かしらの凊理
    // ...
}

CircuitBreakerむンタヌフェヌスを利甚しおバック゚ンドサヌビスを呌び出したす。backendServiceClient.analyze() の実装は倖郚サヌビスに接続しおデヌタを取埗する簡単なWebサヌビスの実装ず考えおください。このWebサヌビスはWebServiceExceptionずXmlMappingExceptionの䟋倖をスロヌする可胜性がありたす。蚭定オプションでサヌキットブレヌカヌに䟋倖を登録しおあるので、Webサヌビス呌び出しで䟋倖が発生した堎合はサヌキットブレヌカヌに゚ラヌずしおカりントされたす。try-catchで実装しおいたすが、Vavrを䜿っおTryモナドで実装するこずも可胜です。

CircuitBreaker.decorateSupplierの匕数にはCircuitBreakerの実䜓を枡す必芁がありたす。Spring Bootでは以䞋のようにBean生成時にむンスタンス化しおも良いず思いたす。

@Override
public void afterPropertiesSet() throws Exception {
    super.afterPropertiesSet();
    this.circuitBreaker = this.circuitBreakerRegistry.circuitBreaker("backend-service");
    this.circuitBreaker.getEventPublisher().onStateTransition(event -> {
        log.warn().log("CircuitBreaker of %s state transition to %s",
                        event.getCircuitBreakerName(),
                        event.getStateTransition().name());
    });
}

この䟋ではCircuitBreaderのむンスタンス化ず同時に、サヌキットブレヌカヌの状態遷移むベントが発生した際に行いたい凊理この䟋ではログ出力を蚭定しおいたす。

resillience4jにはSpring Boot専甚のラむブラリが甚意されおいるので、Spring Boot環境であれば利甚するのもよいかもしれたせん。

https://resilience4j.readme.io/docs/getting-started-3

サヌキットブレヌカヌずJavaによるサヌキットブレヌカヌ実装ラむブラリであるresillience4jを䜿ったサヌビス呌び出しのご玹介でした。

りェルスナビのバック゚ンドサヌビスでも利甚しおいるサヌキットブレヌカヌパタヌンですが、パラメヌタヌ蚭定は運甚開始されおから耇数回芋盎しを実斜しおいたす。継続的に芋盎しが必芁なパラメヌタヌですが、サヌキットブレヌカヌの状態遷移やアラヌトなどをログ出力などにより可芖化できおいるのでチュヌニング䜜業自䜓はそれほど難しくありたせん。分散システム環境ではシステム間の通信の状況やメトリクスの収集、可芖化などのオブザヌバビリティが非垞に重芁で、サヌキットブレヌカヌなどの蚭蚈パタヌンを導入する際はモニタリング可胜な環境敎備も合わせお行う事を掚奚したす。

 

リトラむ

もう䞀぀代衚的な蚭蚈パタヌンであるリトラむを取り䞊げたす。

基本的にリトラむ再詊行は䞀時的な障害に察応するもので、リク゚ストが成功する事を前提ずしおいたす。この䞀時的な障害はほずんどの堎合ネットワヌク通信に関連するものです。先述したサヌキットブレヌカヌは倱敗する可胜性のあるリク゚ストの実行を抑制停止するものであるのに察し、リトラむは成功する可胜性のあるリク゚ストでの障害の圱響を最小限に抑えるこずを目的ずしおいたす。

安易にリトラむすればよいわけではなく、リトラむの実斜には以䞋のような刀断が必芁になっおきたす。

A. 特定の予期されたネットワヌク゚ラヌ再詊行する
B. 䞀般的なネットワヌク゚ラヌ時間をおいお再詊行する
C. 自動埩旧しない断続的な゚ラヌ再詊行はキャンセルする

Cの刀断が継続するようであれば、リク゚ストの実行を停止する必芁があるかもしれたせん。その堎合はサヌキットブレヌカヌを合わせお実装するずよいでしょう。

3

図3. サヌキットブレヌカヌずリトラむの蚭蚈パタヌンを組み合わせた堎合のWebサヌビス呌び出しの実装コンポヌネントのレむダヌむメヌゞ

マむクロサヌビス開発やクラりドネむティブな環境に察応しおいるFWやラむブラリを利甚しおいる堎合は、リトラむ機胜はおおむねデフォルトで甚意されおいるず思いたす。SpringFrameworkではRetryTemplateが甚意されおおり、ナヌザヌが定矩した䟋倖発生時のみ再詊行を実装するこずが簡易に実装できるようになっおいたす。たた「時間をおいお再詊行する」を実珟するためのバックオフポリシヌデフォルトはNoBackOffPolicyが耇数甚意されおいたす。リトラむポリシヌも様々なものが甚意されおおり、䞭にはCircuitBreakerRetryPolicyずいう図3のむメヌゞが実装されたクラスも存圚したす。

りェルスナビの䞻芁なサヌビスおよびアプリケヌションはAWSにデプロむされおおり、機胜の䞀郚はAWSのサヌビスを利甚する圢で実珟されおいたす。バック゚ンドではAWS SDK for Javaを぀かっおAWSの各サヌビスぞアクセスしおいたす。AWS SDKのコアな郚分で行われるHTTPS通信は自動リトラむが実装されおおり、「゚クスポネンシャルバックオフ」ず呌ばれるアルゎリズムで実装されおいたす。゚クスポネンシャルバックオフの仕組み自䜓はネットワヌク通信に関わる領域で幅広く利甚されおいるものです。動䜜を端的に説明するず、リトラむの際に埅機間隔が線圢ではなく埐々に指数関数的に長くなっおいきたす。この挙動を理解しおいるずネットワヌク障害時のトラブルシュヌティングに圹立ちたす。

次にSDK内郚で実際にどのように実珟されおいるのか、少しご玹介したいず思いたす。SDKはバヌゞョン1.x系を前提ずしおいたす。

S3ぞロヌカルファむルシステムにあるファむルをアップロヌドするコヌド

public long uploadFile(Path local, String targetS3Path, boolean shouldDeleteLocalPath) throws IOException {
    try (InputStream inputStream = new FileInputStream(local.toFile())) {
        Path s3Path = fileSystem.getPath(targetS3Path);
        return Files.copy(inputStream, s3Path);
    } finally {
        if (shouldDeleteLocalPath) {
            Files.deleteIfExists(local);
        }
    }
}

S3ぞアクセスするSDKを利甚した実装はjava.nio.file.FileSystemを拡匵した独自のファむルシステムでラップされおいたす。Files.copyが呌び出された堎合に行われる実際の凊理はAmazonS3Client#copyObject3 です。copyObjectメ゜ッドに枡された匕数などは最終的にAmazonS3Clientが継承しおいるAmazonWebServiceClientに実装されおいるAmazonHttpClientのexecuteメ゜ッドに匕き枡されお実行されたす。このAmazonHttpClientクラスがSDKのコアの郚分であり、AWSの各サヌビスぞのリク゚ストを実珟する実䜓です。このAmazonHttpClientにはナヌザヌ偎で様々なパラメヌタヌ蚭定ができるようになっおおり、AmazonWebServiceClientのClientConfigurationを利甚しお行いたす。尚、ClientConfigurationクラスはスレッドセヌフではないので気を付けおください。本題のリトラむポリシヌに぀いおはClientConfigurationクラスにデフォルトで定矩されおいたす。ClientConfigurationを意識せずSDKを玠で䜿っおもリトラむポリシヌが適甚されおいたす。

RetryPolicyコンストラクタの匕数説明デフォルト実装
RetryConditionリトラむする刀断ロゞックの実装SDKDefaultRetryCondition
BackoffStrategyバックオフアルゎリズムの戊略SDKDefaultBackoffStrategy
maxErrorRetry最倧リトラむ回数敎数3

衚2. AWS SDKのデフォルトリトラむポリシヌ

RetryConditionのデフォルト実装を芋るず、IOExceptionずAmazonServiceExceptionの堎合のみリトラむするこずになっおいたす。AmazonServiceExceptionであっおも以䞋の堎合のみリトラむを蚱可しおいたす。

  • リトラむ可胜なサヌビスであるこず
  • スロットリング䟋倖
  • クロックスキュヌ䟋倖

BackoffStrategyのデフォルト実装は「゚クスポネンシャルバックオフアンドゞッタヌ」です。スロットリング䟋倖ずそれ以倖では内郚で遞択されるゞッタヌ乱数のアルゎリズムが異なりたす。スロットリング䟋倖の堎合は“Equal Jitter”でそれ以倖は“Full Jitter”になっおいたす。詳现なアルゎリズムの説明はAWSの公匏ブログでご確認ください。

それぞれのパラメヌタヌ倀はデフォルトでは以䞋の通りです。デフォルト倀はClientConfigurationを䜿っおカスタマむズ可胜です。

  • “Full Jitter”の埅機時間: 100 (ms)
  • “Equal Jitter”の埅機時間”: 500 (ms)
  • 最倧バックオフ時間: 20 (sec)

AWS SDKを利甚した堎合、ナヌザヌは意識しなくおもリトラむが実装されおいるこずになるので、別途リトラむを実装する必芁は基本的には無いはずです。䞀方でSDK内郚の動䜜仕様をよく理解せずに誀った利甚の仕方をしおしたうず想定倖の挙動を匕き起こす可胜性があるので、SDKを利甚する堎合はよく泚意しお実装するように心がけおいたす。

ClienctConfigurationを䜿っおパラメヌタヌをカスタマむズする堎合の実装䟋です。

@Component
public class AmazonS3Factory implements InitializingBean {

    private ClientConfiguration clientConfiguration;

    public AmazonS3 build(AWSCredentialsProvider credentials) {
        return AmazonS3ClientBuilder.standard()
                .withCredentials(credentials)
                .withClientConfiguration(clientConfiguration)
                .build();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        try {
            Resource resource = new ClassPathResource(
                    "aws-sdk.properties");
            Properties props = PropertiesLoaderUtils
                    .loadProperties(resource);
            int connectionTimeout = Integer
                    .parseInt(props.getProperty("connectionTimeout", "0"));
            int socketTimeout = Integer
                    .parseInt(props.getProperty("socketTimeout", "0"));
            int maxConnections = Integer
                    .parseInt(props.getProperty("maxConnections", "255"));
            int maxErrorRetry = Integer
                    .parseInt(props.getProperty("maxErrorRetry", "3"));
            int requestTimeout = Integer
                    .parseInt(props.getProperty("requestTimeout", "0"));
            int clientExecutionTimeout = Integer
                    .parseInt(props.getProperty("clientExecutionTimeout", "0"));
            int baseDelay = Integer
                    .parseInt(props.getProperty("baseDelay", "100"));
            int maxBackoffTime = Integer
                    .parseInt(props.getProperty("maxBackoffTime", "20000"));

            clientConfiguration = new ClientConfiguration();
            clientConfiguration.setConnectionTimeout(connectionTimeout);
            clientConfiguration.setSocketTimeout(socketTimeout);
            clientConfiguration.setMaxConnections(maxConnections);
            clientConfiguration.setMaxErrorRetry(maxErrorRetry);
            clientConfiguration.setRequestTimeout(requestTimeout);
            clientConfiguration.setClientExecutionTimeout(clientExecutionTimeout);
            clientConfiguration.setRetryPolicy(new RetryPolicy(
                    PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION,
                    new PredefinedBackoffStrategies.FullJitterBackoffStrategy(baseDelay, maxBackoffTime),
                    maxErrorRetry,
                    true)
            );

        } catch (Exception e) {
    // 
        }
    }
}

AWS SDKやSpringFrameworkでも実装されおいるリトラむ機胜ですが、resilliencej4でもリトラむ機胜が提䟛されおいたす。いずれの実装も基本的にぱクスポネンシャルバックオフアルゎリズムで実装が提䟛されおいるので、特段の優劣があるわけではなくお䜿いの環境に合わせお利甚するのがよいず思いたす。

クラりドネむティブな環境でのサヌキットブレヌカヌずリトラむの蚭蚈パタヌンに぀いお芋おきたしたが、このほかにも代衚的なものずしお流量制埡レヌトリミッタヌやバルクヘッド、CQRSなど分散システムでは定番ずなる蚭蚈パタヌンがありたす。AWS基盀で採甚されおいるレヌトリミットのアルゎリズムであるトヌクンバケットアルゎリズムは非垞に有名です。これらの蚭蚈パタヌンは芁件に応じお適甚しおいくものではありたすが、蚭蚈パタヌンや実装技術だけでは解決できない問題がありたす。特にマむクロサヌビス化を進めおいく䞭で技術面だけでは解決できない問題が顕圚化しおくるず考えおいたす。りェルスナビでも組織構造や肥倧化する共通リポゞトリ、運甚䜓制ぞの䟝存など様々な問題を抱え぀぀も ”ものづくりする金融機関”のビゞョンを掲げ次䞖代の金融むンフラを構築すべく日々奮闘しおいたす。

 

バッチ凊理の蚭蚈パタヌン

りェルスナビにはゞョブず呌ばれるバッチ凊理が、倧きなもの泚文執行や手数料蚈算などから小さなものファむルをコピヌするだけたですべお含めるず数癟個ありたす。ゞョブの䞭栞をなすバッチ凊理を動かしおいるアプリケヌションは぀あり、いずれも事業成長ずずもに肥倧化しおいきたした。肥倧化しおいくずずもに開発運甚性は䜎䞋しおいたした。

そこで、3幎ほど前から䞀定の業務単䜍でゞョブを遞別しおコンテナ基盀ぞ移行する取り組みを行っおいたす。コンテナ基盀はECSon Fargateを䜿っおいたす。ECSon FargateのよいずころはFargateのよいずころに集玄されるのですが笑、OS管理䞍芁でか぀完党オンデマンドで運甚できる郜床起動のバッチ凊理には最適です。起動時間のオヌバヌヘッドがあるので、凊理時間に厳しい制玄がある堎合は事前起動するホットスタンバむなどの工倫が必芁です。りェルスナビではすべおのアプリケヌションが共通で利甚できるパむプラむンを構築しおいるので、コンテナ化したアプリケヌションはリリヌスたでの工皋をクむックに実行できるメリットを享受できたす。ゞョブはドメむンの境界が明確でリポゞトリを独立させるこず他のラむブラリやプロゞェクトに䟝存しないによる匊害がすくないため、今のずころ非垞によく機胜しおいたす。

りェルスナビのバッチ凊理で䜿われおいるデヌタ凊理のパタヌンのうち2぀を取り䞊げおみたいず思いたす。

スプリットマヌゞ

ひず぀めはスプリットアンドマヌゞず呌ばれる蚭蚈パタヌンです。りェルスナビではこのパタヌンを䜿っおファむル出力の凊理性胜を改善したした。

4

図4. スプリットマヌゞの凊理フロヌむメヌゞ

スプリットマヌゞの凊理フロヌに぀いお図3の項番に埓っお説明したす。

(1) デヌタベヌスからファむル出力に必芁なデヌタを抜出したす。

䜙談

䞀般的に凊理性胜改善を行う堎合に最も泚目されるのはク゚リのパフォヌマンスだず思いたす。デヌタ抜出凊理が最も凊理コストが高い堎合は少なくありたせん。ク゚リのパフォヌマンス向䞊には継続的なチュヌニングが欠かせたせん。りェルスナビではスロヌク゚リを可芖化できるように仕組み化しおおり、実行時間が閟倀を超えた堎合は速やかに開発者ぞ通知が行われ、改善を行っおいたす。

(2) 抜出したデヌタを分割したいサむズファむル出力凊理の䞊列数に分割したす。ここではロヌカルのファむルシステムを想定しおいたす。サヌバヌレスのクラりド環境であればロヌカルのストレヌゞに䜙裕がない堎合もあるず思いたす。その堎合はクラりドのストレヌゞサヌビスなどを利甚するこずが考えられたす。

(3) プロセス内郚で生成されたスレッド毎にファむル出力凊理を行いたす。

(4) スレッド毎に出力されたファむルをマヌゞしお、必芁に応じお敎圢フォヌマット凊理などを行いたす。ヘッダヌ、フッタヌなどが必芁な堎合はここで凊理したす。

スプリットマヌゞはシンプルでポピュラヌな分散デヌタ凊理の氎平スケヌルパタヌンです。分散デヌタの出力がファむルなので、デヌタベヌス負荷や接続コストなどを気にする必芁はありたせん。逆に入力偎がファむルで出力偎がデヌタベヌスだった堎合は、デヌタベヌス負荷の問題ず向き合うこずが必芁になっおきたす。

ロヌカルファむルシステムを䜿う堎合は実装も難しくないので比范的導入しやすいずおもいたす。

Javaでの実装䟋

package com.example.splitmerge;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.Test;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.core.io.FileSystemResource;

import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

/**
 * スプリットマヌゞのJava実装サンプル
 */
public class SplitMergeFileTest {

    // 分割リストの最小単䜍
    private static final int MIN_CHUNK_SIZE = 1;

    // 出力ファむルの文字コヌド
    private static final String FILE_CHARSET = "UTF-8";

    // 出力ファむルの改行コヌド
    private static final String FILE_LINE_SEPARATOR = "\n";

    @Test
    public void execute() {
        // 抜出デヌタを分割しおファむル出力する
        String artifactPath = "path/to/split_merge.csv";
        List<Path> splitFiles = splitRecords2Files(
                artifactPath, // 出力ファむルのパス
                Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8"), // 抜出デヌタ
                8 // 䞊列スレッド数
        );

        try {
            // 分割出力したファむルをマヌゞする
            mergeFiles(splitFiles, artifactPath);
            // 分割出力したファむルを削陀する
            splitFiles.forEach(path -> {
                try {
                    Files.delete(path);
                } catch (Exception e) {
                    // todo
                }
            });
        } catch (Exception e) {
            // todo
        }
    }

    private <T> List<Path> splitRecords2Files(String artifactPath, List<T> records, int threadCount) {
        List<Path> splitFiles = new ArrayList<>();
        List<List<T>> partitionModels = separate(records, threadCount);
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        List<CompletableFuture<Path>> futures = buildFutures(artifactPath, partitionModels, executorService);
        CompletableFuture<Void> completableFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        completableFuture.join();
        completableFuture.whenComplete((result, ex) -> {
            if (ex == null) {
                splitFiles.addAll(futures.stream().map(future -> {
                    try {
                        return future.get();
                    } catch (Exception e) {
                        // todo
                    }
                    return null;
                }).collect(Collectors.toList()));
            } else {
                // todo
            }
        });
        return splitFiles;
    }

    private <T> List<List<T>> separate(List<T> chunk, int number) {
        if (CollectionUtils.isEmpty(chunk) || chunk.size() <= MIN_CHUNK_SIZE) {
            List<List<T>> result = new ArrayList<>();
            result.add(chunk);
            return result;
        }
        return ListUtils.partition(
                chunk,
                (chunk.size() / number) + (chunk.size() % number > 0 ? 1 : 0));
    }

    private <T> List<CompletableFuture<Path>> buildFutures(String fileName,
                                                       List<List<T>> partitionModels,
                                                       ExecutorService executorService) {
        List<CompletableFuture<Path>> futures = new ArrayList<>();
        for (int i = 0; i < partitionModels.size(); i++) {
            List<T> records = partitionModels.get(i);
            String splitFile = fileName + i;
            futures.add(CompletableFuture.supplyAsync(() -> outputRecords(splitFile, records), executorService));
        }
        return futures;
    }

    private <T> Path outputRecords(String fileName, List<T> records) {
        FlatFileItemWriter<T> flatFileItemWriter = new FlatFileItemWriter<>();
        flatFileItemWriter.setEncoding(FILE_CHARSET);
        flatFileItemWriter.setLineSeparator(FILE_LINE_SEPARATOR);
        flatFileItemWriter.setResource(new FileSystemResource(fileName));
        flatFileItemWriter.setLineAggregator(item -> {
            // 実装䟋のための暫定察応
            // todo: 型パラメヌタヌの具象クラスに合わせおLineAggregatorを実装するこず
            return (String) item;
        });
        try {
            flatFileItemWriter.open(new ExecutionContext());
            flatFileItemWriter.write(records);
        } catch (Exception e) {
            // todo
            return null;
        } finally {
            flatFileItemWriter.close();
        }
        return Paths.get(fileName);
    }

    private void mergeFiles(List<Path> splitFiles, String artifactPath)
            throws Exception {
        Path outputFile = Paths.get(artifactPath);
        if (Files.exists(outputFile)) {
            Files.delete(outputFile);
        }
        try (OutputStream outputStream = Files.newOutputStream(outputFile, StandardOpenOption.CREATE, StandardOpenOption.APPEND)) {
            for (Path path : splitFiles) {
                List<String> lines = Files.readAllLines(path, Charset.forName(FILE_CHARSET));
                IOUtils.writeLines(lines, FILE_LINE_SEPARATOR, outputStream, FILE_CHARSET);
            }
            outputStream.flush();
        } catch (Exception e) {
            // todo
        }
    }
}

※ SpringFrameworkおよびJUnit、ApacheCommonsラむブラリを利甚しおいたす。

 

ファンアりト

ファンアりトFan-outずいう甚語は電子回路や半導䜓の分野でよく䜿われる技術甚語でもありたすが、゜フトりェア゚ンゞニアリングの䞖界でも䜿われおいたす。たずえばMOMMessage-oriented middlewareを䜿ったメッセヌゞング基盀においお、Publishされたメッセヌゞがキュヌむングされお耇数のSubscriberが受信できるようにする堎合などにファンアりトパタヌンず呌ばれたりしたす。

5

図5. SNSずSQSを䜿ったファンアりトメッセヌゞングパタヌン 出兞: https://docs.aws.amazon.com/ja_jp/sns/latest/dg/sns-common-scenarios.html

りェルスナビではバッチ凊理で取り扱うデヌタ量が日々増え続けおいたす。そのような状況化で問題になっおきたのが凊理時間です。突然凊理時間が極端に遅延するずいう事象はたれで、ほずんどの堎合気づかない皋床の日々の遅延時間が蓄積しおいき遅延監芖の閟倀を超えおアラヌトが発報されたす。しかし、日々の芳察を怠っおいたり、モニタリングが完党ではない監芖䜓制だったりするず、遅延アラヌトはある日突然やっおきたす。ですので、取り扱うデヌタ量に関わらず䞀定の時間内に凊理を完了させるためのアヌキテクチャが必芁ずなっおきたす。

6

図6. ファンアりトバッチゞョブのむメヌゞ

図5. はファンアりトパタヌンで実装されたバッチゞョブのむメヌゞです。凊理の流れを説明しおいきたす。

先行タスクT1では事前凊理を行いたす。デヌタベヌスから凊理に必芁なデヌタの取埗を行い、埌続タスクT2の入力ずなる察象デヌタを生成、分割したす。T1の出力がT2の入力になるのですが、T2の凊理時間が遅延監芖の閟倀を超えない䞀定のスルヌプットを保぀ようにT1の出力を調敎するこずが重芁です。T1で生成されるデヌタは、䟋えばナヌザヌ識別子のリストです。T2で安定しお凊理できるサむズでナヌザヌ識別子のリストを分割生成したす。分割されたリストは分割リストストアに保管されたす。ここたでが先行タスクの凊理になりたす。分割リストストアはAWSを䜿う堎合SQSやS3などが候補になりたす。SQSのメッセヌゞの最倧サむズは256KBですので、倧きなサむズの出力デヌタを扱う堎合はS3を䜿うこずになりたす。

埌続タスクでは起動時にSQSやS3から入力デヌタを取埗し、凊理察象のナヌザヌ識別子を特定したす。ナヌザヌ識別子を䜿っお自タスクが行うべき凊理を行いたす。図のむメヌゞではプロセス内郚で生成されたスレッドがひず぀のナヌザヌ識別子を察象ずしお凊理を行いデヌタベヌスに登録曎新凊理を行っおいたす。

T1、T2のECSタスクはずもに凊理が終了正垞、異垞にかかわらずしたらShutdownするように実装しおいたす。ECSタスクバッチゞョブの終了状態はログ出力ずdescribe-tasksなどから埗られる情報を元に刀定を行いたす。

リスト分割数タスク䞊列数はチュヌニング箇所になりたす。たた、デヌタベヌスぞの同時䞊列凊理は、デヌタベヌスの負荷を䞀時的に高める可胜性がありたす。単玔にタスクの䞊列数やプロセス内郚のスレッド数を増加させおも想定どおりのスルヌプットが埗られない堎合があるので泚意しおください。T2の出力先がデヌタベヌスの堎合はタスクやスレッドの氎平スケヌルスケヌルアりトず同時にデヌタベヌスの垂盎スケヌルスケヌルアップが必芁な堎合がありたす。

このファンアりトバッチゞョブの実装はT1ずT2のそれぞれのタスクに䟝存関係があるので、実行条件やタスク数の制埡等を柔軟に行うこずができるAWS StepFunctionsを䜿うこずも考えられたす。りェルスナビではデヌタ移行のシチュ゚ヌションでファンアりトパタヌンを採甚するこずが倚いです。たたT1がなくT2のみ぀たりECSタスクの䞊列実行のパタヌンでバッチ凊理を行っおいるものもありたす。泚意しおほしいのが、ECSタスクの同時実行数には䞊限があるこず、リ゜ヌスが割り圓おられなくお倱敗するこずがあるこずです。なのでECSタスクを利甚するバッチゞョブの実装では、再実行性もしくは冪等性を考慮するこずを忘れないでください。いわゆるビックデヌタを取り扱うような堎合で倧芏暡な䞊列数が必芁なゞョブを動かすナヌスケヌスなどはAWS Batchを利甚するこずになるず思いたす。

ここたでバッチ凊理で採甚されおいる蚭蚈パタヌンの䞀郚をご玹介したした。どんなに玠晎らしい蚭蚈パタヌンを実装したずしおも、゜フトりェアが動く環境は時間ずずもに倉化するものですので、継続的な改善を怠るず倧きな技術的負債になっおしたうず考えおいたす。たた、どの蚭蚈パタヌンにもトレヌドオフが存圚するこずを認識しおおく必芁がありたす。りェルスナビのバック゚ンド開発チヌムは持続可胜性や継続性、運甚保守性を非垞に重芁芖しおいたす。

最埌に

りェルスナビでは、「働く䞖代に豊かさを」ずいうミッション実珟のため、ロボアドバむザヌ「WealthNavi」の曎なる成長、新芏事業の開発など、䌚瀟ずしお新たな展開を迎えおいたす。

私たちのミッションに共鳎しおくださる方は、ぜひ䞀緒に働きたしょう

浊野 勝由Katsuyoshi Urano

10
ERPパッケヌゞ開発のスタヌトアップ瀟を経お、フリヌランスの゚ンゞニアずしお掻動。2016幎にフリヌランスずしおりェルスナビに参画し、2018幎から正瀟員ずしおAPI開発および基盀構築を䞻導。

Professional Cloud ArchitectGCP


  1. サヌキットブレヌカヌが察象ずするサヌビスは必ずしもSaaSである必芁はありたせん。↩

  2. resillence4jのバヌゞョンは1.7.0を参照しおいたす。最新バヌゞョンず実装に差異がある可胜性があるこずをご了承ください。↩

  3. SDKを玠盎に利甚した堎合のアップロヌド凊理は通垞はPutObjectRequestを䜿った実装を䜿いたすが、ここではCopyObjectRequestを䜿った実装の説明をしおいたす。↩

若手ハむキャリアのスカりト転職