読者です 読者をやめる 読者になる 読者になる

CQRS+ESについて細かい実装や考察をまとめてみた

前提

私は「エリック・エヴァンスドメイン駆動設計」を読んだのみで、「実践ドメイン入門」は未読(とても欲しい)の状態で書いています。 (「実践ドメイン入門」にはもっと深い洞察が書いてあるのだと思います…) また「エリック・エヴァンスドメイン駆動設計」と共に、参考リンク先をまず読んだことがある前提で書いている部分がありますのでご注意下さい。今後もっと良いアイデアが浮かんだり、実際に実装するにあたって浮かび上がった課題があれば随時追記・記事にできればいいなと思います。

参考

CQRS+ES

DDDから更にData Flowの概念を取り込んだarchitecture。 コマンドクエリ分離原則(CQS/Command Query Separation)に基づく。 CQRSは Command Query Responsibility Segregation の略。 ESはEvent Sourcingの略。

それぞれCommand側・Query側についてまとめていく。

Command (Write)側

Command側のフロー

まずCommand側のフローとしては以下のようになっている

  1. ユーザーは任意のアクションを実行する
  2. クライアントがアクションをCommandとしてサーバーへ投げる。(例:名前の変更/ChangeName)
  3. サーバーがCommandを受け取る
  4. ドメインモデルがCommandを解釈し、ドメインに基づいた処理を実行する(例:名前を変更する)
  5. ドメインモデルは結果となるEventを生成する(例:名前が変更された/ChangedName)
  6. Eventを保存する

Command受付からドメインモデルでの処理へ

Command と Event

Commandは指示、Eventは指示の結果として起きた出来事。意図がまったく異なるので混同しないように注意が必要。クラスとしても分けるべき。命名にもルールがあって、Commandは常に命令形で定義(例: Change Name)し、Eventは常に過去形で定義する(例: Changed Name)

タスクベースのUIで考える

コマンドとして扱うにはタスクベースのUIにするのが良い。

よく見かけるのはCRUDや管理画面等で表の状態を表示し、「保存」ボタンがあり、各formとデータが関連して定義されたDTO(DataTransferObject)が存在し、「保存」アクションで関連付けられたDTOをそのまま送信する、などのパターンだが、このようなものとは異なる。

例えば名前を変更するときは、名前を「保存/更新する」のではなく「名前の変更」というタスクとして切り分け、DTOをそのまま送信しない。「名前の変更」コマンドの中身は、変更者のユーザーID、コマンド名、変更後の名前のみとなる。

UIのためのデータ生成はコストが大きい

ドメインオブジェクトをDTOに変換するのはDataMapperを必要とするうえ、カプセル化して守られているはずの中身をさらけ出す行為に等しい。すべてのモデルから、dumpするためのメソッドを用意するか、このためだけにgetterを生やすことになってしまう。もっとキレイに書くこともできるかもしれないが、いずれにしてもカプセル化し閉じておくべきデータを外に出す入り口を作ってしまう。

[DomainObject] => data mapping => [Data Transfer Object] => client

CommandとQueryを分け、Query側には薄いDataLayerから直接DTOを生成すれば、ドメインオブジェクトはCommand側でのみ使われることになり、カプセル化を破ってまでDTOを生成する必要はなくなる。

EventStoreへの保存からSnapShot作成へ

CommandをEventに変換してEventStoreに保存してからの振る舞い、実際の実装方法においては詰めたい点が残る。CQRSについて述べているものを見ても、以下のように分かれている。

  • EventStoreを保存して、そのEventStoreから読み出してSnapshotを作る(By Greg)
  • EventStoreに保存すると同時にEventHandlerにEventを送る(By Nijhof)

両者とも理屈はわかる。まずGreg側はトランザクションに守られないならばflowは分岐せず1つであるべきという意見。この実装においてはどのEventをHandlingしたか、が分かるようにしなければならない。Gregによると通し番号(sequence)を付けておくのがよい、と書かれている。

対して、NijhofはRead側をRDBMSを想定しているのでインピーダンスミスマッチは起こるが吸収できれば問題なさそう。

EventStore Schema

Event Table, Aggregate Table, SnapShot Table が必要になりそう。

Event Table

発生したイベントを保存するテーブル。 INSERTで追記するのみで、UPDATE/DELETEはしない。 イベントはこの後でEventHandlerに渡すので、ドメイン処理した環境とは 別の環境でも読み取れるよう、JSONなどの汎用フォーマットにしておくとよさそう。

Name Data type Content
Sequence Number Big Int イベント通し番号 (Auto Increment) [PK]
Aggregate ID String 集約ルート ID (UUID)
Data Blob シリアライズされたEventデータ
Version Int 集約ルートデータバージョン
Created At Timestamp イベント発生日時

Aggregate Table

イベント対象となる集約ルートの最新バージョンを保持する。

Name Data type Content
Aggregate ID String 集約ルート ID (UUID) [PK]
Type VarChar 集約ルート名
Version Int 集約ルートデータバージョン
Updated At Timestamp 更新日時

Snapshot Table

イベントをversionの順に適用していった結果の集約ルート状態を保存しておくテーブル。 ドメイン処理した集約ルート状態を再現したインスタンスシリアライズしたものを入れておく。

Name Data type Content
Aggregate ID String 集約ルート ID (UUID) [PK]
Sequence Number Big Int イベント通し番号 (Auto Increment)
Serialized Data Blob シリアライズされた集約ルートのスナップショットデータ
Version Int 集約ルートデータバージョン
Updated At Timestamp 更新日時

格納手順

まずAggregateテーブルからAggregateIDでSELECTする

SELECT version
  FROM aggregates
 WHERE aggregate_id = [aggregateId]

空の場合はaggregate_idとversionを0にして結果セットとする

得たaggregate結果セットrowのversionをwhere句に追加し、一致したときにINSERTするようにする。 後続のeventのrowも続けて入れていく。

aggregatesの最新バージョンを得ておく

SELECT version
  FROM aggregates
 WHERE aggregate_id = [aggregate_id]
   FOR UPDATE

トランザクションを開始する

BEGIN

イベントを追加していくが、追加されるイベント数分のversionを上げて更新する

UPDATE aggregates
   SET version = version + [after_version]
 WHERE aggregate_id = [aggregate_id]
   AND version = [expected_before_version]

ただしく成功されたか(影響行数が1になっているか)確認して続ける。 FOR UPDATE 文なので大丈夫なはずだが、万一失敗したらここでrollbackする

個々のeventをinsertしていく

INSERT INTO events(aggregate_id, data, version) VALUES([aggregate_id], [serialized_event_data_1], [version_1]);
INSERT INTO events(aggregate_id, data, version) VALUES([aggregate_id], [serialized_event_data_2], [version_2]);
INSERT INTO events(aggregate_id, data, version) VALUES([aggregate_id], [serialized_event_data_3], [version_3]);
INSERT INTO events(aggregate_id, data, version) VALUES([aggregate_id], [serialized_event_data_4], [version_4]);

最後にcommitする

COMMIT

ここまでで Commandとしてのフローは完了するので、Clientにレスポンスを返していく。

SnapShot生成

backgroundのプロセスがeventテーブルを監視しておき、前回処理した最終のsequence_numberからすべてを取得していく

SELECT sequence_number, aggregate_id, data, version
  FROM events
 WHERE sequence_number > [last_processed_sequence_number]
 ORDER BY sequence_number ASC;

これを順次処理していく。取り出したaggregate_idから前回までのsnapshotを取り出す

BEGIN
SELECT aggregate_id, sequence_number, serialized_data, version
  FROM snapshots
 WHERE aggregate_id = [aggregate_id]
   FOR UPDATE

取り出したデータをunserializeして集約ルートを復元し、Eventを順次適用していく 最後のEventまで適用が終わったらupdateしていく

UPDATE snapshots
   SET sequence_number = [sequence_number],
       serialized_data = [serialized_data],
       version = [version]
 WHERE aggregate_id = [aggregate_id]

完了したらcommitする

COMMIT

SnapShotからの復元

まず最初は同様にAggregateテーブルからAggregateIDでSELECTする

SELECT version
  FROM aggregates
 WHERE aggregate_id = [aggregateId]

snapshotをAggregateIDから取得

SELECT aggregate_id, sequence_number, serialized_data, version
  FROM snapshots
 WHERE aggregate_id = [aggregate_id]

eventsからsnapshotからの差分を取得する。 得たeventを適用(replay)していき、集約ルートを構築していく

SELECT sequence_number, data, version
  FROM events
 WHERE aggregate_id = [aggregate_id]
 ORDER BY sequence_number ASC

データの流れで問題になりそうなところ

Command(Write)側では複雑なQueryは使えない

Gregも言っているとおりEventSourcingの場合、Command(Write)側で扱えるQueryはPrimaryKeyに基づくものだけになる。 このパターンはユーザーIDで紐づくのはUserの集約になるので、ほぼユーザーデータをまるごとになり、大きめのデータを扱うことになる。 Command側ではこの制約下でも問題ないように作る必要がある。

Eventを記録するときは状態を変化させない

これが一番理解に時間がかかった点ですが、納得してみれば確かにな、と思った箇所。

集約ルートがEventを受け取ったらEventStoreに登録(Apply)するだけにとどめ、集約ルート自身の状態を変化させない。 これまでの過去の結果(snapshot+events)である集合ルートに対して、Eventを生成し、EventStoreに登録するのみ。

これはつまり、ある集合ルートはnameプロパティを持ち、fooという文字列が設定されているとした場合、この集合ルートのnameをbarに変更するコマンドChangeNameCommandを渡したとすると、 nameをfooからbarに変更するイベントChangedNameをEventStoreに登録するが、実際にnameプロパティは変更せず、fooのままにしておくということです。

これは、集合ルートに適用したとしてもその結果を返すことはしないので、状態を変化させることに意味はないからだと考えます。 Snapshotを作る時には過去の結果(snapshot+events)に今回のEventを適用した状態をSnapshotにするので問題にはならないし、Snapshotを作る前であっても過去の結果(snapshot+events)の中に入ってくるのでこれも問題にはならない。

Push型か、Pull型か

クライアントからのCommandを実行し、Eventを登録するまでは同じですが、EventHandlerをkickする(push型)かpolling(pull型)かで処理の流れが異なる。

タイプ 特長
Push型 Commandの処理を行う際に、EventHandlerにEventを流す
Pull型 Commandの処理を行う際に、Eventの登録まで行いPollingして検出する

EventSchema(Write)側のデータから、ReadModel(Read)側へデータが流れるとき Read側がEventSourcingでなく、RDBMSで状態を表す伝統的な仕組みの場合は一度実行されたイベントが再度実行されてはならない。

Push型

EventHandlerにPushするならば、同期的にすべて行うとパフォーマンスに影響が出るので非同期的に行うほうが良い。ただ処理結果は待たずに返さないと結局処理を待つことになるので、指示だけを送って早々にレスポンスを返す事になりそう。

Pull型

Pull型のもっともシンプルな流れでは、下記の3段階でReadModelに適用が行われると思われる

  1. EventのQueueからの取り出し
  2. QueueからReadModelへの適用
  3. EventのQueueからの削除

このとき、2までは成功したが3に失敗したとき、2をrollbackできなくてはいけない。 QueueとReadModelにまたがってトランザクション処理が行えれば良いが、将来的なスケーラビリティを確保するためにも分離されていたほうが良いと思われる。

それぞれトランザクションを用意する

時系列でこんなかんじ

| => begin
o => commit
d => dequeue
p => published
@ => insert/update

Queue   ---|----d--------------p------o-------------------->
RDBMS   -----------|-----@----------------o---------------->

便宜上RDBMSのcommitはqueueを先にRDBMSを後にしたが、どちらが先でも構わない。 大事なのは、QueueもRDBMSも処理を終え、成功を確認したらcommitするということ。

EventHandlerもそれなりに大変

Eventを生成することもDDDドメインモデルの振る舞いが大切になるが、 EventをHandlingしてReadModelに落とす場面も注意深く実装しなくてはいけない。

Read側もEventSourceを格納するのか、正規化してRDBMSに落とし込むのか

からなずしもEventSourcing形式でなくても良いが、RDBMS側はしっかり考えて設計を行う必要がある。なんらかのCommandで、関連する集合を引く必要がある時に、EventStoreデータだけでは引けないことがある。この場合、ReadModelから引くがAggregateIDがないと集合ルートを発見できないので、RDBMSにもAggregateIDを関連させて保存しておく必要がある。

RDBMSに正規化して保存するなら、インピーダンスミスマッチを考慮しつつ、「読み込みやすいこと」「パフォーマンスが出ること」を優先して保存していく。場合によっては非正規化したViewを用意することも考慮に入れていく。

最初は1プロセスでRDBMS保存まで行ってもいい

最初からMicroservice化してしまうとコストが回収できない場合があるので、最初からやりすぎないことが大事。

Query (Read)側

ここまでで非常によく練った状態でRDBMSに格納していったので、Query側は取り出すだけでよい。というかそのようにする。パフォーマンス、スケーラビリティを考慮してなるべくシンプルに保つと良さそう。

まとめ

DDDの弱点であるコストが大きくかかる点であったり、DTOへの変換がOOP原則に照らして辛い部分などをクリアする、とてもよく考えられた設計だなと感じました。これまでのいわゆるMVCなどのアーキテクチャに慣れていた人(自分)には結構なマインド変換が必要なものだなと。 まだ実際には稼働させていないのですべて机上の考察なので、もっと深い考察が出て来るでしょうが、楽しくなりそうです。