Skip to main content

Simplifying Event Sourcing and CQRS in Elixir: A Practical Approach with Phoenix PubSub

Introduction
Event Sourcing and CQRS (Command Query Responsibility Segregation) are powerful architectural patterns that can help build scalable and maintainable applications. While these concepts might seem complex, they can be implemented effectively using Elixir and Phoenix PubSub. This post will explain these patterns and demonstrate their implementation through a practical example of a library inventory management system.

Understanding Event Sourcing and CQRS
What is Event Sourcing?
Event Sourcing is a pattern where changes to an application's state are stored as a sequence of events. Instead of maintaining the current state directly, the application records each change as an event and reconstructs the current state by replaying these events. This approach provides a complete history of changes and facilitates powerful features such as audit logging and temporal queries.

What is CQRS?
CQRS stands for Command Query Responsibility Segregation. It separates the responsibilities for handling commands (which modify the state) and queries (which read the state). By using distinct models for read and write operations, CQRS allows each model to be optimized for its specific task, improving performance and scalability.

Why Use Event Sourcing and CQRS?
  • Scalability: Independent scaling of read and write services allows the system to handle high loads efficiently.
  • Optimized Performance: The write model can focus on efficient state changes, while the read model can be tailored for fast queries.
  • Auditability: Storing every change as an event provides a full history, which is useful for auditing and debugging.
  • Flexibility: The architecture is adaptable to changes, allowing new features to be added without disrupting existing functionality.
Practical Example: Library Inventory Management System
We will implement a library inventory management system using Event Sourcing and CQRS. The system will consist of two separate services: a write service for handling commands and a read service for handling queries. Phoenix PubSub will be used to propagate events between the services.

Project Setup
Create two Elixir applications:
  • library_write_service for command handling.
  • library_read_service for query handling.

Write Service

Folder Structure

We will start by setting up the folder structure for the write service.


library_write_service/
├── lib/
│ ├── library_write_service/
│ │ ├── application.ex
│ │ ├── command_handler.ex
│ │ ├── commands/
│ │ │ ├── add_book.ex
│ │ │ ├── borrow_book.ex
│ │ │ └── remove_book.ex
│ │ ├── event_store.ex
│ │ ├── events/
│ │ │ ├── book_added.ex
│ │ │ ├── book_borrowed.ex
│ │ │ └── book_removed.ex
│ └── library_write_service.ex
├── config/
│ └── config.exs
├── mix.exs
└── README.md

This structure organizes the commands and events into separate directories for clarity and maintainability.

Application Module

Next, we define the application module which starts the necessary processes.


defmodule LibraryWriteService.Application do
use Application

def start(_type, _args) do
children = [
{Phoenix.PubSub, name: LibraryWriteService.PubSub},
{LibraryWriteService.EventStore, []}
]

opts = [strategy: :one_for_one, name: LibraryWriteService.Supervisor]
Supervisor.start_link(children, opts)
end
end

This module starts the Phoenix PubSub and the EventStore, ensuring they are available when the application runs.

Command Handler

We now implement the command handler that processes incoming commands and generates corresponding events.


defmodule LibraryWriteService.CommandHandler do
alias LibraryWriteService.{Events, EventStore}

def handle(%LibraryWriteService.Commands.AddBook{} = command) do
event = %Events.BookAdded{
isbn: command.isbn,
title: command.title,
author: command.author,
quantity: command.quantity,
timestamp: DateTime.utc_now()
}
EventStore.append_event(event)
Phoenix.PubSub.broadcast(LibraryWriteService.PubSub, "events", event)
end

def handle(%LibraryWriteService.Commands.RemoveBook{} = command) do
event = %Events.BookRemoved{
isbn: command.isbn,
timestamp: DateTime.utc_now()
}
EventStore.append_event(event)
Phoenix.PubSub.broadcast(LibraryWriteService.PubSub, "events", event)
end

def handle(%LibraryWriteService.Commands.BorrowBook{} = command) do
event = %Events.BookBorrowed{
isbn: command.isbn,
user_id: command.user_id,
timestamp: DateTime.utc_now()
}
EventStore.append_event(event)
Phoenix.PubSub.broadcast(LibraryWriteService.PubSub, "events", event)
end
end

Here, each command generates an event, which is then appended to the EventStore and broadcasted using Phoenix PubSub.

Event Store

The EventStore stores all events and provides functions to append and retrieve events.


defmodule LibraryWriteService.EventStore do
use GenServer

# Client API
def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

def append_event(event) do
GenServer.call(__MODULE__, {:append_event, event})
end

def get_events do
GenServer.call(__MODULE__, :get_events)
end

# Server Callbacks
@impl true
def init(_) do
{:ok, []}
end

@impl true
def handle_call({:append_event, event}, _from, state) do
{:reply, :ok, [event | state]}
end

@impl true
def handle_call(:get_events, _from, state) do
{:reply, Enum.reverse(state), state}
end
end


This module uses a GenServer to manage an in-memory list of events. In a production system, you would likely use a database to persist events.

Read Service

Folder Structure

Next, we set up the folder structure for the read service.


library_read_service/
├── lib/
│ ├── library_read_service/
│ │ ├── application.ex
│ │ ├── event_handler.ex
│ │ ├── query_model.ex
│ └── library_read_service.ex
├── config/
│ └── config.exs
├── mix.exs
└── README.md



Application Module

The application module for the read service starts the necessary processes.


defmodule LibraryReadService.Application do
use Application

def start(_type, _args) do
children = [
{Phoenix.PubSub, name: LibraryReadService.PubSub},
{LibraryReadService.QueryModel, []},
{LibraryReadService.EventHandler, []}
]

opts = [strategy: :one_for_one, name: LibraryReadService.Supervisor]
Supervisor.start_link(children, opts)
end
end


This module starts the Phoenix PubSub, QueryModel, and EventHandler processes.

Event Handler

The EventHandler subscribes to events and updates the read model accordingly.


defmodule LibraryReadService.EventHandler do
use GenServer
alias LibraryReadService.QueryModel
alias LibraryReadService.Events.{BookAdded, BookRemoved, BookBorrowed}

def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end

def init(state) do
Phoenix.PubSub.subscribe(LibraryReadService.PubSub, "events")
{:ok, state}
end

def handle_info(event, state) do
case event do
%BookAdded{} -> QueryModel.apply_event(event)
%BookRemoved{} -> QueryModel.apply_event(event)
%BookBorrowed{} -> QueryModel.apply_event(event)
end
{:noreply, state}
end
end


This module listens for events and updates the query model by applying the events.

Query Model

The QueryModel maintains the current state of the inventory and applies events to update the state.


defmodule LibraryReadService.QueryModel do
use GenServer

def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end

def get_inventory do
GenServer.call(__MODULE__, :get_inventory)
end

def apply_event(event) do
GenServer.cast(__MODULE__, {:apply_event, event})
end

@impl true
def init(state) do
{:ok, state}
end

@impl true
def handle_call(:get_inventory, _from, state) do
{:reply, state, state}
end

@impl true
def handle_cast({:apply_event, %BookAdded{isbn: isbn, title: title, author: author, quantity: quantity}}, state) do
new_state = Map.put(state, isbn, %{title: title, author: author, quantity: quantity})
{:noreply, new_state}
end

@impl true
def handle_cast({:apply_event, %BookRemoved{isbn: isbn}}, state) do
new_state = Map.delete(state, isbn)
{:noreply, new_state}
end

@impl true
def handle_cast({:apply_event, %BookBorrowed{isbn: isbn}}, state) do
new_state = update_in(state[isbn].quantity, &(&1 - 1))
{:noreply, state}
end
end


Example Usage

Let's see how we can use these services in practice.

Write Service Commands

To add, borrow, and remove books, you would use the LibraryWriteService:


LibraryWriteService.add_book("978-1593275846", "Elixir in Action", "Sasa Juric", 10)
LibraryWriteService.add_book("978-1680502992", "Programming Elixir 1.6", "Dave Thomas", 5)
LibraryWriteService.borrow_book("978-1593275846", "user123")
LibraryWriteService.remove_book("978-1680502992")

  • add_book/4: Adds a new book to the inventory.
  • borrow_book/2: Records that a user has borrowed a book.
  • remove_book/1: Removes a book from the inventory.

Read Service Query

To get the current inventory, you would use the LibraryReadService:


LibraryReadService.get_inventory()


Conclusion

By implementing CQRS and Event Sourcing with Phoenix PubSub, we created a scalable and maintainable architecture for our library inventory management system. Let's break down the key components and their roles:

Write Service

Purpose: Handle state-changing commands and generate corresponding events.

  1. Commands:
    • add_book/4: Adds a new book to the inventory.
    • borrow_book/2: Records a book being borrowed.
    • remove_book/1: Removes a book from the inventory.

Flow:

  • Commands are received and processed by the CommandHandler.
  • The CommandHandler generates events based on the commands.
  • Events are appended to the EventStore.
  • Events are broadcasted using Phoenix PubSub.

Event Store

Purpose: Persist events and provide event history for state reconstruction.

Flow:

  • Events are appended to an in-memory list (or a database in a production system).
  • Events can be retrieved for replaying or auditing purposes.

Read Service

Purpose: Handle queries and provide the current state of the system.

  1. Event Handler:
    • Subscribes to events and updates the QueryModel with new events.

Flow:

  • Subscribes to the event stream.
  • Applies events to update the current state in the QueryModel.
  1. QueryModel:
    • Maintains the current state of the inventory.
    • Provides methods to query the current state

Key Benefits

  1. Auditability: By recording every change as an event, we maintain a complete and immutable history of all actions, making it easy to audit and trace any modifications.
  2. Scalability: Separating read and write operations allows each to be scaled independently, optimizing for performance.
  3. Flexibility: The event-based architecture makes it easier to introduce new features or change existing ones without disrupting the overall system.
  4. Debugging: The ability to replay events to reconstruct the state at any point in time simplifies debugging and ensures consistency.

By using CQRS and Event Sourcing, we've built a robust and flexible system that can handle complex operations efficiently, providing clear benefits in terms of scalability, maintainability, and auditability.

Comments

Popular posts from this blog

Building a Real-Time Collaborative Editing Application with Phoenix LiveView and Presence

In this blog post, we will walk through the process of building a real-time collaborative document editing application using Phoenix LiveView. We will cover everything from setting up the project, creating the user interface, implementing real-time updates, and handling user presence. By the end of this tutorial, you'll have a fully functional collaborative editor that allows multiple users to edit a document simultaneously, with real-time updates and presence tracking. User Flow  Before diving into the code, let's outline the user flow and wireframes for our application. This will help us understand the overall structure and functionality we aim to achieve. Landing Page: The user is greeted with a landing page that prompts them to enter their name. Upon entering their name and clicking "Submit", they are redirected to the document list page. Document List Page: The user sees a list of available documents. Each document title is a clickable link that takes the user to...

Handling Massive Concurrency with Elixir and OTP: Advanced Practical Example

For advanced Elixir developers, handling massive concurrency involves not just understanding the basics of GenServers, Tasks, and Supervisors, but also effectively utilizing more complex patterns and strategies to optimize performance and ensure fault tolerance. In this blog post, we'll dive deeper into advanced techniques and provide a practical example involving a distributed, fault-tolerant system. Practical Example: Distributed Web Crawler We'll build a distributed web crawler that can handle massive concurrency by distributing tasks across multiple nodes, dynamically supervising crawling processes, and implementing rate limiting to control the crawling rate. In this example, we will build a distributed web crawler that simulates handling massive concurrency and backpressure. To achieve this, we will: Generate 100 unique API URLs that will be processed by our system. Create an API within the application that simulates slow responses using :timer.sleep to introduce artificia...

Integrating Elixir with Rust for Advanced WebSocket Message Decoding

As developers, we often face scenarios where we need to push the boundaries of performance and efficiency. One such case is decoding complex WebSocket messages in real-time financial applications. In this blog post, we'll explore how to leverage Rust's performance within an Elixir application to decode WebSocket messages from Zerodha's Kite Connect API. Why Integrate Elixir with Rust? Elixir is known for its concurrency and fault-tolerance, making it an excellent choice for building scalable applications. However, Rust offers unmatched performance and memory safety, making it ideal for CPU-intensive tasks like decoding binary WebSocket messages. By integrating Rust with Elixir, we can achieve the best of both worlds. The Challenge: Decoding Kite Connect WebSocket Messages Zerodha's Kite Connect API provides market data via WebSocket in binary format. These messages need to be decoded efficiently to be useful. While Elixir is powerful, decoding binary data is an area whe...