Overview
@woltz/rich-domain-prisma provides seamless integration between rich-domain and Prisma ORM. It leverages Prisma’s nested writes and transaction support to align perfectly with rich-domain’s change tracking and Unit of Work patterns.
Copy
npm install @woltz/rich-domain @woltz/rich-domain-prisma
Unit of Work
Request-isolated transactions with AsyncLocalStorage
Repository Base Class
PrismaRepository with built-in Criteria support
Change Tracking
PrismaToPersistence with automatic change detection
Batch Operations
PrismaBatchExecutor for efficient bulk writes
Quick Start
1. Setup
Copy
import { PrismaClient } from "@prisma/client";
import { PrismaUnitOfWork } from "@woltz/rich-domain-prisma";
const prisma = new PrismaClient();
const uow = new PrismaUnitOfWork(prisma);
2. Create Repository
Copy
import { PrismaRepository } from "@woltz/rich-domain-prisma";
class UserRepository extends PrismaRepository<User, UserRecord> {
protected readonly model = "user";
protected readonly includes = { posts: true, profile: true };
constructor(prisma: PrismaClient, uow: PrismaUnitOfWork) {
super(
new UserToPersistenceMapper(prisma, uow),
new UserToDomainMapper(),
prisma,
uow
);
}
}
3. Use It
Copy
const userRepository = new UserRepository(prisma, uow);
// Create
const user = new User({
name: "John",
email: "john@example.com",
posts: [],
});
await userRepository.save(user);
// Find with Criteria
const criteria = Criteria.create<User>()
.where("name", "contains", "John")
.orderBy("createdAt", "desc")
.paginate(1, 10);
const result = await userRepository.find(criteria);
// Update with change tracking
const found = await userRepository.findById(user.id.value);
found.name = "John Updated";
found.posts.push(new Post({ title: "Hello", content: "World" }));
await userRepository.save(found); // Only changed data is persisted
// Delete
await userRepository.delete(found);
PrismaUnitOfWork
Manages transactions with per-request isolation usingAsyncLocalStorage.
Copy
import { PrismaUnitOfWork } from "@woltz/rich-domain-prisma";
const uow = new PrismaUnitOfWork(prisma);
Transaction Execution
Copy
// Execute multiple operations atomically
await uow.transaction(async () => {
await userRepository.save(user);
await orderRepository.save(order);
await notificationRepository.save(notification);
// All or nothing - auto rollback on failure
});
Request Isolation
Each HTTP request gets its own transaction context, preventing cross-request interference:Copy
// Request 1
app.post("/users", async (req, res) => {
await uow.transaction(async () => {
// This transaction is isolated to Request 1
await userRepository.save(user);
});
});
// Request 2 (concurrent)
app.get("/users/:id", async (req, res) => {
// NOT affected by Request 1's transaction
const user = await userRepository.findById(req.params.id);
});
API Reference
| Method | Description |
|---|---|
transaction(work) | Execute work function in a transaction |
isInTransaction() | Check if currently in a transaction |
getCurrentContext() | Get current transaction context or null |
PrismaRepository
Base class for repositories with full Criteria support.Copy
import { PrismaRepository } from "@woltz/rich-domain-prisma";
abstract class PrismaRepository<TDomain, TPersistence> {
// Required: Prisma model name
protected abstract get model(): string;
// Optional: relations to include
protected readonly includes: Record<string, any> = {};
// Built-in methods
async find(criteria: Criteria<TDomain>): Promise<PaginatedResult<TDomain>>;
async findById(id: string): Promise<TDomain | null>;
async findOne(criteria: Criteria<TDomain>): Promise<TDomain | null>;
async count(criteria?: Criteria<TDomain>): Promise<number>;
async exists(id: string): Promise<boolean>;
async save(entity: TDomain): Promise<void>;
async delete(entity: TDomain): Promise<void>;
async deleteById(id: string): Promise<void>;
async transaction<T>(work: () => Promise<T>): Promise<T>;
}
Complete Implementation
Copy
import { PrismaRepository, PrismaUnitOfWork } from "@woltz/rich-domain-prisma";
import { Criteria } from "@woltz/rich-domain";
interface UserRecord {
id: string;
name: string;
email: string;
status: string;
posts: PostRecord[];
}
class UserRepository extends PrismaRepository<User, UserRecord> {
protected readonly model = "user";
protected readonly includes = {
posts: true,
profile: true,
};
constructor(prisma: PrismaClient, uow: PrismaUnitOfWork) {
super(
new UserToPersistenceMapper(prisma, uow),
new UserToDomainMapper(),
prisma,
uow
);
}
// Custom query methods
async findByEmail(email: string): Promise<User | null> {
const data = await this.modelAccessor.findUnique({
where: { email },
include: this.includes,
});
return data ? this.mapperToDomain.build(data) : null;
}
async findActiveUsers(): Promise<User[]> {
const criteria = Criteria.create<User>()
.whereEquals("status", "active")
.orderByDesc("createdAt");
const result = await this.find(criteria);
return result.data;
}
async findRecentlyJoined(days: number): Promise<User[]> {
const since = new Date();
since.setDate(since.getDate() - days);
const criteria = Criteria.create<User>()
.where("createdAt", "greaterThan", since)
.orderByDesc("createdAt");
const result = await this.find(criteria);
return result.data;
}
}
Context Awareness
The repository automatically uses the transaction context when available:Copy
class UserRepository extends PrismaRepository<User, UserRecord> {
// ...
// Use this.context instead of this.prisma for transaction support
async customQuery(): Promise<User[]> {
const data = await (this.context as any).user.findMany({
where: { status: "active" },
include: this.includes,
});
return data.map((d) => this.mapperToDomain.build(d));
}
}
PrismaToPersistence
Base mapper class with change tracking integration.Copy
import { PrismaToPersistence } from "@woltz/rich-domain-prisma";
import { EntitySchemaRegistry, AggregateChanges } from "@woltz/rich-domain";
abstract class PrismaToPersistence<TDomain> extends Mapper<TDomain, void> {
// Required: registry for field mapping
protected abstract readonly registry: EntitySchemaRegistry;
// Required: handle entity creation
protected abstract onCreate(entity: TDomain): Promise<void>;
// Required: handle entity update with changes
protected abstract onUpdate(
entity: TDomain,
changes: AggregateChanges
): Promise<void>;
// Available: current context (transaction or prisma)
protected get context(): PrismaClient | Transaction;
}
EntitySchemaRegistry
Maps domain entities to database tables and fields:Copy
import { EntitySchemaRegistry } from "@woltz/rich-domain";
const schemaRegistry = new EntitySchemaRegistry()
.register({
entity: "User",
table: "user",
})
.register({
entity: "Post",
table: "post",
fields: {
content: "main_content", // Domain field → DB column
},
parentFk: {
field: "authorId",
parentEntity: "User",
},
})
.register({
entity: "Comment",
table: "comment",
parentFk: {
field: "postId",
parentEntity: "Post",
},
});
Complete Mapper Implementation
Copy
import {
PrismaToPersistence,
PrismaBatchExecutor,
PrismaUnitOfWork,
} from "@woltz/rich-domain-prisma";
import { EntitySchemaRegistry, AggregateChanges } from "@woltz/rich-domain";
class UserToPersistenceMapper extends PrismaToPersistence<User> {
protected readonly registry = new EntitySchemaRegistry()
.register({ entity: "User", table: "user" })
.register({
entity: "Post",
table: "post",
fields: { content: "main_content" },
parentFk: { field: "authorId", parentEntity: "User" },
});
protected async onCreate(user: User): Promise<void> {
await this.context.user.create({
data: {
id: user.id.value,
name: user.name,
email: user.email,
status: user.status,
createdAt: new Date(),
updatedAt: new Date(),
// Nested create for posts
posts: user.posts.length
? {
createMany: {
data: user.posts.map((post) => ({
id: post.id.value,
title: post.title,
main_content: post.content,
published: post.published,
authorId: user.id.value,
createdAt: new Date(),
updatedAt: new Date(),
})),
},
}
: undefined,
},
});
}
protected async onUpdate(
user: User,
changes: AggregateChanges
): Promise<void> {
// Use BatchExecutor for efficient change processing
const executor = new PrismaBatchExecutor(this.context, {
registry: this.registry,
rootId: user.id.value,
dataMappers: {
Post: (item) => ({
id: item.data.id.value,
title: item.data.title,
main_content: item.data.content,
published: item.data.published,
authorId: item.parentId,
createdAt: item.data.createdAt ?? new Date(),
updatedAt: new Date(),
}),
},
});
await executor.execute(changes);
}
}
PrismaBatchExecutor
Executes batch operations fromAggregateChanges efficiently.
Copy
import { PrismaBatchExecutor } from "@woltz/rich-domain-prisma";
const executor = new PrismaBatchExecutor(prismaContext, {
registry: schemaRegistry,
rootId: aggregate.id.value,
dataMappers: {
Post: (item) => ({ ... }),
Comment: (item) => ({ ... }),
},
});
await executor.execute(changes);
Operation Order
The executor processes operations in the correct order for foreign key constraints:- Deletes: Leaf → Root (depth DESC) - delete children before parents
- Creates: Root → Leaf (depth ASC) - create parents before children
- Updates: Any order - no FK dependencies
Configuration
Copy
interface BatchExecutorConfig {
// Schema registry for table/field mapping
registry: EntitySchemaRegistry;
// Root entity ID (used as parentId for children)
rootId?: string;
// Custom data mappers per entity
dataMappers?: Record<string, EntityDataMapper>;
}
type EntityDataMapper<T = any> = (item: {
data: T;
parentId?: string;
parentEntity?: string;
}) => Record<string, any>;
Convenience Function
For one-off usage:Copy
import { executeBatch } from "@woltz/rich-domain-prisma";
await executeBatch(prismaContext, changes, {
registry: schemaRegistry,
rootId: user.id.value,
});
@Transactional Decorator
Automatically wraps methods in a transaction.Copy
import { Transactional } from "@woltz/rich-domain-prisma";
class CreateUserUseCase {
constructor(
private readonly userRepository: UserRepository,
private readonly uow: PrismaUnitOfWork // Required!
) {}
@Transactional()
async execute(input: CreateUserInput): Promise<User> {
// Everything here runs in a transaction
const existing = await this.userRepository.findByEmail(input.email);
if (existing) {
throw new EntityAlreadyExistsError("User", existing.id.value);
}
const user = new User({
name: input.name,
email: input.email,
posts: [],
});
await this.userRepository.save(user);
return user;
}
}
Behavior
| Scenario | Behavior |
|---|---|
| Direct call | Creates new transaction |
| Already in transaction | Reuses existing one |
| Error thrown | Automatic rollback |
The class must have a
uow property of type PrismaUnitOfWork for the
decorator to work.Complete Example
Putting it all together:Domain Model
Copy
import { Aggregate, Entity, Id } from "@woltz/rich-domain";
import { z } from "zod";
// Post Entity
const postSchema = z.object({
id: z.custom<Id>((v) => v instanceof Id),
title: z.string().min(1),
content: z.string(),
published: z.boolean(),
});
class Post extends Entity<z.infer<typeof postSchema>> {
protected static validation = { schema: postSchema };
get title() {
return this.props.title;
}
get content() {
return this.props.content;
}
get published() {
return this.props.published;
}
publish() {
this.props.published = true;
}
}
// User Aggregate
const userSchema = z.object({
id: z.custom<Id>((v) => v instanceof Id),
name: z.string().min(2),
email: z.string().email(),
status: z.enum(["active", "inactive"]),
posts: z.array(z.custom<Post>((v) => v instanceof Post)),
});
class User extends Aggregate<z.infer<typeof userSchema>> {
protected static validation = { schema: userSchema };
get name() {
return this.props.name;
}
set name(value: string) {
this.props.name = value;
}
get email() {
return this.props.email;
}
get status() {
return this.props.status;
}
get posts() {
return this.props.posts;
}
addPost(post: Post) {
this.props.posts.push(post);
}
removePost(postId: Id) {
const index = this.props.posts.findIndex((p) => p.id.equals(postId));
if (index !== -1) {
this.props.posts.splice(index, 1);
}
}
deactivate() {
this.props.status = "inactive";
}
}
Mappers
Copy
import { Mapper, EntitySchemaRegistry } from "@woltz/rich-domain";
import {
PrismaToPersistence,
PrismaBatchExecutor,
} from "@woltz/rich-domain-prisma";
// Schema Registry
const schemaRegistry = new EntitySchemaRegistry()
.register({ entity: "User", table: "user" })
.register({
entity: "Post",
table: "post",
parentFk: { field: "authorId", parentEntity: "User" },
});
// Domain Mapper
class UserToDomainMapper extends Mapper<UserRecord, User> {
build(record: UserRecord): User {
return new User({
id: Id.from(record.id),
name: record.name,
email: record.email,
status: record.status as "active" | "inactive",
posts: record.posts.map(
(p) =>
new Post({
id: Id.from(p.id),
title: p.title,
content: p.content,
published: p.published,
})
),
});
}
}
// Persistence Mapper
class UserToPersistenceMapper extends PrismaToPersistence<User> {
protected readonly registry = schemaRegistry;
protected async onCreate(user: User): Promise<void> {
await this.context.user.create({
data: {
id: user.id.value,
name: user.name,
email: user.email,
status: user.status,
posts: {
createMany: {
data: user.posts.map((p) => ({
id: p.id.value,
title: p.title,
content: p.content,
published: p.published,
authorId: user.id.value,
})),
},
},
},
});
}
protected async onUpdate(
user: User,
changes: AggregateChanges
): Promise<void> {
const executor = new PrismaBatchExecutor(this.context, {
registry: this.registry,
rootId: user.id.value,
dataMappers: {
Post: (item) => ({
id: item.data.id.value,
title: item.data.title,
content: item.data.content,
published: item.data.published,
authorId: item.parentId,
}),
},
});
await executor.execute(changes);
}
}
Repository
Copy
import { PrismaRepository, PrismaUnitOfWork } from "@woltz/rich-domain-prisma";
class UserRepository extends PrismaRepository<User, UserRecord> {
protected readonly model = "user";
protected readonly includes = { posts: true };
constructor(prisma: PrismaClient, uow: PrismaUnitOfWork) {
super(
new UserToPersistenceMapper(prisma, uow),
new UserToDomainMapper(),
prisma,
uow
);
}
async findByEmail(email: string): Promise<User | null> {
const data = await this.modelAccessor.findUnique({
where: { email },
include: this.includes,
});
return data ? this.mapperToDomain.build(data) : null;
}
}
Use Case
Copy
import { Transactional } from "@woltz/rich-domain-prisma";
import {
EntityNotFoundError,
EntityAlreadyExistsError,
} from "@woltz/rich-domain";
class UserService {
constructor(
private readonly userRepository: UserRepository,
private readonly uow: PrismaUnitOfWork
) {}
@Transactional()
async createUser(input: { name: string; email: string }): Promise<User> {
const existing = await this.userRepository.findByEmail(input.email);
if (existing) {
throw new EntityAlreadyExistsError("User", existing.id.value);
}
const user = new User({
name: input.name,
email: input.email,
status: "active",
posts: [],
});
await this.userRepository.save(user);
return user;
}
@Transactional()
async addPost(
userId: string,
postData: { title: string; content: string }
): Promise<User> {
const user = await this.userRepository.findById(userId);
if (!user) {
throw new EntityNotFoundError("User", userId);
}
const post = new Post({
title: postData.title,
content: postData.content,
published: false,
});
user.addPost(post);
await this.userRepository.save(user); // Change tracking detects the new post
return user;
}
@Transactional()
async publishAllPosts(userId: string): Promise<User> {
const user = await this.userRepository.findById(userId);
if (!user) {
throw new EntityNotFoundError("User", userId);
}
for (const post of user.posts) {
post.publish();
}
await this.userRepository.save(user); // Change tracking detects the updates
return user;
}
}
API Route (Fastify)
Copy
import Fastify from "fastify";
import { PrismaClient } from "@prisma/client";
import { PrismaUnitOfWork } from "@woltz/rich-domain-prisma";
import { Criteria } from "@woltz/rich-domain";
const prisma = new PrismaClient();
const uow = new PrismaUnitOfWork(prisma);
const userRepository = new UserRepository(prisma, uow);
const userService = new UserService(userRepository, uow);
const app = Fastify();
// Create user
app.post("/users", async (request, reply) => {
const user = await userService.createUser(request.body as any);
return reply.status(201).send(user.toJSON());
});
// List users with Criteria
app.get("/users", async (request, reply) => {
const criteria = Criteria.fromQueryParams<User>(
request.query as Record<string, string>
);
const result = await userRepository.find(criteria);
return result.toJSON();
});
// Add post to user
app.post("/users/:id/posts", async (request, reply) => {
const user = await userService.addPost(
request.params.id,
request.body as any
);
return user.toJSON();
});
app.listen({ port: 3000 });
API Reference
Exports
Copy
// Unit of Work
export { PrismaUnitOfWork, Transactional, getCurrentPrismaContext };
export type {
PrismaTransactionContext,
PrismaClientLike,
PrismaTransactionClient,
};
// Repository
export { PrismaRepository };
export type { PrismaRepositoryConfig };
// Mapper
export { PrismaToPersistence };
// Batch Executor
export { PrismaBatchExecutor, executeBatch };
export type { EntityDataMapper, BatchExecutorConfig };
PrismaUnitOfWork Methods
| Method | Returns | Description |
|---|---|---|
transaction(work) | Promise<T> | Execute work in transaction |
isInTransaction() | boolean | Check if in transaction |
getCurrentContext() | PrismaTransactionContext | null | Get current context |
PrismaRepository Methods
| Method | Returns | Description |
|---|---|---|
find(criteria) | Promise<PaginatedResult<T>> | Find with criteria |
findById(id) | Promise<T | null> | Find by ID |
findOne(criteria) | Promise<T | null> | Find first match |
count(criteria?) | Promise<number> | Count entities |
exists(id) | Promise<boolean> | Check if exists |
save(entity) | Promise<void> | Create or update |
delete(entity) | Promise<void> | Delete entity |
deleteById(id) | Promise<void> | Delete by ID |
transaction(work) | Promise<T> | Execute in transaction |