Skip to main content

Command Palette

Search for a command to run...

The Transactional Outbox Pattern in Rust: Guaranteed Message Delivery Without Distributed Transactions

Updated
21 min read
The Transactional Outbox Pattern in Rust: Guaranteed Message Delivery Without Distributed Transactions
L
Backend developer. Expert in Rust and GoLang.

The failure scenario

At 09:14:33, a customer places a £29.99 order on your platform. The order is saved to the database. The process crashes before publishing order.created to RabbitMQ. The fulfilment team never hears about it. The order sits in your database with status pending, indefinitely. Three days later, the customer calls support. Your team opens the database, finds the order, and manually triggers the fulfilment flow.

That is a recoverable situation. Now imagine it is not three days, it is three milliseconds. The crash is transient, the process restarts, and nobody notices. The order is in your database. The customer expects it to ship. It never does. Nobody calls support because the confirmation email went out. Nobody looks at the database because no error was logged.

This is the dual-write problem. Every service that writes to a database and publishes to a broker has it. The naive implementation does two things in sequence. Between them, anything can go wrong.

What makes this especially dangerous is that it is silent. No constraint is violated. No error is raised. The database is consistent with itself. The inconsistency lives in the gap between two systems, invisible to both.


Why the naive fixes do not work

The first instinct is to reverse the order: publish to RabbitMQ first, then write to the database. This trades one problem for another, now you can publish a message for an order that never gets saved. Downstream services process a ghost.

The second instinct is to wrap both operations in a try-catch and retry on failure. This makes duplicate messages more likely, not less. If the publish succeeds and the retry fires anyway, the broker receives the same message twice. Consumers must handle it. You have shifted the problem downstream.

The third instinct is to retry on startup, scan the orders table and republish anything that seems undelivered. This requires knowing which orders were never published, which means querying two separate systems that may now be inconsistent. You cannot trust either one to tell you the truth about the other.

The fourth instinct is a distributed transaction, a two-phase commit across Postgres and RabbitMQ. RabbitMQ does not support two-phase commit transactions. This option is not available.

The outbox pattern is not a clever trick. It is a direct consequence of accepting one constraint: you can only have one atomic unit, and that unit must be a database transaction.


The outbox pattern -- the principle

The pattern has three parts.

The producer writes business data and a message record in the same transaction. The producer knows nothing about RabbitMQ. Its job ends at commit.

The outbox table is the durability boundary. Once a row commits here, the message is guaranteed to eventually be published. The table bridges the synchronous world of database writes and the asynchronous world of message brokers.

The poller is a background process that reads unpublished rows and publishes them. It runs independently of the producer. It is the only component that touches RabbitMQ.

The key insight: once the transaction commits, the message is a database row. Database rows do not disappear when processes crash. The poller will find it on the next tick, and the tick after that, until it succeeds.


The design

Guarantees the system makes

  1. Atomic dual write -- an order and its outbox message are written together or not at all. There is no state where an order exists without a message, or a message exists without an order.

  2. At-least-once delivery -- if the order commits, the order.created event will eventually reach RabbitMQ.

  3. Concurrent poller safety -- multiple poller instances running simultaneously will never publish the same message twice simultaneously.

  4. Crash recovery -- a message claimed by a crashed poller is automatically reclaimed after a TTL expires.

Schema

CREATE TABLE orders (
    id          UUID        PRIMARY KEY DEFAULT gen_random_uuid(),
    customer_id UUID        NOT NULL,
    amount      BIGINT      NOT NULL,
    status      TEXT        NOT NULL DEFAULT 'pending',
    created_at  TIMESTAMPTZ NOT NULL DEFAULT now(),

    CONSTRAINT amount_positive CHECK (amount > 0),
    CONSTRAINT valid_status    CHECK (status IN ('pending','confirmed','cancelled'))
);

CREATE TABLE outbox_messages (
    id           UUID        PRIMARY KEY DEFAULT gen_random_uuid(),
    event_type   TEXT        NOT NULL,
    payload      JSONB       NOT NULL,
    aggregate_id TEXT        NOT NULL,
    status       TEXT        NOT NULL DEFAULT 'pending',
    attempts     INT         NOT NULL DEFAULT 0,
    locked_until TIMESTAMPTZ,
    last_error   TEXT,
    published_at TIMESTAMPTZ,
    created_at   TIMESTAMPTZ NOT NULL DEFAULT now(),

    CONSTRAINT valid_status          CHECK (status IN ('pending','processing','published','failed')),
    CONSTRAINT attempts_non_negative CHECK (attempts >= 0)
);

-- Partial index: only pending rows are indexed, keeps polling fast as the table grows
CREATE INDEX outbox_pending_idx
    ON outbox_messages (created_at)
    WHERE status = 'pending';

Every field is deliberate:

  • amount BIGINT -- money in cents, never a float. £29.99 is stored as 2999.

  • CHECK (amount > 0) -- the database rejects invalid amounts even if the application layer has a bug.

  • locked_until TIMESTAMPTZ -- the crash recovery mechanism. When a poller claims a message, it sets locked_until = now() + 30s. If the poller crashes, the lock expires and the next tick reclaims it.

  • last_error TEXT -- publish failures are recorded with their reason. Failed messages are visible, not silently stuck.

  • payload JSONB -- the message content is stored verbatim at write time. No recomputation on retry.

  • outbox_pending_idx -- a partial index on status = 'pending' keeps polling fast as the table grows. Only pending rows are indexed.

Failure modes

Failure What the system does
Process crashes after order INSERT, before outbox INSERT Transaction rolls back -- both tables empty, no inconsistency
Poller crashes after claiming, before publishing locked_until expires -- next tick reclaims and retries
Poller crashes after publishing, before marking published Message published again on retry -- consumer must deduplicate
Two concurrent pollers claim the same row FOR UPDATE SKIP LOCKED prevents this -- second poller skips the row
Broker permanently rejects a message Status set to failed, last_error recorded -- visible for investigation

The implementation -- five phases

The system was built in five phases. Each phase adds exactly one guarantee and leaves the previous phases unchanged.

Phase 1 -- Types + schema     ->  invalid money is unrepresentable
Phase 2 -- DB + order service ->  dual write is atomic
Phase 3 -- Poller             ->  concurrent pollers are safe
Phase 4 -- Publisher          ->  broker confirms before ack
Phase 5 -- Shutdown + tests   ->  all guarantees proven

Phase 1 -- Types and schema: the domain before any connection

Before a connection is opened, the type system defines what the domain looks like and what it forbids.

#[derive(Debug, Clone, Copy)]
pub struct Money(i64);

impl Money {
    pub fn from_cents(c: i64) -> Result<Self, OutboxError> {
        if c <= 0 {
            return Err(OutboxError::InvalidAmount);
        }
        Ok(Self(c))
    }

    pub fn cents(&self) -> i64 { self.0 }
}

Money(i64) is a newtype. The inner field is private. The only way to construct a Money value is through from_cents, which rejects zero and negative amounts at the boundary -- before the value reaches any database query.

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub struct OrderId(Uuid);

impl OrderId {
    pub fn new() -> Self { OrderId(Uuid::new_v4()) }
    pub fn as_uuid(&self) -> &Uuid { &self.0 }
    pub fn from_uuid(uuid: Uuid) -> Self { Self(uuid) }
}

#[derive(Debug, Clone, Copy)]
pub struct CustomerId(Uuid);

impl CustomerId {
    pub fn new() -> Self { CustomerId(Uuid::new_v4()) }
    pub fn as_uuid(&self) -> &Uuid { &self.0 }
    pub fn from_uuid(uuid: Uuid) -> Self { Self(uuid) }
}

OrderId and CustomerId are both wrappers around Uuid, but they are distinct types. Passing an OrderId where a CustomerId is expected is a compile error. In Go, both would be uuid.UUID -- the distinction is a variable name, not a type.

EventType wraps a String and rejects empty strings at construction time:

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventType(String);

impl EventType {
    pub fn new(s: impl Into<String>) -> Result<Self, OutboxError> {
        let s = s.into();
        if s.trim().is_empty() {
            return Err(OutboxError::InvalidEventType);
        }
        Ok(Self(s))
    }

    pub fn as_str(&self) -> &str { &self.0 }
}

The status types mirror the CHECK constraints in the schema exactly. A new variant cannot be added to the application without also updating the schema -- and vice versa:

#[derive(Debug)]
pub enum OrderStatus { Pending, Confirmed, Cancelled }

impl TryFrom<&str> for OrderStatus {
    type Error = OutboxError;
    fn try_from(s: &str) -> Result<Self, Self::Error> {
        match s {
            "pending"   => Ok(Self::Pending),
            "confirmed" => Ok(Self::Confirmed),
            "cancelled" => Ok(Self::Cancelled),
            _ => Err(OutboxError::Config(format!("unknown order status: {s}"))),
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum MessageStatus { Pending, Processing, Published, Failed }

OutboxMessage is the struct the poller works with after claiming rows from the database:

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OutboxMessage {
    pub id:           Uuid,
    pub event_type:   EventType,
    pub payload:      serde_json::Value,
    pub aggregate_id: String,
    pub status:       MessageStatus,
    pub attempts:     i32,
    pub last_error:   Option<String>,
    pub published_at: Option<DateTime<Utc>>,
    pub created_at:   DateTime<Utc>,
}

The error type covers every distinct failure mode. Every match on OutboxError is exhaustive -- a new variant that is not handled everywhere is a compile error:

#[derive(Debug, thiserror::Error)]
pub enum OutboxError {
    #[error("invalid amount: must be greater than zero cents")]
    InvalidAmount,
    #[error("invalid event type: must not be empty")]
    InvalidEventType,
    #[error("order not found: {0}")]
    OrderNotFound(Uuid),
    #[error("database error: {0}")]
    Database(#[from] sqlx::Error),
    #[error("serialization error: {0}")]
    Serialization(#[from] serde_json::Error),
    #[error("broker nacked message: {0}")]
    BrokerNack(Uuid),
    #[error("publisher confirms not enabled on channel")]
    ConfirmsNotEnabled,
    #[error("broker connection error: {0}")]
    BrokerConnection(#[from] lapin::Error),
    #[error("configuration error: {0}")]
    Config(String),
}

Phase 2 -- DB layer: the dual write

Phase 2 is the core of the pattern. The most important line in the entire codebase is a function signature:

pub async fn insert_order(
    tx: &mut Transaction<'_, Postgres>,  // <- not &PgPool
    customer_id: CustomerId,
    amount: Money,
) -> Result<OrderId, OutboxError>

insert_order takes &mut Transaction, not &PgPool. This is not a convention or a guideline, it is a type constraint enforced by the compiler. Calling insert_order outside a transaction is a compile error. The dual-write guarantee is structural, not disciplinary.

In Go, the equivalent function typically takes an interface that both *sql.DB and *sql.Tx satisfy. Passing the pool connection instead of a transaction is valid Go, it compiles, runs, and silently breaks the atomicity guarantee.

The full implementation of insert_order uses query_scalar! to get the database-generated id back in the same round trip:

pub async fn insert_order(
    tx: &mut Transaction<'_, Postgres>,
    customer_id: CustomerId,
    amount: Money,
) -> Result<OrderId, OutboxError> {
    let id = sqlx::query_scalar!(
        r#"
        INSERT INTO orders (customer_id, amount)
        VALUES (\(1, \)2)
        RETURNING id
        "#,
        customer_id.as_uuid(),
        amount.cents(),
    )
    .fetch_one(&mut **tx)
    .await?;

    Ok(OrderId::from_uuid(id))
}

insert_outbox_message writes the event into outbox_messages inside the same transaction. Passing a &EventType instead of a raw &str means an empty event type is rejected before the query runs:

pub async fn insert_outbox_message(
    tx: &mut Transaction<'_, Postgres>,
    event_type: &EventType,
    payload: serde_json::Value,
    aggregate_id: &str,
) -> Result<Uuid, OutboxError> {
    let id = sqlx::query_scalar!(
        r#"
        INSERT INTO outbox_messages (event_type, payload, aggregate_id)
        VALUES (\(1, \)2, $3)
        RETURNING id
        "#,
        event_type.as_str(),
        payload,
        aggregate_id,
    )
    .fetch_one(&mut **tx)
    .await?;

    Ok(id)
}

create_order owns the transaction boundary. Both writes happen inside a single with_transaction closure. If insert_outbox_message fails for any reason, the ? propagates the error, the closure returns Err, and with_transaction drops the transaction without committing, rolling back the order insert too:

pub async fn create_order(
    &self,
    customer_id: CustomerId,
    amount: Money,
) -> Result<OrderId, OutboxError> {
    self.with_transaction(|mut tx| async move {
        let order_id = Db::insert_order(&mut tx, customer_id, amount).await?;

        Db::insert_outbox_message(
            &mut tx,
            &EventType::new("order.created")?,
            serde_json::json!({
                "order_id":    order_id.as_uuid(),
                "customer_id": customer_id.as_uuid(),
                "amount":      amount.cents(),
            }),
            &order_id.to_string(),
        ).await?;

        Ok((order_id, tx))
    }).await
}

The with_transaction helper commits on success and drops, rolling back on any error. The FnOnce bound means the closure is consumed and can only be called once:

pub async fn with_transaction<'a, F, Fut, T>(&self, f: F) -> Result<T, OutboxError>
where
    F: FnOnce(Transaction<'a, Postgres>) -> Fut,
    Fut: Future<Output = Result<(T, Transaction<'a, Postgres>), OutboxError>>,
{
    let tx = self.pool.begin().await?;
    let (result, tx) = f(tx).await?;  // error here -> tx dropped -> rollback
    tx.commit().await?;
    Ok(result)
}

The gap cannot exist. If either write fails, neither persists.


Phase 3 -- Poller: concurrent safety with FOR UPDATE SKIP LOCKED

The PollerConfig makes the three tunable parameters explicit and gives them safe defaults:

#[derive(Debug, Clone)]
pub struct PollerConfig {
    pub batch_size:    i64,
    pub poll_interval: Duration,
    pub lock_secs:     i64,
}

impl Default for PollerConfig {
    fn default() -> Self {
        Self {
            batch_size:    10,
            poll_interval: Duration::from_secs(2),
            lock_secs:     30,
        }
    }
}

The poll query claims messages atomically, no separate select-then-update. The claim and the status transition happen in one statement:

pub async fn poll(
    &self,
    batch_size: i64,
    lock_secs: i64,
) -> Result<Vec<OutboxMessage>, OutboxError> {
    let rows = sqlx::query!(
        r#"
        UPDATE outbox_messages
        SET    status       = 'processing',
               attempts     = attempts + 1,
               locked_until = now() + ($1 || ' seconds')::interval
        WHERE  id IN (
            SELECT id FROM outbox_messages
            WHERE  status = 'pending'
            AND    (locked_until IS NULL OR locked_until < now())
            ORDER  BY created_at
            LIMIT  $2
            FOR UPDATE SKIP LOCKED
        )
        RETURNING id, event_type, payload, aggregate_id, attempts, created_at
        "#,
        lock_secs.to_string(),
        batch_size,
    )
    .fetch_all(&self.pool)
    .await?;

    rows.into_iter()
        .map(|r| Ok(OutboxMessage {
            id:           r.id,
            event_type:   EventType::new(r.event_type)?,
            payload:      r.payload,
            aggregate_id: r.aggregate_id,
            status:       MessageStatus::Processing,
            attempts:     r.attempts,
            last_error:   None,
            published_at: None,
            created_at:   r.created_at,
        }))
        .collect()
}

FOR UPDATE SKIP LOCKED is the concurrent safety mechanism. When two pollers run simultaneously, each acquires a row-level lock on the rows it selects. The second poller's SELECT skips any row that is already locked. The two pollers claim disjoint sets of rows. No message is claimed twice.

locked_until is the crash recovery mechanism. If a poller claims a batch and then crashes before marking the messages published, the locks expire after 30 seconds. The next poller tick finds those rows eligible again, locked_until < now() and reclaims them.

After publishing, the poller calls mark_published or mark_failed:

pub async fn mark_published(&self, id: Uuid) -> Result<(), OutboxError> {
    sqlx::query!(
        r#"
        UPDATE outbox_messages
        SET    status       = 'published',
               published_at = now(),
               locked_until = NULL
        WHERE  id = $1
        "#,
        id,
    )
    .execute(&self.pool)
    .await?;
    Ok(())
}

pub async fn mark_failed(&self, id: Uuid, error: &str) -> Result<(), OutboxError> {
    sqlx::query!(
        r#"
        UPDATE outbox_messages
        SET    status       = 'failed',
               last_error   = $1,
               locked_until = NULL
        WHERE  id = $2
        "#,
        error, id,
    )
    .execute(&self.pool)
    .await?;
    Ok(())
}

Phase 4 -- Publisher: confirm before ack

Publishing to RabbitMQ is behind a concrete Publisher struct. The poller depends on it via a PublisherTrait so tests can substitute a MockPublisher without a live broker:

pub trait PublisherTrait: Send + Sync {
    async fn publish(&self, msg: &OutboxMessage) -> Result<(), OutboxError>;
}

Publisher::connect declares a durable topic exchange and enables publisher confirms on the channel before returning:

pub async fn connect(amqp_url: &str, exchange: &str) -> Result<Self, OutboxError> {
    let conn    = Connection::connect(amqp_url, ConnectionProperties::default()).await?;
    let channel = conn.create_channel().await?;

    channel.exchange_declare(
        exchange,
        lapin::ExchangeKind::Topic,
        ExchangeDeclareOptions { durable: true, ..Default::default() },
        Default::default(),
    ).await?;

    channel.confirm_select(ConfirmSelectOptions::default()).await?;

    Ok(Self { channel, exchange: exchange.to_string() })
}

publish sends the message and waits for the broker's durable confirm. The double .await is intentional:

pub async fn publish(&self, msg: &OutboxMessage) -> Result<(), OutboxError> {
    let payload     = serde_json::to_vec(&msg.payload)?;
    let routing_key = msg.event_type.as_str();

    let confirm = self.channel
        .basic_publish(
            &self.exchange,
            routing_key,
            BasicPublishOptions::default(),
            &payload,
            BasicProperties::default()
                .with_message_id(msg.id.to_string().into())
                .with_content_type("application/json".into())
                .with_delivery_mode(2),  // 2 = persistent
        )
        .await?   // first await: message enters broker TCP buffer
        .await?;  // second await: broker has written to disk and sent Ack

    match confirm {
        Confirmation::Ack(_)          => Ok(()),
        Confirmation::Nack(_)         => Err(OutboxError::BrokerNack(msg.id)),
        Confirmation::NotRequested    => Err(OutboxError::ConfirmsNotEnabled),
    }
}

Without confirm_select(), the first .await returns Ok as soon as the message enters the broker', before it is durable. A broker crash at that moment loses the message silently. With confirms enabled, Ok only returns after the broker has committed the message to disk. The poller only calls mark_published after both awaits return Ok.

The poller loop acts on the result:

match publisher.publish(msg).await {
    Ok(()) => {
        if let Err(e) = self.db.mark_published(msg.id).await {
            tracing::error!(id = %msg.id, error = %e,
                "failed to mark published -- locked_until will expire");
        }
    }
    Err(e) => {
        self.db.mark_failed(msg.id, &e.to_string()).await?;
    }
}

mark_failed sets status = 'failed' and records the error in last_error. The message is not stuck in processing, it is visible, queryable, and can be retried.


Phase 5 -- Shutdown and tests

The HTTP server and the poller run as independent Tokio tasks. Graceful shutdown coordinates them via watch::channel:

let (shutdown_tx, shutdown_rx) = watch::channel(false);

let handle = tokio::spawn(async move {
    poller.run(Arc::clone(&publisher), shutdown_rx).await
});

axum::serve(listener, app)
    .with_graceful_shutdown(async { tokio::signal::ctrl_c().await.ok(); })
    .await?;

shutdown_tx.send(true)?;
handle.await??;

Inside the poller, tokio::select! races the shutdown signal against the poll interval. The poller does not abandon a batch mid-flight, it finishes the current batch, then exits on the next tick:

pub async fn run(
    &self,
    publisher: Arc<Publisher>,
    mut shutdown: watch::Receiver<bool>,
) -> Result<(), OutboxError> {
    let mut ticker = interval(self.config.poll_interval);
    loop {
        tokio::select! {
            _ = shutdown.changed() => {
                if *shutdown.borrow() { return Ok(()); }
            }
            _ = ticker.tick() => {
                let messages = match self.db.poll(
                    self.config.batch_size,
                    self.config.lock_secs,
                ).await {
                    Ok(m)  => m,
                    Err(e) => {
                        tracing::error!(error = %e, "poll failed, will retry next tick");
                        continue;
                    }
                };

                for msg in &messages {
                    match publisher.publish(msg).await {
                        Ok(())  => { self.db.mark_published(msg.id).await?; }
                        Err(e)  => { self.db.mark_failed(msg.id, &e.to_string()).await?; }
                    }
                }
            }
        }
    }
}

Proving it works -- five adversarial tests

Test 1 -- Dual write is atomic

#[tokio::test]
async fn create_order_writes_both_rows_atomically() {
    let db = setup().await;

    let order_id = db.create_order(customer(), Money::from_cents(1000).unwrap()).await.unwrap();

    let orders_count  = db.count_orders().await.unwrap();
    let pending_count = db.count_outbox_by_status("pending").await.unwrap();

    assert_eq!(orders_count,  1, "exactly one order");
    assert_eq!(pending_count, 1, "exactly one pending outbox message");

    let msg = db.latest_outbox_message().await.unwrap().unwrap();
    assert_eq!(msg.aggregate_id, order_id.to_string());
    assert_eq!(msg.payload["order_id"].as_str().unwrap(), order_id.as_uuid().to_string());
}

Both rows exist, the aggregate_id links them, and the payload is correct. This is the happy path. Test 2 proves the failure path.

Test 2 -- The dual-write gap demonstrated and closed

#[tokio::test]
async fn the_dual_write_gap_is_visible_naively_and_closed_by_outbox() {
    let db = setup().await;

    // Part A -- show the gap is real
    sqlx::query!("INSERT INTO orders (customer_id, amount) VALUES (\(1, \)2)",
        Uuid::new_v4(), 1000i64)
        .execute(&db.pool).await.unwrap();

    assert_eq!(db.count_orders().await.unwrap(),  1, "order exists");
    assert_eq!(db.count_outbox_by_status("pending").await.unwrap(), 0,
        "no outbox message -- the gap is real");

    sqlx::query("DELETE FROM orders").execute(&db.pool).await.unwrap();

    // Part B -- show the outbox pattern closes it
    let _ = db.with_transaction(|mut tx| async move {
        Db::insert_order(&mut tx, customer(), ten_cents()).await?;
        // Simulate crash after order INSERT, before outbox INSERT
        Err::<((), _), OutboxError>(OutboxError::Config("simulated crash".into()))
    }).await;

    assert_eq!(db.count_orders().await.unwrap(),  0, "order rolled back");
    assert_eq!(db.count_outbox_by_status("pending").await.unwrap(), 0,
        "gap cannot exist with the outbox pattern");
}

Part A shows the gap with a direct insert. Part B proves the pattern closes it by simulating a crash mid-transaction. A reader who runs this test understands the pattern.

Test 3 -- Concurrent pollers never claim the same message

#[tokio::test]
async fn concurrent_pollers_never_claim_same_message() {
    let db = setup().await;

    for _ in 0..10 {
        db.create_order(customer(), ten_cents()).await.unwrap();
    }

    let mut set = tokio::task::JoinSet::new();
    for _ in 0..3 {
        let db = db.clone();
        set.spawn(async move { db.poll(10, 30).await.unwrap() });
    }

    let mut all_ids: Vec<Uuid> = Vec::new();
    while let Some(result) = set.join_next().await {
        for msg in result.unwrap() { all_ids.push(msg.id); }
    }

    let unique: HashSet<_> = all_ids.iter().collect();
    assert_eq!(unique.len(), all_ids.len(), "no message claimed twice");
    assert_eq!(all_ids.len(), 10, "all messages claimed exactly once");
}

If FOR UPDATE SKIP LOCKED were missing, two pollers would claim overlapping rows. The HashSet check would fail with "no message claimed twice." One clause, one failure mode, one test.

Test 4 -- End-to-end: poller publishes with correct payload

#[tokio::test]
async fn poller_publishes_order_message_with_correct_payload() {
    let db        = setup().await;
    let publisher = Arc::new(Publisher::connect(&amqp_url, "orders").await.unwrap());

    let order_id = db.create_order(customer(), ten_cents()).await.unwrap();

    let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
    let handle = tokio::spawn(async move {
        fast_poller(db.clone()).run(Arc::clone(&publisher), shutdown_rx).await
    });

    let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
    loop {
        if db.count_outbox_by_status("published").await.unwrap() == 1 { break; }
        if tokio::time::Instant::now() > deadline { panic!("timeout"); }
        tokio::time::sleep(Duration::from_millis(50)).await;
    }

    shutdown_tx.send(true).unwrap();
    handle.await.unwrap().unwrap();

    let msg = db.latest_outbox_message().await.unwrap().unwrap();
    assert_eq!(msg.status, MessageStatus::Published);
    assert_eq!(msg.payload["order_id"].as_str().unwrap(), order_id.as_uuid().to_string());
}

This is the only test that requires a live RabbitMQ connection. It proves the full path from create_order to status = 'published'.

Test 5 -- Publish failure is visible and recoverable

#[tokio::test]
async fn publish_failure_marks_message_failed_and_leaves_order_intact() {
    let db       = setup().await;
    let order_id = db.create_order(customer(), ten_cents()).await.unwrap();

    let mut messages = db.poll(10, 30).await.unwrap();
    let msg = messages.remove(0);

    db.mark_failed(msg.id, "simulated broker rejection").await.unwrap();

    assert_eq!(db.count_outbox_by_status("failed").await.unwrap(),     1);
    assert_eq!(db.count_outbox_by_status("processing").await.unwrap(), 0);

    let stored = db.latest_outbox_message().await.unwrap().unwrap();
    assert_eq!(stored.status, MessageStatus::Failed);
    assert_eq!(stored.last_error.as_deref(), Some("simulated broker rejection"));

    let order = db.get_order(order_id).await.unwrap().unwrap();
    assert_eq!(order.amount.cents(), 1000, "order unchanged after failed publish");
}

A failed publish does not lose the message and does not corrupt the order. The message is in failed status with its error reason recorded. The order is untouched.

Running the tests

docker compose up -d postgres rabbitmq
sqlx migrate run
cargo test
test create_order_writes_both_rows_atomically                    ... ok
test the_dual_write_gap_is_visible_naively_and_closed_by_outbox  ... ok
test concurrent_pollers_never_claim_same_message                 ... ok
test poller_publishes_order_message_with_correct_payload         ... ok
test publish_failure_marks_message_failed_and_leaves_order_intact ... ok

test result: ok. 5 passed; 0 failed

The Go comparison

Guarantee Rust mechanism Go mechanism
Money type safety Money(i64) newtype, private field type Money int64 -- convention only
Transaction boundary enforced insert_order(tx: &mut Transaction) -- pool impossible insertOrder(ctx, tx *sql.Tx) -- pool accepted by type
Publisher abstraction PublisherTrait -- dyn PublisherTrait in poller Publisher interface -- same concept, same ergonomics
Error handling Result<T, E> with ? -- cannot be ignored error return -- if err != nil can be omitted
Shutdown coordination watch::channel + tokio::select! context.Context + select {}
Exhaustive message status enum MessageStatus -- unhandled variant = compile error const strings -- new value silently falls through

What Rust enforces that Go leaves to discipline

Transaction boundary. insert_order takes &mut Transaction<'_, Postgres>, not &PgPool. Calling it with a pool connection is a compile error. In Go, *sql.Tx and *sql.DB both satisfy a QueryerContext interface, the developer must ensure the right one is passed.

Publisher result. Result<(), OutboxError> from publish cannot be silently discarded. In Go, ignoring the error return from a publish call is a one-character omission.

Enum exhaustiveness. Adding a new MessageStatus variant breaks every match that does not handle it. In Go, a new status string passes silently through every switch that lacks a case for it.

What Go does better

Go's concurrency model is simpler to read. The poller loop in Go with select on a context.Done() channel is idiomatic and familiar to any Go developer. The Rust equivalent with tokio::select! and watch::Receiver achieves the same thing but requires understanding async Rust's task model. Go also starts faster and compiles faster -- relevant for a background service that is restarted frequently.


What I learned

The most instructive thing about this project was Test 2 -- the test that demonstrates the gap before proving it is closed. Writing that test forced a clear definition of what the pattern actually prevents. The gap is not abstract. You can reproduce it in five lines of SQL. You can prove it is closed in ten lines of Rust. The test is the clearest explanation of the pattern that could be written.

The second thing: FOR UPDATE SKIP LOCKED is doing a lot of work that is easy to take for granted. The concurrent poller test, three tasks, ten messages, zero duplicates, passes because of one clause in one SQL query. Remove it and replace it with FOR UPDATE, and the pollers serialise rather than parallelise. Remove it entirely and duplicates appear. One clause, two distinct failure modes. Worth understanding before you need it in production.


Where to go next


Part of a series on building backend systems in Rust and Go.