Skip to main content

The Problem πŸ’€

Consider a typical e-commerce flow:
// User places an order
const order = Order.create({ items, total: 150 });
await repo.save(order);           // βœ… Persisted to DB

// Dispatch domain events
await order.dispatchAll(bus);     // πŸ’€ CRASH HERE

The Failure Window

There’s a gap between save() and dispatchAll(). If the process dies in that gap:
What happensImpact
Order is saved to DBβœ… Data is safe
OrderPlaced event❌ Silently lost
Inventory reservation❌ Never happens
Confirmation email❌ Never sent
Analytics tracking❌ Missing
This isn’t theoretical. It happens in production:
  • Process crashes (unhandled exceptions, OOM kills)
  • Deployments (container is replaced mid-request)
  • Network blips (message broker unreachable at that exact moment)
  • Infrastructure (node restart, pod eviction)
The event is silently lost β€” no trace, no retry, no recovery.

The Solution β€” Transactional Outbox Pattern

The Transactional Outbox Pattern ensures events are persisted atomically with your aggregate write, then published by a background process:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Transaction Boundary                            β”‚
β”‚                                                  β”‚
β”‚  INSERT INTO orders (...)                        β”‚
β”‚  INSERT INTO outbox (eventId, payload, ...)      β”‚
β”‚                                                  β”‚
β”‚  ═══════════ COMMIT ═══════════                  β”‚
β”‚  βœ… Both rows written atomically                  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚
         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  OutboxPublisher (background process)            β”‚
β”‚                                                  β”‚
β”‚  SELECT * FROM outbox WHERE status = 'pending'   β”‚
β”‚  β†’ publish to message broker                     β”‚
β”‚  β†’ UPDATE outbox SET status = 'published'        β”‚
β”‚                                                  β”‚
β”‚  On failure:                                     β”‚
β”‚  β†’ UPDATE outbox SET status = 'failed', retries++β”‚
β”‚  β†’ Retry on next poll                            β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Immediate-First, Outbox as Safety Net

Rich Domain’s implementation uses a decorator pattern β€” your code stays identical:
// Your use case code β€” unchanged
await repo.save(order);
await order.dispatchAll(bus);   // Still publishes immediately!
Behind the scenes:
  1. Immediate publish β€” dispatchAll() tries the real bus first (RabbitMQ, Kafka, etc.)
  2. Success β†’ outboxStore.markPublished(eventId) β€” marks as done
  3. Failure β†’ outboxStore.markFailed(eventId, error) β€” records the error
  4. Background safety net β€” OutboxPublisher polls for pending events and retries
The outbox is the safety net, not the primary path. You get low-latency delivery when the broker is healthy, and guaranteed delivery when it’s not.
The repo.save() call also auto-saves uncommitted events to the outbox table in the same transaction β€” so even if dispatchAll() isn’t called at all, your events are preserved.

Proven Pattern

This isn’t new. Established frameworks have used this pattern for years:
FrameworkEcosystemHow they do it
MassTransit.NETTransactional outbox built-in; stores messages in DB transaction, publishes via Quartz.NET scheduled job
NServiceBus.NETOutbox feature since v6; stores outgoing messages in same DB transaction as business data
CAP.NETEQueue outbox pattern; [CapSubscribe] attribute + transactional message storage
DebeziumJavaCDC-based outbox; tails the DB transaction log to pick up outbox events
Axon FrameworkJavaEvent sourcing + outbox; persists domain events in the same transaction as the aggregate
Frameworks like Dapper (.NET) and Spring Cloud Stream (Java) also support variations of this pattern. @woltz/rich-domain-outbox brings the same reliability guarantee to the TypeScript/Node.js ecosystem β€” with zero external dependencies beyond your existing database.

Quick Start

Step 1: Install

npm install @woltz/rich-domain-outbox

Step 2: Create the Outbox Table

The outbox table has a simple, fixed schema. The id column stores the event’s own eventId β€” this is how markPublished(eventId) does a direct primary key lookup:
ColumnTypeDescription
idTEXTPRIMARY KEY. Stores event.eventId directly
eventNameTEXTThe event class name (e.g. "OrderPlaced")
payloadJSONB / JSONThe event payload
occurredOnTIMESTAMPTZWhen the event was created
statusTEXT'pending' β†’ 'published' β†’ (or 'failed')
retriesINTEGERNumber of publish attempts. Default 0
lastErrorTEXT NULLLast error message if publish failed
createdAtTIMESTAMPTZWhen the row was inserted
Pick your ORM:
Copy this into your schema.prisma:
model Outbox {
  id          String   @id
  eventName   String
  payload     Json
  occurredOn  DateTime
  status      String   @default("pending")
  retries     Int      @default(0)
  lastError   String?
  createdAt   DateTime @default(now())

  @@index([status])
}
Then import the schema constant for reference:
import { PRISMA_OUTBOX_SCHEMA } from "@woltz/rich-domain-prisma";
Then run your migration as usual (prisma migrate dev, drizzle-kit generate, typeorm migration:run, etc.).

Step 3: Wrap your EventBus

import { OutboxEventBusDecorator } from "@woltz/rich-domain-outbox";

// Your real event bus (RabbitMQ, Kafka, InMemory, etc.)
const realBus = new RabbitMQEventBus(connection);

// Wrap it with the outbox decorator
const bus = new OutboxEventBusDecorator(realBus, outboxStore);

Step 4: Start the OutboxPublisher

import { OutboxPublisher } from "@woltz/rich-domain-outbox";

const publisher = new OutboxPublisher(outboxStore, realBus, {
  pollIntervalMs: 5000,  // Poll every 5 seconds
  batchSize: 50,         // Process up to 50 events per batch
  maxRetries: 3,         // Give up after 3 failed attempts
});

publisher.start();

// Graceful shutdown
process.on("SIGTERM", async () => {
  await publisher.stop();
});

Step 5: Your Code Stays the Same

// This code doesn't change at all
const order = Order.create({ items, total: 150 });

await repo.save(order);              // Events auto-saved to outbox
await order.dispatchAll(bus);        // Still publishes immediately
The outbox is the safety net, not a replacement. Events are published immediately when possible. The background publisher only picks up events that failed to publish (or were never dispatched at all).

ORM Integration

Each ORM adapter provides an outbox store that integrates with the adapter’s transaction management.

Prisma

import { PrismaClient } from "@prisma/client";
import { PrismaUnitOfWork, PrismaOutboxStore } from "@woltz/rich-domain-prisma";
import { OutboxEventBusDecorator, OutboxPublisher } from "@woltz/rich-domain-outbox";

const prisma = new PrismaClient();
const uow = new PrismaUnitOfWork(prisma);
const outboxStore = new PrismaOutboxStore(prisma);

// Auto-save in repository
class OrderRepository extends PrismaRepository<Order, OrderRecord> {
  constructor(prisma: PrismaClient, uow: PrismaUnitOfWork) {
    super(
      new OrderToPersistenceMapper(prisma, uow),
      new OrderToDomainMapper(),
      prisma,
      uow,
      outboxStore  // ← pass the outbox store
    );
  }
}

// Wrap the event bus
const bus = new OutboxEventBusDecorator(rabbitBus, outboxStore);

// Background publisher
const publisher = new OutboxPublisher(outboxStore, rabbitBus, {
  pollIntervalMs: 5000,
});
publisher.start();
When you call repo.save(order) inside a uow.transaction(), the outbox events are written in the same database transaction as the aggregate β€” guaranteeing atomicity.

Drizzle

import { drizzle } from "drizzle-orm/node-postgres";
import { DrizzleUnitOfWork, DrizzleOutboxStore } from "@woltz/rich-domain-drizzle";
import { OutboxEventBusDecorator, OutboxPublisher } from "@woltz/rich-domain-outbox";

const db = drizzle(pool, { schema });
const uow = new DrizzleUnitOfWork(db);
const outboxStore = new DrizzleOutboxStore(db);

// Configure repository with outboxStore in DrizzleRepositoryConfig
const repo = new OrderRepository({
  db,
  table: schema.orders,
  toDomainMapper: new OrderToDomainMapper(),
  toPersistenceMapper: new OrderToPersistenceMapper(),
  uow,
  outboxStore,  // ← auto-save enabled
});

// Same decorator + publisher pattern
const bus = new OutboxEventBusDecorator(rabbitBus, outboxStore);
const publisher = new OutboxPublisher(outboxStore, rabbitBus);
publisher.start();

TypeORM

import { DataSource } from "typeorm";
import { TypeORMUnitOfWork, TypeORMOutboxStore } from "@woltz/rich-domain-typeorm";
import { OutboxEventBusDecorator, OutboxPublisher } from "@woltz/rich-domain-outbox";

const dataSource = new DataSource({ /* ... */ });
const uow = new TypeORMUnitOfWork(dataSource);
const outboxStore = new TypeORMOutboxStore(dataSource);

// Configure repository with outboxStore in TypeORMRepositoryConfig
const repo = new OrderRepository({
  typeormRepository: dataSource.getRepository(OrderEntity),
  toDomainMapper: new OrderToDomainMapper(),
  toPersistenceMapper: new OrderToPersistenceMapper(),
  uow,
  outboxStore,  // ← auto-save enabled
});

// Same decorator + publisher pattern
const bus = new OutboxEventBusDecorator(rabbitBus, outboxStore);
const publisher = new OutboxPublisher(outboxStore, rabbitBus);
publisher.start();

How Auto-Save Works

When you configure an outboxStore on your repository, the save() method automatically:
  1. Extracts uncommitted domain events from the aggregate (using duck-typing β€” no direct dependency on BaseAggregate)
  2. Clears the events from the aggregate (so dispatchAll() won’t double-publish)
  3. Saves the events to the outbox table in the same transaction context
// Inside the repository base class:
async save(entity: TDomain): Promise<void> {
  const events = this.extractEvents(entity);  // 1. Extract events
  await this.toPersistenceMapper.build(entity); // 2. Persist aggregate
  entity.markAsPersisted();

  if (events.length > 0 && this.outboxStore) {
    await this.outboxStore.save(events);      // 3. Save to outbox (same tx)
  }
}
This means that even if dispatchAll() is never called, your events are safely stored in the outbox. The OutboxPublisher will pick them up on the next poll cycle.

API Reference

OutboxEventBusDecorator

Wraps an IDomainEventBus to track publish success/failure in the outbox.
class OutboxEventBusDecorator implements IDomainEventBus {
  constructor(
    inner: IDomainEventBus,
    outboxStore: IOutboxStore
  );

  async publish(event: IDomainEvent): Promise<void>;
  async publishAll(events: IDomainEvent[]): Promise<void>;
}
MethodDescription
publish(event)Publish immediately. On success β†’ markPublished(eventId). On failure β†’ markFailed(eventId, error) + re-throw
publishAll(events)Same as publish but for multiple events. Empty array is a no-op

OutboxPublisher

Background process that polls the outbox table and publishes pending events.
class OutboxPublisher {
  constructor(
    store: IOutboxStore,
    bus: IDomainEventBus,
    config?: OutboxPublisherConfig
  );

  start(): void;
  stop(): Promise<void>;
  processOnce(): Promise<{ processed: number; failed: number }>;
  get isRunning(): boolean;
}
MethodDescription
start()Begin polling loop. Idempotent β€” safe to call multiple times
stop()Graceful shutdown. Waits for in-flight batch to complete
processOnce()Process a single batch (useful for cron jobs or manual triggers)
isRunningWhether the polling loop is active

OutboxPublisherConfig

OptionDefaultDescription
pollIntervalMs5000How often to poll the outbox table (milliseconds)
batchSize50Maximum events to process per poll cycle
maxRetries3Stop retrying after this many failed attempts
loggerconsoleLogger instance (must have info, warn, error methods)

IOutboxStore

The contract that all ORM-specific outbox stores implement.
interface IOutboxStore {
  save(events: IDomainEvent[]): Promise<void>;
  fetchPending(batchSize?: number): Promise<OutboxEntryData[]>;
  markPublished(eventId: string): Promise<void>;
  markFailed(eventId: string, error: string): Promise<void>;
}
MethodDescription
save(events)Insert events into the outbox table. Uses the transactional client when inside a UoW transaction
fetchPending(batchSize?)Fetch pending events ordered by createdAt ASC
markPublished(eventId)UPDATE outbox SET status = 'published' WHERE id = $eventId β€” direct PK lookup
markFailed(eventId, error)UPDATE outbox SET status = 'failed', retries = retries + 1, lastError = $error WHERE id = $eventId

OutboxEntry

Plain immutable class representing an outbox row.
class OutboxEntry {
  readonly id: string;
  readonly eventName: string;
  readonly payload: unknown;
  readonly occurredOn: Date;
  readonly status: OutboxStatus;
  readonly retries: number;
  readonly lastError: string | null;
  readonly createdAt: Date;

  canRetry(maxRetries: number): boolean;
  get isPublished(): boolean;
  get isPending(): boolean;
  get isFailed(): boolean;
  toJSON(): Record<string, unknown>;
}

OutboxStatus

type OutboxStatus = "pending" | "published" | "failed";

Best Practices

Polling Interval

Tune pollIntervalMs to your latency tolerance:
IntervalUse Case
1000 (1s)Low-latency requirements, high-throughput systems
5000 (5s)General purpose (default)
30000 (30s)Background processes, analytics events
Polling too frequently adds unnecessary database load. Start with 5 seconds and adjust based on your event volume and latency requirements.

Batch Size

batchSize controls how many events are fetched per poll cycle:
// High-throughput system
const publisher = new OutboxPublisher(store, bus, {
  batchSize: 200,
  pollIntervalMs: 2000,
});

// Low-volume system
const publisher = new OutboxPublisher(store, bus, {
  batchSize: 10,
  pollIntervalMs: 10000,
});

Graceful Shutdown

Always stop the publisher during application shutdown to avoid in-flight message loss:
const publisher = new OutboxPublisher(store, bus);
publisher.start();

async function shutdown() {
  console.log("Shutting down...");
  await publisher.stop();  // Waits for current batch to finish
  process.exit(0);
}

process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);

Monitoring

Monitor the outbox table for events stuck in failed status:
-- Events that have exhausted retries
SELECT COUNT(*) FROM outbox
WHERE status = 'failed' AND retries >= 3;

-- Events stuck in pending (possibly orphaned)
SELECT COUNT(*) FROM outbox
WHERE status = 'pending'
  AND createdAt < NOW() - INTERVAL '1 hour';
Set up alerts when these counts exceed a threshold β€” it means the background publisher isn’t keeping up or the broker is down.

Error Handling

The OutboxEventBusDecorator re-throws publish errors so your use case code can respond:
try {
  await order.dispatchAll(bus);
} catch (error) {
  if (error instanceof OutboxPublishError) {
    // Immediate publish failed, but event is safely in outbox
    // Respond to the user β€” the event will be retried
    return { status: "accepted", retryExpected: true };
  }
  throw error;
}

Exports Summary

// Main package: @woltz/rich-domain-outbox
export { OutboxEventBusDecorator } from "./outbox-event-bus-decorator";
export { OutboxPublisher } from "./outbox-publisher";
export { OutboxEntry } from "./outbox-entry";
export { OutboxError, OutboxPublishError, OutboxStoreError } from "./outbox-errors";
export { OUTBOX_DDL } from "./outbox-ddl";
export type { OutboxPublisherConfig } from "./outbox-publisher";

// Core types (from @woltz/rich-domain)
export type { IOutboxStore, OutboxEntryData, OutboxFetchResult, OutboxStatus };

// ORM adapters
export { PrismaOutboxStore, PRISMA_OUTBOX_SCHEMA } from "@woltz/rich-domain-prisma";
export { DrizzleOutboxStore, outboxTable } from "@woltz/rich-domain-drizzle";
export { TypeORMOutboxStore, OutboxEntity } from "@woltz/rich-domain-typeorm";