Jun 11, 2024

An end-to-end ES/CQRS example with EventStoreDB and Phoenix/LiveView

Yevhenii Kurtov
Yevhenii Kurtov

Greetings to all Elixir enthusiasts!

I’m happy to present my take on implementing ES/CQRS systems with Phoenix/LiveView and EventStoreDB. I believe those pieces of technology grew from the same conceptual root and are a great match.

If you have ever felt joy from expressing a dimension of time with GenServer or satisfaction from LiveView’s stateful nature, you will feel right at home. For everyone else - welcome aboard, and let’s see where we can get by the end of the text.

This post is dedicated to all developers who suffered from what starts as an innocuous status field and then is sequenced into a series of entities, one per state transition, bound together by an invisible spell passed onto newcomers with the words “Oh, that…”

Context

For this tutorial, we will implement a loan processing system as presented by Tony Young in his webinar ‘An end-to-end example with EventStoreDB’ . It covers the following requirements:

  • Consumers can apply for loans
  • To protect a consumer’s privacy, we collect as little information as possible. We request their National ID number, name, address, and the amount of the loan they want
  • We use their National ID number to check their credit score with a credit clearing house
  • If a loan application has a credit score
    • >= 7, approve automatically
    • <=4, deny automatically
    • Otherwise, send it to an underwriter to manually decide if it should be approved

High level overview of the flow:

Event Flow

Please notice that there will be no conceptual gap between the diagram and the implementation. It will follow the event model to the letter, creating a powerful medium for stakeholder collaboration.

Full application source is available at https://github.com/lessless/lepret (from french le prêt - the loan)

Disclaimer: The presented code serves the purpose of communicating ideas and can be further refined and improved upon.

Commanding system to request a loan

Our first step is to place a Request Loan command with three bits of the data provided by a user:

  • name
  • national id
  • amount

loan request flow

For the system to be able to process the command, the data should meet the following criteria:

  • all three fields must be present
  • the amount must be a positive integer greater than zero
  • the national ID must conform to a specific format.

Those are application-level/input validations, not business-rules validations like “a person with the same national ID shouldn’t have more than one request opened at the same time”, for example.

That is a task of “validating a command to request a loan” rather than “validating loan request”, meaning that we want to ensure that we have data to express the intention of requesting a loan.

💡
Commands express intent, are named in the present tense and can be rejected. Events represent observable facts that have already happened, are named in the past tense, and are expected to be always processable.

Now, let’s also assume that there will be more than one channel to request a loan via browser and JSON API for the completeness of the example. In that case, we want to keep those validations in one module and reuse them across all channels.

We can call it the RequestLoan Command and put it under the Command namespace, i.e. Lepret.Command.RequestLoan.

Here is how such a command can look like:

defmodule Lepret.Commands.RequestLoan do
  use Ecto.Schema
  import Ecto.Changeset

  @primary_key {:id, :binary_id, []}

  embedded_schema do
    field :name, :string
    field :amount, :integer
    field :national_id, :string
  end

  def build() do
    %__MODULE__{}
  end

  def build(attrs) do
    attrs
    |> changeset()
    |> apply_action(:build)
  end

  def changeset(attrs \\ %{}) do
    %__MODULE__{}
    |> cast(attrs, [:name, :amount, :national_id])
    |> validate_required([:name, :amount, :national_id])
    |> validate_length(:name, min: 2)
    |> validate_number(:amount, greater_than: 0)
    |> validate_format(:national_id, ~r/^\d{6}[A-Za-z]{2}$/)
  end
end

and correspondingly it can be used in the slightly simplified loan request form component generated by the mix phx.gen.live:

defmodule LepretWeb.LoanRequestLive.FormComponent do
  use LepretWeb, :live_component

  alias Lepret.Commands.RequestLoan, as: RequestLoanCommand

  @impl true
  def render(assigns) do
    ~H"""
    <div>
      <.header>
        <%= @title %>
        <:subtitle>Use this form to manage loan_request records in your database.</:subtitle>
      </.header>

      <.simple_form for={@form} id="loan_request-form" phx-target={@myself} phx-change="validate" phx-submit="save">
        <.input field={@form[:name]} type="text" label="Name" />
        <.input field={@form[:amount]} type="number" label="Amount" />
        <.input field={@form[:national_id]} type="text" label="National ID (six numbers and two letters, e.g. 123456aa)" />
        <:actions>
          <.button phx-disable-with="Requesting...">Request Loan</.button>
        </:actions>
      </.simple_form>
    </div>
    """
  end

  @impl true
  def update(assigns, socket) do
    changeset = RequestLoanCommand.changeset()

    {:ok,
     socket
     |> assign(assigns)
     |> assign_form(changeset)}
  end

  @impl true
  def handle_event("validate", %{"request_loan" => request_loan_params}, socket) do
    changeset =
      request_loan_params
      |> RequestLoanCommand.changeset()
      |> Map.put(:action, :validate)

    {:noreply, assign_form(socket, changeset)}
  end

  def handle_event("save", %{"request_loan" => request_loan_params}, socket) do
    case RequestLoanCommand.build(request_loan_params) do
      {:error, changeset} ->
        {:noreply, assign_form(socket, Map.put(changeset, :action, :validate))}

      {:ok, _command} ->
        # we will place valid command on the next step
        {:noreply, socket}
    end
  end

  defp assign_form(socket, %Ecto.Changeset{} = changeset) do
    assign(socket, :form, to_form(changeset))
  end
end

See “Validate loan request” commit for full changes.

Message processing guarantees

Let’s take a step back now and think what’s ahead of us:

  • we want to kick off a new loan request process with the Loan Requested event
  • react on the Loan Requested event to get credit score
  • publish Credit Checked event with applicant’s credit score
  • make loan request decision by publishing one of Loan Request Approved, Loan Request Denied or Loan Approval Required

Consider what will happen in production: every node in the cluster will receive a Loan Requested message and go through the same flow and therefore, our app will request a credit check from an external system by the number of nodes in the cluster. So, how do we hit the external system just once?

This question cuts deep into the message processing guarantees, which is the most challenging topic of distributed systems programming in my experience. Getting it right is always tied to the needs of a specific system and requires an understanding of risks and tradeoffs. Fortunately, with Elixir and EventStoreDB, we have a couple of simple options that might be just good enough starting points:

  • A distributed lock through a shared storage: an advisory lock in PostgreSQL or a globally unique queue in Oban. I have not tried the former, but the latter comes with a necessity to agree on the risk of out-of-order message processing: your consumer can process the following message faster than the job runner processes the scheduled background job.
  • A persistent subscription in EventStoreDB. That comes with stronger ordering guarantees but requires additional effort to achieve “exactly once” guarantees when message processing fails too many times, and it was moved to the dead letter queue (was parked in ESDB nomenclature )
  • A Webhook connector . This functionality is currently in the preview stage and comes with the abovementioned out-of-order caveats.
You might be tempted to implement a Single Global Process. Go for it. Maybe you're one of those who can get it right. Please remember to publish the book on how you did that afterwards. I'll be among the first to preorder.

In our sample system we can cheat by asking users to cooperate with us and to try again in case of a failure. That primarily works when users are highly motivated to get the job done. In other cases, they may leave instead of waiting for a result.

But enough gloom. Let’s entertain ourselves with a technical “comfort food” by saying that we are changing the direction of coupling from events to commands: we will interact with a third-party system when processing a command instead of reacting to an event. That is also sometimes known as moving from orchestration to choreography.

event-based vs command-based coupling illustration From Balancing Choreography and Orchestration by Bernd Rücker

Here is the updated flow:

from choreography to orchestration

We will still publish multiple events: Loan Requested and one of Loan Request Approved, Loan Request Denied or Loan Approval Required because it makes sense from the process semantics perspective but that will happen whilst processing the RequestLoan command which we will enrich with the credit_score (see ParameterObject ).

📚
Stephen Tung and Yves Reynhout, during the discussion on the ESDB discord server, both pointed out that "exactly once" guarantees require an external system accepting an idempotency key. Please see Appendix A for details.

Application layer

Now that we have the Loan Request, we can place it against our system through the application layer. It’s not strictly necessary, but there is a good rationale behind doing so, detailed in the “Resources” section.

Enriching command

I added the credit_score field to the Loan Request command, and now it can carry the credit score we are about to obtain from the third-party service. Also I updated the build function to generate the id:

defmodule Lepret.Commands.RequestLoan do
   # same as before
+  alias Lepret.CreditScore

   embedded_schema do
     # same as before
+    field :credit_score, :map
   end

   def build(attrs) do
     attrs
     |> changeset()
+    |> put_change(:id, Ecto.UUID.generate())
     |> apply_action(:build)
   end

   # same as before
+  def enrich_with(%__MODULE__{} = command, %CreditScore{} = credit_score) do
+    Map.put(command, :credit_score, credit_score)
+  end
end

The Lepret.CreditScore is a value object backed by the Elixir struct

defmodule Lepret.CreditScore do
  defstruct [:score, :national_id, :ts]

  def new(national_id, score, ts) do
    %__MODULE__{national_id: national_id, score: score, ts: ts}
  end
end

Then I updated loan request form to invoke RequestLoan use case for valid RequestLoan command:

defmodule LepretWeb.LoanRequestLive.FormComponent do
+  alias Lepret.UseCases
   # same as before

   def handle_event("save", %{"request_loan" => request_loan_params}, socket) do
     case RequestLoanCommand.build(request_loan_params) do
       {:error, changeset} ->
         {:noreply, assign_form(socket, Map.put(changeset, :action, :validate))}

-      {:ok, _command} ->
-        # we will place valid command on the next step
-        {:noreply, socket}
+      {:ok, command} ->
+        save_loan_request(socket, command)
     end
   end

+  defp save_loan_request(socket, command) do
+    case UseCases.LoanRequest.run(command) do
+      :ok ->
+        {:noreply,
+         socket
+         |> put_flash(:info, "Loan successfully requested")
+         |> push_patch(to: socket.assigns.patch)}
+    end
+  end
end

I used fake credit score api client to enrich LoanRequest command with the CreditScore in the LoanRequest use case:

defmodule Lepret.UseCases.LoanRequest do
  alias Lepret.Commands.RequestLoan
  alias Lepret.CreditScoreApiClient

  def run(%RequestLoan{} = command) do
    {:ok, credit_score} = CreditScoreApiClient.for(command.national_id)
    _command = RequestLoan.enrich_with(command, credit_score)
    :ok
  end
end

That fake client returns a predefined credit score for a known national ID or a random credit score. Predefined scores fall into one of three different buckets: automatic approval, rejection, and manual review

defmodule Lepret.CreditScoreApiClient do
  alias Lepret.CreditScore

  def for(national_id) do
    _latency = Process.sleep(100)
    {:ok, CreditScore.new(national_id, predefined_credit_scores(national_id), DateTime.utc_now())}
  end

  defp predefined_credit_scores(national_id) do
    case national_id do
      "999999aa" -> 9
      "666666aa" -> 6
      "555555aa" -> 5
      "111111aa" -> 1
      _ -> Enum.random(1..10)
    end
  end
end

At the end of those changes, the enriched Request Loan command now carries credit score and client-controlled id

%Lepret.Commands.RequestLoan{
  id: "a1ac5824-177d-43e0-bcc0-8b03dad5e1a7",
  name: "Yev",
  amount: 1000,
  national_id: "123456aa",
  credit_score: %Lepret.CreditScore{
    score: 7,
    national_id: "123456aa",
    ts: ~U[2024-04-24 14:40:04.880633Z]
  }
}

See “Enrich RequestLoan command with credit score” commit for full changes.

Setting up EventStoreDB

As of April 2024, EventStoreDB is the ES/CQRS database, the only one that guarantees global and per-stream sequentiality of events. Whatever happens, your application can read events in the same order that they were published both from the global $all stream and aggregate(entity)-level streams. Check out Data Modeling in an Event Centric World to learn more.

Fortunately, it’s a breeze to start with. For the purpose of this tutorial, you can run it via Docker, as detailed in the “Docker” section of EventStoreDB’s “Installation” guide:

docker run --name esdb-node -it -p 2113:2113 eventstore/eventstore:latest --insecure --run-projections=All --enable-atom-pub-over-http

Apple Silicon users must use the alpha image eventstore/eventstore:24.2.0-alpha-arm64v8

If you happen to host your applications on Fly.io, it takes about 30 minutes to create a setup that is good enough for development purposes. The only impediment to building a production-grade cluster is a lack of IPV6 support .

Connecting to the EventStoreDB from an Elixir project

I’m a fan of the sharp and well-looked-after client Spear. Let’s plug it in and configure it to connect to the local EventStoreDB instance:

defmodule Lepret.MixProject do
  # same as before
  defp deps do
    [
      # same as before
      {:spear, "~> 1.4"}
    ]
  end
end
# in config/dev.exs
config :lepret, Lepret.EventStore.SpearClient, connection_string: "esdb://localhost:2113"

Later, we will sprinkle it up with few convenience functions to automatically derive stream names and deserialise events, so let’s put it under a wrapping module:

defmodule Lepret.EventStore do
  defmodule SpearClient do
    use Spear.Client, otp_app: :lepret
  end
end

Finally, add Lepret.EventStore.SpearClient to the app’s supervision tree:

defmodule Lepret.Application do
  # same as before

  @impl true
  def start(_type, _args) do
    children = [
     # same as before
      Lepret.EventStore.SpearClient,
      LepretWeb.Endpoint
    ]

   # same as before
  end

  # same as before
end

With those changes in place, you should be able to fetch events from the $all stream:

iex(1)> Lepret.EventStore.SpearClient.stream!(:all) |> Enum.take(1)
[
  %Spear.Event{
    id: "6878a9de-cfd0-461b-8a4b-09f72415d396",
    type: "$metadata",
    body: %{
      "$acl" => %{
        "$d" => [],
        "$mr" => [],
        "$mw" => [],
        "$r" => "$ops",
        "$w" => []
      },
      "$maxAge" => 2592000
    },
    link: nil,
    metadata: %{
      content_type: "application/json",
      created: ~U[2024-04-24 19:55:29.362346Z],
      stream_name: "$$$scavenges",
      prepare_position: 548,
      commit_position: 548,
      custom_metadata: "",
      stream_revision: 0
    }
  }
]

See “Setup SpearClient” commit for full changes.

Decisions resting on streams

Event sourcing is a state management pattern in which the current state is derived from an immutable log of previous events by folding that stream of events to the right.

Conceptually, the implementation amounts to the

stream_name = "#{Account}:#{account_id}"
events = EventStore.load(stream_name)
state = Enum.reduce(events, _initial_state = %Account{}, fn event, state -> Account.apply(event, state) end)
changes = Account.handle(command, state)
EventStore.publish(stream_name, changes)

Handling a command normally produces a new set of changes in the form of events that are later published to the same stream, leading to the event-based state transitions.

Alternatively, a system can reject a command, in which case nothing happens. In contrast, events should always be processed as they describe something that has already happened.

Optimistic concurrency

What happens when two different users want to introduce and change the same entity? We can address that with roughly the exact mechanism that relational databases use under the hood. Record the stream’s revision from the last known event and tell EventStoreDB that it’s the one we expect it to be at when publishing changes.

Such an explicit mechanism, plus the ability to understand the nature of change, gives greater control over data consistency: we can re-read the stream, get the conflicting event, and decide what to do.

# as before
EventStore.publish(stream_name, changes, List.last(events).revision)

EventStoreDB also provides a number of other options alongside the stream’s revision check to assert on its state such as Any, NoStream, EmptyStream and StreamExists.

Return changes or hide them inside an entity?

Until recently, I preferred to return an entity that would hide changes, which made the specific state management approach more swappable:

defmodule Account do
  @primary_key {:id, :binary_id, []}
  embedded_schema do
    field :balance, :integer
    field :__changes__, :map, virtual: true
  end
end

However, my colleagues practically convinced me to stick with the Decider Pattern , a pure functional state management pattern that suggests emphasising the role of the events as it should be by returning only changes. Also, it’s rare to see the need to evolve the entity against the changes to operate the future state in practice.

That comes at the expense of the increased number of arguments in the following implementation of the EventStore.publish. With events hidden inside an entity that could be just EventStore.publish(state) and inside that state, there could be id and the expected version in the __version__ property.

Now, all of those have to be passed explicitly: EventStore.publish(module, id, expected_revision, events), with the first two arguments needed just to produce a stream name in the form of entity_name <> separator <> id, e.g. RequestLoan:123e4567-e89b-12d3-a456-426614174000.

There are numerous methods to tidy up that boilerplate. Regardless of your choice, keeping the event in the spotlight will pay off in conversations with non-technical stakeholders.

Loan Request Decider

Starting lean, we will implement two of the four prescribed Decision functions: evolve and decide and skip initial state and terminal state.

The initial state function returns the entity before the first event is applied, and it is the most convenient way to provide defaults. Think of the init callback in the GenServer or the new constructor in the mainstream programming languages. But since we don’t have any use for it, we can go directly into deciding on a loan request application:

defmodule Lepret.LoanRequest do
  use Ecto.Schema
  alias Lepret.Commands.RequestLoan
  alias Lepret.Events.LoanRequested
  alias Lepret.Events.AutoApproved
  alias Lepret.Events.AutoDenied
  alias Lepret.Events.ManualReviewRequired

  @primary_key {:id, :binary_id, []}
  embedded_schema do
    field :name, :string
    field :amount, :integer
    field :national_id, :string
    field :credit_score, :map
    field :status, Ecto.Enum, values: [:auto_approved, :auto_denied, :manual_review_required]
  end

  def decide(%RequestLoan{} = command) do
    loan_requested = %LoanRequested{
      id: command.id,
      name: command.name,
      amount: command.amount,
      national_id: command.national_id,
      credit_score: command.credit_score
    }

    loan_request_review_decision =
      cond do
        command.credit_score >= 7 ->
          %AutoApproved{id: command.id}

        command.credit_score <= 4 ->
          %AutoDenied{id: command.id}

        true ->
          %ManualReviewRequired{
            id: command.id,
            amount: command.amount,
            national_id: command.national_id,
            credit_score: command.credit_score
          }
      end

    [loan_requested, loan_request_review_decision]
  end
end

This decide function doesn’t have to consult past state to make a decision; therefore, we don’t have a use for the state argument.

Invoking that code from the use case will result in the list composed of two events that we can now append to the event stream:

defmodule Lepret.UseCases.LoanRequest do
  alias Lepret.Commands.RequestLoan
  alias Lepret.CreditScoreApiClient
+  alias Lepret.LoanRequest
+  alias Lepret.EventStore

  def run(%RequestLoan{} = command) do
    {:ok, credit_score} = CreditScoreApiClient.for(command.national_id)
-    _command = RequestLoan.enrich_with(command, credit_score)
-    :ok
+    command = RequestLoan.enrich_with(command, credit_score)
+    changes = LoanRequest.decide(command)
+    :ok = EventStore.publish(LoanRequest, command.id, :empty, changes)
  end
end

I recognise that there are refactoring opportunities in the EventStore.publish as the first two arguments are used only to derive the stream name, but please bear with me for now:

defmodule Lepret.EventStore do
  defmodule SpearClient do
    use Spear.Client, otp_app: :lepret
  end

  def publish(decider, id, expected_revision, events) do
      events
      |> to_spear_events()
      |> SpearClient.append(stream_name_for(decider, id), expect: expected_revision)
    end

    defp to_spear_events(events) do
      Enum.map(events, fn event -> Spear.Event.new(module_name(event), event) end)
    end

    defp stream_name_for(decider, id) do
      "#{module_name(decider)}:#{id}"
    end

    defp module_name(struct) when is_map(struct) do
      struct
      |> Map.get(:__struct__)
      |> module_name()
    end

    defp module_name(module) when is_atom(module) do
      module
      |> Atom.to_string()
      |> String.split(".")
      |> List.last()
    end
end

This code’s biggest part is responsible for producing stream names with a colon as a separator, e.g., LoanRequest:2e4a86e8-39e7-4348-9b2a-d6a01806f66d.

See “Add LoanRequest decider” commit for full changes.

Now you have a side-effect-free, completely functional domain model in about the length of one decide function and a vocabulary rooted in domain events that you can communicate with non-technical stakeholders.

As an added bonus, the decision-making process in your system is entirely observable, as you can see in the EventStoreDB’s Stream Browser

Published loan request events

If you want a stronger pitch for potential benefits, I recommend “Event Modeling by Adam Dymitruk” . It’s a breath of fresh air in the world of typical business automation and worth every minute of your time.

Making more sensible structure

Let’s move our domain model under the lib/lepret/domain. It’s a bit awkward to see UI reaching inside the domain, but I prefer that to keeping commands on the same level with use cases.

lib/lepret/
├── application.ex
├── domain
│   ├── commands
│   │   └── request_loan.ex
│   ├── credit_score.ex
│   ├── credit_score_api_client.ex
│   ├── event_store.ex
│   ├── events.ex
│   └── loan_request.ex
├── mailer.ex
├── repo.ex
└── use_cases
    └── request_loan.ex

Eventual consistency and async updates

Working with Elixir, you know that message exchange between processes is async by nature, and the synchronous mechanic is an added convenience.

There are many merits to that, a few most noticeable of which are scalability and resiliency. ES/CQRS works similarly by splitting the Read and Write sides of the system and independently catering to their needs. It’s remarkable how these two models match each other and allow us to build software within the same message-passing paradigm from the moment a user sends a command until they see a pub/subbed updated on a screen.

EventStoreDB & Elixir in philosophical alignment

Let’s dive right in!

Overview

Walking down the Command and Query Responsibility Segregation lane, we will introduce a read model to display loan requests pending manual review on the underwriter’s dashboard.

We will subscribe to the global event stream, define a handler for the ManualReviewRequired event—the only one we are interested in so far — and start projecting the events as we read them into the relational database for quick and convenient access.

After processing each event, we will increment the consumer’s cursor position in the stream so that the read model can catch up from the last known position after a restart (during deployment, for example).

We will wrap event projection and cursor increment in the same transaction to be confident that the read model will eventually be consistent. At some point, it will catch up with the latest event in the stream, no matter what.

Establishing subscription

The projector itself is a GensServer receiving streaming updates from the EventStoreDB:

defmodule Lepret.ReadModel.UnderwriterDashboard.Subscription do
  use GenServer
  alias Lepret.Domain.EventStore
  alias Lepret.Repo

  def start_link(params) do
    GenServer.start_link(__MODULE__, params)
  end

  @impl GenServer
  def init(_) do
    {:ok, _subscription} = EventStore.catchup_subscription(self(), checkpoint_name())

    {:ok, nil}
  end

  defp checkpoint_name() do
    Atom.to_string(__MODULE__)
  end
end

The EventStore.catchup_subscription is a thin wrapper around Spear.subscribe with a transparent knowledge of the current position:


defmodule Lepret.Domain.EventStore do
  # same as before

  def catchup_subscription(subscriber_pid, checkpoint_name) do
    SpearClient.subscribe(
      subscriber_pid,
      :all,
      from: Checkpoint.current(checkpoint_name),
      filter: Spear.Filter.exclude_system_events()
    )
  end
end

The Checkpoint is an Ecto Schema designated to store and retrieve checkpoints for read models:

defmodule Lepret.Domain.EventStore.Checkpoint do
  use Ecto.Schema
  alias Lepret.Repo

  @primary_key {:name, :string, autogenerate: false}
  schema "read_model_checkpoints" do
    field :position, :integer

    timestamps()
  end

  def current(name) do
    checkpoint = get(name) || create(name)

    %Spear.Filter.Checkpoint{commit_position: checkpoint.position, prepare_position: checkpoint.position}
  end

  defp get(name) do
    Repo.get(__MODULE__, name)
  end

  def create(name) do
    Repo.insert!(%__MODULE__{name: name, position: 0})
  end
end

Deserialising events

Now, we are ready to start consuming events. The first step is to deserialise raw event stored as JSON

defmodule Lepret.ReadModel.UnderwriterDashboard.Subscription do
  # as before

  @impl GenServer
  def handle_info(raw_event, state) do
    if EventStore.event_type_in?(raw_event, [ManualReviewRequired]) do
      Repo.transaction(fn ->
        _event = EventStore.deserialise!(raw_event, Lepret.Domain.Event)
      end)
    end

    {:noreply, state}
  end
end

There is more than one way to restore an Elixir struct from the JSON object, but in our case, even crude introspection and Ecto.Changeset.cast should do:


defmodule Lepret.Domain.EventStore do
  # as before

  def event_type_in?(%Spear.Event{} = raw_event, events_of_interest) do
    raw_event.type in Enum.map(events_of_interest, &module_name/1)
  end

  def event_type_in?(_checkpoint, _events_of_interest) do
    false
  end

  def deserialise!(%Spear.Event{} = raw_event, event_namespace) do
    module_name = Module.concat(event_namespace, raw_event.type)
    module_name.deserialise!(raw_event.body)
  end
end
defmodule Lepret.Domain.Event do
 # as before

  defmodule ManualReviewRequired do
    use Ecto.Schema
    import Ecto.Changeset
    alias Lepret.Domain.CreditScore

    @derive Jason.Encoder
    @primary_key {:id, :binary_id, []}
    embedded_schema do
      field :name, :string
      field :amount, :integer
      field :national_id, :string
      embeds_one :credit_score, CreditScore
    end

    def deserialise!(serialised_body) do
      %__MODULE__{}
      |> cast(serialised_body, [:id, :name, :amount, :national_id])
      |> validate_required([:id, :name, :amount, :national_id])
      |> cast_embed(:credit_score, required: true)
      |> apply_action!(:deserialise)
    end
  end
end

I also elevated CreditScore to be an Ecto Schema so that the event will look the same on both ends - before publishing and after deserialisation.

Projecting events

With the original event in hand, we can start having fun representing the same data via another data model. In this case, I’m going to use a table in a relational database, but it can be any storage of your choice — from Neo4j to ElasticSearch.

You can have as many of those models as you want, in any format and place you want and that’s a totally new degree of freedom.

defmodule Lepret.ReadModel.UnderwriterDashboard do
  use Ecto.Schema
  import Ecto.Changeset
  alias Lepret.Domain.Event.ManualReviewRequired
  alias Lepret.Repo

  @primary_key {:id, :binary_id, []}
  schema "underwriter_dashboard" do
    field :name, :string
    field :amount, :integer
    field :national_id, :string
    field :credit_score, :integer
    field :decision, Ecto.Enum, values: [:approved, :denied]

    timestamps()
  end

  def consume!(%ManualReviewRequired{} = event) do
    event
    |> Map.from_struct()
    |> Map.put(:credit_score, event.credit_score.score)
    |> create!()
  end

  defp create!(params) do
    %__MODULE__{}
    |> cast(params, [:id, :amount, :name, :national_id, :credit_score])
    |> validate_required([:id, :amount, :name, :national_id, :credit_score])
    |> Repo.insert!()
  end
end

Now, we can wire it into the subscription alongside the checkpoint update.

defmodule Lepret.ReadModel.UnderwriterDashboard.Subscription do
 # as before
 #
  @impl GenServer
  def handle_info(raw_event, state) do
    if EventStore.event_type_in?(raw_event, [ManualReviewRequired]) do
      Repo.transaction(fn ->
        event = EventStore.deserialise!(raw_event, Lepret.Domain.Event)
+       UnderwriterDashboard.consume!(event)
+       EventStore.update_checkpoint!(checkpoint_name(), raw_event)
      end)
    end

    {:noreply, state}
  end
end

The EventStore.update_checkpoint is a dead simple function updating the subscriber’s cursor position in the stream

defmodule Lepret.Domain.EventStore do
 # same as before

  def update_checkpoint!(checkpoint_name, %Spear.Event{} = raw_event) do
    Checkpoint.update!(checkpoint_name, raw_event.metadata.commit_position)
  end
end
defmodule Lepret.Domain.EventStore.Checkpoint do
 # same as before

  def update!(name, position) do
    name
    |> get()
    |> change(%{position: position})
    |> Repo.update!()
  end
end

After ensuring reliable event consumption, we are just one step away from completing the dashboard, which will show the pending loan requests. We will start with querying our read model for all loan requests and putting them into the stream we will render into the list.

defmodule LepretWeb.UnderwriterDashboardLive.Index do
  use LepretWeb, :live_view

  alias Lepret.ReadModel.UnderwriterDashboard

  @impl true
  def mount(_params, _session, socket) do
    {:ok, stream(socket, :loan_requests, UnderwriterDashboard.list_loan_requests())}
  end

  @impl true
  def handle_params(params, _url, socket) do
    {:noreply, apply_action(socket, socket.assigns.live_action, params)}
  end

  defp apply_action(socket, :index, _params) do
    assign(socket, :page_title, "Pending loan requests")
  end
end

and the template with a LiveView stream:

<.header>
  Pending loan requests
</.header>

<.table id="loan_requests" rows={@streams.loan_requests}>
  <:col :let={{_id, loan_request}} label="Name"><%= loan_request.name %></:col>
  <:col :let={{_id, loan_request}} label="Amount"><%= loan_request.amount %></:col>
  <:col :let={{_id, loan_request}} label="National ID"><%= loan_request.national_id %></:col>
</.table>

Live updates with Pub/Sub

In many other technologies, that would be it—an underwriter would have to refresh the page to see a new loan request for review. But we can go one step further and give them a soft real-time experience by extending the event flow from the event store all the way down to the browser with just two easy steps.

Broadcast the update:

defmodule Lepret.ReadModel.UnderwriterDashboard do
  # same as before

  def consume!(%ManualReviewRequired{} = event) do
    event
    |> Map.from_struct()
    |> Map.put(:credit_score, event.credit_score.score)
    |> create!()
+   |> broadcast("new_loan_request_for_review")
  end

  # same as before

+  def topic() do
+    Atom.to_string(__MODULE__)
+  end
+
+  defp broadcast(payload, event) do
+    Endpoint.broadcast(topic(), event, payload)
+  end
end

Consume it in LiveView and add to the stream:

defmodule LepretWeb.UnderwriterDashboardLive.Index do
 # same as before

    @impl true
    def handle_params(params, _url, socket) do
+     if connected?(socket) do
+       Endpoint.subscribe(UnderwriterDashboard.topic())
+     end

      {:noreply, apply_action(socket, socket.assigns.live_action, params)}
    end

+  @impl true
+  def handle_info(%{event: "new_loan_request_for_review", payload: new_loan_request}, socket) do
+    {:noreply, stream_insert(socket, :loan_requests, new_loan_request)}
+  end
end

And voila!

See “Add UnderwriterDashboard read model” commit for full changes.

Sometimes, it feels magical how well those two models fit each other!

Okay, but what about production?

You may notice a potential issue with that approach in production — your application will start writing to the same table from all nodes in the cluster. How do you avoid that?

The answer is - you don’t. You let it write to the same table in different databases. For example, you can use the locally stored SQLite3 with no network latency to fetch your data no matter where the app is hosted. And that’s how you can have your blazing-fast reads at the network’s edge, as close to your users as possible.

You might be interested in checking out Fly’s LiteFS - the distributed SQLite or Turso if you are on any other PaaS. They have a generous free plan with 9Gb of total storage over 500 databases.

Readmodels with SQLite

Important implementation notes

It’s crucial to note that read models should be built only from the information presented in events and shouldn’t query third-party services. Adhering to that practice will enable you to effortlessly reshape the data into a form that will better fit the next product iteration.

Also, be careful when using this mechanism to send customer notifications.

Another significant bit is to measure the latency between the publishing event and a customer receiving an update. I saw a situation when a network latency led to an unacceptable delay, and it’s better to stay on top of that metric before customers get frustrated.

Otherwise, it’s a great productivity mechanism that breaks data coupling. In a purely technical sense, you may want to have a read model per screen and sometimes even separate read models for widgets with complex data requirements. In a sociotechnical sense, the events-based contract enables developers to work independently on the reading and writing sides without blocking each other.

The bottom line is that anyone can safely change any read model and know they won’t break another functionality reading from the same table.

I hope, by now, the “image that explains almost everything” makes more sense from the implementation point of view.

Readmodels with SQLite A typical system flow from the Introducing Event Storming book by Alberto Brandolini

We have just one topic left to cover in this blogpost - folding event stream to make a decision.

Deriving the current state from the event stream

We made it to the very end without evolving the Loan Request decider’s state. Let’s fix that now by implementing loan denial.

Moving outside-in we’ll start by adding Deny button to the loan requests list:

<.table id="loan_requests" rows={@streams.loan_requests}>
  <:col :let={{_id, loan_request}} label="Name"><%= loan_request.name %></:col>
  <:col :let={{_id, loan_request}} label="Amount"><%= loan_request.amount %></:col>
  <:col :let={{_id, loan_request}} label="National ID"><%= loan_request.national_id %></:col>
+  <:col :let={{id, loan_request}}>
+    <.button
+      phx-disable-with="Processing..."
+      phx-click="deny-loan"
+      phx-value-loan-request-id={loan_request.id}
+      phx-value-dom-id={id}
+    >
+      Deny
+    </.button>
+  </:col>
  </.table>
</.table>

And the corresponding click handler:

defmodule LepretWeb.UnderwriterDashboardLive.Index do
  # same as before
+  alias Lepret.Domain.Commands.DenyLoan, as: DenyLoanCommand
+  alias Lepret.UseCases.DenyLoan

  # same as before

+  @impl true
+  def handle_event("deny-loan", %{"loan-request-id" => loan_request_id, "dom-id" => dom_id}, socket) do
+    {:ok, command} = DenyLoanCommand.build(%{id: loan_request_id})
+    :ok = DenyLoan.run(command)
+    {:noreply, stream_delete_by_dom_id(socket, :loan_requests, dom_id)}
+  end
end

The DenyLoan command only carries an intent via the name and a single id property to identify loan request:

defmodule Lepret.Domain.Commands.DenyLoan do
  use Ecto.Schema
  import Ecto.Changeset

  @primary_key {:id, :binary_id, []}
  embedded_schema do
  end

  def build(attrs) do
    %__MODULE__{}
    |> cast(attrs, [:id])
    |> validate_required([:id])
    |> apply_action(:build)
  end
end

Adding EventStream.load

Things are getting a bit more interesting in the use case with the current EventStore implementation direction.

defmodule Lepret.UseCases.DenyLoan do
  alias Lepret.Domain.Commands.DenyLoan
  alias Lepret.Domain.LoanRequest
  alias Lepret.Domain.EventStore

  def run(%DenyLoan{} = command) do
    loan_request_events = EventStore.load(LoanRequest, command.id)
    loan_request = LoanRequest.evolve(loan_request_events)

    changes = LoanRequest.decide(loan_request, command)
    :ok = EventStore.publish(LoanRequest, loan_request.id, List.last(loan_request_events).metadata.stream_revision, changes)
  end
end

We are leaking too many EventStore implementation details, so we can overload EventStore.publish to derive the stream name for the decider’s state, as it already has the module name and ID, but that will leave the question of the expected stream revision still open. What if we instead return a tuple from the EventStore.load that will carry all that metadata?

defmodule Lepret.Domain.EventStore do
  alias Lepret.Domain.EventStore.Checkpoint
  alias Lepret.Domain.EventStore.StreamMetadata
  alias Lepret.Domain.EventStore.ConversionTools

  defmodule SpearClient do
    use Spear.Client, otp_app: :lepret
  end

  @events_namespace Lepret.Domain.Event

  def load(decider, id) do
    stream_metadata = StreamMetadata.new(decider, id)
    raw_events = read_all_events(stream_metadata.stream_name)
    {Enum.map(raw_events, &deserialise!/1), StreamMetadata.current_revision(stream_metadata, raw_events)}
  end

  def publish(events, %StreamMetadata{} = stream_metadata) do
    events
    |> to_spear_events()
    |> SpearClient.append(stream_metadata.stream_name, expect: stream_metadata.current_revision)
  end

  def publish(decider, id, expected_revision, events) do
    publish(events, StreamMetadata.new(decider, id, expected_revision))
  end

  defp to_spear_events(events) do
    Enum.map(events, fn event -> Spear.Event.new(ConversionTools.module_name(event), event) end)
  end

  def catchup_subscription(subscriber_pid, checkpoint_name) do
    SpearClient.subscribe(
      subscriber_pid,
      :all,
      from: Checkpoint.current(checkpoint_name),
      filter: Spear.Filter.exclude_system_events()
    )
  end

  def event_type_in?(%Spear.Event{} = raw_event, events_of_interest) do
    raw_event.type in Enum.map(events_of_interest, &ConversionTools.module_name/1)
  end

  def event_type_in?(_checkpoint, _events_of_interest) do
    false
  end

  def deserialise!(%Spear.Event{} = raw_event) do
    module_name = Module.concat(@events_namespace, raw_event.type)
    module_name.deserialise!(raw_event.body)
  end

  def update_checkpoint!(checkpoint_name, %Spear.Event{} = raw_event) do
    Checkpoint.update!(checkpoint_name, raw_event.metadata.commit_position)
  end

  defp read_all_events(stream_name) do
    stream_name
    |> SpearClient.stream!()
    |> Enum.into([])
  end
end
defmodule Lepret.Domain.EventStore.StreamMetadata do
  alias Lepret.Domain.EventStore.StreamMetadata
  alias Lepret.Domain.EventStore.ConversionTools
  @enforce_keys [:stream_name]
  defstruct [:stream_name, :current_revision]

  def new(decider, id) do
    %__MODULE__{
      stream_name: stream_name_for(decider, id)
    }
  end

  def new(decider, id, current_revision) do
    %__MODULE__{
      stream_name: stream_name_for(decider, id),
      current_revision: current_revision
    }
  end

  defp stream_name_for(decider, id) do
    "#{ConversionTools.module_name(decider)}:#{id}"
  end

  def current_revision(%StreamMetadata{} = stream_metadata, raw_events) when is_list(raw_events) do
    Map.put(stream_metadata, :current_revision, List.last(raw_events).metadata.stream_revision)
  end
end
defmodule Lepret.Domain.EventStore.ConversionTools do
  def module_name(struct) when is_map(struct) do
    struct
    |> Map.get(:__struct__)
    |> module_name()
  end

  def module_name(module) when is_atom(module) do
    module
    |> Atom.to_string()
    |> String.split(".")
    |> List.last()
  end
end

I can’t say I got the abstractions right, but they’re not bad.

Now we have more sane and less leaky interface:

defmodule Lepret.UseCases.DenyLoan do
  # same as before

  def run(%DenyLoan{} = command) do
    {events, stream_metadata} = EventStore.load(LoanRequest, command.id)
    loan_request = LoanRequest.evolve(stream_metadata)

    changes = LoanRequest.decide(loan_request, command)
    :ok = EventStore.publish(changes, stream_metadata)
  end
end

However, I must admit, I’m still tempted to hide the revision and changes inside LoanRequest and evolve it inside EventStore.load :)

Deriving the state

Okay, ladies and gentlemen, here is the moment we all come here for - derivation of a current state from an event stream.

defmodule Lepret.Domain.LoanRequest do
  # same as before

  def evolve(events) do
    List.foldl(events, initial_state(), &evolve/2)
  end

  def evolve(%LoanRequested{} = event, %__MODULE__{id: nil, status: nil} = state) do
    %{
      state
      | id: event.id,
        name: event.name,
        amount: event.amount,
        national_id: event.national_id,
        credit_score: event.credit_score
    }
  end

  def evolve(%ManualReviewRequired{} = _event, %__MODULE__{status: nil} = state) do
    %{state | status: :manual_review_required}
  end

  defp initial_state() do
    %__MODULE__{}
  end
end

that gives us following state:

%Lepret.Domain.LoanRequest{
 id: "40a08c7a-f4fd-45f1-bd6a-fdea1f79e2f7",
 name: "Yevhenii Kurtov",
 amount: 100,
 national_id: "555555aa",
 credit_score: %Lepret.Domain.CreditScore{
   score: 5,
   national_id: "555555aa",
   ts: ~U[2024-04-29 20:27:00Z]
 },
 status: :manual_review_required
}

And here you are! With a completely introspectable, functional domain model where making decisions is separated from applying them.

loan request event stream with manual loan denial

I’m sorry if it looks simple, but it really is. Here is the complete decider with the last decide function over the manual loan denial:

defmodule Lepret.Domain.LoanRequest do
  use Ecto.Schema
  alias Lepret.Domain.Command.RequestLoan
  alias Lepret.Domain.Command.DenyLoan
  alias Lepret.Domain.Event.LoanRequested
  alias Lepret.Domain.Event.AutoApproved
  alias Lepret.Domain.Event.AutoDenied
  alias Lepret.Domain.Event.ManualReviewRequired
  alias Lepret.Domain.Event.ManuallyDenied

  @primary_key {:id, :binary_id, []}
  embedded_schema do
    field :name, :string
    field :amount, :integer
    field :national_id, :string
    field :credit_score, :map
    field :status, Ecto.Enum, values: [:auto_approved, :auto_denied, :manual_review_required]
  end

  def decide(%RequestLoan{} = command) do
    loan_requested = %LoanRequested{
      id: command.id,
      name: command.name,
      amount: command.amount,
      national_id: command.national_id,
      credit_score: command.credit_score
    }

    loan_request_review_decision =
      cond do
        command.credit_score.score >= 7 ->
          %AutoApproved{id: command.id}

        command.credit_score.score <= 4 ->
          %AutoDenied{id: command.id}

        true ->
          %ManualReviewRequired{
            id: command.id,
            name: command.name,
            amount: command.amount,
            national_id: command.national_id,
            credit_score: command.credit_score
          }
      end

    [loan_requested, loan_request_review_decision]
  end

  def decide(%__MODULE__{status: :manual_review_required}, %DenyLoan{} = command) do
    [%ManuallyDenied{id: command.id}]
  end

  def evolve(events) do
    List.foldl(events, initial_state(), &evolve/2)
  end

  def evolve(%LoanRequested{} = event, %__MODULE__{id: nil, status: nil} = state) do
    %{
      state
      | id: event.id,
        name: event.name,
        amount: event.amount,
        national_id: event.national_id,
        credit_score: event.credit_score
    }
  end

  def evolve(%ManualReviewRequired{} = _event, %__MODULE__{status: nil} = state) do
    %{state | status: :manual_review_required}
  end

  defp initial_state() do
    %__MODULE__{}
  end
end

See “Fold LoanRequest stream” commit for full changes

Consider how much less mental mapping you must do to speak about the domain with a non-technical stakeholder vs other state management patterns.

Sure, ES/CQRS is not without its complex technical issues like set validations, but no matter what, it’s a powerful tool for winning typical business automation and one we can surely benefit from in complex domains.

Additionally, splitting writing from reading allows one to scale them independently, making a noticeable impact in industries with significant spikes like betting. My experience shows that separating those two roles on different servers makes the difference between an overloaded system and one that can handle Cheltenham without hiccups. Furthermore, it’s possible to split the system across servers even in a more fine-grain fashion and build a separate read model for a single sport, for example, football.

When this may not work

  1. I never saw ES/CQRS adoption going down well in a feature factory. It doesn’t matter if management doesn’t want to distribute power or developers are not interested in the product. The upside is that this journey can be transformative for an organisation willing to change.

  2. Even if your organisation is willing to pay the price of collaborative design in the form of levelling up its social network so that it can accommodate more complex interactions and people inside the organisation are willing to undergo this change, there is a coordination and psychoemotional cost to that. That push is mostly justified for the core part of the system that can make a difference in how the business performs on the market.

  3. Rolling your own event store, especially on top of Kafka, is not worth it. There are already enough articles on the internet about how it’s a bad idea. Even Confluence’s market department stopped advertising it as such.

ES/CQRS thrives in knowledge-based workplaces. It moves focus back into the domain and forces people across the organization to co-design the software, solving both of the infamous “hardest problems” in software development on the spot:

  • The cache invalidation problem in most cases I’m familiar with is a post-effect of inadequate state management, and ES/CQRS gets that straight by design. Your read models are a cache of your state, and they change according to domain rules.

  • The problem of naming things ceases to exist because the fundamental building block of ES/CQRS systems - events are, in fact, domain events that are discovered with subject matter experts. You basically move that problem to the place where it should be solved.

The epiphany

And here is another one for why ES/CQRS & Elixir are a great match

Elixir ES/CQRS
Technical scaling through immutable data Knowledge scaling through immutable event logs that capture the decision-making context
Long-running, independent processes with GenServers Long-running business processes designed according to domain rules
Communication between processes through asynchronous message passing Asynchronous, eventually-consistent systems working off event-based state transitions
Data and behaviour are decoupled from each other: structs doesn’t have behaviour Data and behaviour are decoupled from each other: one can change logic to reinterpret events

Acknowledgments

Many thanks to Stephen Tung for helping prepare this post and to Yves Reynhout for the thoughtful discussion on message delivery guarantees.

Appendix A: message delivery guarantees

  • If you want to perform an “exact-once” operation in ESDB against an external system, the operation on the external system must support idempotency. An external systems that supports this usually allows you to pass some sort of deduplication or unique identification key to it so that it will just ignore the multiple requests with the same key. In ESDB, this will be the event ID that your process manager is handling. With this, it doesn’t matter if “catch-up” or “persistent” subscription is used. Your external system will perform the dedup.

  • If you try to do the above catch-up subscription, you will certainly hit the problem you describe in your article. Namely, if you have multiple clients (for HA reasons) performing the same catch-up subscription, then they can all duplicate each other’s event handling efforts. That being said, you can still achieve “exact-once” because the consumer is idempotent is can dedup any duplicated messages. But this is just not a very efficient use of resources when clients are stepping all over each other.

A way to make sure catch-up subscription clients don’t step on each other toes is to use distributed locks, supposedly popularized by Martin Kleppmann (I only recently learnt about this after talking to YvesR):

https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html

In this approach, you can have multiple nodes with clients tasked to perform the same projection/reaction logic, but only one can actually perform it based on who acquires the lock/lease first. This lock can be acquired, for example, over a database.

I think this is the same idea as what you refer to as “synchronisation mechanism through shared storage”

  • On the other hand if you try to do the above with persistent subscription, you will get the competing consumer capability out of the box and ESDB will be the distributor that manages which node/client will receive the message.

This is great and require less code complexity on your part, but the danger is that with parking, your event handler can lose events, can receive events out-of-order, and can receive duplicated events (due to retry). If you operation doesn’t really care about this, then persistent subscription is the best option for you. But if not, then you’ll need to write extra logic to handle these scenarios.

Stephen Tung

Exactly once is only going to work when the third party actively participates. What if you call the 3rd party and your calling process dies mid-execution. You have no idea whether the desired side effect happened. Even if you wait for an acknowledgement, if your process dies before receiving it, you have no way of knowing whether the target system sent it, unless you call it again (with a message identifier to allow the 3rd party to deduplicate / be idempotent).

Yves Reynhout

Resources

Orientation

Implementation

Collaborative design

Application layer

Categories

Development Elixir ES/CQRS