Sooner or later, someone asks: “Why did the system calculate this margin at $50K last Tuesday when it’s $47K today?” In a traditional database, you shrug. The old value is gone—overwritten by the current state.
I’ve watched teams try to solve this with audit tables, trigger-based change logs, temporal database extensions. These work for a while. Then the audit table hits a billion rows and queries take minutes. Or the change log misses an edge case and you’re back to guessing. Or someone asks about the state at 3:47 PM specifically and you realize your hourly snapshots aren’t granular enough.
Event sourcing sidesteps these problems by storing what happened rather than what is. The current portfolio balance isn’t a database row—it’s a computation over the history of deposits, withdrawals, trades, and dividends. That history is the source of truth.
Why This Matters for Financial Systems
The audit trail is the main reason I reach for event sourcing in finance. Regulators don’t want to hear “the system says $47K now.” They want the exact sequence of operations that produced any number, reproducible on demand. Event sourcing gives you this without bolting on logging infrastructure after the fact—the log is the data model.
Debugging follows naturally. When a calculation looks wrong, you replay events and watch the state evolve. No guessing, no printf statements, no “well it works now so I guess we’re fine.” The bug is in the event history or it’s in your replay logic. Either way, you can find it.
The flip side: event sourcing is genuinely more complex than a PostgreSQL table with UPDATE statements. You need to think about event ordering in distributed systems (harder than it sounds), projection lag, schema evolution for events that live forever, and the operational overhead of eventually-consistent read models. I wouldn’t use it for a system where “just query the current row” actually solves the problem.
What Goes in an Event
Events capture facts about your domain: a trade executed, an order placed, a dividend received. The trick is deciding what fields each event needs—too few and you can’t reconstruct state; too many and you’re serializing half your database on every write.
Code examples use Scala 3 with ZIO 2. The patterns work in any language with decent sum types.
trait DomainEvent {
def aggregateId: String // Which entity this affects (account, portfolio)
def eventId: UUID // Unique identifier for this event
def timestamp: Instant // When this happened
def version: Long // Ordering within this aggregate
}
case class TradeExecuted(
aggregateId: String,
eventId: UUID,
timestamp: Instant,
version: Long,
symbol: String,
quantity: BigDecimal,
price: BigDecimal,
side: TradeSide,
executionVenue: String,
regulatoryReportingFlags: Set[String],
counterpartyId: Option[String]
) extends DomainEvent
The version field matters more than it might appear. In distributed systems, timestamps alone cannot establish ordering—clock skew between servers can scramble event sequences. Version numbers provide reliable ordering within an aggregate.
The regulatory fields (regulatoryReportingFlags, executionVenue, counterpartyId) are not optional extras. Most jurisdictions require this information for trade reporting.
The Event Store Itself
You can build an event store on PostgreSQL, Kafka, EventStoreDB, or a dozen other backends. The interface is simple—append events, read events for an aggregate, read all events globally. The complexity is in the implementation.
trait EventStore[F[_]] {
def append[E <: DomainEvent](events: Chunk[E]): F[Unit]
def readStream(aggregateId: String, fromVersion: Long): ZStream[Any, Throwable, DomainEvent]
def readAllEvents(fromOffset: Long): ZStream[Any, Throwable, EventEnvelope]
}
// Extended trait for transactional operations
trait TransactionalEventStore[F[_]] extends EventStore[F] {
def appendTransactional[E <: DomainEvent](events: Chunk[E], tx: Transaction): F[Unit]
}
Production implementations need additional concerns:
class ProductionEventStore(
storage: EventStorage,
clock: Clock,
metrics: MetricsCollector
) extends EventStore[Task] {
def append[E <: DomainEvent](events: Chunk[E]): Task[Unit] = {
for {
now <- clock.instant
partitionedEvents = events.groupBy(e => partitionFor(e.aggregateId))
_ <- ZIO.foreachParDiscard(partitionedEvents) { case (partition, partitionEvents) =>
val envelopes = partitionEvents.map(event =>
EventEnvelope(
event = event,
partition = partition,
offset = 0L,
processingTime = now
)
)
storage.writeEvents(envelopes)
.retry(Schedule.exponentialBackoff(100.millis) && Schedule.recurs(3))
.timeoutFail(TimeoutException("Write timeout"))(30.seconds)
.tap(_ => metrics.recordSuccessfulWrite(partition, envelopes.size))
}
} yield ()
}
def readStream(aggregateId: String, fromVersion: Long): ZStream[Any, Throwable, DomainEvent] = {
ZStream
.fromIterableZIO(storage.loadEvents(aggregateId, fromVersion))
.rechunk(256)
.map(_.event)
}
def readAllEvents(fromOffset: Long): ZStream[Any, Throwable, EventEnvelope] = {
ZStream.fromIterableZIO(storage.loadAllEvents(fromOffset))
}
}
A few things to notice: we batch events by partition and write in parallel—single-event writes are painfully slow under load. The retry with exponential backoff has saved us during transient network issues more times than I can count, but you need that timeout or a hung write blocks everything behind it. And track your metrics. When your event store slows down, you want to know before your users do.
Event Ordering Is Harder Than You Think
Events arrive out of order. Not sometimes—regularly. Clock skew between servers, network delays, retries after failures. In an e-commerce system, maybe you don’t care if two cart updates arrive in the wrong order. In a trading system, processing a trade confirmation before the trade execution can trigger risk alerts, compliance violations, or just wrong position calculations.
The fix is a reordering buffer. Hold out-of-order events until their predecessors arrive, then release them in sequence:
case class EventBuffer[E <: DomainEvent](
pending: Map[Long, E] = Map.empty,
nextExpectedVersion: Long = 1L,
maxBufferSize: Int = 1000
) {
def addEvent(event: E): (EventBuffer[E], Chunk[E]) = {
if (event.version == nextExpectedVersion) {
// Event arrived in order—process it and drain any buffered successors
val (newBuffer, consecutive) = this
.copy(nextExpectedVersion = nextExpectedVersion + 1)
.drainConsecutive()
(newBuffer, Chunk.single(event) ++ consecutive)
} else if (event.version > nextExpectedVersion && pending.size < maxBufferSize) {
// Future event—buffer it
(copy(pending = pending + (event.version -> event)), Chunk.empty)
} else {
// Duplicate, stale, or buffer full—discard
(this, Chunk.empty)
}
}
private def drainConsecutive(): (EventBuffer[E], Chunk[E]) = {
val consecutive = LazyList
.from(0)
.map(i => pending.get(nextExpectedVersion + i))
.takeWhile(_.isDefined)
.flatten
.toList
val newNextVersion = nextExpectedVersion + consecutive.length
val remainingPending = pending -- consecutive.map(_.version)
(copy(pending = remainingPending, nextExpectedVersion = newNextVersion),
Chunk.fromIterable(consecutive))
}
}
The buffer holds out-of-order events until their predecessors arrive, then releases them in sequence. The maxBufferSize limit prevents unbounded memory growth when events are lost entirely.
Building Read Models from Events
Events are append-only, which is great for writes but awkward for reads. Nobody wants to replay 50,000 events to answer “what’s the current portfolio value?” Projections solve this—they’re read-optimized views built by folding over the event stream.
case class PortfolioProjection(
portfolioId: String,
positions: Map[String, BigDecimal],
cashBalance: BigDecimal,
lastUpdated: Instant,
version: Long
)
object PortfolioProjection {
def empty(portfolioId: String): PortfolioProjection =
PortfolioProjection(portfolioId, Map.empty, BigDecimal(0), Instant.EPOCH, 0L)
}
def applyEvent(
projection: PortfolioProjection,
event: DomainEvent
): PortfolioProjection = {
event match {
case trade: TradeExecuted =>
val positionDelta = if (trade.side == TradeSide.Buy) trade.quantity else -trade.quantity
val cashDelta = trade.quantity * trade.price * (if (trade.side == TradeSide.Buy) -1 else 1)
projection.copy(
positions = projection.positions.updated(
trade.symbol,
projection.positions.getOrElse(trade.symbol, BigDecimal(0)) + positionDelta
),
cashBalance = projection.cashBalance + cashDelta,
lastUpdated = trade.timestamp,
version = trade.version
)
case dividend: DividendReceived =>
projection.copy(
cashBalance = projection.cashBalance + dividend.amount,
lastUpdated = dividend.timestamp,
version = dividend.version
)
case _ => projection
}
}
Projections can run in real-time (streaming) or on-demand (replay from events). For frequently accessed data, maintain a streaming projection that stays current. For historical queries, replay events to the target timestamp.
Real-Time Projection Pipeline
class PortfolioProjector(eventStore: EventStore[Task], snapshotStore: SnapshotStore) {
def startProjection(): Task[Fiber[Throwable, Unit]] = {
eventStore
.readAllEvents(fromOffset = 0L)
.groupByKey(_.event.aggregateId) { case (portfolioId, eventStream) =>
eventStream
.scan(PortfolioProjection.empty(portfolioId)) { (projection, envelope) =>
applyEvent(projection, envelope.event)
}
.changes
.mapZIO(snapshotStore.save)
.runDrain
}
.runDrain
.fork
}
}
The groupByKey operator partitions the event stream by aggregate ID, creating a sub-stream for each portfolio. Events within each sub-stream maintain their original order, enabling parallel processing across portfolios while preserving per-portfolio consistency. The changes operator emits only when the projection value differs from the previous, reducing unnecessary writes.
Making the Audit Trail Tamper-Evident
Event sourcing gives you an audit trail, but a sufficiently motivated (or compromised) admin could still UPDATE the event table directly. Hash chains make tampering detectable.
The idea is simple: each event’s hash includes the previous event’s hash. Change any historical event and every hash after it becomes invalid. An auditor can verify the entire chain by recomputing hashes from the beginning.
class AuditableEventStore(
underlying: EventStore[Task],
auditLogger: AuditLogger,
hashStore: HashStore
) {
def appendWithAudit[E <: DomainEvent](
events: Chunk[E],
context: AuditContext
): Task[Unit] = {
for {
lastHash <- hashStore.getLastHash(events.head.aggregateId)
.catchSome { case _: NoSuchElementException => ZIO.succeed("genesis") }
auditableEvents = buildHashChain(events, lastHash, context)
// Write events and audit trail atomically
_ <- underlying.append(auditableEvents.map(_.event))
_ <- auditLogger.recordEventChain(auditableEvents)
_ <- hashStore.updateLastHash(
events.head.aggregateId,
auditableEvents.last.eventHash
)
} yield ()
}
private def buildHashChain[E <: DomainEvent](
events: Chunk[E],
startHash: String,
context: AuditContext
): Chunk[AuditableEvent[E]] = {
events.scanLeft((startHash, Option.empty[AuditableEvent[E]])) {
case ((previousHash, _), event) =>
val chainData = s"$previousHash|${serialize(event)}|${context.userId}"
val currentHash = sha256(chainData)
val auditable = AuditableEvent(
event = event,
previousHash = previousHash,
eventHash = currentHash,
userId = context.userId,
sessionId = context.sessionId
)
(currentHash, Some(auditable))
}.collect { case (_, Some(event)) => event }
}
}
The transactional write ensures that events and their audit records are stored atomically. If either fails, neither is committed.
Performance Reality
Writes are the easy part. Appending to a log is fast—faster than UPDATE statements that need to find and lock rows. Batch your writes (we batch into groups of 100-500 events) and partition by aggregate ID so you can parallelize across partitions.
Reads are where event sourcing gets expensive. Every “what’s the current balance?” query potentially replays the entire history. For a portfolio with 10,000 trades, that’s 10,000 events to fold through before you can answer.
The fix is projections and caching. Keep hot data in memory or a fast read store, updated by a streaming projection. Accept that the projection lags behind writes—usually by milliseconds, sometimes by seconds during load spikes. Your UI needs to handle this. Display “as of 10:32:15” timestamps, or use optimistic updates and reconcile when the projection catches up.
class OptimizedEventStore(
storage: EventStorage,
config: EventStoreConfig
) extends EventStore[Task] {
// Projection cache reduces read latency significantly for hot data
private val projectionCache: Cache[String, Throwable, PortfolioProjection] =
Unsafe.unsafe { implicit unsafe =>
Runtime.default.unsafe.run(
Cache.make(
capacity = 10000,
timeToLive = 5.minutes,
lookup = Lookup(buildProjectionFromEvents)
)
).getOrThrow()
}
def getCurrentProjection(aggregateId: String): Task[PortfolioProjection] = {
projectionCache.get(aggregateId)
}
def append[E <: DomainEvent](events: Chunk[E]): Task[Unit] = {
if (events.size < config.batchThreshold) {
appendWithBatching(events)
} else {
appendImmediate(events)
}
}
private def buildProjectionFromEvents(aggregateId: String): Task[PortfolioProjection] = {
readStream(aggregateId, fromVersion = 0L)
.runFold(PortfolioProjection.empty(aggregateId))(applyEvent)
}
}
Getting Events Out to Other Systems
At some point you’ll need to publish events to Kafka, notify a risk system, or sync to a reporting database. The naive approach—write the event, then publish the message—breaks when the publish fails after the write succeeds. Now your event store and message broker disagree about what happened.
The transactional outbox pattern solves this. Write the event and a “to be published” record in the same database transaction. A separate process reads unpublished records and pushes them to the broker. If the publish fails, retry. If it succeeds, mark the record as published.
// Transactional outbox ensures events and messages are written atomically
def processCommand(command: TradeCommand): Task[Unit] = {
for {
events <- ZIO.attempt(command.toEvents())
// Single transaction: write events + outbox records together
_ <- database.transaction { tx =>
for {
_ <- tx.insertEvents(events)
_ <- tx.insertOutboxMessages(events.map(toOutboxMessage))
} yield ()
}
// Background process polls outbox and publishes to broker
// On success, marks outbox records as published
} yield ()
}
// Outbox publisher runs continuously
def outboxPublisher: ZStream[Any, Throwable, Unit] = {
ZStream
.repeatZIOWithSchedule(
outboxTable.fetchUnpublished(limit = 100),
Schedule.fixed(100.millis)
)
.mapZIO { messages =>
ZIO.foreachDiscard(messages) { msg =>
messageBroker.publish(msg) *> outboxTable.markPublished(msg.id)
}
}
}
Without the outbox pattern, you risk “dual write” failures: the event is stored but the message publication fails (or vice versa), leaving systems inconsistent. The outbox pattern guarantees at-least-once delivery by persisting the intent to publish within the same transaction as the event.
Snapshotting for Long Event Streams
Aggregates with thousands of events become expensive to rebuild. Snapshotting periodically persists the computed projection, allowing replay to start from the snapshot rather than the beginning.
case class Snapshot[P](
aggregateId: String,
projection: P,
version: Long,
createdAt: Instant
)
def rebuildWithSnapshot(
aggregateId: String,
snapshotStore: SnapshotStore,
eventStore: EventStore[Task]
): Task[PortfolioProjection] = {
for {
// Try loading most recent snapshot
maybeSnapshot <- snapshotStore.loadLatest(aggregateId)
// Determine starting point
(startProjection, startVersion) = maybeSnapshot match {
case Some(snap) => (snap.projection, snap.version + 1)
case None => (PortfolioProjection.empty(aggregateId), 0L)
}
// Replay only events after the snapshot
projection <- eventStore
.readStream(aggregateId, fromVersion = startVersion)
.runFold(startProjection)(applyEvent)
} yield projection
}
How often to snapshot? I’ve settled on every 500-1000 events for most aggregates, but honestly it depends on your read patterns. If you’re rebuilding the same portfolio projection 50 times a second for a trading dashboard, snapshot aggressively. If it’s a batch job that runs overnight, don’t bother—just replay from the beginning.
When to Skip Event Sourcing
I’ve seen teams adopt event sourcing because it sounds sophisticated, then spend months fighting complexity that a PostgreSQL table would have avoided. Some warning signs:
If nobody will ever ask “what was the state at time T?"—if you genuinely only care about current values—you’re adding infrastructure for a problem you don’t have. A user profile service doesn’t need event sourcing. Neither does a feature flag system.
If a single entity gets thousands of updates per day, you’ll spend more time on snapshotting and projection optimization than on your actual domain. High-frequency trading systems often use event sourcing for the audit trail but maintain separate hot-path state stores that don’t replay events on every read.
If your team hasn’t built an event-sourced system before, budget real time for the learning curve. The mental model shift—from “update the row” to “append an event and rebuild”—trips people up longer than you’d expect. I’d rather a team ship a well-built CRUD system than a half-understood event-sourced one.
The Short Version
Event sourcing stores what happened, not what is. For financial systems, this buys you auditable history, point-in-time queries, and debugging capabilities that traditional databases can’t match without bolting on significant infrastructure.
The cost is real complexity: event ordering, projection lag, schema evolution, and operational overhead. Worth it when regulators or your own sanity demand knowing exactly how the system reached any given state. Overkill when “SELECT * FROM accounts WHERE id = ?” would have been fine.

Susan Potter
Quant
Work with me
I spent the first half of my career building risk models and market data infrastructure at BNP Paribas, Bank of America, and Citadel, then fourteen years shipping production systems at scale. Now I bring both sides to quantitative trading. If you're a trading firm, family office, or fund looking to tighten the connection between your research ideas and your production trading systems, whether that's building validation pipelines, formalizing signal logic, or getting microstructure analytics into a deployable state, I'd like to hear what you're working on. Reach me at me@susanpotter.net.