Transactional Outbox
The transactional outbox pattern solves one of the trickiest problems in distributed systems: how do you reliably update your database and publish an event to a message broker like Kafka in a way that guarantees both happen or neither happens?
I’ve dealt with this problem multiple times—you write to your database, the write succeeds, then your service crashes before publishing the event to Kafka. Now your database is updated but downstream services never got notified. Or worse, the opposite happens: the event gets published but the database write fails, and now you’ve sent an event for something that didn’t actually happen.
The Problem
Consider an order service that needs to:
- Save an order to the database
- Publish an “order-created” event to Kafka
The naive approach looks like this:
func CreateOrder(ctx context.Context, order Order) error {
if err := db.SaveOrder(ctx, order); err != nil {
return err
}
if err := kafka.Publish(ctx, "order-created", order); err != nil {
// 😱 the database is updated but event failed
// What do we do now?
return err
}
return nil
}This doesn’t work. If the Kafka publish fails, you’ve already committed the database transaction. You could try to roll back the database write, but what if that fails too? You’re in an inconsistent state.
You might think “just publish to Kafka first, then save to the database,” but that has the same problem in reverse. If the database save fails after publishing, you’ve sent an event for something that didn’t happen.
The root issue is that you’re trying to make two separate systems (your database and Kafka) participate in the same atomic operation. Without distributed transactions (which come with their own complexity and performance costs), you need a different approach.
The Solution
The transactional outbox pattern solves this by storing the event in the same database transaction as your business data. Here’s how it works:
- Write your business data to the database
- In the same transaction, write the event to an “outbox” collection/table
- Commit the transaction
- A separate process reads events from the outbox and publishes them to Kafka
- After successful publication, mark the event as published or delete it
Now you have atomicity. Either both the business data and the outbox event are written, or neither are. The database transaction guarantees this.
flowchart TD
%% Nodes
Service[🛒 Order Service]:::service
subgraph DB["💾 Database (Transactional Boundary)"]
direction TB
Orders[📦 orders<br/>table]:::db
Outbox[📮 outbox<br/>table]:::db
end
Publisher[🔄 Outbox Publisher]:::process
Kafka[📨 Kafka Topic]:::queue
Downstream[⚙️ Other Services]:::service
%% Flow
Service -->|"① Begin Transaction<br/>Write Order + Event<br/>COMMIT"| DB
Publisher -->|"② Poll every 2s"| Outbox
Publisher -->|"③ Publish Event"| Kafka
Publisher -->|"④ Mark as published"| Outbox
Kafka -->|"⑤ Consume Event"| Downstream
Key insight: The service writes to both tables in one atomic transaction. If either write fails, both rollback. A separate publisher polls the outbox and publishes events to Kafka asynchronously.
Implementation
Outbox Schema
The outbox needs to store events alongside your business data. The exact schema depends on your database, but the concept is the same:
For SQL databases (PostgreSQL, MySQL, etc.):
CREATE TABLE outbox (
id VARCHAR PRIMARY KEY,
aggregate_type VARCHAR NOT NULL,
aggregate_id VARCHAR NOT NULL,
event_type VARCHAR NOT NULL,
payload JSON NOT NULL,
created_at TIMESTAMP NOT NULL,
published_at TIMESTAMP
);
CREATE INDEX idx_outbox_unpublished ON outbox(created_at)
WHERE published_at IS NULL;For MongoDB:
{
_id: "uuid",
aggregateType: "order",
aggregateId: "order-123",
eventType: "order-created",
payload: { /* event data */ },
createdAt: ISODate("2024-01-15T10:30:00Z"),
publishedAt: null // null means unpublished
}
// Index for efficient queries
db.outbox.createIndex(
{ createdAt: 1 },
{ partialFilterExpression: { publishedAt: null } }
)The fields serve specific purposes:
aggregateTypeandaggregateId: identify what entity this event is about (e.g., “order”, “order-123”)eventType: the type of event (e.g., “order-created”, “order-cancelled”)payload: the event datacreatedAt: when the event was created (for ordering)publishedAt: null means unpublished, timestamp means published
Writing to the Outbox
When creating an order, you write to both your business collection/table and the outbox in a single transaction:
type OutboxEvent struct {
ID string `json:"id" bson:"_id"`
AggregateType string `json:"aggregateType" bson:"aggregateType"`
AggregateID string `json:"aggregateId" bson:"aggregateId"`
EventType string `json:"eventType" bson:"eventType"`
Payload []byte `json:"payload" bson:"payload"`
CreatedAt time.Time `json:"createdAt" bson:"createdAt"`
PublishedAt *time.Time `json:"publishedAt,omitempty" bson:"publishedAt,omitempty"`
}
func (s *OrderService) CreateOrder(ctx context.Context, order Order) error {
tx, err := s.db.BeginTransaction(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
if err := tx.InsertOrder(ctx, order); err != nil {
return err
}
event := OutboxEvent{
ID: generateID(),
AggregateType: "order",
AggregateID: order.ID,
EventType: "order-created",
Payload: marshalJSON(order),
CreatedAt: time.Now(),
}
if err := tx.InsertOutboxEvent(ctx, event); err != nil {
return err
}
// Commit atomically
return tx.Commit(ctx)
}Both writes succeed or both fail together. You’ve achieved atomicity.
Publishing from the Outbox
The second part is publishing events from the outbox to Kafka. You have two main approaches:
Polling
A background worker periodically queries for unpublished events:
func RunOutboxPublisher(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := publishPendingEvents(ctx); err != nil {
log.Printf("Error publishing events: %v", err)
}
}
}
}
func publishPendingEvents(ctx context.Context) error {
// Fetch unpublished events (ordered by creation time)
events, err := db.GetUnpublishedEvents(ctx, 100)
if err != nil {
return err
}
for _, event := range events {
// Publish to Kafka
record := &kgo.Record{
Topic: event.AggregateType + "-events",
Key: []byte(event.AggregateID),
Value: event.Payload,
}
if err := kafka.ProduceSync(ctx, record).FirstErr(); err != nil {
log.Printf("Failed to publish event %s: %v", event.ID, err)
continue
}
// Mark as published
if err := db.MarkEventPublished(ctx, event.ID, time.Now()); err != nil {
log.Printf("Failed to mark event %s as published: %v", event.ID, err)
// Event might be published again (at-least-once delivery)
}
}
return nil
}This approach is simple and works with any database. The trade-off is latency—the polling interval creates a delay between writing the event and publishing it. Typically I use 1-5 second intervals, which is acceptable for most use cases.
Guarantees and Trade-offs
At-Least-Once Delivery
The transactional outbox provides at-least-once delivery. An event might be published multiple times if:
- The publish succeeds but marking it as published fails
- The publisher crashes after publishing but before marking
- The same event is processed by multiple publisher instances
This means consumers must be idempotent—processing the same event multiple times should produce the same result. Common approaches:
- Store processed event IDs in your database
- Use upserts instead of inserts
- Design operations to be naturally idempotent
Ordering
Events in the outbox are naturally ordered by createdAt. If you publish them in that order and use the same partition key (aggregate ID) when writing to Kafka, you maintain ordering within that aggregate.
However, if multiple publisher instances run concurrently, they might publish events out of order. To maintain strict ordering:
- Use a single publisher instance, or
- Partition the outbox by aggregate ID and assign partitions to publishers
Performance Considerations
The outbox grows over time. You need a cleanup strategy:
Option 1: Delete published events
db.DeletePublishedEvents(ctx, olderThan(24 * time.Hour))Simple but you lose the event history.
Option 2: Archive published events
db.ArchivePublishedEvents(ctx, olderThan(7 * 24 * time.Hour))Keeps history but requires managing the archive collection/table.
Option 3: TTL indexes (MongoDB)
db.outbox.createIndex(
{ publishedAt: 1 },
{ expireAfterSeconds: 86400 } // Delete after 24 hours
)The outbox adds one additional write per transaction. In my experience, this overhead is negligible compared to the consistency guarantees it provides.
Complete Example
Here’s a full implementation showing the pattern with a generic database interface:
package main
import (
"context"
"encoding/json"
"log"
"time"
"github.com/google/uuid"
"github.com/twmb/franz-go/pkg/kgo"
)
type Order struct {
ID string `json:"id"`
CustomerID string `json:"customer_id"`
Amount float64 `json:"amount"`
CreatedAt time.Time `json:"created_at"`
}
type OutboxEvent struct {
ID string
AggregateType string
AggregateID string
EventType string
Payload []byte
CreatedAt time.Time
PublishedAt *time.Time
}
// Generic database interface - implement this for your database
type Database interface {
BeginTransaction(ctx context.Context) (Transaction, error)
GetUnpublishedEvents(ctx context.Context, limit int) ([]OutboxEvent, error)
MarkEventPublished(ctx context.Context, eventID string, publishedAt time.Time) error
}
type Transaction interface {
InsertOrder(ctx context.Context, order Order) error
InsertOutboxEvent(ctx context.Context, event OutboxEvent) error
Commit(ctx context.Context) error
Rollback(ctx context.Context) error
}
// OrderService handles order operations
type OrderService struct {
db Database
}
func (s *OrderService) CreateOrder(ctx context.Context, order Order) error {
tx, err := s.db.BeginTransaction(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
// Insert order
if err := tx.InsertOrder(ctx, order); err != nil {
return err
}
// Insert outbox event
payload, _ := json.Marshal(order)
event := OutboxEvent{
ID: uuid.New().String(),
AggregateType: "order",
AggregateID: order.ID,
EventType: "order-created",
Payload: payload,
CreatedAt: time.Now(),
}
if err := tx.InsertOutboxEvent(ctx, event); err != nil {
return err
}
return tx.Commit(ctx)
}
// OutboxPublisher publishes events from outbox to Kafka
type OutboxPublisher struct {
db Database
kafka *kgo.Client
ticker *time.Ticker
}
func NewOutboxPublisher(db Database, kafka *kgo.Client, interval time.Duration) *OutboxPublisher {
return &OutboxPublisher{
db: db,
kafka: kafka,
ticker: time.NewTicker(interval),
}
}
func (p *OutboxPublisher) Run(ctx context.Context) {
defer p.ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-p.ticker.C:
if err := p.publishPending(ctx); err != nil {
log.Printf("Error publishing events: %v", err)
}
}
}
}
func (p *OutboxPublisher) publishPending(ctx context.Context) error {
events, err := p.db.GetUnpublishedEvents(ctx, 100)
if err != nil {
return err
}
for _, event := range events {
// Publish to Kafka
record := &kgo.Record{
Topic: event.AggregateType + "-events",
Key: []byte(event.AggregateID),
Value: event.Payload,
}
if err := p.kafka.ProduceSync(ctx, record).FirstErr(); err != nil {
log.Printf("Failed to publish event %s: %v", event.ID, err)
continue
}
// Mark as published
if err := p.db.MarkEventPublished(ctx, event.ID, time.Now()); err != nil {
log.Printf("Failed to mark event %s as published: %v", event.ID, err)
}
}
return nil
}
func main() {
ctx := context.Background()
// Initialize your database implementation
var db Database // PostgresDB, MongoDB, etc.
kafka, _ := kgo.NewClient(kgo.SeedBrokers("localhost:9092"))
// Create order service
service := &OrderService{db: db}
// Create and run publisher
publisher := NewOutboxPublisher(db, kafka, 2*time.Second)
go publisher.Run(ctx)
// Create an order
order := Order{
ID: uuid.New().String(),
CustomerID: "customer-123",
Amount: 99.99,
CreatedAt: time.Now(),
}
if err := service.CreateOrder(ctx, order); err != nil {
log.Fatal(err)
}
log.Println("Order created, event will be published by the outbox publisher")
}When to Use This Pattern
I reach for the transactional outbox when:
You need guaranteed consistency between database writes and event publishing. If losing events or publishing phantom events is unacceptable, this pattern is worth the complexity. ok You’re building event-driven systems where downstream services depend on receiving accurate events about state changes.
You can handle at-least-once delivery in your consumers. If you need exactly-once semantics end-to-end, you’ll need additional mechanisms like idempotent consumers.
Alternatives
The transactional outbox isn’t always necessary. Consider alternatives:
Accept eventual consistency: For non-critical events, it might be okay to lose some occasionally. A try-catch around Kafka publishing with logging might suffice.
Distributed transactions: If your database and message broker support them, two-phase commit ensures atomicity. But it’s complex and has performance implications.
Event sourcing: Store events as your primary data model. The database is the event log, eliminating the need for separate publishing.