Skip to main content

Overview

@woltz/rich-domain-typeorm provides full integration between rich-domain and TypeORM, bringing Domain-Driven Design patterns with automatic change tracking and batch operations.
npm install @woltz/rich-domain-typeorm @woltz/rich-domain typeorm

Change Tracking

Automatic detection and persistence of aggregate changes

Batch Operations

Optimized bulk inserts, updates, and deletes

N:N Relations

Smart handling of owned (1:N) and reference (N:N) collections

Transaction Support

Full ACID compliance with @Transactional decorator
For a complete working example, see the fastify-with-typeorm example in the repository.

Quick Start

1. Setup DataSource and UnitOfWork

import { DataSource } from "typeorm";
import { TypeORMUnitOfWork } from "@woltz/rich-domain-typeorm";

const dataSource = new DataSource({
  type: "postgres",
  host: "localhost",
  port: 5432,
  username: "user",
  password: "password",
  database: "mydb",
  entities: [UserEntity, PostEntity, TagEntity],
  synchronize: true,
});

await dataSource.initialize();
const uow = new TypeORMUnitOfWork(dataSource);

2. Define Domain Entity

import { Aggregate, Id, EntityValidation } from "@woltz/rich-domain";
import { z } from "zod";

const userSchema = z.object({
  id: z.instanceof(Id),
  email: z.string().email(),
  name: z.string(),
  posts: z.array(z.instanceof(Post)),
  createdAt: z.date(),
  updatedAt: z.date(),
});

type UserProps = z.infer<typeof userSchema>;

export class User extends Aggregate<UserProps> {
  protected static validation: EntityValidation<UserProps> = {
    schema: userSchema,
  };

  get email() { return this.props.email; }
  get name() { return this.props.name; }
  get posts() { return this.props.posts; }

  addPost(post: Post): void {
    this.props.posts.push(post);
  }

  removePost(postId: Id): void {
    const index = this.props.posts.findIndex(p => p.id.equals(postId));
    if (index !== -1) {
      this.props.posts.splice(index, 1);
    }
  }
}

3. Create TypeORM Entity

import { Entity, PrimaryColumn, Column, OneToMany } from "typeorm";

@Entity("users")
export class UserEntity {
  @PrimaryColumn("uuid")
  id!: string;

  @Column()
  email!: string;

  @Column()
  name!: string;

  @OneToMany(() => PostEntity, post => post.author)
  posts!: PostEntity[];

  @Column()
  createdAt!: Date;

  @Column()
  updatedAt!: Date;
}

4. Create Mappers

import { Mapper, Id } from "@woltz/rich-domain";
import {
  TypeORMToPersistence,
  TypeORMToDomain,
} from "@woltz/rich-domain-typeorm";
import { EntitySchemaRegistry } from "@woltz/rich-domain";

// Domain Mapper
export class UserToDomainMapper extends Mapper<UserEntity, User> {
  build(entity: UserEntity): User {
    return User.reconstitute({
      id: Id.from(entity.id),
      email: entity.email,
      name: entity.name,
      posts: entity.posts?.map(p => this.mapPost(p)) ?? [],
      createdAt: entity.createdAt,
      updatedAt: entity.updatedAt,
    });
  }

  private mapPost(entity: PostEntity): Post {
    return Post.reconstitute({
      id: Id.from(entity.id),
      title: entity.title,
      content: entity.mainContent,
      published: entity.published,
      createdAt: entity.createdAt,
      updatedAt: entity.updatedAt,
    });
  }
}

// Persistence Mapper
export class UserToPersistenceMapper extends TypeORMToPersistence<User> {
  protected readonly registry = new EntitySchemaRegistry()
    .register({
      entity: "User",
      table: "users",
      collections: {
        posts: { type: "owned", entity: "Post" },
      },
    })
    .register({
      entity: "Post",
      table: "posts",
      fields: { content: "main_content" },
      parentFk: { field: "authorId", parentEntity: "User" },
    });

  protected readonly entityClasses = new Map<string, new () => any>([
    ["User", UserEntity],
    ["Post", PostEntity],
  ]);

  protected async onCreate(aggregate: User, em: EntityManager): Promise<void> {
    const entity = new UserEntity();
    entity.id = aggregate.id.value;
    entity.email = aggregate.email;
    entity.name = aggregate.name;
    entity.createdAt = aggregate.createdAt;
    entity.updatedAt = aggregate.updatedAt;
    await em.save(entity);

    for (const post of aggregate.posts) {
      const postEntity = new PostEntity();
      postEntity.id = post.id.value;
      postEntity.title = post.title;
      postEntity.mainContent = post.content;
      postEntity.authorId = aggregate.id.value;
      await em.save(postEntity);
    }
  }
}

5. Create Repository

import {
  TypeORMRepository,
  SearchableField,
} from "@woltz/rich-domain-typeorm";

export class UserRepository extends TypeORMRepository<User, UserEntity> {
  constructor(
    typeormRepo: Repository<UserEntity>,
    toDomainMapper: UserToDomainMapper,
    toPersistenceMapper: UserToPersistenceMapper,
    uow: TypeORMUnitOfWork
  ) {
    super({
      typeormRepository: typeormRepo,
      toDomainMapper,
      toPersistenceMapper,
      uow,
      alias: "user",
    });
  }

  protected getDefaultRelations(): string[] {
    return ["posts"];
  }

  protected getSearchableFields(): SearchableField<UserEntity>[] {
    return ["name", "email", "posts.title"];
  }
}

6. Use It

const userRepo = new UserRepository(
  dataSource.getRepository(UserEntity),
  new UserToDomainMapper(),
  new UserToPersistenceMapper(uow),
  uow
);

// Create
const user = new User({
  id: Id.create(),
  email: "john@example.com",
  name: "John",
  posts: [],
  createdAt: new Date(),
  updatedAt: new Date(),
});
await userRepo.save(user);

// Find with Criteria
const criteria = Criteria.create<User>()
  .whereEquals("name", "John")
  .orderByDesc("createdAt")
  .paginate(1, 10);

const result = await userRepo.find(criteria);

// Update with change tracking
user.addPost(new Post({ ... }));
await userRepo.save(user); // Only persists the new post

TypeORMUnitOfWork

Manages transactions with per-request isolation using AsyncLocalStorage.

Setup

import { TypeORMUnitOfWork } from "@woltz/rich-domain-typeorm";

const uow = new TypeORMUnitOfWork(dataSource);
The DataSource must be initialized before creating the UnitOfWork.

Transaction Execution

await uow.transaction(async () => {
  await userRepository.save(user);
  await orderRepository.save(order);
  // All or nothing - auto rollback on failure
});

Request Isolation

Each HTTP request gets its own transaction context:
// Request 1
app.post("/users", async (req, res) => {
  await uow.transaction(async () => {
    // Isolated to Request 1
    await userRepository.save(user);
  });
});

// Request 2 (concurrent)
app.get("/users/:id", async (req, res) => {
  // NOT affected by Request 1's transaction
  const user = await userRepository.findById(req.params.id);
});

API Reference

MethodReturnsDescription
transaction(fn)Promise<T>Execute function in transaction
isInTransaction()booleanCheck if in active transaction
getCurrentContext()TypeORMTransactionContext | nullGet current context
getCurrentEntityManager()EntityManagerGet current or default manager
getDataSource()DataSourceGet underlying DataSource

@Transactional Decorator

Automatically wraps methods in transactions.
import { Transactional } from "@woltz/rich-domain-typeorm";

class UserService {
  constructor(
    private readonly userRepo: UserRepository,
    private readonly uow: TypeORMUnitOfWork
  ) {}

  @Transactional()
  async createUserWithPosts(data: CreateUserData): Promise<User> {
    const user = new User({ ...data, posts: [] });
    await this.userRepo.save(user);

    for (const postData of data.posts) {
      user.addPost(new Post(postData));
    }
    await this.userRepo.save(user);

    return user;
    // Commits on success, rolls back on error
  }
}

Nested Transactions

The decorator is idempotent - if already in a transaction, it reuses it:
@Transactional()
async outer() {
  await this.methodA(); // Uses same transaction
  await this.methodB(); // Uses same transaction
}

@Transactional()
async methodA() {
  // Detects existing transaction and reuses it
}

With Explicit UoW

@Transactional(myUnitOfWork)
async execute() {
  // Uses the explicitly provided UoW
}

TypeORMRepository

Base class for repositories with full Criteria support.

Configuration

export class UserRepository extends TypeORMRepository<User, UserEntity> {
  constructor(config: TypeORMRepositoryConfig<User, UserEntity>) {
    super(config);
  }

  // Override for default relations (eager loading)
  protected getDefaultRelations(): string[] {
    return ["posts", "posts.tags"];
  }

  // Override for searchable fields
  protected getSearchableFields(): SearchableField<UserEntity>[] {
    return [
      "name",                                  // Case-insensitive (default)
      "email",
      { field: "code", caseSensitive: true }, // Case-sensitive
      "posts.title",                           // Nested relation
    ];
  }
}

Methods

MethodReturnsDescription
find(criteria?)Promise<PaginatedResult<T>>Find with criteria
findById(id)Promise<T | null>Find by ID
findOne(criteria)Promise<T | null>Find first match
count(criteria?)Promise<number>Count matching entities
exists(id)Promise<boolean>Check if exists
save(entity)Promise<void>Save (create or update)
delete(entity)Promise<void>Delete entity

Criteria Queries

const criteria = Criteria.create<User>()
  .whereEquals("status", "active")
  .where("age", "greaterThan", 18)
  .whereContains("email", "@company.com")
  .search("john")
  .orderByDesc("createdAt")
  .paginate(1, 20);

const result = await userRepo.find(criteria);
// result.data - User[]
// result.meta - { page, limit, total, totalPages }
protected getSearchableFields(): SearchableField<UserEntity>[] {
  return [
    "title",                                   // Case-insensitive
    { field: "code", caseSensitive: true },   // Case-sensitive
    "author.name",                             // Nested relation
  ];
}

// Usage
const criteria = Criteria.create<Post>()
  .search("hello");

// Generates:
// WHERE (LOWER(post.title) LIKE LOWER('%hello%')
//        OR LOWER(author.name) LIKE LOWER('%hello%'))

TypeORMToPersistence

Base class for mapping domain aggregates to persistence.

Registry Configuration

export class UserToPersistenceMapper extends TypeORMToPersistence<User> {
  protected readonly registry = new EntitySchemaRegistry()
    .register({
      entity: "User",
      table: "users",
      collections: {
        posts: { type: "owned", entity: "Post" },
      },
    })
    .register({
      entity: "Post",
      table: "posts",
      fields: {
        content: "main_content", // Domain field → DB column
      },
      parentFk: {
        field: "authorId",
        parentEntity: "User",
      },
      collections: {
        tags: {
          type: "reference", // N:N
          entity: "Tag",
          junction: {
            table: "_PostToTag",
            sourceKey: "A",
            targetKey: "B",
          },
        },
      },
    });

  protected readonly entityClasses = new Map([
    ["User", UserEntity],
    ["Post", PostEntity],
    ["Tag", TagEntity],
  ]);

  protected async onCreate(aggregate: User, em: EntityManager): Promise<void> {
    // Create root entity
    const entity = new UserEntity();
    entity.id = aggregate.id.value;
    entity.email = aggregate.email;
    entity.name = aggregate.name;
    await em.save(entity);

    // Create owned entities
    for (const post of aggregate.posts) {
      const postEntity = new PostEntity();
      postEntity.id = post.id.value;
      postEntity.title = post.title;
      postEntity.authorId = aggregate.id.value;
      await em.save(postEntity);
    }
  }
}

Collection Types

TypeBehaviorUse Case
ownedCreates/deletes child entities1:N relationships (User → Posts)
referenceConnects/disconnects via junctionN:N relationships (Post ↔ Tags)

TypeORMBatchExecutor

Executes batch operations from AggregateChanges.

Execution Order

  1. Deletes (leaf → root by depth DESC)
    • Owned: Delete entities
    • Reference: Unlink from junction table
  2. Creates (root → leaf by depth ASC)
    • Owned: Create entities
    • Reference: Insert into junction table
  3. Updates (any order)

Direct Usage

import { TypeORMBatchExecutor } from "@woltz/rich-domain-typeorm";

const executor = new TypeORMBatchExecutor({
  registry: schemaRegistry,
  entityManager: uow.getCurrentEntityManager(),
  entityClasses: new Map([
    ["Post", PostEntity],
    ["Comment", CommentEntity],
    ["Tag", TagEntity],
  ]),
});

await executor.execute(aggregateChanges);

Convenience Function

import { executeBatch } from "@woltz/rich-domain-typeorm";

await executeBatch(entityManager, changes, {
  registry: schemaRegistry,
  entityClasses: entityClassMap,
});

N:N Relations

Configuration

protected readonly registry = new EntitySchemaRegistry()
  .register({
    entity: "Post",
    table: "posts",
    collections: {
      tags: {
        type: "reference",
        entity: "Tag",
        junction: {
          table: "_PostToTag",  // Junction table name
          sourceKey: "A",       // Column for Post ID
          targetKey: "B",       // Column for Tag ID
        },
      },
    },
  });

Usage

const post = await postRepo.findById(postId);

// Add tag (inserts into junction table)
post.addTag(new Tag({ id: Id.from("promo"), name: "Promo" }));
await postRepo.save(post);
// → INSERT INTO "_PostToTag" ("A", "B") VALUES (postId, 'promo')

// Remove tag (deletes from junction table)
post.removeTag(Id.from("promo"));
await postRepo.save(post);
// → DELETE FROM "_PostToTag" WHERE "A" = postId AND "B" = 'promo'

Error Handling

The adapter provides specific error types:
import {
  TypeORMAdapterError,
  EntityClassNotFoundError,
  TableNotFoundError,
  BatchOperationError,
  NoRecordsAffectedError,
  TypeORMRepositoryError,
} from "@woltz/rich-domain-typeorm";

try {
  await userRepo.save(user);
} catch (error) {
  if (error instanceof EntityClassNotFoundError) {
    // Entity class not registered in entityClasses map
  } else if (error instanceof TableNotFoundError) {
    // Entity not in registry
  } else if (error instanceof BatchOperationError) {
    // Batch operation failed
  } else if (error instanceof TypeORMRepositoryError) {
    // General repository error
  }
}

Complete Example

See the fastify-with-typeorm example for a complete working application demonstrating:
  • User aggregate with Posts (1:N owned)
  • Post with Tags (N:N reference via junction table)
  • Case-insensitive search
  • Transaction management
  • CRUD operations
  • Domain events with BullMQ

API Reference

Exports

// Unit of Work
export { TypeORMUnitOfWork, UOWStorage, getCurrentTypeORMContext };
export type { TypeORMTransactionContext };

// Repository
export { TypeORMRepository };
export type { TypeORMRepositoryConfig };

// Mappers
export { TypeORMToPersistence };
export { TypeORMToDomain };

// Batch Executor
export { TypeORMBatchExecutor, executeBatch };
export type { TypeORMBatchExecutorConfig };

// Query Builder
export { TypeORMQueryBuilder };
export type { SearchableField, SearchableFieldConfig };

// Decorator
export { Transactional };

// Errors
export {
  TypeORMAdapterError,
  EntityClassNotFoundError,
  TableNotFoundError,
  BatchOperationError,
  NoRecordsAffectedError,
  TypeORMRepositoryError,
};

TypeORMRepositoryConfig

interface TypeORMRepositoryConfig<TDomain, TEntity> {
  typeormRepository: Repository<TEntity>;
  toDomainMapper: Mapper<TEntity, TDomain>;
  toPersistenceMapper: TypeORMToPersistence<TDomain>;
  uow: TypeORMUnitOfWork;
  alias?: string; // Default: "entity"
}

SearchableField

type SearchableField<T> =
  | keyof T                    // Simple field
  | `${string}.${string}`      // Nested field
  | {
      field: string;
      caseSensitive?: boolean; // Default: false
    };

EntitySchemaRegistry Configuration

interface SchemaConfig {
  entity: string;
  table: string;
  fields?: Record<string, string>;
  collections?: Record<string, {
    type: "owned" | "reference";
    entity: string;
    junction?: {
      table: string;
      sourceKey: string;
      targetKey: string;
    };
  }>;
  parentFk?: {
    field: string;
    parentEntity: string;
  };
}