In the previous article Introduction to Event Sourcing and CQRS we got familiar with the main concepts of Event Sourcing and reviewed the cons and pros of this approach.
Implementing Event Sourcing in Rails can be a powerful way to handle complex business logic and maintain a reliable audit trail of changes to your application's state. Event Sourcing involves storing a sequence of events that represent changes to the state of your application over time.
Now, before we plummet into using Event Sourcing in Rails with help or battle-tested solutions, let's research the basics and learn how to implement things from scratch. However, be aware - you should think twice before using custom implementation (too many chances you going to miss some small detail and the consequences may be huge). For these purposes let's create a simple application for listing rental advertisements on the website.
The process may be quite complicated, but for the sake of simplicity, let's assume it goes as below:
- the ad listing is created
- the content is updated
- the listing goes published on the website
- ad listing removed
Preparations
where we start building a path in our journey and getting familiar with events
So, let's create a new project for our implementation purposes first:
rails new event_sourced_ads --database=postgresql --skip-test
Skipping tests, since I’m fan so we'll use that:
gem "rspec-rails", "~> 6.1.0"
Then bundle and rails generate rspec:install
Now, once the initial setup is done we are going to use events. Let’s implement that part. And guess what we’re starting with:
class CreateEvents < ActiveRecord::Migration[7.1]
def change
create_table :events, id: :uuid do |t|
t.string :stream_name
t.string :event_type
t.jsonb :data
t.timestamps
end
end
end
We use UUID here for primary keys. When dealing with distributed systems and the need for worldwide uniqueness, opting for a UUID could be the optimal decision.
We need streams to separate events related to certain entities. Streams are needed to group events of a particular kind. In our case, I’m going to group events related to one single ad, so those can be easily fetched. Frankly speaking, it’s not a good idea to store stream names like that, since the events may be in different streams. For the sake of simplicity, let’s consider doing some evil (we’ll do more till the end).
We need some basic event class that we can publish and verify input with, also we need some way to use pub-sub (quite a crucial part). I’m excited about dry-rb stack- I can’t say it’s perfect, but it usually perfectly suits all my needs, so I'm turning on the imagination and seeing what it brings…
ImaginationCompleted.publish(data: {idea: "Create BaseEvent", pub_sub: "KISS rails has one built-in"})
Who am I to argue with that 😇 Let’s start with a spec:
it "persists an event record" do
expect { publish }.to change { Event.count }.by(1)
expect(Event.last).to have_attributes(
event_type: "FakeEvent",
data: {"name" => "whatever"},
stream_name: "123123"
)
end
it "sends a notification" do
allow(ActiveSupport::Notifications).to receive(:instrument)
publish
expect(ActiveSupport::Notifications).to have_received(:instrument).with(
"FakeEvent", data: {name: "whatever"}, stream_name: "123123"
)
end
and after some struggle, we come up with:
# lib/events/base_event.rb
module Events
class BaseEvent
class InvalidAttributes < StandardError; end
class MissingContract < StandardError; end
attr_reader :data
def self.schema(&block)
inner_schema = block.call
define_method(:params_schema) do
Dry::Schema.Params do
required(:data).hash(inner_schema)
end
end
end
def self.publish(**args)
new(**args.slice(:data)).publish(stream_name: args[:stream_name])
end
def initialize(**args)
validate_input(args)
@data = args[:data]
end
def publish(stream_name: nil)
Event.create!(
event_type: self.class.name, data:, stream_name:
)
ActiveSupport::Notifications.instrument(self.class.name, data:, stream_name:)
self
end
def params_schema
->(_) { raise MissingContract, "Contract needs to be implemented" }
end
def validate_input(args)
data_validation = params_schema.call(args)
raise InvalidAttributes.new(data_validation.errors.to_h) if data_validation.errors.any?
end
end
end
And let’s try using our new pet. You know where to start…
RSpec.describe Events::AdCreated do
describe ".publish" do
subject(:publish) do
described_class.publish(
data: {title: "Some title", body: "Some description"},
stream_name: "123456789",
)
end
it "persists the event in database" do
expect { publish }.to change { Event.count }.by(1)
expect(Event.last).to have_attributes(
event_type: "Events::AdCreated",
data: {
"title" => "Some title",
"body" => "Some description"
},
stream_name: "123456789"
)
end
end
end
and the event itself:
# lib/events/ad_created.rb
class Events::AdCreated < Events::BaseEvent
schema do
Dry::Schema.Params do
required(:title).filled(:string)
required(:body).filled(:string)
end
end
end
Aggregate part
the one where we learn to manipulate our ads
So, as user we should be able to create new ad, possibly modify that and publish. However, we shouldn’t be able to edit already published ad. So we need some consistency in actions and having corresponding event published after the action is executed. That’s where the aggregate comes to place.
So, we create AdAggregate class and start with test for the new instance:
# spec/services/ad_aggregate_spec.rb
RSpec.describe AdAggregate do
it "has valid attributes on initialization" do
expect(aggregate).to have_attributes(
id: kind_of(String),
state: :new
)
end
# app/services/ad_aggregate.rb
class AdAggregate
attr_reader :id, :attributes, :state
def initialize(id = nil)
@id = id || SecureRandom.uuid
@state = :new
end
end
Next we need possibility to actually create new draft and have those attributes in the aggregate. Also we need to publish an event that the draft is created.
describe "#create_draft" do
subject(:create_draft) { aggregate.create_draft(**attributes) }
context "with valid attributes" do
let(:attributes) { valid_attributes }
it "updates attributes and state" do
create_draft
expect(aggregate).to have_attributes(
attributes: {
title: "Test title",
body: "Test description"
},
state: :draft
)
end
end
end
The one is easy to implement, but we face a problem here. We should be able to restore the state of the aggregate later when we want to apply next actions to that. The aggregate is supposed to be event sourced one. So we need a way to apply events to that and all we should actually do here is to apply an event
def create_draft(title:, body:)
apply Events::AdCreated.new(data: {ad_id: id, title:, body:})
end
We need a handler in the aggregate to understand how we modify the attributes, how the state is changed and how we can restore the state of an aggregate from history of events (for this purpose we’ll create another class in a while 😉). Also, the events should be published when we store the aggregate. For these purposes, let’s add handler methods to explain how we want to modify aggregate’s state on event and common method that will also create a queue of unpublished events
def unpublished_events
@unpublished_events ||= []
end
def apply_event(event)
send("apply_#{event.class.name.demodulize.underscore}", event)
end
private
def apply(event)
unpublished_events << event
apply_event(event)
end
def apply_ad_created(event)
@state = :draft
@attributes = event.data.slice(:title, :body)
end
So, when the new event is applied we save that in a queue of unpublished events and call the corresponding handler. But what’s the sense of that without having the events stored? How to fetch previously created aggregate? We could implement that here in this class, though according Single Responsibility Principle, it’s definitely a work that someone else should do. That’s where we need a repository:
# frozen_string_literal: true
require "rails_helper"
RSpec.describe Repository do
describe '.load' do
subject(:load) { described_class.load(aggregate_class, stream_name) }
let(:aggregate_class) { AdAggregate }
let(:stream_name) { SecureRandom.uuid }
context "without events" do
it "loads new aggregate" do
expect(load).to be_instance_of(aggregate_class).and have_attributes(
id: stream_name,
state: :new
)
end
end
context "with existing events" do
context "when applying AdCreated event" do
before do
Event.create(
event_type: "Events::AdCreated", stream_name:,
data: {ad_id: stream_name, title: "title", body: "body"}
)
end
it "applies event to aggregate" do
expect(load).to be_a(AdAggregate).and have_attributes(
id: stream_name,
state: :draft,
attributes: {
title: "title",
body: "body"
}
)
end
context "when applying AdPublished" do
before do
Event.create(
event_type: "Events::AdPublished", stream_name:,
data: {ad_id: stream_name, remote_id: "xosfjoj"}
)
end
it "applies event to aggregate" do
expect(load).to be_a(AdAggregate).and have_attributes(
id: stream_name,
state: :published,
attributes: {
title: "title",
body: "body"
}
)
end
end
end
end
end
describe '.store' do
subject(:store) { described_class.store(aggregate) }
context "with unpublished events" do
let(:aggregate) do
instance_double(AdAggregate, id: stream_name, unpublished_events: [event])
end
let(:stream_name) { SecureRandom.uuid }
let(:event) do
Events::AdCreated.new(data: {ad_id: stream_name, title: "title", body: "body"})
end
it "publishes pending events" do
expect { store }.to change { Event.count }.by(1)
expect(Event.last).to have_attributes(
stream_name:,
event_type: "Events::AdCreated",
data: {
"ad_id" => stream_name,
"title" => "title",
"body" => "body"
}
)
end
end
end
end
and the implementation of that is easy enough. I’ll omit description of that to save some precious space and time
module Repository
extend self
def load(aggregate_class, stream_name)
events = Event.where(stream_name:).map do |event|
event.event_type.constantize.new(data: event.data)
end
aggregate_class.new(stream_name).tap do |aggregate|
events.each do |event|
aggregate.apply_event(event)
end
end
end
def store(aggregate)
aggregate.unpublished_events.each do |event|
event.publish(stream_name: aggregate.id)
end
end
end
We do have a possibility to store the aggregate to load that from existing events. However, we are missing one of the main purposes for the aggregate. We should disallow editing already published ads, also we definitely can’t publish the same ad twice (well technically we can, but for sure that’s wrong). So, as usually:
describe "#update_content" do
subject(:update_content) { aggregate.update_content(**new_attributes) }
let(:aggregate) { described_class.new }
let(:new_attributes) do
{title: "Updated title", body: "Updated description"}
end
context "when ad is in draft state" do
before { aggregate.create_draft(**valid_attributes) }
it "updates ad attributes" do
update_content
expect(aggregate).to have_attributes(
attributes: {
title: "Updated title",
body: "Updated description"
},
state: :draft
)
end
end
context "when ad is in published state" do
before do
aggregate.create_draft(**valid_attributes)
aggregate.publish
end
it "raises an error" do
expect { update_content }.to raise_error(described_class::AlreadyPublished)
end
end
end
describe "#publish" do
subject(:publish) { aggregate.publish }
let(:aggregate) { described_class.new }
context "when ad is in draft state" do
before { aggregate.create_draft(**valid_attributes) }
it "updates state to published" do
publish
expect(aggregate.state).to eq(:published)
end
end
context "when ad is in published state" do
before do
aggregate.create_draft(**valid_attributes)
aggregate.publish
end
it "raises an error" do
expect { publish }.to raise_error(described_class::AlreadyPublished)
end
end
end
You can check the implementation of the methods in the repository and it’d be a good idea to try implementing that by yourself 😉
CQRS part
the one where we get familiar with read models and presentation to users
Ok, pub is ready, now it’s time to have sub part:
# config/initializers/event_listeners.rb
Rails.application.config.after_initialize do
{
AdEventListener: [
Events::AdCreated,
]
}.each do |listener, events|
events.each { |event| ActiveSupport::Notifications.subscribe(event.to_s, listener.to_s.constantize) }
end
end
So, here we are going to rule where events happen to be. In the example, AdEventListener will get the ActiveSupport event we broadcast with BaseEvent and send a call to our listener. Perfect… but not exactly what we need.
class ApplicationEventListener
def self.call(event)
public_send(
"apply_#{event.name.demodulize.underscore}", **event.payload
)
end
end
and now we should be able to create listeners in a very convenient form:
class AdEventListener < ApplicationEventListener
class << self
def apply_ad_created(data:, stream_name:)
Ad.create!(id: stream_name, **data)
end
end
end
🤔 …but stop, something’s wrong here. What’s Ad.create!? We don’t have that implemented… The part is omitted for a reason.
What we implemented above is a CQRS system and Ad is a read model. The structure of that is not important and should suit your needs. In this example project I’ve implemented Events::AdModified, Events::AdPublished, Events::AdRemoved. You can get familiar with the project.
Retrospective part
the one where we look over what we did
We’ve just implemented an application using Event Sourcing from scratch. I definitely would recommend to stay away from self-made solutions in production. Several simplifications were made (but you may need those once your project grows). Anyway, it’s good to know what‘s inside the black-box (gem) you use.
In the upcoming articles we’re going to play with some recognized instruments to implement event sourced applications