Introduction
Event Sourcing and CQRS (Command Query Responsibility Segregation) are powerful architectural patterns that can help build scalable and maintainable applications. While these concepts might seem complex, they can be implemented effectively using Elixir and Phoenix PubSub. This post will explain these patterns and demonstrate their implementation through a practical example of a library inventory management system.
Understanding Event Sourcing and CQRS
What is Event Sourcing?
Event Sourcing is a pattern where changes to an application's state are stored as a sequence of events. Instead of maintaining the current state directly, the application records each change as an event and reconstructs the current state by replaying these events. This approach provides a complete history of changes and facilitates powerful features such as audit logging and temporal queries.
What is CQRS?
CQRS stands for Command Query Responsibility Segregation. It separates the responsibilities for handling commands (which modify the state) and queries (which read the state). By using distinct models for read and write operations, CQRS allows each model to be optimized for its specific task, improving performance and scalability.
Why Use Event Sourcing and CQRS?
- Scalability: Independent scaling of read and write services allows the system to handle high loads efficiently.
- Optimized Performance: The write model can focus on efficient state changes, while the read model can be tailored for fast queries.
- Auditability: Storing every change as an event provides a full history, which is useful for auditing and debugging.
- Flexibility: The architecture is adaptable to changes, allowing new features to be added without disrupting existing functionality.
Practical Example: Library Inventory Management System
We will implement a library inventory management system using Event Sourcing and CQRS. The system will consist of two separate services: a write service for handling commands and a read service for handling queries. Phoenix PubSub will be used to propagate events between the services.
Project Setup
Create two Elixir applications:
- library_write_service for command handling.
- library_read_service for query handling.
Write Service
Folder Structure
We will start by setting up the folder structure for the write service.
library_write_service/
├── lib/
│ ├── library_write_service/
│ │ ├── application.ex
│ │ ├── command_handler.ex
│ │ ├── commands/
│ │ │ ├── add_book.ex
│ │ │ ├── borrow_book.ex
│ │ │ └── remove_book.ex
│ │ ├── event_store.ex
│ │ ├── events/
│ │ │ ├── book_added.ex
│ │ │ ├── book_borrowed.ex
│ │ │ └── book_removed.ex
│ └── library_write_service.ex
├── config/
│ └── config.exs
├── mix.exs
└── README.md
This structure organizes the commands and events into separate directories for clarity and maintainability.
Application Module
Next, we define the application module which starts the necessary processes.
defmodule LibraryWriteService.Application do
use Application
def start(_type, _args) do
children = [
{Phoenix.PubSub, name: LibraryWriteService.PubSub},
{LibraryWriteService.EventStore, []}
]
opts = [strategy: :one_for_one, name: LibraryWriteService.Supervisor]
Supervisor.start_link(children, opts)
end
end
This module starts the Phoenix PubSub and the EventStore, ensuring they are available when the application runs.
Command Handler
We now implement the command handler that processes incoming commands and generates corresponding events.
defmodule LibraryWriteService.CommandHandler do
alias LibraryWriteService.{Events, EventStore}
def handle(%LibraryWriteService.Commands.AddBook{} = command) do
event = %Events.BookAdded{
isbn: command.isbn,
title: command.title,
author: command.author,
quantity: command.quantity,
timestamp: DateTime.utc_now()
}
EventStore.append_event(event)
Phoenix.PubSub.broadcast(LibraryWriteService.PubSub, "events", event)
end
def handle(%LibraryWriteService.Commands.RemoveBook{} = command) do
event = %Events.BookRemoved{
isbn: command.isbn,
timestamp: DateTime.utc_now()
}
EventStore.append_event(event)
Phoenix.PubSub.broadcast(LibraryWriteService.PubSub, "events", event)
end
def handle(%LibraryWriteService.Commands.BorrowBook{} = command) do
event = %Events.BookBorrowed{
isbn: command.isbn,
user_id: command.user_id,
timestamp: DateTime.utc_now()
}
EventStore.append_event(event)
Phoenix.PubSub.broadcast(LibraryWriteService.PubSub, "events", event)
end
end
Here, each command generates an event, which is then appended to the EventStore and broadcasted using Phoenix PubSub.
Event Store
The EventStore stores all events and provides functions to append and retrieve events.
defmodule LibraryWriteService.EventStore do
use GenServer
# Client API
def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def append_event(event) do
GenServer.call(__MODULE__, {:append_event, event})
end
def get_events do
GenServer.call(__MODULE__, :get_events)
end
# Server Callbacks
@impl true
def init(_) do
{:ok, []}
end
@impl true
def handle_call({:append_event, event}, _from, state) do
{:reply, :ok, [event | state]}
end
@impl true
def handle_call(:get_events, _from, state) do
{:reply, Enum.reverse(state), state}
end
end
This module uses a GenServer to manage an in-memory list of events. In a production system, you would likely use a database to persist events.
Read Service
Folder Structure
Next, we set up the folder structure for the read service.
library_read_service/
├── lib/
│ ├── library_read_service/
│ │ ├── application.ex
│ │ ├── event_handler.ex
│ │ ├── query_model.ex
│ └── library_read_service.ex
├── config/
│ └── config.exs
├── mix.exs
└── README.md
Application Module
The application module for the read service starts the necessary processes.
defmodule LibraryReadService.Application do
use Application
def start(_type, _args) do
children = [
{Phoenix.PubSub, name: LibraryReadService.PubSub},
{LibraryReadService.QueryModel, []},
{LibraryReadService.EventHandler, []}
]
opts = [strategy: :one_for_one, name: LibraryReadService.Supervisor]
Supervisor.start_link(children, opts)
end
end
This module starts the Phoenix PubSub, QueryModel, and EventHandler processes.
Event Handler
The EventHandler subscribes to events and updates the read model accordingly.
defmodule LibraryReadService.EventHandler do
use GenServer
alias LibraryReadService.QueryModel
alias LibraryReadService.Events.{BookAdded, BookRemoved, BookBorrowed}
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def init(state) do
Phoenix.PubSub.subscribe(LibraryReadService.PubSub, "events")
{:ok, state}
end
def handle_info(event, state) do
case event do
%BookAdded{} -> QueryModel.apply_event(event)
%BookRemoved{} -> QueryModel.apply_event(event)
%BookBorrowed{} -> QueryModel.apply_event(event)
end
{:noreply, state}
end
end
This module listens for events and updates the query model by applying the events.
Query Model
The QueryModel maintains the current state of the inventory and applies events to update the state.
defmodule LibraryReadService.QueryModel do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def get_inventory do
GenServer.call(__MODULE__, :get_inventory)
end
def apply_event(event) do
GenServer.cast(__MODULE__, {:apply_event, event})
end
@impl true
def init(state) do
{:ok, state}
end
@impl true
def handle_call(:get_inventory, _from, state) do
{:reply, state, state}
end
@impl true
def handle_cast({:apply_event, %BookAdded{isbn: isbn, title: title, author: author, quantity: quantity}}, state) do
new_state = Map.put(state, isbn, %{title: title, author: author, quantity: quantity})
{:noreply, new_state}
end
@impl true
def handle_cast({:apply_event, %BookRemoved{isbn: isbn}}, state) do
new_state = Map.delete(state, isbn)
{:noreply, new_state}
end
@impl true
def handle_cast({:apply_event, %BookBorrowed{isbn: isbn}}, state) do
new_state = update_in(state[isbn].quantity, &(&1 - 1))
{:noreply, state}
end
end
Example Usage
Let's see how we can use these services in practice.
Write Service Commands
To add, borrow, and remove books, you would use the LibraryWriteService:
LibraryWriteService.add_book("978-1593275846", "Elixir in Action", "Sasa Juric", 10)
LibraryWriteService.add_book("978-1680502992", "Programming Elixir 1.6", "Dave Thomas", 5)
LibraryWriteService.borrow_book("978-1593275846", "user123")
LibraryWriteService.remove_book("978-1680502992")
add_book/4: Adds a new book to the inventory.borrow_book/2: Records that a user has borrowed a book.remove_book/1: Removes a book from the inventory.
Read Service Query
To get the current inventory, you would use the LibraryReadService:
LibraryReadService.get_inventory()
Conclusion
By implementing CQRS and Event Sourcing with Phoenix PubSub, we created a scalable and maintainable architecture for our library inventory management system. Let's break down the key components and their roles:
Write Service
Purpose: Handle state-changing commands and generate corresponding events.
- Commands:
add_book/4: Adds a new book to the inventory.borrow_book/2: Records a book being borrowed.remove_book/1: Removes a book from the inventory.
Flow:
- Commands are received and processed by the
CommandHandler. - The
CommandHandler generates events based on the commands. - Events are appended to the
EventStore. - Events are broadcasted using Phoenix PubSub.
Event Store
Purpose: Persist events and provide event history for state reconstruction.
Flow:
- Events are appended to an in-memory list (or a database in a production system).
- Events can be retrieved for replaying or auditing purposes.
Read Service
Purpose: Handle queries and provide the current state of the system.
- Event Handler:
- Subscribes to events and updates the
QueryModel with new events.
Flow:
- Subscribes to the event stream.
- Applies events to update the current state in the
QueryModel.
- QueryModel:
- Maintains the current state of the inventory.
- Provides methods to query the current state
Key Benefits
- Auditability: By recording every change as an event, we maintain a complete and immutable history of all actions, making it easy to audit and trace any modifications.
- Scalability: Separating read and write operations allows each to be scaled independently, optimizing for performance.
- Flexibility: The event-based architecture makes it easier to introduce new features or change existing ones without disrupting the overall system.
- Debugging: The ability to replay events to reconstruct the state at any point in time simplifies debugging and ensures consistency.
By using CQRS and Event Sourcing, we've built a robust and flexible system that can handle complex operations efficiently, providing clear benefits in terms of scalability, maintainability, and auditability.
Comments
Post a Comment