Skip to main content

Overview

@woltz/rich-domain-typeorm provides full integration between rich-domain and TypeORM, bringing Domain-Driven Design patterns with automatic change tracking and batch operations.
npm install @woltz/rich-domain-typeorm @woltz/rich-domain typeorm

Change Tracking

Automatic detection and persistence of aggregate changes

Batch Operations

Optimized bulk inserts, updates, and deletes

N:N Relations

Smart handling of owned (1:N) and reference (N:N) collections

Transaction Support

Full ACID compliance with @Transactional decorator

Transactional Outbox

Optional outboxStore in repository config for guaranteed event delivery
For a complete working example, see the fastify-with-typeorm example in the repository.

Quick Start

1. Setup DataSource and UnitOfWork

import { DataSource } from "typeorm";
import { TypeORMUnitOfWork } from "@woltz/rich-domain-typeorm";

const dataSource = new DataSource({
  type: "postgres",
  host: "localhost",
  port: 5432,
  username: "user",
  password: "password",
  database: "mydb",
  entities: [UserEntity, PostEntity, TagEntity],
  synchronize: true,
});

await dataSource.initialize();
const uow = new TypeORMUnitOfWork(dataSource);

2. Define Domain Entity

import { Aggregate, Id, EntityValidation } from "@woltz/rich-domain";
import { z } from "zod";

const userSchema = z.object({
  id: z.instanceof(Id),
  email: z.string().email(),
  name: z.string(),
  posts: z.array(z.instanceof(Post)),
  createdAt: z.date(),
  updatedAt: z.date(),
});

type UserProps = z.infer<typeof userSchema>;

export class User extends Aggregate<UserProps> {
  protected static validation: EntityValidation<UserProps> = {
    schema: userSchema,
  };

  get email() { return this.props.email; }
  get name() { return this.props.name; }
  get posts() { return this.props.posts; }

  addPost(post: Post): void {
    this.props.posts.push(post);
  }

  removePost(postId: Id): void {
    const index = this.props.posts.findIndex(p => p.id.equals(postId));
    if (index !== -1) {
      this.props.posts.splice(index, 1);
    }
  }
}

3. Create TypeORM Entity

import { Entity, PrimaryColumn, Column, OneToMany } from "typeorm";

@Entity("users")
export class UserEntity {
  @PrimaryColumn("uuid")
  id!: string;

  @Column()
  email!: string;

  @Column()
  name!: string;

  @OneToMany(() => PostEntity, post => post.author)
  posts!: PostEntity[];

  @Column()
  createdAt!: Date;

  @Column()
  updatedAt!: Date;
}

4. Create Mappers

import { Mapper, Id } from "@woltz/rich-domain";
import {
  TypeORMToPersistence,
  TypeORMToDomain,
} from "@woltz/rich-domain-typeorm";
import { EntitySchemaRegistry } from "@woltz/rich-domain";

// Domain Mapper
export class UserToDomainMapper extends Mapper<UserEntity, User> {
  build(entity: UserEntity): User {
    return User.reconstitute({
      id: Id.from(entity.id),
      email: entity.email,
      name: entity.name,
      posts: entity.posts?.map(p => this.mapPost(p)) ?? [],
      createdAt: entity.createdAt,
      updatedAt: entity.updatedAt,
    });
  }

  private mapPost(entity: PostEntity): Post {
    return Post.reconstitute({
      id: Id.from(entity.id),
      title: entity.title,
      content: entity.mainContent,
      published: entity.published,
      createdAt: entity.createdAt,
      updatedAt: entity.updatedAt,
    });
  }
}

// Persistence Mapper
export class UserToPersistenceMapper extends TypeORMToPersistence<User> {
  protected readonly registry = new EntitySchemaRegistry()
    .register({
      entity: "User",
      table: "users",
      collections: {
        posts: { type: "owned", entity: "Post" },
      },
    })
    .register({
      entity: "Post",
      table: "posts",
      fields: { content: "main_content" },
      parentFk: { field: "authorId", parentEntity: "User" },
    });

  protected readonly entityClasses = new Map<string, new () => any>([
    ["User", UserEntity],
    ["Post", PostEntity],
  ]);

  protected async onCreate(aggregate: User, em: EntityManager): Promise<void> {
    const entity = new UserEntity();
    entity.id = aggregate.id.value;
    entity.email = aggregate.email;
    entity.name = aggregate.name;
    entity.createdAt = aggregate.createdAt;
    entity.updatedAt = aggregate.updatedAt;
    await em.save(entity);

    for (const post of aggregate.posts) {
      const postEntity = new PostEntity();
      postEntity.id = post.id.value;
      postEntity.title = post.title;
      postEntity.mainContent = post.content;
      postEntity.authorId = aggregate.id.value;
      await em.save(postEntity);
    }
  }
}

5. Create Repository

import {
  TypeORMRepository,
  SearchableField,
} from "@woltz/rich-domain-typeorm";

export class UserRepository extends TypeORMRepository<User, UserEntity> {
  constructor(
    typeormRepo: Repository<UserEntity>,
    toDomainMapper: UserToDomainMapper,
    toPersistenceMapper: UserToPersistenceMapper,
    uow: TypeORMUnitOfWork
  ) {
    super({
      typeormRepository: typeormRepo,
      toDomainMapper,
      toPersistenceMapper,
      uow,
      alias: "user",
    });
  }

  protected getDefaultRelations(): string[] {
    return ["posts"];
  }

  protected getSearchableFields(): SearchableField<UserEntity>[] {
    return ["name", "email", "posts.title"];
  }
}

6. Use It

const userRepo = new UserRepository(
  dataSource.getRepository(UserEntity),
  new UserToDomainMapper(),
  new UserToPersistenceMapper(uow),
  uow
);

// Create
const user = new User({
  id: Id.create(),
  email: "john@example.com",
  name: "John",
  posts: [],
  createdAt: new Date(),
  updatedAt: new Date(),
});
await userRepo.save(user);

// Find with Criteria
const criteria = Criteria.create<User>()
  .whereEquals("name", "John")
  .orderByDesc("createdAt")
  .paginate(1, 10);

const result = await userRepo.find(criteria);

// Update with change tracking
user.addPost(new Post({ ... }));
await userRepo.save(user); // Only persists the new post

TypeORMUnitOfWork

Manages transactions with per-request isolation using AsyncLocalStorage.

Setup

import { TypeORMUnitOfWork } from "@woltz/rich-domain-typeorm";

const uow = new TypeORMUnitOfWork(dataSource);
The DataSource must be initialized before creating the UnitOfWork.

Transaction Execution

await uow.transaction(async () => {
  await userRepository.save(user);
  await orderRepository.save(order);
  // All or nothing - auto rollback on failure
});

Request Isolation

Each HTTP request gets its own transaction context:
// Request 1
app.post("/users", async (req, res) => {
  await uow.transaction(async () => {
    // Isolated to Request 1
    await userRepository.save(user);
  });
});

// Request 2 (concurrent)
app.get("/users/:id", async (req, res) => {
  // NOT affected by Request 1's transaction
  const user = await userRepository.findById(req.params.id);
});

API Reference

MethodReturnsDescription
transaction(fn)Promise<T>Execute function in transaction
isInTransaction()booleanCheck if in active transaction
getCurrentContext()TypeORMTransactionContext | nullGet current context
getCurrentEntityManager()EntityManagerGet current or default manager
getDataSource()DataSourceGet underlying DataSource

@Transactional Decorator

Automatically wraps methods in transactions.
import { Transactional } from "@woltz/rich-domain-typeorm";

class UserService {
  constructor(
    private readonly userRepo: UserRepository,
    private readonly uow: TypeORMUnitOfWork
  ) {}

  @Transactional()
  async createUserWithPosts(data: CreateUserData): Promise<User> {
    const user = new User({ ...data, posts: [] });
    await this.userRepo.save(user);

    for (const postData of data.posts) {
      user.addPost(new Post(postData));
    }
    await this.userRepo.save(user);

    return user;
    // Commits on success, rolls back on error
  }
}

Nested Transactions

The decorator is idempotent - if already in a transaction, it reuses it:
@Transactional()
async outer() {
  await this.methodA(); // Uses same transaction
  await this.methodB(); // Uses same transaction
}

@Transactional()
async methodA() {
  // Detects existing transaction and reuses it
}

With Explicit UoW

@Transactional(myUnitOfWork)
async execute() {
  // Uses the explicitly provided UoW
}

TypeORMRepository

Base class for repositories with full Criteria support.

Configuration

export class UserRepository extends TypeORMRepository<User, UserEntity> {
  constructor(config: TypeORMRepositoryConfig<User, UserEntity>) {
    super(config);
  }

  // Override for default relations (eager loading)
  protected getDefaultRelations(): string[] {
    return ["posts", "posts.tags"];
  }

  // Override for searchable fields
  protected getSearchableFields(): SearchableField<UserEntity>[] {
    return [
      "name",                                  // Case-insensitive (default)
      "email",
      { field: "code", caseSensitive: true }, // Case-sensitive
      "posts.title",                           // Nested relation
    ];
  }
}

Transactional Outbox

Pass an optional outboxStore in TypeORMRepositoryConfig. When set, save() automatically persists uncommitted domain events to the outbox table in the same database transaction as the aggregate write.
PropertyTypeRequiredDescription
outboxStoreTypeORMOutboxStoreNoEnables auto-save of domain events on save()
Use TypeORMOutboxStore from @woltz/rich-domain-typeorm. For the full setup (entity registration, event bus decorator, background publisher), see Transactional Outbox.
import { TypeORMOutboxStore } from "@woltz/rich-domain-typeorm";

const outboxStore = new TypeORMOutboxStore(dataSource);

const repo = new OrderRepository({
  typeormRepository: dataSource.getRepository(OrderEntity),
  toDomainMapper: new OrderToDomainMapper(),
  toPersistenceMapper: new OrderToPersistenceMapper(),
  uow,
  outboxStore, // ← events saved atomically with the aggregate
});

Methods

MethodReturnsDescription
find(criteria?)Promise<PaginatedResult<T>>Find with criteria
findById(id)Promise<T | null>Find by ID
findOne(criteria)Promise<T | null>Find first match
count(criteria?)Promise<number>Count matching entities
exists(id)Promise<boolean>Check if exists
save(entity)Promise<void>Save (create or update)
delete(entity)Promise<void>Delete entity

Criteria Queries

const criteria = Criteria.create<User>()
  .whereEquals("status", "active")
  .where("age", "greaterThan", 18)
  .whereContains("email", "@company.com")
  .search("john")
  .orderByDesc("createdAt")
  .paginate(1, 20);

const result = await userRepo.find(criteria);
// result.data - User[]
// result.meta - { page, limit, total, totalPages }
protected getSearchableFields(): SearchableField<UserEntity>[] {
  return [
    "title",                                   // Case-insensitive
    { field: "code", caseSensitive: true },   // Case-sensitive
    "author.name",                             // Nested relation
  ];
}

// Usage
const criteria = Criteria.create<Post>()
  .search("hello");

// Generates:
// WHERE (LOWER(post.title) LIKE LOWER('%hello%')
//        OR LOWER(author.name) LIKE LOWER('%hello%'))

TypeORMToPersistence

Base class for mapping domain aggregates to persistence.

Registry Configuration

export class UserToPersistenceMapper extends TypeORMToPersistence<User> {
  protected readonly registry = new EntitySchemaRegistry()
    .register({
      entity: "User",
      table: "users",
      collections: {
        posts: { type: "owned", entity: "Post" },
      },
    })
    .register({
      entity: "Post",
      table: "posts",
      fields: {
        content: "main_content", // Domain field → DB column
      },
      parentFk: {
        field: "authorId",
        parentEntity: "User",
      },
      collections: {
        tags: {
          type: "reference", // N:N
          entity: "Tag",
          junction: {
            table: "_PostToTag",
            sourceKey: "A",
            targetKey: "B",
          },
        },
      },
    });

  protected readonly entityClasses = new Map([
    ["User", UserEntity],
    ["Post", PostEntity],
    ["Tag", TagEntity],
  ]);

  protected async onCreate(aggregate: User, em: EntityManager): Promise<void> {
    // Create root entity
    const entity = new UserEntity();
    entity.id = aggregate.id.value;
    entity.email = aggregate.email;
    entity.name = aggregate.name;
    await em.save(entity);

    // Create owned entities
    for (const post of aggregate.posts) {
      const postEntity = new PostEntity();
      postEntity.id = post.id.value;
      postEntity.title = post.title;
      postEntity.authorId = aggregate.id.value;
      await em.save(postEntity);
    }
  }
}

Collection Types

TypeBehaviorUse Case
ownedCreates/deletes child entities1:N relationships (User → Posts)
referenceConnects/disconnects via junctionN:N relationships (Post ↔ Tags)

TypeORMBatchExecutor

Executes batch operations from AggregateChanges.

Execution Order

  1. Deletes (leaf → root by depth DESC)
    • Owned: Delete entities
    • Reference: Unlink from junction table
  2. Creates (root → leaf by depth ASC)
    • Owned: Create entities
    • Reference: Insert into junction table
  3. Updates (any order)

Direct Usage

import { TypeORMBatchExecutor } from "@woltz/rich-domain-typeorm";

const executor = new TypeORMBatchExecutor({
  registry: schemaRegistry,
  entityManager: uow.getCurrentEntityManager(),
  entityClasses: new Map([
    ["Post", PostEntity],
    ["Comment", CommentEntity],
    ["Tag", TagEntity],
  ]),
});

await executor.execute(aggregateChanges);

Convenience Function

import { executeBatch } from "@woltz/rich-domain-typeorm";

await executeBatch(entityManager, changes, {
  registry: schemaRegistry,
  entityClasses: entityClassMap,
});

N:N Relations

Configuration

protected readonly registry = new EntitySchemaRegistry()
  .register({
    entity: "Post",
    table: "posts",
    collections: {
      tags: {
        type: "reference",
        entity: "Tag",
        junction: {
          table: "_PostToTag",  // Junction table name
          sourceKey: "A",       // Column for Post ID
          targetKey: "B",       // Column for Tag ID
        },
      },
    },
  });

Usage

const post = await postRepo.findById(postId);

// Add tag (inserts into junction table)
post.addTag(new Tag({ id: Id.from("promo"), name: "Promo" }));
await postRepo.save(post);
// → INSERT INTO "_PostToTag" ("A", "B") VALUES (postId, 'promo')

// Remove tag (deletes from junction table)
post.removeTag(Id.from("promo"));
await postRepo.save(post);
// → DELETE FROM "_PostToTag" WHERE "A" = postId AND "B" = 'promo'

Error Handling

The adapter provides specific error types:
import {
  TypeORMAdapterError,
  EntityClassNotFoundError,
  TableNotFoundError,
  BatchOperationError,
  NoRecordsAffectedError,
  TypeORMRepositoryError,
} from "@woltz/rich-domain-typeorm";

try {
  await userRepo.save(user);
} catch (error) {
  if (error instanceof EntityClassNotFoundError) {
    // Entity class not registered in entityClasses map
  } else if (error instanceof TableNotFoundError) {
    // Entity not in registry
  } else if (error instanceof BatchOperationError) {
    // Batch operation failed
  } else if (error instanceof TypeORMRepositoryError) {
    // General repository error
  }
}

Complete Example

See the fastify-with-typeorm example for a complete working application demonstrating:
  • User aggregate with Posts (1:N owned)
  • Post with Tags (N:N reference via junction table)
  • Case-insensitive search
  • Transaction management
  • CRUD operations
  • Domain events with BullMQ

API Reference

Exports

// Unit of Work
export { TypeORMUnitOfWork, UOWStorage, getCurrentTypeORMContext };
export type { TypeORMTransactionContext };

// Repository
export { TypeORMRepository };
export type { TypeORMRepositoryConfig };

// Outbox
export { TypeORMOutboxStore, OutboxEntity };

// Mappers
export { TypeORMToPersistence };
export { TypeORMToDomain };

// Batch Executor
export { TypeORMBatchExecutor, executeBatch };
export type { TypeORMBatchExecutorConfig };

// Query Builder
export { TypeORMQueryBuilder };
export type { SearchableField, SearchableFieldConfig };

// Decorator
export { Transactional };

// Errors
export {
  TypeORMAdapterError,
  EntityClassNotFoundError,
  TableNotFoundError,
  BatchOperationError,
  NoRecordsAffectedError,
  TypeORMRepositoryError,
  OutboxStoreError,
};

TypeORMRepositoryConfig

interface TypeORMRepositoryConfig<TDomain, TEntity> {
  typeormRepository: Repository<TEntity>;
  toDomainMapper: Mapper<TEntity, TDomain>;
  toPersistenceMapper: TypeORMToPersistence<TDomain>;
  uow: TypeORMUnitOfWork;
  alias?: string; // Default: "entity"
  /** Optional — auto-save domain events on `save()`. See [Transactional Outbox](/integrations/outbox). */
  outboxStore?: TypeORMOutboxStore;
}

SearchableField

type SearchableField<T> =
  | keyof T                    // Simple field
  | `${string}.${string}`      // Nested field
  | {
      field: string;
      caseSensitive?: boolean; // Default: false
    };

EntitySchemaRegistry Configuration

interface SchemaConfig {
  entity: string;
  table: string;
  fields?: Record<string, string>;
  collections?: Record<string, {
    type: "owned" | "reference";
    entity: string;
    junction?: {
      table: string;
      sourceKey: string;
      targetKey: string;
    };
  }>;
  parentFk?: {
    field: string;
    parentEntity: string;
  };
}