The Transactional Outbox Pattern in Go: What the Compiler Enforces and What Rust Leaves to You

The failure scenario
At 09:14:33, a customer places a £29.99 order on your platform. The order is saved to the database. The process crashes before publishing order.created to RabbitMQ. The fulfilment team never hears about it. The order sits in your database with status pending, indefinitely. Three days later, the customer calls support. Your team opens the database, finds the order, and manually triggers the fulfilment flow.
That is a recoverable situation. Now imagine it is not three days, it is three milliseconds. The crash is transient, the process restarts, and nobody notices. The order is in your database. The customer expects it to ship. It never does. Nobody calls support because the confirmation email went out. Nobody looks at the database because no error was logged.
This is the dual-write problem. Every service that writes to a database and publishes to a broker has it. The naive implementation does two things in sequence. Between them, anything can go wrong.
What makes this especially dangerous is that it is silent. No constraint is violated. No error is raised. The database is consistent with itself. The inconsistency lives in the gap between two systems, invisible to both.
Why the naive fixes do not work
The first instinct is to reverse the order: publish to RabbitMQ first, then write to the database. This trades one problem for another, now you can publish a message for an order that never gets saved. Downstream services process a ghost.
The second instinct is to wrap both operations in a retry. This makes duplicate messages more likely, not less. If the publish succeeds and the retry fires anyway, the broker receives the same message twice. Consumers must handle it. You have shifted the problem downstream.
The third instinct is a distributed transaction, a two-phase commit across Postgres and RabbitMQ. RabbitMQ does not support two-phase transactions. This option is not available.
The outbox pattern is not a clever trick. It is a direct consequence of accepting one constraint: you can only have one atomic unit, and that unit must be a database transaction.
The outbox pattern -- the principle
The pattern has three parts:
The producer writes business data and a message record in the same transaction. The producer knows nothing about RabbitMQ. Its job ends at commit.
The outbox table is the durability boundary. Once a row commits here, the message is guaranteed to eventually be published. The table bridges the synchronous world of database writes and the asynchronous world of message brokers.
The poller is a background goroutine that reads unpublished rows and publishes them. It runs independently of the producer. It is the only component that touches RabbitMQ.
The key insight: once the transaction commits, the message is a database row. Database rows do not disappear when processes crash. The poller will find it on the next tick, and the tick after that, until it succeeds.
The contract this side of the boundary is at-least-once delivery, not exactly-once. The poller may publish a message and then crash before marking it published. The next tick will publish it again. Consumers must be idempotent. This is the tradeoff, eventual consistency with a single atomic operation, rather than attempting distributed atomicity which does not exist in this stack.
The design
Schema
CREATE TABLE orders (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
customer_id UUID NOT NULL,
amount BIGINT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT amount_positive CHECK (amount > 0),
CONSTRAINT valid_status CHECK (status IN ('pending','confirmed','cancelled'))
);
CREATE TABLE outbox_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
aggregate_id TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
attempts INT NOT NULL DEFAULT 0,
locked_until TIMESTAMPTZ,
last_error TEXT,
published_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT valid_status CHECK (status IN ('pending','processing','published','failed')),
CONSTRAINT attempts_non_negative CHECK (attempts >= 0)
);
CREATE INDEX outbox_pending_idx
ON outbox_messages (created_at)
WHERE status = 'pending';
Every field is deliberate:
amount BIGINT-- money in cents, never a float. £29.99 is stored as2999.CHECK (amount > 0)-- the database rejects invalid amounts even if the application layer has a bug.locked_until TIMESTAMPTZ-- the crash recovery mechanism. When a poller claims a message, it setslocked_until = now() + 30s. If the poller crashes, the lock expires and the next tick reclaims it.last_error TEXT-- publish failures are recorded with their reason. Failed messages are visible, not silently stuck.payload JSONB-- the message content is stored verbatim at write time. No recomputation on retry.outbox_pending_idx-- a partial index onstatus = 'pending'keeps polling fast as the table grows. Only pending rows are indexed.
Failure modes
| Failure | What the system does |
|---|---|
| Process crashes after order INSERT, before outbox INSERT | Transaction rolls back -- both tables empty, no inconsistency |
| Poller crashes after claiming, before publishing | locked_until expires -- next tick reclaims and retries |
| Poller crashes after publishing, before marking published | Message published again on retry -- consumer must deduplicate |
| Two concurrent pollers claim the same row | FOR UPDATE SKIP LOCKED prevents this -- second poller skips the row |
| Broker permanently rejects a message | Status set to failed, last_error recorded -- visible for investigation |
The implementation -- five phases
Phase 1 -- Types + schema -> invalid money is unrepresentable
Phase 2 -- DB + order service -> dual write is atomic
Phase 3 -- Poller -> concurrent pollers are safe
Phase 4 -- Publisher -> broker confirms before ack
Phase 5 -- Shutdown + tests -> all guarantees proven
Phase 1 -- Types and schema: making invalid states unrepresentable
Before a connection is opened, the type system defines what the domain looks like and what it forbids.
type Money struct {
cents int64
}
func FromCents(c int64) (Money, error) {
if c <= 0 {
return Money{}, fmt.Errorf("amount must be a positive number of cents")
}
return Money{cents: c}, nil
}
func (m Money) Cents() int64 { return m.cents }
Money is a struct with a private cents field. The only way to construct one is through FromCents, which rejects zero and negative amounts at the boundary. A caller cannot pass a float -- the type mismatch is a compile error. A caller cannot pass -1 -- FromCents rejects it before the value reaches the database.
type OrderID uuid.UUID
type CustomerID uuid.UUID
Both are distinct named types. Passing an OrderID where a CustomerID is expected is a compile error. In Rust, both would be wrappers around Uuid, but the distinction is enforced by the type system. In Go, the distinction lives in variable names and developer discipline.
type MessageStatus string
const (
MessageStatusPending MessageStatus = "pending"
MessageStatusProcessing MessageStatus = "processing"
MessageStatusPublished MessageStatus = "published"
MessageStatusFailed MessageStatus = "failed"
)
These mirror the CHECK constraints in the schema. The database and the application agree on the set of valid values. A new status cannot sneak in through a string literal, it must be declared here first. The error type covers every distinct failure mode. Every switch on error values is exhaustive -- a new variant that is not handled everywhere is a compile error in Rust, but not in Go.
Phase 2 -- DB layer: the dual write
Phase 2 is the core of the pattern. The most important line in the entire codebase is a function signature:
func (d *Db) CreateOrder(ctx context.Context, customerID types.CustomerID, amount types.Money) (types.OrderID, error) {
tx, err := d.pool.Begin(ctx)
if err != nil {
return types.OrderID{}, apperrors.Database(err)
}
defer tx.Rollback(ctx)
orderID, err := insertOrder(ctx, tx, customerID, amount)
if err != nil {
return types.OrderID{}, err
}
payload, _ := json.Marshal(map[string]any{
"order_id": orderID.UUID().String(),
"customer_id": customerID.UUID().String(),
"amount": amount.Cents(),
})
_, err = insertOutboxMessage(ctx, tx, eventType, payload, orderID.UUID().String())
if err != nil {
return types.OrderID{}, err
}
if err := tx.Commit(ctx); err != nil {
return types.OrderID{}, apperrors.Database(err)
}
return orderID, nil
}
defer tx.Rollback(ctx) is written immediately after Begin. If insertOrder fails, if insertOutboxMessage fails, if anything returns early, the deferred rollback fires. Both writes are unwound. The gap cannot exist. The dual-write guarantee is enforced by convention and test coverage in Go, while in Rust it is enforced by the type system.
The most important design detail is what insertOrder and insertOutboxMessage take as their first argument:
func insertOrder(ctx context.Context, tx pgx.Tx, customerID types.CustomerID, amount types.Money) (types.OrderID, error)
func insertOutboxMessage(ctx context.Context, tx pgx.Tx, eventType types.EventType, payload []byte, aggregateID string) (uuid.UUID, error)
Both take pgx.Tx, not *pgxpool.Pool. In Go, this is a convention enforced by code review, not a compiler guarantee. A future change that passes the pool connection instead of the transaction compiles fine and silently breaks the atomicity guarantee. The type system cannot catch it. Tests can.
This is the sharpest difference from the Rust implementation, where insert_order(tx: &mut Transaction<'_, Postgres>) makes passing a pool connection a compile error. In Go, the discipline is in the function signatures and test coverage.
tx.Commit is always the last operation. If it fails, the deferred tx.Rollback is a no-op -- pgx handles this correctly.
Phase 3 -- Poller: concurrent safety with FOR UPDATE SKIP LOCKED
The poller claims messages from the outbox with a single atomic query. No separate select-then-update. The claim and the transition happen together:
func (d *Db) Poll(ctx context.Context, batchSize int, lockSecs int) ([]types.OutboxMessage, error) {
rows, err := d.pool.Query(ctx, `
UPDATE outbox_messages
SET status = 'processing',
attempts = attempts + 1,
locked_until = now() + ($1 || ' seconds')::interval
WHERE id IN (
SELECT id FROM outbox_messages
WHERE status = 'pending'
AND (locked_until IS NULL OR locked_until < now())
ORDER BY created_at
LIMIT $2
FOR UPDATE SKIP LOCKED
)
RETURNING id, event_type, payload, aggregate_id, attempts, created_at
`, fmt.Sprintf("%d", lockSecs), batchSize)
// ...
}
FOR UPDATE SKIP LOCKED is the concurrent safety mechanism. When two pollers run simultaneously, each acquires a row-level lock on the rows it selects. The second poller's SELECT skips any row that is already locked. No message is claimed twice.
locked_until is the crash recovery mechanism. If a poller claims a batch and then crashes before marking the messages published, the locks expire after 30 seconds. The next tick finds those rows eligible again, locked_until < now() and reclaims them.
The Config struct makes these parameters tunable without code changes:
type Config struct {
BatchSize int
PollInterval time.Duration
LockSecs int
}
The poller loop uses select on a ticker and a done channel:
func (p *Poller) Run(pub *publisher.Publisher, done <-chan struct{}) {
ticker := time.NewTicker(p.config.PollInterval)
defer ticker.Stop()
for {
select {
case <-done:
slog.Info("poller received shutdown, draining current batch")
return
case <-ticker.C:
messages, err := p.db.Poll(ctx, p.config.BatchSize, p.config.LockSecs)
if err != nil {
slog.Error("poll failed, will retry next tick", "error", err)
continue
}
// process batch...
}
}
}
If Poll fails, transient database error, connection reset the error is logged and the loop continues on the next tick. A temporary database hiccup does not kill the poller.
Phase 4 -- Publisher: confirm before ack
The publisher uses amqp091-go with confirm mode enabled. A message is only considered delivered after the broker acknowledges it has been durably stored:
func Connect(amqpURL, exchangeName string) (*Publisher, error) {
// ...
err = ch.Confirm(false)
if err != nil {
return nil, apperrors.BrokerConnection(fmt.Errorf("confirm select: %w", err))
}
return &Publisher{channel: ch, exchange: exchangeName}, nil
}
ch.Confirm(false) puts the channel into publisher confirm mode. Every subsequent publish blocks until the broker returns an Ack or Nack:
func (p *Publisher) Publish(msg *types.OutboxMessage) error {
err := p.channel.Publish(
p.exchange,
msg.EventType.String(),
false, false,
amqp.Publishing{
MessageId: msg.ID.String(),
ContentType: "application/json",
DeliveryMode: amqp.Persistent,
Body: msg.Payload,
},
)
if err != nil {
return apperrors.BrokerConnection(fmt.Errorf("basic publish: %w", err))
}
confirms := p.channel.NotifyPublish(make(chan amqp.Confirmation, 1))
confirm := <-confirms
if !confirm.Ack {
return apperrors.BrokerNack(msg.ID)
}
return nil
}
<-confirms waits for the broker to write the message durably to disk and send the acknowledgement back. Without confirm mode, Publish returns as soon as the message enters the broker's TCP buffer -- before it is durable. A broker crash at that moment loses the message silently. With confirms, nil only returns after the broker has committed the message.
The poller only calls MarkPublished after Publish returns nil:
if err := pub.Publish(msg); err != nil {
slog.Warn("publish failed", "id", msg.ID, "error", err)
p.db.MarkFailed(ctx, msg.ID, err.Error())
continue
}
p.db.MarkPublished(ctx, msg.ID)
MarkFailed sets status = 'failed' and records the error in last_error. The message is not stuck in processing -- it is visible, queryable, and can be retried manually or by a recovery process.
Phase 5 -- Shutdown and tests
The HTTP server and the poller run independently. Graceful shutdown coordinates them via a done channel and a sync.WaitGroup:
done := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
p := poller.New(database, poller.DefaultConfig())
p.Run(pub, done)
}()
// ... start HTTP server ...
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
slog.Info("SIGINT/SIGTERM received, shutting down")
srv.Shutdown(context.Background())
close(done)
wg.Wait()
slog.Info("shutdown complete")
close(done) signals the poller to stop. The poller's select unblocks on <-done and returns after finishing its current iteration. wg.Wait() ensures main does not exit until the poller has drained. No messages are abandoned mid-batch.
Proving it works -- five adversarial tests
Test 1 -- Dual write is atomic
func TestCreateOrderWritesBothRowsAtomically(t *testing.T) {
database := setup(t)
ctx := context.Background()
orderID, err := database.CreateOrder(ctx, newCustomer(), tenCents(t))
if err != nil {
t.Fatalf("CreateOrder failed: %v", err)
}
ordersCount, _ := database.CountOrders(ctx)
pendingCount, _ := database.CountOutboxByStatus(ctx, "pending")
if ordersCount != 1 {
t.Errorf("expected 1 order, got %d", ordersCount)
}
if pendingCount != 1 {
t.Errorf("expected 1 pending outbox message, got %d", pendingCount)
}
msg, _ := database.LatestOutboxMessage(ctx)
if msg.AggregateID != orderID.UUID().String() {
t.Errorf("aggregate_id mismatch")
}
var payload map[string]any
json.Unmarshal(msg.Payload, &payload)
if payload["order_id"] != orderID.UUID().String() {
t.Errorf("payload order_id mismatch")
}
}
Both rows exist, the aggregate_id links them, and the payload is correct. If CreateOrder did two separate writes instead of one transaction, a crash between them would leave one table with a row and the other without. This test proves the happy path. Test 2 proves the failure path.
Test 2 -- The dual-write gap demonstrated and closed
func TestDualWriteGapIsClosedByOutbox(t *testing.T) {
database := setup(t)
ctx := context.Background()
// Part A -- show the gap naively: insert order directly, outside any transaction
pool.Exec(ctx, "INSERT INTO orders (id, customer_id, amount) VALUES (\(1, \)2, $3)",
uuid.New(), uuid.New(), int64(1000))
ordersBefore, _ := database.CountOrders(ctx)
pendingBefore, _ := database.CountOutboxByStatus(ctx, "pending")
// ordersBefore == 1, pendingBefore == 0: the gap is real
pool.Exec(ctx, "DELETE FROM orders")
// Part B -- show the outbox pattern closes it
tx, _ := pool.Begin(ctx)
tx.Exec(ctx, "INSERT INTO orders (id, customer_id, amount) VALUES (\(1, \)2, $3)",
uuid.New(), uuid.New(), int64(1000))
_ = tx.Rollback(ctx) // simulate crash mid-transaction
ordersAfter, _ := database.CountOrders(ctx)
pendingAfter, _ := database.CountOutboxByStatus(ctx, "pending")
// ordersAfter == 0, pendingAfter == 0: gap cannot exist
}
This test does two things no other test does: it shows the gap is real with a direct insert, then proves the outbox pattern closes it. Part A is the before. Part B is the after. A reader who runs this test understands the pattern.
Test 3 -- Concurrent pollers never claim the same message
func TestConcurrentPollersNeverClaimSameMessage(t *testing.T) {
database := setup(t)
ctx := context.Background()
for i := 0; i < 10; i++ {
database.CreateOrder(ctx, newCustomer(), tenCents(t))
}
results := make(chan result, 3)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
msgs, err := database.Poll(ctx, 10, 30)
ids := make([]uuid.UUID, len(msgs))
for j, m := range msgs { ids[j] = m.ID }
results <- result{ids: ids, err: err}
}()
}
wg.Wait()
close(results)
allIDs := make(map[uuid.UUID]int)
for r := range results {
for _, id := range r.ids { allIDs[id]++ }
}
for id, count := range allIDs {
if count > 1 {
t.Errorf("message %v was claimed %d times -- FOR UPDATE SKIP LOCKED is not working", id, count)
}
}
if len(allIDs) != 10 {
t.Errorf("expected all 10 messages to be claimed, got %d unique IDs", len(allIDs))
}
}
If FOR UPDATE SKIP LOCKED were missing from the poll query, two pollers would claim overlapping sets. A message would appear in allIDs with a count greater than 1. The test would fail with a message naming the duplicated ID. SKIP LOCKED is doing a lot of work that is easy to take for granted -- this test makes it visible.
Test 4 -- End-to-end: poller publishes with correct payload
func TestPollerPublishesOrderMessageWithCorrectPayload(t *testing.T) {
database := setup(t)
ctx := context.Background()
pub, _ := publisher.Connect(amqpURL, "orders")
defer pub.Close()
orderID, _ := database.CreateOrder(ctx, newCustomer(), tenCents(t))
done := make(chan struct{})
go func() {
p := poller.New(database, fastPollerConfig())
p.Run(pub, done)
}()
defer close(done)
deadline := time.Now().Add(5 * time.Second)
for {
published, _ := database.CountOutboxByStatus(ctx, "published")
if published == 1 { break }
if time.Now().After(deadline) { t.Fatal("timeout") }
time.Sleep(50 * time.Millisecond)
}
msg, _ := database.LatestOutboxMessage(ctx)
if msg.Status != types.MessageStatusPublished {
t.Errorf("status: got %q, want published", msg.Status)
}
var payload map[string]any
json.Unmarshal(msg.Payload, &payload)
if payload["order_id"] != orderID.UUID().String() {
t.Errorf("payload order_id mismatch")
}
}
This is the only test that requires a live RabbitMQ connection. It proves the full path from CreateOrder to status = 'published'.
Test 5 -- Publish failure is visible and recoverable
func TestPublishFailureMarksMessageFailed(t *testing.T) {
database := setup(t)
ctx := context.Background()
orderID, _ := database.CreateOrder(ctx, newCustomer(), tenCents(t))
messages, _ := database.Poll(ctx, 10, 30)
msg := messages[0]
database.MarkFailed(ctx, msg.ID, "simulated broker rejection")
failedCount, _ := database.CountOutboxByStatus(ctx, "failed")
processingCount, _ := database.CountOutboxByStatus(ctx, "processing")
if failedCount != 1 { t.Errorf("expected 1 failed message, got %d", failedCount) }
if processingCount != 0 { t.Errorf("expected 0 processing, got %d", processingCount) }
stored, _ := database.LatestOutboxMessage(ctx)
if stored.Status != types.MessageStatusFailed {
t.Errorf("expected status 'failed', got %q", stored.Status)
}
if *stored.LastError != "simulated broker rejection" {
t.Errorf("last_error mismatch")
}
order, _ := database.GetOrder(ctx, orderID)
if order.Amount.Cents() != 1000 {
t.Errorf("order must be unchanged by publish failure")
}
if order.Status != types.OrderStatusPending {
t.Errorf("order status must be unchanged by publish failure")
}
}
A failed publish does not lose the message and does not corrupt the order. The message is in failed status with its error reason recorded. The order is untouched. Both are true because the poller only writes to outbox_messages, it has no code that touches orders.
Running the tests
docker compose up -d postgres rabbitmq
DATABASE_URL=postgres://postgres:password@localhost:5434/order-service-outbox \
AMQP_URL=amqp://guest:guest@localhost:5673/%2f \
go test ./tests/... -v
--- PASS: TestCreateOrderWritesBothRowsAtomically (0.09s)
--- PASS: TestDualWriteGapIsClosedByOutbox (0.07s)
--- PASS: TestConcurrentPollersNeverClaimSameMessage (0.12s)
--- PASS: TestPollerPublishesOrderMessageWithCorrectPayload (0.31s)
--- PASS: TestPublishFailureMarksMessageFailed (0.08s)
PASS
ok github.com/lethuzulu/order-service-with-outbox-pattern-go/tests
The Rust comparison
| Guarantee | Rust mechanism | Go mechanism |
|---|---|---|
| Money type safety | Money(i64) newtype, private field |
Money struct, private cents field |
| Transaction boundary enforced | insert_order(tx: &mut Transaction) -- pool impossible |
insertOrder(ctx, tx pgx.Tx) -- pool accepted by type |
| Publisher abstraction | PublisherTrait -- dyn PublisherTrait in poller |
Publisher interface -- same concept, same ergonomics |
| Error handling | Result<T, E> with ? -- cannot be ignored |
error return -- if err != nil can be omitted |
| Shutdown coordination | watch::channel + tokio::select! |
context.Context + select {} |
| Exhaustive message status | enum MessageStatus -- unhandled variant = compile error |
const strings -- new value silently falls through |
What Rust enforces that Go leaves to discipline
Transaction boundary. insert_order takes &mut Transaction<'_, Postgres>, not &PgPool. Calling it with a pool connection is a compile error. In Go, pgx.Tx and pgxpool.Pool are not interchangeable, but the discipline is enforced by convention and tests, not the compiler.
Publisher result. Result<(), OutboxError> from publish cannot be silently discarded. In Go, ignoring the error return from a publish call is a one-character omission.
Enum exhaustiveness. Adding a new MessageStatus variant breaks every match that does not handle it. In Go, a new status string passes silently through every switch that lacks a case for it.
What Go does better
Go's concurrency model is simpler to read. The poller loop in Go with select on a context.Done() channel is idiomatic and familiar to any Go developer. The Rust equivalent with tokio::select! and watch::Receiver achieves the same thing but requires understanding async Rust's task model. Go also starts faster and compiles faster, relevant for a background service that is restarted frequently.
What I learned
The most instructive thing about this project was Test 2, the test that demonstrates the gap before proving it is closed. Writing that test forced a clear definition of what the pattern actually prevents. The gap is not abstract. You can reproduce it in three lines of SQL. You can prove it is closed in ten more. The test is the clearest explanation of the pattern I could have written.
The second thing: FOR UPDATE SKIP LOCKED is doing a lot of work that is easy to take for granted. The concurrent poller test, three goroutines, ten messages, zero duplicates -- passes because of one clause in one SQL query. Remove SKIP LOCKED and replace it with FOR UPDATE, and the pollers serialize rather than parallelize. Remove it entirely and duplicates appear. One clause, two distinct failure modes. Worth understanding before you need it in production.
The third thing: defer tx.Rollback(ctx) must be written immediately after Begin. Not after the first query. Not after the business logic. Immediately after Begin. The further it is placed from Begin, the more likely an early return path does not have a rollback. Placing it immediately makes the pattern mechanical and impossible to forget.
Where to go next
Chris Richardson's original outbox pattern writeup -- the canonical description, language-agnostic
Postgres
FOR UPDATE SKIP LOCKED-- the official documentation on lock modes and skip lockedRabbitMQ publisher confirms -- how confirms work and why they matter for at-least-once delivery
Source code -- the full implementation
Part of a series on building backend systems in Go and Rust.




