Greetings to all Elixir enthusiasts!
I’m happy to present my take on implementing ES/CQRS systems with Phoenix/LiveView and EventStoreDB. I believe those pieces of technology grew from the same conceptual root and are a great match.
If you have ever felt joy from expressing a dimension of time with GenServer or satisfaction from LiveView’s stateful nature, you will feel right at home. For everyone else - welcome aboard, and let’s see where we can get by the end of the text.
This post is dedicated to all developers who suffered from what starts as an innocuous status
field and then is sequenced into a series of entities, one per state transition, bound together by an invisible spell passed onto newcomers with the words “Oh, that…”
For this tutorial, we will implement a loan processing system as presented by Tony Young in his webinar ‘An end-to-end example with EventStoreDB’ . It covers the following requirements:
High level overview of the flow:
Please notice that there will be no conceptual gap between the diagram and the implementation. It will follow the event model to the letter, creating a powerful medium for stakeholder collaboration.
Full application source is available at https://github.com/lessless/lepret (from french le prêt - the loan)
Disclaimer: The presented code serves the purpose of communicating ideas and can be further refined and improved upon.
Our first step is to place a Request Loan
command with three bits of the data provided by a user:
For the system to be able to process the command, the data should meet the following criteria:
Those are application-level/input validations, not business-rules validations like “a person with the same national ID shouldn’t have more than one request opened at the same time”, for example.
That is a task of “validating a command to request a loan” rather than “validating loan request”, meaning that we want to ensure that we have data to express the intention of requesting a loan.
Now, let’s also assume that there will be more than one channel to request a loan via browser and JSON API for the completeness of the example. In that case, we want to keep those validations in one module and reuse them across all channels.
We can call it the RequestLoan Command and put it under the Command
namespace, i.e. Lepret.Command.RequestLoan
.
Here is how such a command can look like:
defmodule Lepret.Commands.RequestLoan do
use Ecto.Schema
import Ecto.Changeset
@primary_key {:id, :binary_id, []}
embedded_schema do
field :name, :string
field :amount, :integer
field :national_id, :string
end
def build() do
%__MODULE__{}
end
def build(attrs) do
attrs
|> changeset()
|> apply_action(:build)
end
def changeset(attrs \\ %{}) do
%__MODULE__{}
|> cast(attrs, [:name, :amount, :national_id])
|> validate_required([:name, :amount, :national_id])
|> validate_length(:name, min: 2)
|> validate_number(:amount, greater_than: 0)
|> validate_format(:national_id, ~r/^\d{6}[A-Za-z]{2}$/)
end
end
and correspondingly it can be used in the slightly simplified loan request form component generated by the mix phx.gen.live
:
defmodule LepretWeb.LoanRequestLive.FormComponent do
use LepretWeb, :live_component
alias Lepret.Commands.RequestLoan, as: RequestLoanCommand
@impl true
def render(assigns) do
~H"""
<div>
<.header>
<%= @title %>
<:subtitle>Use this form to manage loan_request records in your database.</:subtitle>
</.header>
<.simple_form for={@form} id="loan_request-form" phx-target={@myself} phx-change="validate" phx-submit="save">
<.input field={@form[:name]} type="text" label="Name" />
<.input field={@form[:amount]} type="number" label="Amount" />
<.input field={@form[:national_id]} type="text" label="National ID (six numbers and two letters, e.g. 123456aa)" />
<:actions>
<.button phx-disable-with="Requesting...">Request Loan</.button>
</:actions>
</.simple_form>
</div>
"""
end
@impl true
def update(assigns, socket) do
changeset = RequestLoanCommand.changeset()
{:ok,
socket
|> assign(assigns)
|> assign_form(changeset)}
end
@impl true
def handle_event("validate", %{"request_loan" => request_loan_params}, socket) do
changeset =
request_loan_params
|> RequestLoanCommand.changeset()
|> Map.put(:action, :validate)
{:noreply, assign_form(socket, changeset)}
end
def handle_event("save", %{"request_loan" => request_loan_params}, socket) do
case RequestLoanCommand.build(request_loan_params) do
{:error, changeset} ->
{:noreply, assign_form(socket, Map.put(changeset, :action, :validate))}
{:ok, _command} ->
# we will place valid command on the next step
{:noreply, socket}
end
end
defp assign_form(socket, %Ecto.Changeset{} = changeset) do
assign(socket, :form, to_form(changeset))
end
end
See “Validate loan request” commit for full changes.
Let’s take a step back now and think what’s ahead of us:
Loan Requested
eventLoan Requested
event to get credit scoreCredit Checked
event with applicant’s credit scoreLoan Request Approved
, Loan Request Denied
or Loan Approval Required
Consider what will happen in production: every node in the cluster will receive a Loan Requested
message and go through the same flow and therefore, our app will request a credit check from an external system by the number of nodes in the cluster. So, how do we hit the external system just once?
This question cuts deep into the message processing guarantees, which is the most challenging topic of distributed systems programming in my experience. Getting it right is always tied to the needs of a specific system and requires an understanding of risks and tradeoffs. Fortunately, with Elixir and EventStoreDB, we have a couple of simple options that might be just good enough starting points:
In our sample system we can cheat by asking users to cooperate with us and to try again in case of a failure. That primarily works when users are highly motivated to get the job done. In other cases, they may leave instead of waiting for a result.
But enough gloom. Let’s entertain ourselves with a technical “comfort food” by saying that we are changing the direction of coupling from events to commands: we will interact with a third-party system when processing a command instead of reacting to an event. That is also sometimes known as moving from orchestration to choreography.
From Balancing Choreography and Orchestration by Bernd Rücker
Here is the updated flow:
We will still publish multiple events: Loan Requested
and one of Loan Request Approved
, Loan Request Denied
or Loan Approval Required
because it makes sense from the process semantics perspective but that will happen whilst processing the RequestLoan
command which we will enrich with the credit_score
(see
ParameterObject
).
Now that we have the Loan Request,
we can place it against our system through the application layer. It’s not strictly necessary, but there is a good rationale behind doing so, detailed in the
“Resources”
section.
I added the credit_score
field to the Loan Request
command, and now it can carry the credit score we are about to obtain from the third-party service. Also I updated the build
function to generate the id:
defmodule Lepret.Commands.RequestLoan do
# same as before
+ alias Lepret.CreditScore
embedded_schema do
# same as before
+ field :credit_score, :map
end
def build(attrs) do
attrs
|> changeset()
+ |> put_change(:id, Ecto.UUID.generate())
|> apply_action(:build)
end
# same as before
+ def enrich_with(%__MODULE__{} = command, %CreditScore{} = credit_score) do
+ Map.put(command, :credit_score, credit_score)
+ end
end
The Lepret.CreditScore
is a
value object
backed by the Elixir struct
defmodule Lepret.CreditScore do
defstruct [:score, :national_id, :ts]
def new(national_id, score, ts) do
%__MODULE__{national_id: national_id, score: score, ts: ts}
end
end
Then I updated loan request form to invoke RequestLoan
use case for valid RequestLoan
command:
defmodule LepretWeb.LoanRequestLive.FormComponent do
+ alias Lepret.UseCases
# same as before
def handle_event("save", %{"request_loan" => request_loan_params}, socket) do
case RequestLoanCommand.build(request_loan_params) do
{:error, changeset} ->
{:noreply, assign_form(socket, Map.put(changeset, :action, :validate))}
- {:ok, _command} ->
- # we will place valid command on the next step
- {:noreply, socket}
+ {:ok, command} ->
+ save_loan_request(socket, command)
end
end
+ defp save_loan_request(socket, command) do
+ case UseCases.LoanRequest.run(command) do
+ :ok ->
+ {:noreply,
+ socket
+ |> put_flash(:info, "Loan successfully requested")
+ |> push_patch(to: socket.assigns.patch)}
+ end
+ end
end
I used fake credit score api client to enrich LoanRequest
command with the CreditScore
in the LoanRequest
use case:
defmodule Lepret.UseCases.LoanRequest do
alias Lepret.Commands.RequestLoan
alias Lepret.CreditScoreApiClient
def run(%RequestLoan{} = command) do
{:ok, credit_score} = CreditScoreApiClient.for(command.national_id)
_command = RequestLoan.enrich_with(command, credit_score)
:ok
end
end
That fake client returns a predefined credit score for a known national ID or a random credit score. Predefined scores fall into one of three different buckets: automatic approval, rejection, and manual review
defmodule Lepret.CreditScoreApiClient do
alias Lepret.CreditScore
def for(national_id) do
_latency = Process.sleep(100)
{:ok, CreditScore.new(national_id, predefined_credit_scores(national_id), DateTime.utc_now())}
end
defp predefined_credit_scores(national_id) do
case national_id do
"999999aa" -> 9
"666666aa" -> 6
"555555aa" -> 5
"111111aa" -> 1
_ -> Enum.random(1..10)
end
end
end
At the end of those changes, the enriched Request Loan
command now carries credit score and client-controlled id
%Lepret.Commands.RequestLoan{
id: "a1ac5824-177d-43e0-bcc0-8b03dad5e1a7",
name: "Yev",
amount: 1000,
national_id: "123456aa",
credit_score: %Lepret.CreditScore{
score: 7,
national_id: "123456aa",
ts: ~U[2024-04-24 14:40:04.880633Z]
}
}
See “Enrich RequestLoan command with credit score” commit for full changes.
As of April 2024, EventStoreDB is the ES/CQRS database, the only one that guarantees global and per-stream sequentiality of events. Whatever happens, your application can read events
in the same order that they were published
both from the global $all
stream and aggregate(entity)-level streams. Check out
Data Modeling in an Event Centric World
to learn more.
Fortunately, it’s a breeze to start with. For the purpose of this tutorial, you can run it via Docker, as detailed in the “Docker” section of EventStoreDB’s “Installation” guide:
docker run --name esdb-node -it -p 2113:2113 eventstore/eventstore:latest --insecure --run-projections=All --enable-atom-pub-over-http
Apple Silicon users must use the alpha image eventstore/eventstore:24.2.0-alpha-arm64v8
If you happen to host your applications on Fly.io, it takes about 30 minutes to create a setup that is good enough for development purposes. The only impediment to building a production-grade cluster is a lack of IPV6 support .
I’m a fan of the sharp and well-looked-after client Spear. Let’s plug it in and configure it to connect to the local EventStoreDB instance:
defmodule Lepret.MixProject do
# same as before
defp deps do
[
# same as before
{:spear, "~> 1.4"}
]
end
end
# in config/dev.exs
config :lepret, Lepret.EventStore.SpearClient, connection_string: "esdb://localhost:2113"
Later, we will sprinkle it up with few convenience functions to automatically derive stream names and deserialise events, so let’s put it under a wrapping module:
defmodule Lepret.EventStore do
defmodule SpearClient do
use Spear.Client, otp_app: :lepret
end
end
Finally, add Lepret.EventStore.SpearClient
to the app’s supervision tree:
defmodule Lepret.Application do
# same as before
@impl true
def start(_type, _args) do
children = [
# same as before
Lepret.EventStore.SpearClient,
LepretWeb.Endpoint
]
# same as before
end
# same as before
end
With those changes in place, you should be able to fetch events from the $all
stream:
iex(1)> Lepret.EventStore.SpearClient.stream!(:all) |> Enum.take(1)
[
%Spear.Event{
id: "6878a9de-cfd0-461b-8a4b-09f72415d396",
type: "$metadata",
body: %{
"$acl" => %{
"$d" => [],
"$mr" => [],
"$mw" => [],
"$r" => "$ops",
"$w" => []
},
"$maxAge" => 2592000
},
link: nil,
metadata: %{
content_type: "application/json",
created: ~U[2024-04-24 19:55:29.362346Z],
stream_name: "$$$scavenges",
prepare_position: 548,
commit_position: 548,
custom_metadata: "",
stream_revision: 0
}
}
]
See “Setup SpearClient” commit for full changes.
Event sourcing is a state management pattern in which the current state is derived from an immutable log of previous events by folding that stream of events to the right.
Conceptually, the implementation amounts to the
stream_name = "#{Account}:#{account_id}"
events = EventStore.load(stream_name)
state = Enum.reduce(events, _initial_state = %Account{}, fn event, state -> Account.apply(event, state) end)
changes = Account.handle(command, state)
EventStore.publish(stream_name, changes)
Handling a command normally produces a new set of changes in the form of events that are later published to the same stream, leading to the event-based state transitions.
Alternatively, a system can reject a command, in which case nothing happens. In contrast, events should always be processed as they describe something that has already happened.
What happens when two different users want to introduce and change the same entity? We can address that with roughly the exact mechanism that relational databases use under the hood. Record the stream’s revision from the last known event and tell EventStoreDB that it’s the one we expect it to be at when publishing changes.
Such an explicit mechanism, plus the ability to understand the nature of change, gives greater control over data consistency: we can re-read the stream, get the conflicting event, and decide what to do.
# as before
EventStore.publish(stream_name, changes, List.last(events).revision)
EventStoreDB also
provides a number of other options
alongside the stream’s revision check to assert on its state such as Any
, NoStream
, EmptyStream
and StreamExists
.
Until recently, I preferred to return an entity that would hide changes, which made the specific state management approach more swappable:
defmodule Account do
@primary_key {:id, :binary_id, []}
embedded_schema do
field :balance, :integer
field :__changes__, :map, virtual: true
end
end
However, my colleagues practically convinced me to stick with the Decider Pattern , a pure functional state management pattern that suggests emphasising the role of the events as it should be by returning only changes. Also, it’s rare to see the need to evolve the entity against the changes to operate the future state in practice.
That comes at the expense of the increased number of arguments in the following implementation of the EventStore.publish
. With events hidden inside an entity that could be just EventStore.publish(state)
and inside that state
, there could be id
and the expected version in the __version__
property.
Now, all of those have to be passed explicitly: EventStore.publish(module, id, expected_revision, events)
, with the first two arguments needed just to produce a stream name in the form of entity_name <> separator <> id
, e.g. RequestLoan:123e4567-e89b-12d3-a456-426614174000
.
There are numerous methods to tidy up that boilerplate. Regardless of your choice, keeping the event in the spotlight will pay off in conversations with non-technical stakeholders.
Starting lean, we will implement two of the four prescribed Decision functions: evolve
and decide
and skip initial state
and terminal state
.
The initial state function returns the entity before the first event is applied, and it is the most convenient way to provide defaults. Think of the init
callback in the GenServer or the new
constructor in the mainstream programming languages. But since we don’t have any use for it, we can go directly into deciding on a loan request application:
defmodule Lepret.LoanRequest do
use Ecto.Schema
alias Lepret.Commands.RequestLoan
alias Lepret.Events.LoanRequested
alias Lepret.Events.AutoApproved
alias Lepret.Events.AutoDenied
alias Lepret.Events.ManualReviewRequired
@primary_key {:id, :binary_id, []}
embedded_schema do
field :name, :string
field :amount, :integer
field :national_id, :string
field :credit_score, :map
field :status, Ecto.Enum, values: [:auto_approved, :auto_denied, :manual_review_required]
end
def decide(%RequestLoan{} = command) do
loan_requested = %LoanRequested{
id: command.id,
name: command.name,
amount: command.amount,
national_id: command.national_id,
credit_score: command.credit_score
}
loan_request_review_decision =
cond do
command.credit_score >= 7 ->
%AutoApproved{id: command.id}
command.credit_score <= 4 ->
%AutoDenied{id: command.id}
true ->
%ManualReviewRequired{
id: command.id,
amount: command.amount,
national_id: command.national_id,
credit_score: command.credit_score
}
end
[loan_requested, loan_request_review_decision]
end
end
This decide
function doesn’t have to consult past state to make a decision; therefore, we don’t have a use for the state
argument.
Invoking that code from the use case will result in the list composed of two events that we can now append to the event stream:
defmodule Lepret.UseCases.LoanRequest do
alias Lepret.Commands.RequestLoan
alias Lepret.CreditScoreApiClient
+ alias Lepret.LoanRequest
+ alias Lepret.EventStore
def run(%RequestLoan{} = command) do
{:ok, credit_score} = CreditScoreApiClient.for(command.national_id)
- _command = RequestLoan.enrich_with(command, credit_score)
- :ok
+ command = RequestLoan.enrich_with(command, credit_score)
+ changes = LoanRequest.decide(command)
+ :ok = EventStore.publish(LoanRequest, command.id, :empty, changes)
end
end
I recognise that there are refactoring opportunities in the EventStore.publish
as the first two arguments are used only to derive the stream name, but please bear with me for now:
defmodule Lepret.EventStore do
defmodule SpearClient do
use Spear.Client, otp_app: :lepret
end
def publish(decider, id, expected_revision, events) do
events
|> to_spear_events()
|> SpearClient.append(stream_name_for(decider, id), expect: expected_revision)
end
defp to_spear_events(events) do
Enum.map(events, fn event -> Spear.Event.new(module_name(event), event) end)
end
defp stream_name_for(decider, id) do
"#{module_name(decider)}:#{id}"
end
defp module_name(struct) when is_map(struct) do
struct
|> Map.get(:__struct__)
|> module_name()
end
defp module_name(module) when is_atom(module) do
module
|> Atom.to_string()
|> String.split(".")
|> List.last()
end
end
This code’s biggest part is responsible for producing stream names with a colon as a separator, e.g., LoanRequest:2e4a86e8-39e7-4348-9b2a-d6a01806f66d
.
See “Add LoanRequest decider” commit for full changes.
Now you have a side-effect-free, completely functional domain model in about the length of one decide
function and a vocabulary rooted in domain events that you can communicate with non-technical stakeholders.
As an added bonus, the decision-making process in your system is entirely observable, as you can see in the EventStoreDB’s Stream Browser
If you want a stronger pitch for potential benefits, I recommend “Event Modeling by Adam Dymitruk” . It’s a breath of fresh air in the world of typical business automation and worth every minute of your time.
Let’s move our domain model under the lib/lepret/domain
. It’s a bit awkward to see UI reaching inside the domain, but I prefer that to keeping commands on the same level with use cases
.
lib/lepret/
├── application.ex
├── domain
│ ├── commands
│ │ └── request_loan.ex
│ ├── credit_score.ex
│ ├── credit_score_api_client.ex
│ ├── event_store.ex
│ ├── events.ex
│ └── loan_request.ex
├── mailer.ex
├── repo.ex
└── use_cases
└── request_loan.ex
Working with Elixir, you know that message exchange between processes is async by nature, and the synchronous mechanic is an added convenience.
There are many merits to that, a few most noticeable of which are scalability and resiliency. ES/CQRS works similarly by splitting the Read and Write sides of the system and independently catering to their needs. It’s remarkable how these two models match each other and allow us to build software within the same message-passing paradigm from the moment a user sends a command until they see a pub/subbed updated on a screen.
Let’s dive right in!
Walking down the Command and Query Responsibility Segregation lane, we will introduce a read model to display loan requests pending manual review on the underwriter’s dashboard.
We will subscribe to the global event stream, define a handler for the ManualReviewRequired
event—the only one we are interested in so far — and start projecting the events as we read them into the relational database for quick and convenient access.
After processing each event, we will increment the consumer’s cursor position in the stream so that the read model can catch up from the last known position after a restart (during deployment, for example).
We will wrap event projection and cursor increment in the same transaction to be confident that the read model will eventually be consistent. At some point, it will catch up with the latest event in the stream, no matter what.
The projector itself is a GensServer receiving streaming updates from the EventStoreDB:
defmodule Lepret.ReadModel.UnderwriterDashboard.Subscription do
use GenServer
alias Lepret.Domain.EventStore
alias Lepret.Repo
def start_link(params) do
GenServer.start_link(__MODULE__, params)
end
@impl GenServer
def init(_) do
{:ok, _subscription} = EventStore.catchup_subscription(self(), checkpoint_name())
{:ok, nil}
end
defp checkpoint_name() do
Atom.to_string(__MODULE__)
end
end
The EventStore.catchup_subscription
is a thin wrapper around Spear.subscribe
with a transparent knowledge of the current position:
defmodule Lepret.Domain.EventStore do
# same as before
def catchup_subscription(subscriber_pid, checkpoint_name) do
SpearClient.subscribe(
subscriber_pid,
:all,
from: Checkpoint.current(checkpoint_name),
filter: Spear.Filter.exclude_system_events()
)
end
end
The Checkpoint
is an Ecto Schema designated to store and retrieve checkpoints for read models:
defmodule Lepret.Domain.EventStore.Checkpoint do
use Ecto.Schema
alias Lepret.Repo
@primary_key {:name, :string, autogenerate: false}
schema "read_model_checkpoints" do
field :position, :integer
timestamps()
end
def current(name) do
checkpoint = get(name) || create(name)
%Spear.Filter.Checkpoint{commit_position: checkpoint.position, prepare_position: checkpoint.position}
end
defp get(name) do
Repo.get(__MODULE__, name)
end
def create(name) do
Repo.insert!(%__MODULE__{name: name, position: 0})
end
end
Now, we are ready to start consuming events. The first step is to deserialise raw event stored as JSON
defmodule Lepret.ReadModel.UnderwriterDashboard.Subscription do
# as before
@impl GenServer
def handle_info(raw_event, state) do
if EventStore.event_type_in?(raw_event, [ManualReviewRequired]) do
Repo.transaction(fn ->
_event = EventStore.deserialise!(raw_event, Lepret.Domain.Event)
end)
end
{:noreply, state}
end
end
There is more than one way to restore an Elixir struct from the JSON object, but in our case, even crude introspection and Ecto.Changeset.cast
should do:
defmodule Lepret.Domain.EventStore do
# as before
def event_type_in?(%Spear.Event{} = raw_event, events_of_interest) do
raw_event.type in Enum.map(events_of_interest, &module_name/1)
end
def event_type_in?(_checkpoint, _events_of_interest) do
false
end
def deserialise!(%Spear.Event{} = raw_event, event_namespace) do
module_name = Module.concat(event_namespace, raw_event.type)
module_name.deserialise!(raw_event.body)
end
end
defmodule Lepret.Domain.Event do
# as before
defmodule ManualReviewRequired do
use Ecto.Schema
import Ecto.Changeset
alias Lepret.Domain.CreditScore
@derive Jason.Encoder
@primary_key {:id, :binary_id, []}
embedded_schema do
field :name, :string
field :amount, :integer
field :national_id, :string
embeds_one :credit_score, CreditScore
end
def deserialise!(serialised_body) do
%__MODULE__{}
|> cast(serialised_body, [:id, :name, :amount, :national_id])
|> validate_required([:id, :name, :amount, :national_id])
|> cast_embed(:credit_score, required: true)
|> apply_action!(:deserialise)
end
end
end
I also elevated CreditScore
to be an Ecto Schema so that the event will look the same on both ends - before publishing and after deserialisation.
With the original event in hand, we can start having fun representing the same data via another data model. In this case, I’m going to use a table in a relational database, but it can be any storage of your choice — from Neo4j to ElasticSearch.
You can have as many of those models as you want, in any format and place you want and that’s a totally new degree of freedom.
defmodule Lepret.ReadModel.UnderwriterDashboard do
use Ecto.Schema
import Ecto.Changeset
alias Lepret.Domain.Event.ManualReviewRequired
alias Lepret.Repo
@primary_key {:id, :binary_id, []}
schema "underwriter_dashboard" do
field :name, :string
field :amount, :integer
field :national_id, :string
field :credit_score, :integer
field :decision, Ecto.Enum, values: [:approved, :denied]
timestamps()
end
def consume!(%ManualReviewRequired{} = event) do
event
|> Map.from_struct()
|> Map.put(:credit_score, event.credit_score.score)
|> create!()
end
defp create!(params) do
%__MODULE__{}
|> cast(params, [:id, :amount, :name, :national_id, :credit_score])
|> validate_required([:id, :amount, :name, :national_id, :credit_score])
|> Repo.insert!()
end
end
Now, we can wire it into the subscription alongside the checkpoint update.
defmodule Lepret.ReadModel.UnderwriterDashboard.Subscription do
# as before
#
@impl GenServer
def handle_info(raw_event, state) do
if EventStore.event_type_in?(raw_event, [ManualReviewRequired]) do
Repo.transaction(fn ->
event = EventStore.deserialise!(raw_event, Lepret.Domain.Event)
+ UnderwriterDashboard.consume!(event)
+ EventStore.update_checkpoint!(checkpoint_name(), raw_event)
end)
end
{:noreply, state}
end
end
The EventStore.update_checkpoint
is a dead simple function updating the subscriber’s cursor position in the stream
defmodule Lepret.Domain.EventStore do
# same as before
def update_checkpoint!(checkpoint_name, %Spear.Event{} = raw_event) do
Checkpoint.update!(checkpoint_name, raw_event.metadata.commit_position)
end
end
defmodule Lepret.Domain.EventStore.Checkpoint do
# same as before
def update!(name, position) do
name
|> get()
|> change(%{position: position})
|> Repo.update!()
end
end
After ensuring reliable event consumption, we are just one step away from completing the dashboard, which will show the pending loan requests. We will start with querying our read model for all loan requests and putting them into the stream we will render into the list.
defmodule LepretWeb.UnderwriterDashboardLive.Index do
use LepretWeb, :live_view
alias Lepret.ReadModel.UnderwriterDashboard
@impl true
def mount(_params, _session, socket) do
{:ok, stream(socket, :loan_requests, UnderwriterDashboard.list_loan_requests())}
end
@impl true
def handle_params(params, _url, socket) do
{:noreply, apply_action(socket, socket.assigns.live_action, params)}
end
defp apply_action(socket, :index, _params) do
assign(socket, :page_title, "Pending loan requests")
end
end
and the template with a LiveView stream:
<.header>
Pending loan requests
</.header>
<.table id="loan_requests" rows={@streams.loan_requests}>
<:col :let={{_id, loan_request}} label="Name"><%= loan_request.name %></:col>
<:col :let={{_id, loan_request}} label="Amount"><%= loan_request.amount %></:col>
<:col :let={{_id, loan_request}} label="National ID"><%= loan_request.national_id %></:col>
</.table>
In many other technologies, that would be it—an underwriter would have to refresh the page to see a new loan request for review. But we can go one step further and give them a soft real-time experience by extending the event flow from the event store all the way down to the browser with just two easy steps.
Broadcast the update:
defmodule Lepret.ReadModel.UnderwriterDashboard do
# same as before
def consume!(%ManualReviewRequired{} = event) do
event
|> Map.from_struct()
|> Map.put(:credit_score, event.credit_score.score)
|> create!()
+ |> broadcast("new_loan_request_for_review")
end
# same as before
+ def topic() do
+ Atom.to_string(__MODULE__)
+ end
+
+ defp broadcast(payload, event) do
+ Endpoint.broadcast(topic(), event, payload)
+ end
end
Consume it in LiveView and add to the stream:
defmodule LepretWeb.UnderwriterDashboardLive.Index do
# same as before
@impl true
def handle_params(params, _url, socket) do
+ if connected?(socket) do
+ Endpoint.subscribe(UnderwriterDashboard.topic())
+ end
{:noreply, apply_action(socket, socket.assigns.live_action, params)}
end
+ @impl true
+ def handle_info(%{event: "new_loan_request_for_review", payload: new_loan_request}, socket) do
+ {:noreply, stream_insert(socket, :loan_requests, new_loan_request)}
+ end
end
And voila!
See “Add UnderwriterDashboard read model” commit for full changes.
Sometimes, it feels magical how well those two models fit each other!
You may notice a potential issue with that approach in production — your application will start writing to the same table from all nodes in the cluster. How do you avoid that?
The answer is - you don’t. You let it write to the same table in different databases. For example, you can use the locally stored SQLite3 with no network latency to fetch your data no matter where the app is hosted. And that’s how you can have your blazing-fast reads at the network’s edge, as close to your users as possible.
You might be interested in checking out Fly’s LiteFS - the distributed SQLite or Turso if you are on any other PaaS. They have a generous free plan with 9Gb of total storage over 500 databases.
It’s crucial to note that read models should be built only from the information presented in events and shouldn’t query third-party services. Adhering to that practice will enable you to effortlessly reshape the data into a form that will better fit the next product iteration.
Also, be careful when using this mechanism to send customer notifications.
Another significant bit is to measure the latency between the publishing event and a customer receiving an update. I saw a situation when a network latency led to an unacceptable delay, and it’s better to stay on top of that metric before customers get frustrated.
Otherwise, it’s a great productivity mechanism that breaks data coupling. In a purely technical sense, you may want to have a read model per screen and sometimes even separate read models for widgets with complex data requirements. In a sociotechnical sense, the events-based contract enables developers to work independently on the reading and writing sides without blocking each other.
The bottom line is that anyone can safely change any read model and know they won’t break another functionality reading from the same table.
I hope, by now, the “image that explains almost everything” makes more sense from the implementation point of view.
A typical system flow from the Introducing Event Storming book by Alberto Brandolini
We have just one topic left to cover in this blogpost - folding event stream to make a decision.
We made it to the very end without evolving the Loan Request decider’s state. Let’s fix that now by implementing loan denial.
Moving outside-in we’ll start by adding Deny
button to the loan requests list:
<.table id="loan_requests" rows={@streams.loan_requests}>
<:col :let={{_id, loan_request}} label="Name"><%= loan_request.name %></:col>
<:col :let={{_id, loan_request}} label="Amount"><%= loan_request.amount %></:col>
<:col :let={{_id, loan_request}} label="National ID"><%= loan_request.national_id %></:col>
+ <:col :let={{id, loan_request}}>
+ <.button
+ phx-disable-with="Processing..."
+ phx-click="deny-loan"
+ phx-value-loan-request-id={loan_request.id}
+ phx-value-dom-id={id}
+ >
+ Deny
+ </.button>
+ </:col>
</.table>
</.table>
And the corresponding click handler:
defmodule LepretWeb.UnderwriterDashboardLive.Index do
# same as before
+ alias Lepret.Domain.Commands.DenyLoan, as: DenyLoanCommand
+ alias Lepret.UseCases.DenyLoan
# same as before
+ @impl true
+ def handle_event("deny-loan", %{"loan-request-id" => loan_request_id, "dom-id" => dom_id}, socket) do
+ {:ok, command} = DenyLoanCommand.build(%{id: loan_request_id})
+ :ok = DenyLoan.run(command)
+ {:noreply, stream_delete_by_dom_id(socket, :loan_requests, dom_id)}
+ end
end
The DenyLoan
command only carries an intent via the name and a single id
property to identify loan request:
defmodule Lepret.Domain.Commands.DenyLoan do
use Ecto.Schema
import Ecto.Changeset
@primary_key {:id, :binary_id, []}
embedded_schema do
end
def build(attrs) do
%__MODULE__{}
|> cast(attrs, [:id])
|> validate_required([:id])
|> apply_action(:build)
end
end
EventStream.load
Things are getting a bit more interesting in the use case with the current EventStore
implementation direction.
defmodule Lepret.UseCases.DenyLoan do
alias Lepret.Domain.Commands.DenyLoan
alias Lepret.Domain.LoanRequest
alias Lepret.Domain.EventStore
def run(%DenyLoan{} = command) do
loan_request_events = EventStore.load(LoanRequest, command.id)
loan_request = LoanRequest.evolve(loan_request_events)
changes = LoanRequest.decide(loan_request, command)
:ok = EventStore.publish(LoanRequest, loan_request.id, List.last(loan_request_events).metadata.stream_revision, changes)
end
end
We are leaking too many EventStore implementation details, so we can overload EventStore.publish
to derive the stream name for the decider’s state, as it already has the module name and ID, but that will leave the question of the expected stream revision still open. What if we instead return a tuple from the EventStore.load
that will carry all that metadata?
defmodule Lepret.Domain.EventStore do
alias Lepret.Domain.EventStore.Checkpoint
alias Lepret.Domain.EventStore.StreamMetadata
alias Lepret.Domain.EventStore.ConversionTools
defmodule SpearClient do
use Spear.Client, otp_app: :lepret
end
@events_namespace Lepret.Domain.Event
def load(decider, id) do
stream_metadata = StreamMetadata.new(decider, id)
raw_events = read_all_events(stream_metadata.stream_name)
{Enum.map(raw_events, &deserialise!/1), StreamMetadata.current_revision(stream_metadata, raw_events)}
end
def publish(events, %StreamMetadata{} = stream_metadata) do
events
|> to_spear_events()
|> SpearClient.append(stream_metadata.stream_name, expect: stream_metadata.current_revision)
end
def publish(decider, id, expected_revision, events) do
publish(events, StreamMetadata.new(decider, id, expected_revision))
end
defp to_spear_events(events) do
Enum.map(events, fn event -> Spear.Event.new(ConversionTools.module_name(event), event) end)
end
def catchup_subscription(subscriber_pid, checkpoint_name) do
SpearClient.subscribe(
subscriber_pid,
:all,
from: Checkpoint.current(checkpoint_name),
filter: Spear.Filter.exclude_system_events()
)
end
def event_type_in?(%Spear.Event{} = raw_event, events_of_interest) do
raw_event.type in Enum.map(events_of_interest, &ConversionTools.module_name/1)
end
def event_type_in?(_checkpoint, _events_of_interest) do
false
end
def deserialise!(%Spear.Event{} = raw_event) do
module_name = Module.concat(@events_namespace, raw_event.type)
module_name.deserialise!(raw_event.body)
end
def update_checkpoint!(checkpoint_name, %Spear.Event{} = raw_event) do
Checkpoint.update!(checkpoint_name, raw_event.metadata.commit_position)
end
defp read_all_events(stream_name) do
stream_name
|> SpearClient.stream!()
|> Enum.into([])
end
end
defmodule Lepret.Domain.EventStore.StreamMetadata do
alias Lepret.Domain.EventStore.StreamMetadata
alias Lepret.Domain.EventStore.ConversionTools
@enforce_keys [:stream_name]
defstruct [:stream_name, :current_revision]
def new(decider, id) do
%__MODULE__{
stream_name: stream_name_for(decider, id)
}
end
def new(decider, id, current_revision) do
%__MODULE__{
stream_name: stream_name_for(decider, id),
current_revision: current_revision
}
end
defp stream_name_for(decider, id) do
"#{ConversionTools.module_name(decider)}:#{id}"
end
def current_revision(%StreamMetadata{} = stream_metadata, raw_events) when is_list(raw_events) do
Map.put(stream_metadata, :current_revision, List.last(raw_events).metadata.stream_revision)
end
end
defmodule Lepret.Domain.EventStore.ConversionTools do
def module_name(struct) when is_map(struct) do
struct
|> Map.get(:__struct__)
|> module_name()
end
def module_name(module) when is_atom(module) do
module
|> Atom.to_string()
|> String.split(".")
|> List.last()
end
end
I can’t say I got the abstractions right, but they’re not bad.
Now we have more sane and less leaky interface:
defmodule Lepret.UseCases.DenyLoan do
# same as before
def run(%DenyLoan{} = command) do
{events, stream_metadata} = EventStore.load(LoanRequest, command.id)
loan_request = LoanRequest.evolve(stream_metadata)
changes = LoanRequest.decide(loan_request, command)
:ok = EventStore.publish(changes, stream_metadata)
end
end
However, I must admit, I’m still tempted to hide the revision
and changes
inside LoanRequest
and evolve it inside EventStore.load
:)
Okay, ladies and gentlemen, here is the moment we all come here for - derivation of a current state from an event stream.
defmodule Lepret.Domain.LoanRequest do
# same as before
def evolve(events) do
List.foldl(events, initial_state(), &evolve/2)
end
def evolve(%LoanRequested{} = event, %__MODULE__{id: nil, status: nil} = state) do
%{
state
| id: event.id,
name: event.name,
amount: event.amount,
national_id: event.national_id,
credit_score: event.credit_score
}
end
def evolve(%ManualReviewRequired{} = _event, %__MODULE__{status: nil} = state) do
%{state | status: :manual_review_required}
end
defp initial_state() do
%__MODULE__{}
end
end
that gives us following state:
%Lepret.Domain.LoanRequest{
id: "40a08c7a-f4fd-45f1-bd6a-fdea1f79e2f7",
name: "Yevhenii Kurtov",
amount: 100,
national_id: "555555aa",
credit_score: %Lepret.Domain.CreditScore{
score: 5,
national_id: "555555aa",
ts: ~U[2024-04-29 20:27:00Z]
},
status: :manual_review_required
}
And here you are! With a completely introspectable, functional domain model where making decisions is separated from applying them.
I’m sorry if it looks simple, but it really is. Here is the complete decider with the last decide
function over the manual loan denial:
defmodule Lepret.Domain.LoanRequest do
use Ecto.Schema
alias Lepret.Domain.Command.RequestLoan
alias Lepret.Domain.Command.DenyLoan
alias Lepret.Domain.Event.LoanRequested
alias Lepret.Domain.Event.AutoApproved
alias Lepret.Domain.Event.AutoDenied
alias Lepret.Domain.Event.ManualReviewRequired
alias Lepret.Domain.Event.ManuallyDenied
@primary_key {:id, :binary_id, []}
embedded_schema do
field :name, :string
field :amount, :integer
field :national_id, :string
field :credit_score, :map
field :status, Ecto.Enum, values: [:auto_approved, :auto_denied, :manual_review_required]
end
def decide(%RequestLoan{} = command) do
loan_requested = %LoanRequested{
id: command.id,
name: command.name,
amount: command.amount,
national_id: command.national_id,
credit_score: command.credit_score
}
loan_request_review_decision =
cond do
command.credit_score.score >= 7 ->
%AutoApproved{id: command.id}
command.credit_score.score <= 4 ->
%AutoDenied{id: command.id}
true ->
%ManualReviewRequired{
id: command.id,
name: command.name,
amount: command.amount,
national_id: command.national_id,
credit_score: command.credit_score
}
end
[loan_requested, loan_request_review_decision]
end
def decide(%__MODULE__{status: :manual_review_required}, %DenyLoan{} = command) do
[%ManuallyDenied{id: command.id}]
end
def evolve(events) do
List.foldl(events, initial_state(), &evolve/2)
end
def evolve(%LoanRequested{} = event, %__MODULE__{id: nil, status: nil} = state) do
%{
state
| id: event.id,
name: event.name,
amount: event.amount,
national_id: event.national_id,
credit_score: event.credit_score
}
end
def evolve(%ManualReviewRequired{} = _event, %__MODULE__{status: nil} = state) do
%{state | status: :manual_review_required}
end
defp initial_state() do
%__MODULE__{}
end
end
See “Fold LoanRequest stream” commit for full changes
Consider how much less mental mapping you must do to speak about the domain with a non-technical stakeholder vs other state management patterns.
Sure, ES/CQRS is not without its complex technical issues like set validations, but no matter what, it’s a powerful tool for winning typical business automation and one we can surely benefit from in complex domains.
Additionally, splitting writing from reading allows one to scale them independently, making a noticeable impact in industries with significant spikes like betting. My experience shows that separating those two roles on different servers makes the difference between an overloaded system and one that can handle Cheltenham without hiccups. Furthermore, it’s possible to split the system across servers even in a more fine-grain fashion and build a separate read model for a single sport, for example, football.
I never saw ES/CQRS adoption going down well in a feature factory. It doesn’t matter if management doesn’t want to distribute power or developers are not interested in the product. The upside is that this journey can be transformative for an organisation willing to change.
Even if your organisation is willing to pay the price of collaborative design in the form of levelling up its social network so that it can accommodate more complex interactions and people inside the organisation are willing to undergo this change, there is a coordination and psychoemotional cost to that. That push is mostly justified for the core part of the system that can make a difference in how the business performs on the market.
Rolling your own event store, especially on top of Kafka, is not worth it. There are already enough articles on the internet about how it’s a bad idea. Even Confluence’s market department stopped advertising it as such.
ES/CQRS thrives in knowledge-based workplaces. It moves focus back into the domain and forces people across the organization to co-design the software, solving both of the infamous “hardest problems” in software development on the spot:
The cache invalidation problem in most cases I’m familiar with is a post-effect of inadequate state management, and ES/CQRS gets that straight by design. Your read models are a cache of your state, and they change according to domain rules.
The problem of naming things ceases to exist because the fundamental building block of ES/CQRS systems - events are, in fact, domain events that are discovered with subject matter experts. You basically move that problem to the place where it should be solved.
And here is another one for why ES/CQRS & Elixir are a great match
Elixir | ES/CQRS |
---|---|
Technical scaling through immutable data | Knowledge scaling through immutable event logs that capture the decision-making context |
Long-running, independent processes with GenServers | Long-running business processes designed according to domain rules |
Communication between processes through asynchronous message passing | Asynchronous, eventually-consistent systems working off event-based state transitions |
Data and behaviour are decoupled from each other: structs doesn’t have behaviour | Data and behaviour are decoupled from each other: one can change logic to reinterpret events |
Many thanks to Stephen Tung for helping prepare this post and to Yves Reynhout for the thoughtful discussion on message delivery guarantees.
If you want to perform an “exact-once” operation in ESDB against an external system, the operation on the external system must support idempotency. An external systems that supports this usually allows you to pass some sort of deduplication or unique identification key to it so that it will just ignore the multiple requests with the same key. In ESDB, this will be the event ID that your process manager is handling. With this, it doesn’t matter if “catch-up” or “persistent” subscription is used. Your external system will perform the dedup.
If you try to do the above catch-up subscription, you will certainly hit the problem you describe in your article. Namely, if you have multiple clients (for HA reasons) performing the same catch-up subscription, then they can all duplicate each other’s event handling efforts. That being said, you can still achieve “exact-once” because the consumer is idempotent is can dedup any duplicated messages. But this is just not a very efficient use of resources when clients are stepping all over each other.
A way to make sure catch-up subscription clients don’t step on each other toes is to use distributed locks, supposedly popularized by Martin Kleppmann (I only recently learnt about this after talking to YvesR):
https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html
In this approach, you can have multiple nodes with clients tasked to perform the same projection/reaction logic, but only one can actually perform it based on who acquires the lock/lease first. This lock can be acquired, for example, over a database.
I think this is the same idea as what you refer to as “synchronisation mechanism through shared storage”
- On the other hand if you try to do the above with persistent subscription, you will get the competing consumer capability out of the box and ESDB will be the distributor that manages which node/client will receive the message.
This is great and require less code complexity on your part, but the danger is that with parking, your event handler can lose events, can receive events out-of-order, and can receive duplicated events (due to retry). If you operation doesn’t really care about this, then persistent subscription is the best option for you. But if not, then you’ll need to write extra logic to handle these scenarios.
Exactly once is only going to work when the third party actively participates. What if you call the 3rd party and your calling process dies mid-execution. You have no idea whether the desired side effect happened. Even if you wait for an acknowledgement, if your process dies before receiving it, you have no way of knowing whether the target system sent it, unless you call it again (with a message identifier to allow the 3rd party to deduplicate / be idempotent).