The Problem π
Consider a typical e-commerce flow:
// User places an order
const order = Order.create({ items, total: 150 });
await repo.save(order); // β
Persisted to DB
// Dispatch domain events
await order.dispatchAll(bus); // π CRASH HERE
The Failure Window
Thereβs a gap between save() and dispatchAll(). If the process dies in that gap:
| What happens | Impact |
|---|
| Order is saved to DB | β
Data is safe |
OrderPlaced event | β Silently lost |
| Inventory reservation | β Never happens |
| Confirmation email | β Never sent |
| Analytics tracking | β Missing |
This isnβt theoretical. It happens in production:
- Process crashes (unhandled exceptions, OOM kills)
- Deployments (container is replaced mid-request)
- Network blips (message broker unreachable at that exact moment)
- Infrastructure (node restart, pod eviction)
The event is silently lost β no trace, no retry, no recovery.
The Solution β Transactional Outbox Pattern
The Transactional Outbox Pattern ensures events are persisted atomically with your aggregate write, then published by a background process:
ββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Transaction Boundary β
β β
β INSERT INTO orders (...) β
β INSERT INTO outbox (eventId, payload, ...) β
β β
β βββββββββββ COMMIT βββββββββββ β
β β
Both rows written atomically β
ββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
ββββββββββββββββββββββββββββββββββββββββββββββββββββ
β OutboxPublisher (background process) β
β β
β SELECT * FROM outbox WHERE status = 'pending' β
β β publish to message broker β
β β UPDATE outbox SET status = 'published' β
β β
β On failure: β
β β UPDATE outbox SET status = 'failed', retries++β
β β Retry on next poll β
ββββββββββββββββββββββββββββββββββββββββββββββββββββ
Rich Domainβs implementation uses a decorator pattern β your code stays identical:
// Your use case code β unchanged
await repo.save(order);
await order.dispatchAll(bus); // Still publishes immediately!
Behind the scenes:
- Immediate publish β
dispatchAll() tries the real bus first (RabbitMQ, Kafka, etc.)
- Success β
outboxStore.markPublished(eventId) β marks as done
- Failure β
outboxStore.markFailed(eventId, error) β records the error
- Background safety net β
OutboxPublisher polls for pending events and retries
The outbox is the safety net, not the primary path. You get low-latency delivery when the broker is healthy, and guaranteed delivery when itβs not.
The repo.save() call also auto-saves uncommitted events to the outbox table in the same transaction β so even if dispatchAll() isnβt called at all, your events are preserved.
Proven Pattern
This isnβt new. Established frameworks have used this pattern for years:
| Framework | Ecosystem | How they do it |
|---|
| MassTransit | .NET | Transactional outbox built-in; stores messages in DB transaction, publishes via Quartz.NET scheduled job |
| NServiceBus | .NET | Outbox feature since v6; stores outgoing messages in same DB transaction as business data |
| CAP | .NET | EQueue outbox pattern; [CapSubscribe] attribute + transactional message storage |
| Debezium | Java | CDC-based outbox; tails the DB transaction log to pick up outbox events |
| Axon Framework | Java | Event sourcing + outbox; persists domain events in the same transaction as the aggregate |
Frameworks like Dapper (.NET) and Spring Cloud Stream (Java) also support variations of this pattern.
@woltz/rich-domain-outbox brings the same reliability guarantee to the TypeScript/Node.js ecosystem β with zero external dependencies beyond your existing database.
Quick Start
Step 1: Install
npm install @woltz/rich-domain-outbox
Step 2: Create the Outbox Table
The outbox table has a simple, fixed schema. The id column stores the eventβs own eventId β this is how markPublished(eventId) does a direct primary key lookup:
| Column | Type | Description |
|---|
id | TEXT | PRIMARY KEY. Stores event.eventId directly |
eventName | TEXT | The event class name (e.g. "OrderPlaced") |
payload | JSONB / JSON | The event payload |
occurredOn | TIMESTAMPTZ | When the event was created |
status | TEXT | 'pending' β 'published' β (or 'failed') |
retries | INTEGER | Number of publish attempts. Default 0 |
lastError | TEXT NULL | Last error message if publish failed |
createdAt | TIMESTAMPTZ | When the row was inserted |
Pick your ORM:
Prisma
Drizzle
TypeORM
Raw SQL
Copy this into your schema.prisma:model Outbox {
id String @id
eventName String
payload Json
occurredOn DateTime
status String @default("pending")
retries Int @default(0)
lastError String?
createdAt DateTime @default(now())
@@index([status])
}
Then import the schema constant for reference:import { PRISMA_OUTBOX_SCHEMA } from "@woltz/rich-domain-prisma";
Import the table definition from @woltz/rich-domain-drizzle:import { outboxTable } from "@woltz/rich-domain-drizzle";
// Add to your schema
export const schema = {
users: usersTable,
posts: postsTable,
outbox: outboxTable,
};
Add OutboxEntity to your DataSource entities array:import { OutboxEntity } from "@woltz/rich-domain-typeorm";
const dataSource = new DataSource({
// ...
entities: [User, Post, OutboxEntity],
});
Use the exported DDL:import { OUTBOX_DDL } from "@woltz/rich-domain-outbox";
// PostgreSQL
await db.execute(OUTBOX_DDL.postgresql);
// MySQL
await db.execute(OUTBOX_DDL.mysql);
Then run your migration as usual (prisma migrate dev, drizzle-kit generate, typeorm migration:run, etc.).
Step 3: Wrap your EventBus
import { OutboxEventBusDecorator } from "@woltz/rich-domain-outbox";
// Your real event bus (RabbitMQ, Kafka, InMemory, etc.)
const realBus = new RabbitMQEventBus(connection);
// Wrap it with the outbox decorator
const bus = new OutboxEventBusDecorator(realBus, outboxStore);
Step 4: Start the OutboxPublisher
import { OutboxPublisher } from "@woltz/rich-domain-outbox";
const publisher = new OutboxPublisher(outboxStore, realBus, {
pollIntervalMs: 5000, // Poll every 5 seconds
batchSize: 50, // Process up to 50 events per batch
maxRetries: 3, // Give up after 3 failed attempts
});
publisher.start();
// Graceful shutdown
process.on("SIGTERM", async () => {
await publisher.stop();
});
Step 5: Your Code Stays the Same
// This code doesn't change at all
const order = Order.create({ items, total: 150 });
await repo.save(order); // Events auto-saved to outbox
await order.dispatchAll(bus); // Still publishes immediately
The outbox is the safety net, not a replacement. Events are published immediately when possible. The background publisher only picks up events that failed to publish (or were never dispatched at all).
ORM Integration
Each ORM adapter provides an outbox store that integrates with the adapterβs transaction management.
import { PrismaClient } from "@prisma/client";
import { PrismaUnitOfWork, PrismaOutboxStore } from "@woltz/rich-domain-prisma";
import { OutboxEventBusDecorator, OutboxPublisher } from "@woltz/rich-domain-outbox";
const prisma = new PrismaClient();
const uow = new PrismaUnitOfWork(prisma);
const outboxStore = new PrismaOutboxStore(prisma);
// Auto-save in repository
class OrderRepository extends PrismaRepository<Order, OrderRecord> {
constructor(prisma: PrismaClient, uow: PrismaUnitOfWork) {
super(
new OrderToPersistenceMapper(prisma, uow),
new OrderToDomainMapper(),
prisma,
uow,
outboxStore // β pass the outbox store
);
}
}
// Wrap the event bus
const bus = new OutboxEventBusDecorator(rabbitBus, outboxStore);
// Background publisher
const publisher = new OutboxPublisher(outboxStore, rabbitBus, {
pollIntervalMs: 5000,
});
publisher.start();
When you call repo.save(order) inside a uow.transaction(), the outbox events are written in the same database transaction as the aggregate β guaranteeing atomicity.
Drizzle
import { drizzle } from "drizzle-orm/node-postgres";
import { DrizzleUnitOfWork, DrizzleOutboxStore } from "@woltz/rich-domain-drizzle";
import { OutboxEventBusDecorator, OutboxPublisher } from "@woltz/rich-domain-outbox";
const db = drizzle(pool, { schema });
const uow = new DrizzleUnitOfWork(db);
const outboxStore = new DrizzleOutboxStore(db);
// Configure repository with outboxStore in DrizzleRepositoryConfig
const repo = new OrderRepository({
db,
table: schema.orders,
toDomainMapper: new OrderToDomainMapper(),
toPersistenceMapper: new OrderToPersistenceMapper(),
uow,
outboxStore, // β auto-save enabled
});
// Same decorator + publisher pattern
const bus = new OutboxEventBusDecorator(rabbitBus, outboxStore);
const publisher = new OutboxPublisher(outboxStore, rabbitBus);
publisher.start();
TypeORM
import { DataSource } from "typeorm";
import { TypeORMUnitOfWork, TypeORMOutboxStore } from "@woltz/rich-domain-typeorm";
import { OutboxEventBusDecorator, OutboxPublisher } from "@woltz/rich-domain-outbox";
const dataSource = new DataSource({ /* ... */ });
const uow = new TypeORMUnitOfWork(dataSource);
const outboxStore = new TypeORMOutboxStore(dataSource);
// Configure repository with outboxStore in TypeORMRepositoryConfig
const repo = new OrderRepository({
typeormRepository: dataSource.getRepository(OrderEntity),
toDomainMapper: new OrderToDomainMapper(),
toPersistenceMapper: new OrderToPersistenceMapper(),
uow,
outboxStore, // β auto-save enabled
});
// Same decorator + publisher pattern
const bus = new OutboxEventBusDecorator(rabbitBus, outboxStore);
const publisher = new OutboxPublisher(outboxStore, rabbitBus);
publisher.start();
How Auto-Save Works
When you configure an outboxStore on your repository, the save() method automatically:
- Extracts uncommitted domain events from the aggregate (using duck-typing β no direct dependency on
BaseAggregate)
- Clears the events from the aggregate (so
dispatchAll() wonβt double-publish)
- Saves the events to the outbox table in the same transaction context
// Inside the repository base class:
async save(entity: TDomain): Promise<void> {
const events = this.extractEvents(entity); // 1. Extract events
await this.toPersistenceMapper.build(entity); // 2. Persist aggregate
entity.markAsPersisted();
if (events.length > 0 && this.outboxStore) {
await this.outboxStore.save(events); // 3. Save to outbox (same tx)
}
}
This means that even if dispatchAll() is never called, your events are safely stored in the outbox. The OutboxPublisher will pick them up on the next poll cycle.
API Reference
OutboxEventBusDecorator
Wraps an IDomainEventBus to track publish success/failure in the outbox.
class OutboxEventBusDecorator implements IDomainEventBus {
constructor(
inner: IDomainEventBus,
outboxStore: IOutboxStore
);
async publish(event: IDomainEvent): Promise<void>;
async publishAll(events: IDomainEvent[]): Promise<void>;
}
| Method | Description |
|---|
publish(event) | Publish immediately. On success β markPublished(eventId). On failure β markFailed(eventId, error) + re-throw |
publishAll(events) | Same as publish but for multiple events. Empty array is a no-op |
OutboxPublisher
Background process that polls the outbox table and publishes pending events.
class OutboxPublisher {
constructor(
store: IOutboxStore,
bus: IDomainEventBus,
config?: OutboxPublisherConfig
);
start(): void;
stop(): Promise<void>;
processOnce(): Promise<{ processed: number; failed: number }>;
get isRunning(): boolean;
}
| Method | Description |
|---|
start() | Begin polling loop. Idempotent β safe to call multiple times |
stop() | Graceful shutdown. Waits for in-flight batch to complete |
processOnce() | Process a single batch (useful for cron jobs or manual triggers) |
isRunning | Whether the polling loop is active |
OutboxPublisherConfig
| Option | Default | Description |
|---|
pollIntervalMs | 5000 | How often to poll the outbox table (milliseconds) |
batchSize | 50 | Maximum events to process per poll cycle |
maxRetries | 3 | Stop retrying after this many failed attempts |
logger | console | Logger instance (must have info, warn, error methods) |
IOutboxStore
The contract that all ORM-specific outbox stores implement.
interface IOutboxStore {
save(events: IDomainEvent[]): Promise<void>;
fetchPending(batchSize?: number): Promise<OutboxEntryData[]>;
markPublished(eventId: string): Promise<void>;
markFailed(eventId: string, error: string): Promise<void>;
}
| Method | Description |
|---|
save(events) | Insert events into the outbox table. Uses the transactional client when inside a UoW transaction |
fetchPending(batchSize?) | Fetch pending events ordered by createdAt ASC |
markPublished(eventId) | UPDATE outbox SET status = 'published' WHERE id = $eventId β direct PK lookup |
markFailed(eventId, error) | UPDATE outbox SET status = 'failed', retries = retries + 1, lastError = $error WHERE id = $eventId |
OutboxEntry
Plain immutable class representing an outbox row.
class OutboxEntry {
readonly id: string;
readonly eventName: string;
readonly payload: unknown;
readonly occurredOn: Date;
readonly status: OutboxStatus;
readonly retries: number;
readonly lastError: string | null;
readonly createdAt: Date;
canRetry(maxRetries: number): boolean;
get isPublished(): boolean;
get isPending(): boolean;
get isFailed(): boolean;
toJSON(): Record<string, unknown>;
}
OutboxStatus
type OutboxStatus = "pending" | "published" | "failed";
Best Practices
Polling Interval
Tune pollIntervalMs to your latency tolerance:
| Interval | Use Case |
|---|
1000 (1s) | Low-latency requirements, high-throughput systems |
5000 (5s) | General purpose (default) |
30000 (30s) | Background processes, analytics events |
Polling too frequently adds unnecessary database load. Start with 5 seconds and adjust based on your event volume and latency requirements.
Batch Size
batchSize controls how many events are fetched per poll cycle:
// High-throughput system
const publisher = new OutboxPublisher(store, bus, {
batchSize: 200,
pollIntervalMs: 2000,
});
// Low-volume system
const publisher = new OutboxPublisher(store, bus, {
batchSize: 10,
pollIntervalMs: 10000,
});
Graceful Shutdown
Always stop the publisher during application shutdown to avoid in-flight message loss:
const publisher = new OutboxPublisher(store, bus);
publisher.start();
async function shutdown() {
console.log("Shutting down...");
await publisher.stop(); // Waits for current batch to finish
process.exit(0);
}
process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);
Monitoring
Monitor the outbox table for events stuck in failed status:
-- Events that have exhausted retries
SELECT COUNT(*) FROM outbox
WHERE status = 'failed' AND retries >= 3;
-- Events stuck in pending (possibly orphaned)
SELECT COUNT(*) FROM outbox
WHERE status = 'pending'
AND createdAt < NOW() - INTERVAL '1 hour';
Set up alerts when these counts exceed a threshold β it means the background publisher isnβt keeping up or the broker is down.
Error Handling
The OutboxEventBusDecorator re-throws publish errors so your use case code can respond:
try {
await order.dispatchAll(bus);
} catch (error) {
if (error instanceof OutboxPublishError) {
// Immediate publish failed, but event is safely in outbox
// Respond to the user β the event will be retried
return { status: "accepted", retryExpected: true };
}
throw error;
}
Exports Summary
// Main package: @woltz/rich-domain-outbox
export { OutboxEventBusDecorator } from "./outbox-event-bus-decorator";
export { OutboxPublisher } from "./outbox-publisher";
export { OutboxEntry } from "./outbox-entry";
export { OutboxError, OutboxPublishError, OutboxStoreError } from "./outbox-errors";
export { OUTBOX_DDL } from "./outbox-ddl";
export type { OutboxPublisherConfig } from "./outbox-publisher";
// Core types (from @woltz/rich-domain)
export type { IOutboxStore, OutboxEntryData, OutboxFetchResult, OutboxStatus };
// ORM adapters
export { PrismaOutboxStore, PRISMA_OUTBOX_SCHEMA } from "@woltz/rich-domain-prisma";
export { DrizzleOutboxStore, outboxTable } from "@woltz/rich-domain-drizzle";
export { TypeORMOutboxStore, OutboxEntity } from "@woltz/rich-domain-typeorm";