エムスリーテックブログ

エムスリー(m3)のエンジニア・開発メンバーによる技術ブログです

Axon Framework で簡単にEventSourcing+CQRSなアプリケーションを作る

この記事はエムスリーAdvent Calendar 2023の13日目の記事です。

こんにちは、製薬企業向けプラットフォームチームエンジニアの桑原です。

前回のJJUG CCC の登壇についてのブログで Axon Framework について軽く触れました。今回はAxon Frameworkがどのようなもので、どういった使い方をするかを紹介したいと思います。

背景:CommandとQueryに最適なモデルが異なる

上述のリンクで紹介したメッセージ配信システムの設計では、アプリケーション特性の違いからメッセージ配信を担うCommandとメッセージ閲覧を担うQueryに分けました。 結果的にCQRSのようになりましたが、最初からCQRSを目指していたわけではなく、データモデルは1つのものを考えていました。

CommandとEvent追記型との相性は良かった

データモデリングはイミュータブルモデルの考えを参考に、リソースとイベントに分けて考えました。イベントは更新や上書きせずに追記する考え方はCommandの実装ととても相性が良かったです。

QueryがEvent追記型との相性は良くなかった

メッセージには 送信,開封,削除 などのイベントが紐付けられ、1つのメッセージに対して複数の開封イベントが発生することもあります。

メッセージリソースとイベントの関係

そして以下のようなテーブルの状態になってる場合を想定します。

message
message_id title body
1
hello axon
this is message body
2
not expected to be read
this message is not expected to be read
message_read
message_id read_datetime
1
2023-12-13T10:00:00+09:00
2
2023-12-13T11:00:00+09:00
1
2023-12-13T12:00:00+09:00
message_read_canceled
message_id canceled_datetime
2
2023-12-13T12:00:00+09:00

message_id:1 に対しては開封レコードが2件入ってます。
message_id:2 に対しては開封レコードが入っているもののその後開封取り消しされています。

この状態で、メッセージ一覧画面などで現在有効な未読一覧を検索したい場合、一筋縄ではいかなくなります。

  • SQLで表現可能?
    message , message_read, message_read_canceled を結合し、message_read にレコードがなければ未読。 または message_read_canceled の最新日が message_read の最新日付よりも未来日であれば未読。
    などをSQLで表現するのはとても骨が折れそうです。

  • アプリで現在状態の導出?
    またはアプリでイベントを取得し直して再計算すれば実現はできそうですが性能が満足に出るとは到底思えません。

苦肉の解決策

そのままだとクエリに耐えられないことは容易に想像できたので、メッセージ配信システムではイベントを保存すると同時にクエリ用のステータスも更新することにしております。

message
message_id title body status
1
hello axon
this is message body
READ
2
not expected to be read
this message is not expected to be read
UNREAD

せっかくイベントの追記してるのに、状態の更新もしないといけないのは二度手間感も少々ありますね。

事実を記録するという観点でイベントをそのまま追記していくのはとても有効な設計なのですが、やはりそのままだとクエリで厳しくなり、データモデルにステータスなどをもたせるなどの対応が必要になってしまいそうです。かといってクエリ用のリードモデルの作成となると一気に考えることが増えそうです。

Axon Framework

前回の JJUG CCC 2023 Fall で似た内容でこの悩みを解決されていた事例がありました。
実践Pub/Subマイクロサービス――SpringとAxonで作る疎結合でスケーラブルなシステム

このセッションでAxon Framework が使用されており、とても面白そうだったので最近私も家で遊び始めてます。

ざっくりアーキテクチャ

Axon Framework はDDDとCQRSをベースとしたマイクロサービス用のフレームワークです。Axon Frameworkとは別にAxon Serverも必要になり、Axon Framework は Axon Server とのやり取りを隠蔽してくれる立ち位置にいます。ざっくりとしたアーキテクチャは Architecture Overview の図が参考になると思います。

https://docs.axoniq.io/reference-guide/architecture-overview から

開発者が意識すべきは主に3点です。

  • Commandを受け取って(生成して)Axon Server に渡す。
  • Axon Server から通知されるEventを受け取り、read用のモデルReadModelを生成する。
  • QueryはReadModelから欲しい情報を取得する。

イベントの保存と送受信を全部Axon Frameworkがやってくれるのでビジネスロジックの開発に注力して高機能なCQRSを開発することが可能になります。更に、Spring Boot Integration を使用すれば Axon Server へのデータ送信のためのコンポーネントやイベント検知のためのアノテーションが提供されるため、開発者は外部システムの存在を意識することなくメソッドを呼び出すだけで済んでしまいます。

実際に Axon Framework を使って、先程の Command/Query 用のモデルをどのように構築していくかを確かめていきます。

Command

Eventの登録

  • Axon ServerへCommandを渡す
@RestController
public class MessageRestEndpoint {

    private final CommandGateway commandGateway;

    @PostMapping("/create-message")
    public CompletableFuture<Void> createMessage(@RequestBody MessageCreate messageCreate) {
        String messageId = UUID.randomUUID().toString();
        return commandGateway.send(new CreateMessageCommand(messageId, messageCreate.title, messageCreate.body));
    }

    @PostMapping("/{messageId}/read")
    public CompletableFuture<Void> readMessage(@PathVariable String messageId) {
        return commandGateway.send(new ReadMessageCommand(messageId));
    }
}

public class CreateMessageCommand {
    @TargetAggregateIdentifier
    private final String messageId;
    private final String title;
    private final String body;

    // getter,setter,equals,hashCode,toString...
}

Commandの発行自体は CommandGateway を経由するだけです。

  • Axon Server から CommandHandlerを呼び出し

Commandを発行するとAxon Framework により @CommandHandler 付与されたメソッドを呼び出され、ここでCommandからEventを生成して発火します。

@Aggregate(snapshotTriggerDefinition = "messageAggregateSnapshotTriggerDefinition")
public class MessageAggregate {

    @CommandHandler
    public MessageAggregate(CreateMessageCommand command) {
        apply(new MessageCreatedEvent(command.getMessageId(), command.getTitle(), command.getBody()));
    }
}

public class MessageCreatedEvent {
    private final String messageId;
    private final String title;
    private final String body;

    // getter,setter,equals,hashCode,toString...
}

ここで apply された MessageCreatedEvent はAxon Server へ送信され、イベントジャーナルとして永続化されます。図の右上Event Storeに該当します。

永続化されたイベント

Command側のイベントの登録は以上です。Axon Frameworkにより永続化層のことは何も触れることなく、Eventが保存されています。また、永続化先はRDBやKafkaなどに対応しているため、要件に沿った製品の選定も柔軟に選択できます。

EventからReadModelへのマッピング

readmodelへのマッピング

  • EventHandlerの呼び出し

Eventが永続化されると、Axon Framework から @EventHandler付与したメソッドを呼び出されます。ここでEventからQuery側に都合のいいReadModelの作成を行えます。

@Service
@ProcessingGroup("messages")
public class InMemoryMessagesEventHandler implements MessagesEventHandler {

    private final Map<String, Message> messages = new HashMap<>();

    public InMemoryMessagesEventHandler(QueryUpdateEmitter emitter) {
        this.emitter = emitter;
    }

    @EventHandler
    public void on(MessageCreatedEvent event) {
        String messageId = event.getMessageId();
        messages.put(messageId, new Message(messageId, event.getTitle(), event.getBody()));
    }

    @EventHandler
    public void on(MessageReadEvent event) {
        messages.computeIfPresent(event.getMessageId(), (messageId, message) -> {
            message.setMessageStatusRead();
            message.setLatestReadDateTime(event.getReadDateTime());
            emitUpdate(message);
            return message;
        });
    }
}

MessageReadEvent のイベントハンドラ内で、 message.setMessageStatusRead() を呼び出しています。このタイミングでイベントを元に、更に必要があれば現在や過去の状態を元にReadModelに対して検索用のステータスを導出、保持させることができます。Eventと検索用のReadModelが分離されてスッキリします。

ここでは簡素化のため private final Map<String, Message> messages = new HashMap<>(); に対してReadModelの構築をしていますが、永続化先はRDBやNoSQLも選択可能です。自由に記述できるのでここでも要件に沿った選定が可能です。

Query

ReadModelからの読み込み

Queryではすでに特化したReadModelが作成されているので、取得するだけです。

まとめ

とても簡単にEventSourcing + CQRSのアプリケーションが作成できてしまいました! Axon Frameworkを活用することでビジネスロジックの開発に注力しても高機能なCQRS環境が作成できてしまうので、とても強力なフレームワークだと感じています。ここでは紹介しきれていませんが新たなReadModelを作成したい場合はEventのReplay機能など魅力的な機能がまだまだ他にもあるようですので、どんどん触ってみたいと思います。

参考記事

We are Hiring!

エムスリーはまだまだJavaやKotlinなどのJVM系言語によるシステム開発も行っております! 興味を持たれたらぜひこちらから!

jobs.m3.com