Transactional Outbox

Transactional Outbox

The transactional outbox pattern solves one of the trickiest problems in distributed systems: how do you reliably update your database and publish an event to a message broker like Kafka in a way that guarantees both happen or neither happens?

I’ve dealt with this problem multiple times—you write to your database, the write succeeds, then your service crashes before publishing the event to Kafka. Now your database is updated but downstream services never got notified. Or worse, the opposite happens: the event gets published but the database write fails, and now you’ve sent an event for something that didn’t actually happen.

The Problem

Consider an order service that needs to:

  1. Save an order to the database
  2. Publish an “order-created” event to Kafka

The naive approach looks like this:

func CreateOrder(ctx context.Context, order Order) error {
    if err := db.SaveOrder(ctx, order); err != nil {
        return err
    }
    
    if err := kafka.Publish(ctx, "order-created", order); err != nil {
        // 😱 the database is updated but event failed
        // What do we do now?
        return err
    }
    
    return nil
}

This doesn’t work. If the Kafka publish fails, you’ve already committed the database transaction. You could try to roll back the database write, but what if that fails too? You’re in an inconsistent state.

You might think “just publish to Kafka first, then save to the database,” but that has the same problem in reverse. If the database save fails after publishing, you’ve sent an event for something that didn’t happen.

The root issue is that you’re trying to make two separate systems (your database and Kafka) participate in the same atomic operation. Without distributed transactions (which come with their own complexity and performance costs), you need a different approach.

The Solution

The transactional outbox pattern solves this by storing the event in the same database transaction as your business data. Here’s how it works:

  1. Write your business data to the database
  2. In the same transaction, write the event to an “outbox” collection/table
  3. Commit the transaction
  4. A separate process reads events from the outbox and publishes them to Kafka
  5. After successful publication, mark the event as published or delete it

Now you have atomicity. Either both the business data and the outbox event are written, or neither are. The database transaction guarantees this.

  flowchart TD
    %% Nodes
    Service[🛒 Order Service]:::service
    
    subgraph DB["💾 Database (Transactional Boundary)"]
        direction TB
        Orders[📦 orders<br/>table]:::db
        Outbox[📮 outbox<br/>table]:::db
    end
    
    Publisher[🔄 Outbox Publisher]:::process
    Kafka[📨 Kafka Topic]:::queue
    Downstream[⚙️ Other Services]:::service

    %% Flow
    Service -->|"① Begin Transaction<br/>Write Order + Event<br/>COMMIT"| DB
    Publisher -->|"② Poll every 2s"| Outbox
    Publisher -->|"③ Publish Event"| Kafka
    Publisher -->|"④ Mark as published"| Outbox
    Kafka -->|"⑤ Consume Event"| Downstream

Key insight: The service writes to both tables in one atomic transaction. If either write fails, both rollback. A separate publisher polls the outbox and publishes events to Kafka asynchronously.

Implementation

Outbox Schema

The outbox needs to store events alongside your business data. The exact schema depends on your database, but the concept is the same:

For SQL databases (PostgreSQL, MySQL, etc.):

CREATE TABLE outbox (
    id VARCHAR PRIMARY KEY,
    aggregate_type VARCHAR NOT NULL,
    aggregate_id VARCHAR NOT NULL,
    event_type VARCHAR NOT NULL,
    payload JSON NOT NULL,
    created_at TIMESTAMP NOT NULL,
    published_at TIMESTAMP
);

CREATE INDEX idx_outbox_unpublished ON outbox(created_at) 
WHERE published_at IS NULL;

For MongoDB:

{
  _id: "uuid",
  aggregateType: "order",
  aggregateId: "order-123",
  eventType: "order-created",
  payload: { /* event data */ },
  createdAt: ISODate("2024-01-15T10:30:00Z"),
  publishedAt: null  // null means unpublished
}

// Index for efficient queries
db.outbox.createIndex(
  { createdAt: 1 }, 
  { partialFilterExpression: { publishedAt: null } }
)

The fields serve specific purposes:

  • aggregateType and aggregateId: identify what entity this event is about (e.g., “order”, “order-123”)
  • eventType: the type of event (e.g., “order-created”, “order-cancelled”)
  • payload: the event data
  • createdAt: when the event was created (for ordering)
  • publishedAt: null means unpublished, timestamp means published

Writing to the Outbox

When creating an order, you write to both your business collection/table and the outbox in a single transaction:

type OutboxEvent struct {
    ID            string    `json:"id" bson:"_id"`
    AggregateType string    `json:"aggregateType" bson:"aggregateType"`
    AggregateID   string    `json:"aggregateId" bson:"aggregateId"`
    EventType     string    `json:"eventType" bson:"eventType"`
    Payload       []byte    `json:"payload" bson:"payload"`
    CreatedAt     time.Time `json:"createdAt" bson:"createdAt"`
    PublishedAt   *time.Time `json:"publishedAt,omitempty" bson:"publishedAt,omitempty"`
}

func (s *OrderService) CreateOrder(ctx context.Context, order Order) error {
    tx, err := s.db.BeginTransaction(ctx)
    if err != nil {
        return err
    }
    defer tx.Rollback(ctx)
    
    if err := tx.InsertOrder(ctx, order); err != nil {
        return err
    }
    
    event := OutboxEvent{
        ID:            generateID(),
        AggregateType: "order",
        AggregateID:   order.ID,
        EventType:     "order-created",
        Payload:       marshalJSON(order),
        CreatedAt:     time.Now(),
    }
    
    if err := tx.InsertOutboxEvent(ctx, event); err != nil {
        return err
    }
    
    // Commit atomically
    return tx.Commit(ctx)
}

Both writes succeed or both fail together. You’ve achieved atomicity.

Publishing from the Outbox

The second part is publishing events from the outbox to Kafka. You have two main approaches:

Polling

A background worker periodically queries for unpublished events:

func RunOutboxPublisher(ctx context.Context, interval time.Duration) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            if err := publishPendingEvents(ctx); err != nil {
                log.Printf("Error publishing events: %v", err)
            }
        }
    }
}

func publishPendingEvents(ctx context.Context) error {
    // Fetch unpublished events (ordered by creation time)
    events, err := db.GetUnpublishedEvents(ctx, 100)
    if err != nil {
        return err
    }
    
    for _, event := range events {
        // Publish to Kafka
        record := &kgo.Record{
            Topic: event.AggregateType + "-events",
            Key:   []byte(event.AggregateID),
            Value: event.Payload,
        }
        
        if err := kafka.ProduceSync(ctx, record).FirstErr(); err != nil {
            log.Printf("Failed to publish event %s: %v", event.ID, err)
            continue
        }
        
        // Mark as published
        if err := db.MarkEventPublished(ctx, event.ID, time.Now()); err != nil {
            log.Printf("Failed to mark event %s as published: %v", event.ID, err)
            // Event might be published again (at-least-once delivery)
        }
    }
    
    return nil
}

This approach is simple and works with any database. The trade-off is latency—the polling interval creates a delay between writing the event and publishing it. Typically I use 1-5 second intervals, which is acceptable for most use cases.

Guarantees and Trade-offs

At-Least-Once Delivery

The transactional outbox provides at-least-once delivery. An event might be published multiple times if:

  • The publish succeeds but marking it as published fails
  • The publisher crashes after publishing but before marking
  • The same event is processed by multiple publisher instances

This means consumers must be idempotent—processing the same event multiple times should produce the same result. Common approaches:

  • Store processed event IDs in your database
  • Use upserts instead of inserts
  • Design operations to be naturally idempotent

Ordering

Events in the outbox are naturally ordered by createdAt. If you publish them in that order and use the same partition key (aggregate ID) when writing to Kafka, you maintain ordering within that aggregate.

However, if multiple publisher instances run concurrently, they might publish events out of order. To maintain strict ordering:

  • Use a single publisher instance, or
  • Partition the outbox by aggregate ID and assign partitions to publishers

Performance Considerations

The outbox grows over time. You need a cleanup strategy:

Option 1: Delete published events

db.DeletePublishedEvents(ctx, olderThan(24 * time.Hour))

Simple but you lose the event history.

Option 2: Archive published events

db.ArchivePublishedEvents(ctx, olderThan(7 * 24 * time.Hour))

Keeps history but requires managing the archive collection/table.

Option 3: TTL indexes (MongoDB)

db.outbox.createIndex(
  { publishedAt: 1 },
  { expireAfterSeconds: 86400 }  // Delete after 24 hours
)

The outbox adds one additional write per transaction. In my experience, this overhead is negligible compared to the consistency guarantees it provides.

Complete Example

Here’s a full implementation showing the pattern with a generic database interface:

package main

import (
    "context"
    "encoding/json"
    "log"
    "time"

    "github.com/google/uuid"
    "github.com/twmb/franz-go/pkg/kgo"
)

type Order struct {
    ID         string    `json:"id"`
    CustomerID string    `json:"customer_id"`
    Amount     float64   `json:"amount"`
    CreatedAt  time.Time `json:"created_at"`
}

type OutboxEvent struct {
    ID            string
    AggregateType string
    AggregateID   string
    EventType     string
    Payload       []byte
    CreatedAt     time.Time
    PublishedAt   *time.Time
}

// Generic database interface - implement this for your database
type Database interface {
    BeginTransaction(ctx context.Context) (Transaction, error)
    GetUnpublishedEvents(ctx context.Context, limit int) ([]OutboxEvent, error)
    MarkEventPublished(ctx context.Context, eventID string, publishedAt time.Time) error
}

type Transaction interface {
    InsertOrder(ctx context.Context, order Order) error
    InsertOutboxEvent(ctx context.Context, event OutboxEvent) error
    Commit(ctx context.Context) error
    Rollback(ctx context.Context) error
}

// OrderService handles order operations
type OrderService struct {
    db Database
}

func (s *OrderService) CreateOrder(ctx context.Context, order Order) error {
    tx, err := s.db.BeginTransaction(ctx)
    if err != nil {
        return err
    }
    defer tx.Rollback(ctx)

    // Insert order
    if err := tx.InsertOrder(ctx, order); err != nil {
        return err
    }

    // Insert outbox event
    payload, _ := json.Marshal(order)
    event := OutboxEvent{
        ID:            uuid.New().String(),
        AggregateType: "order",
        AggregateID:   order.ID,
        EventType:     "order-created",
        Payload:       payload,
        CreatedAt:     time.Now(),
    }

    if err := tx.InsertOutboxEvent(ctx, event); err != nil {
        return err
    }

    return tx.Commit(ctx)
}

// OutboxPublisher publishes events from outbox to Kafka
type OutboxPublisher struct {
    db     Database
    kafka  *kgo.Client
    ticker *time.Ticker
}

func NewOutboxPublisher(db Database, kafka *kgo.Client, interval time.Duration) *OutboxPublisher {
    return &OutboxPublisher{
        db:     db,
        kafka:  kafka,
        ticker: time.NewTicker(interval),
    }
}

func (p *OutboxPublisher) Run(ctx context.Context) {
    defer p.ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-p.ticker.C:
            if err := p.publishPending(ctx); err != nil {
                log.Printf("Error publishing events: %v", err)
            }
        }
    }
}

func (p *OutboxPublisher) publishPending(ctx context.Context) error {
    events, err := p.db.GetUnpublishedEvents(ctx, 100)
    if err != nil {
        return err
    }

    for _, event := range events {
        // Publish to Kafka
        record := &kgo.Record{
            Topic: event.AggregateType + "-events",
            Key:   []byte(event.AggregateID),
            Value: event.Payload,
        }

        if err := p.kafka.ProduceSync(ctx, record).FirstErr(); err != nil {
            log.Printf("Failed to publish event %s: %v", event.ID, err)
            continue
        }

        // Mark as published
        if err := p.db.MarkEventPublished(ctx, event.ID, time.Now()); err != nil {
            log.Printf("Failed to mark event %s as published: %v", event.ID, err)
        }
    }

    return nil
}

func main() {
    ctx := context.Background()
    
    // Initialize your database implementation
    var db Database // PostgresDB, MongoDB, etc.
    
    kafka, _ := kgo.NewClient(kgo.SeedBrokers("localhost:9092"))
    
    // Create order service
    service := &OrderService{db: db}
    
    // Create and run publisher
    publisher := NewOutboxPublisher(db, kafka, 2*time.Second)
    go publisher.Run(ctx)
    
    // Create an order
    order := Order{
        ID:         uuid.New().String(),
        CustomerID: "customer-123",
        Amount:     99.99,
        CreatedAt:  time.Now(),
    }
    
    if err := service.CreateOrder(ctx, order); err != nil {
        log.Fatal(err)
    }
    
    log.Println("Order created, event will be published by the outbox publisher")
}

When to Use This Pattern

I reach for the transactional outbox when:

You need guaranteed consistency between database writes and event publishing. If losing events or publishing phantom events is unacceptable, this pattern is worth the complexity. ok You’re building event-driven systems where downstream services depend on receiving accurate events about state changes.

You can handle at-least-once delivery in your consumers. If you need exactly-once semantics end-to-end, you’ll need additional mechanisms like idempotent consumers.

Alternatives

The transactional outbox isn’t always necessary. Consider alternatives:

Accept eventual consistency: For non-critical events, it might be okay to lose some occasionally. A try-catch around Kafka publishing with logging might suffice.

Distributed transactions: If your database and message broker support them, two-phase commit ensures atomicity. But it’s complex and has performance implications.

Event sourcing: Store events as your primary data model. The database is the event log, eliminating the need for separate publishing.