事件驱动架构概述
事件驱动架构(EDA)是一种通过事件的产生、检测和消费来实现系统组件间松耦合通信的架构风格。
事件驱动架构优势:
┌─────────────────────────────────────────┐
│ ✓ 松耦合:生产者不知道消费者是谁 │
│ ✓ 可扩展:新增消费者不影响现有系统 │
│ ✓ 审计追踪:完整的事件历史记录 │
│ ✓ 时间解耦:异步处理,提高响应速度 │
│ ✓ 弹性:失败可重试,不影响主流程 │
└─────────────────────────────────────────┘
领域事件设计
事件定义
// src/domain/events/baseEvent.ts
export interface DomainEvent {
eventId: string;
eventType: string;
aggregateId: string;
aggregateType: string;
timestamp: Date;
version: number;
metadata?: Record<string, any>;
}
// src/domain/events/orderEvents.ts
export class OrderCreatedEvent implements DomainEvent {
readonly eventId: string;
readonly eventType = 'OrderCreated';
readonly aggregateType = 'Order';
readonly timestamp: Date;
readonly version = 1;
constructor(
public readonly aggregateId: string,
public readonly userId: string,
public readonly items: OrderItem[],
public readonly totalAmount: number,
public readonly shippingAddress: Address,
metadata?: Record<string, any>
) {
this.eventId = generateUUID();
this.timestamp = new Date();
this.metadata = metadata;
}
}
export class OrderConfirmedEvent implements DomainEvent {
readonly eventId: string;
readonly eventType = 'OrderConfirmed';
readonly aggregateType = 'Order';
readonly timestamp: Date;
readonly version = 1;
constructor(
public readonly aggregateId: string,
public readonly confirmedAt: Date,
public readonly estimatedDelivery: Date,
metadata?: Record<string, any>
) {
this.eventId = generateUUID();
this.timestamp = new Date();
this.metadata = metadata;
}
}
export class OrderCancelledEvent implements DomainEvent {
readonly eventId: string;
readonly eventType = 'OrderCancelled';
readonly aggregateType = 'Order';
readonly timestamp: Date;
readonly version = 1;
constructor(
public readonly aggregateId: string,
public readonly reason: string,
public readonly cancelledBy: string,
metadata?: Record<string, any>
) {
this.eventId = generateUUID();
this.timestamp = new Date();
this.metadata = metadata;
}
}
export class OrderShippedEvent implements DomainEvent {
readonly eventId: string;
readonly eventType = 'OrderShipped';
readonly aggregateType = 'Order';
readonly timestamp: Date;
readonly version = 1;
constructor(
public readonly aggregateId: string,
public readonly trackingNumber: string,
public readonly carrier: string,
public readonly shippedAt: Date,
metadata?: Record<string, any>
) {
this.eventId = generateUUID();
this.timestamp = new Date();
this.metadata = metadata;
}
}
聚合根发布事件
// src/domain/entities/Order.ts
import { DomainEvent } from '../events/baseEvent';
import { OrderCreatedEvent, OrderConfirmedEvent, OrderCancelledEvent } from '../events/orderEvents';
export class Order {
private domainEvents: DomainEvent[] = [];
constructor(
public readonly id: string,
public userId: string,
public items: OrderItem[],
public totalAmount: number,
public shippingAddress: Address,
public status: OrderStatus = 'PENDING',
public createdAt: Date = new Date()
) {}
static create(
userId: string,
items: OrderItem[],
shippingAddress: Address
): Order {
const totalAmount = items.reduce(
(sum, item) => sum + item.price * item.quantity,
0
);
const order = new Order(
generateUUID(),
userId,
items,
totalAmount,
shippingAddress
);
// 发布领域事件
order.addDomainEvent(
new OrderCreatedEvent(
order.id,
userId,
items,
totalAmount,
shippingAddress,
{ correlationId: generateCorrelationId() }
)
);
return order;
}
confirm(estimatedDelivery: Date): void {
if (this.status !== 'PENDING') {
throw new BusinessRuleError('Only pending orders can be confirmed');
}
this.status = 'CONFIRMED';
this.addDomainEvent(
new OrderConfirmedEvent(
this.id,
new Date(),
estimatedDelivery
)
);
}
cancel(reason: string, cancelledBy: string): void {
if (this.status === 'SHIPPED') {
throw new BusinessRuleError('Cannot cancel shipped order');
}
this.status = 'CANCELLED';
this.addDomainEvent(
new OrderCancelledEvent(
this.id,
reason,
cancelledBy
)
);
}
// 获取并清除领域事件
pullDomainEvents(): DomainEvent[] {
const events = [...this.domainEvents];
this.domainEvents = [];
return events;
}
private addDomainEvent(event: DomainEvent): void {
this.domainEvents.push(event);
}
}
// src/application/services/OrderApplicationService.ts
export class OrderApplicationService {
constructor(
private orderRepository: OrderRepository,
private eventPublisher: EventPublisher
) {}
async createOrder(command: CreateOrderCommand): Promise<string> {
// 1. 创建聚合根
const order = Order.create(
command.userId,
command.items,
command.shippingAddress
);
// 2. 保存到数据库(事务)
await this.orderRepository.save(order);
// 3. 发布领域事件
const events = order.pullDomainEvents();
await this.eventPublisher.publishAll(events);
return order.id;
}
async confirmOrder(orderId: string, estimatedDelivery: Date): Promise<void> {
const order = await this.orderRepository.findById(orderId);
if (!order) {
throw new NotFoundError('Order', orderId);
}
order.confirm(estimatedDelivery);
await this.orderRepository.save(order);
const events = order.pullDomainEvents();
await this.eventPublisher.publishAll(events);
}
}
事件总线实现
内存事件总线
// src/infrastructure/events/InMemoryEventBus.ts
import { EventEmitter } from 'events';
import { DomainEvent } from '../../domain/events/baseEvent';
type EventHandler = (event: DomainEvent) => Promise<void>;
export class InMemoryEventBus {
private emitter: EventEmitter;
constructor() {
this.emitter = new EventEmitter();
this.emitter.setMaxListeners(100); // 增加监听器数量限制
}
subscribe(eventType: string, handler: EventHandler): void {
this.emitter.on(eventType, async (event: DomainEvent) => {
try {
await handler(event);
} catch (error) {
console.error(`Error handling event ${eventType}:`, error);
// 可以发送到死信队列
}
});
}
async publish(event: DomainEvent): Promise<void> {
this.emitter.emit(event.eventType, event);
}
async publishAll(events: DomainEvent[]): Promise<void> {
for (const event of events) {
await this.publish(event);
}
}
}
// 使用示例
const eventBus = new InMemoryEventBus();
// 订阅订单创建事件
eventBus.subscribe('OrderCreated', async (event: OrderCreatedEvent) => {
await inventoryService.reserveStock(event.aggregateId, event.items);
});
eventBus.subscribe('OrderCreated', async (event: OrderCreatedEvent) => {
await notificationService.sendOrderConfirmation(event.userId, event.aggregateId);
});
// 订阅订单确认事件
eventBus.subscribe('OrderConfirmed', async (event: OrderConfirmedEvent) => {
await warehouseService.prepareShipment(event.aggregateId);
});
Kafka事件总线
// src/infrastructure/events/KafkaEventBus.ts
import { Kafka, Producer, Consumer, EachMessagePayload } from 'kafkajs';
import { DomainEvent } from '../../domain/events/baseEvent';
export class KafkaEventBus {
private producer: Producer;
private consumers: Map<string, Consumer> = new Map();
private topic: string;
constructor(
private brokers: string[],
private clientId: string,
topic: string = 'domain-events'
) {
const kafka = new Kafka({
clientId,
brokers
});
this.producer = kafka.producer();
this.topic = topic;
}
async connect(): Promise<void> {
await this.producer.connect();
console.log('Kafka producer connected');
}
async publish(event: DomainEvent): Promise<void> {
await this.producer.send({
topic: this.topic,
messages: [
{
key: event.aggregateId,
value: JSON.stringify(event),
headers: {
eventType: event.eventType,
aggregateType: event.aggregateType,
timestamp: event.timestamp.toISOString()
}
}
]
});
console.log(`Published event: ${event.eventType} for ${event.aggregateId}`);
}
async subscribe(
eventType: string,
groupId: string,
handler: (event: DomainEvent) => Promise<void>
): Promise<void> {
const kafka = new Kafka({
clientId: this.clientId,
brokers: this.brokers
});
const consumer = kafka.consumer({ groupId });
await consumer.connect();
await consumer.subscribe({ topic: this.topic, fromBeginning: false });
await consumer.run({
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
const eventTypeHeader = message.headers?.eventType?.toString();
if (eventTypeHeader === eventType) {
const event = JSON.parse(message.value?.toString() || '{}');
try {
await handler(event);
} catch (error) {
console.error(`Error handling event ${eventType}:`, error);
// 发送到死信队列或重试
throw error;
}
}
}
});
this.consumers.set(`${eventType}-${groupId}`, consumer);
console.log(`Subscribed to ${eventType} with group ${groupId}`);
}
async disconnect(): Promise<void> {
await this.producer.disconnect();
for (const consumer of this.consumers.values()) {
await consumer.disconnect();
}
console.log('Kafka disconnected');
}
}
// 使用示例
const eventBus = new KafkaEventBus(
['kafka1:9092', 'kafka2:9092'],
'order-service'
);
await eventBus.connect();
// 发布事件
await eventBus.publish(orderCreatedEvent);
// 订阅事件
await eventBus.subscribe(
'OrderCreated',
'inventory-service-group',
async (event) => {
await inventoryService.reserveStock(event.aggregateId, event.items);
}
);
事件溯源(Event Sourcing)
事件存储
// src/infrastructure/persistence/EventStore.ts
import { Pool } from 'pg';
import { DomainEvent } from '../../domain/events/baseEvent';
export class PostgresEventStore {
constructor(private pool: Pool) {
this.initializeSchema();
}
private async initializeSchema(): Promise<void> {
await this.pool.query(`
CREATE TABLE IF NOT EXISTS events (
event_id UUID PRIMARY KEY,
event_type VARCHAR(255) NOT NULL,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB,
version INTEGER NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
UNIQUE(aggregate_id, version)
);
CREATE INDEX IF NOT EXISTS idx_events_aggregate
ON events(aggregate_id, version);
CREATE INDEX IF NOT EXISTS idx_events_type
ON events(event_type, created_at);
`);
}
async saveEvents(
aggregateId: string,
events: DomainEvent[],
expectedVersion: number
): Promise<void> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// 乐观锁检查
const currentVersion = await this.getAggregateVersion(client, aggregateId);
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but got ${currentVersion}`
);
}
// 保存事件
for (const event of events) {
await client.query(
`INSERT INTO events
(event_id, event_type, aggregate_id, aggregate_type,
event_data, metadata, version, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
[
event.eventId,
event.eventType,
event.aggregateId,
event.aggregateType,
JSON.stringify(event),
event.metadata ? JSON.stringify(event.metadata) : null,
event.version,
event.timestamp
]
);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async loadEvents(aggregateId: string): Promise<DomainEvent[]> {
const result = await this.pool.query(
`SELECT event_data FROM events
WHERE aggregate_id = $1
ORDER BY version ASC`,
[aggregateId]
);
return result.rows.map(row => {
const eventData = row.event_data;
return this.deserializeEvent(eventData);
});
}
private async getAggregateVersion(
client: any,
aggregateId: string
): Promise<number> {
const result = await client.query(
`SELECT MAX(version) as version FROM events WHERE aggregate_id = $1`,
[aggregateId]
);
return result.rows[0]?.version || 0;
}
private deserializeEvent(eventData: any): DomainEvent {
// 根据eventType反序列化为具体的事件类
const eventMap = {
'OrderCreated': OrderCreatedEvent,
'OrderConfirmed': OrderConfirmedEvent,
'OrderCancelled': OrderCancelledEvent,
// ...
};
const EventClass = eventMap[eventData.eventType];
if (!EventClass) {
throw new Error(`Unknown event type: ${eventData.eventType}`);
}
return Object.assign(new EventClass(), eventData);
}
}
// src/domain/aggregates/EventSourcedOrder.ts
export class EventSourcedOrder {
private version: number = 0;
constructor(
public readonly id: string,
public userId: string = '',
public items: OrderItem[] = [],
public totalAmount: number = 0,
public status: OrderStatus = 'PENDING',
private changes: DomainEvent[] = []
) {}
// 从事件流重建状态
loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.apply(event, false);
}
}
// 创建新订单
static create(
userId: string,
items: OrderItem[],
shippingAddress: Address
): EventSourcedOrder {
const order = new EventSourcedOrder(generateUUID());
const totalAmount = items.reduce(
(sum, item) => sum + item.price * item.quantity,
0
);
const event = new OrderCreatedEvent(
order.id,
userId,
items,
totalAmount,
shippingAddress
);
order.apply(event, true);
return order;
}
confirm(estimatedDelivery: Date): void {
if (this.status !== 'PENDING') {
throw new BusinessRuleError('Only pending orders can be confirmed');
}
const event = new OrderConfirmedEvent(
this.id,
new Date(),
estimatedDelivery
);
this.apply(event, true);
}
// 应用事件
private apply(event: DomainEvent, isNew: boolean): void {
switch (event.eventType) {
case 'OrderCreated':
const createdEvent = event as OrderCreatedEvent;
this.userId = createdEvent.userId;
this.items = createdEvent.items;
this.totalAmount = createdEvent.totalAmount;
this.status = 'PENDING';
break;
case 'OrderConfirmed':
this.status = 'CONFIRMED';
break;
case 'OrderCancelled':
this.status = 'CANCELLED';
break;
}
this.version++;
if (isNew) {
this.changes.push(event);
}
}
// 获取未保存的变更
getChanges(): DomainEvent[] {
return this.changes;
}
clearChanges(): void {
this.changes = [];
}
}
// src/application/services/EventSourcedOrderService.ts
export class EventSourcedOrderService {
constructor(
private eventStore: PostgresEventStore,
private eventPublisher: EventPublisher
) {}
async createOrder(command: CreateOrderCommand): Promise<string> {
const order = EventSourcedOrder.create(
command.userId,
command.items,
command.shippingAddress
);
const changes = order.getChanges();
await this.eventStore.saveEvents(order.id, changes, 0);
await this.eventPublisher.publishAll(changes);
order.clearChanges();
return order.id;
}
async confirmOrder(orderId: string, estimatedDelivery: Date): Promise<void> {
const events = await this.eventStore.loadEvents(orderId);
if (events.length === 0) {
throw new NotFoundError('Order', orderId);
}
const order = new EventSourcedOrder(orderId);
order.loadFromHistory(events);
const currentVersion = events.length;
order.confirm(estimatedDelivery);
const changes = order.getChanges();
await this.eventStore.saveEvents(orderId, changes, currentVersion);
await this.eventPublisher.publishAll(changes);
}
}
CQRS(命令查询职责分离)
// src/application/commands/OrderCommands.ts
export interface Command {
commandId: string;
timestamp: Date;
}
export class CreateOrderCommand implements Command {
readonly commandId = generateUUID();
readonly timestamp = new Date();
constructor(
public readonly userId: string,
public readonly items: OrderItem[],
public readonly shippingAddress: Address
) {}
}
// src/application/queries/OrderQueries.ts
export interface OrderReadModel {
id: string;
userId: string;
items: OrderItem[];
totalAmount: number;
status: string;
createdAt: Date;
customerName?: string;
productNames?: string[];
}
export class GetOrderQuery {
constructor(public readonly orderId: string) {}
}
export class GetUserOrdersQuery {
constructor(
public readonly userId: string,
public readonly page: number = 1,
public readonly pageSize: number = 20
) {}
}
// src/infrastructure/persistence/OrderReadModelProjector.ts
export class OrderReadModelProjector {
constructor(private readDb: Pool) {
this.initializeSchema();
}
private async initializeSchema(): Promise<void> {
await this.readDb.query(`
CREATE TABLE IF NOT EXISTS order_read_models (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
items JSONB NOT NULL,
total_amount DECIMAL NOT NULL,
status VARCHAR(50) NOT NULL,
customer_name VARCHAR(255),
product_names TEXT[],
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_orders_user
ON order_read_models(user_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_orders_status
ON order_read_models(status);
`);
}
async project(event: DomainEvent): Promise<void> {
switch (event.eventType) {
case 'OrderCreated':
await this.handleOrderCreated(event as OrderCreatedEvent);
break;
case 'OrderConfirmed':
await this.updateOrderStatus(event.aggregateId, 'CONFIRMED');
break;
case 'OrderCancelled':
await this.updateOrderStatus(event.aggregateId, 'CANCELLED');
break;
}
}
private async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
// 查询客户信息
const customer = await this.readDb.query(
'SELECT name FROM users WHERE id = $1',
[event.userId]
);
// 查询产品名称
const productIds = event.items.map(item => item.productId);
const products = await this.readDb.query(
'SELECT id, name FROM products WHERE id = ANY($1)',
[productIds]
);
const productNames = products.rows.map(p => p.name);
await this.readDb.query(
`INSERT INTO order_read_models
(id, user_id, items, total_amount, status, customer_name, product_names, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $8)`,
[
event.aggregateId,
event.userId,
JSON.stringify(event.items),
event.totalAmount,
'PENDING',
customer.rows[0]?.name || 'Unknown',
productNames,
event.timestamp
]
);
}
private async updateOrderStatus(orderId: string, status: string): Promise<void> {
await this.readDb.query(
`UPDATE order_read_models
SET status = $1, updated_at = NOW()
WHERE id = $2`,
[status, orderId]
);
}
}
// src/application/queries/OrderQueryService.ts
export class OrderQueryService {
constructor(private readDb: Pool) {}
async getOrder(query: GetOrderQuery): Promise<OrderReadModel | null> {
const result = await this.readDb.query(
'SELECT * FROM order_read_models WHERE id = $1',
[query.orderId]
);
return result.rows[0] || null;
}
async getUserOrders(query: GetUserOrdersQuery): Promise<{
orders: OrderReadModel[];
total: number;
}> {
const offset = (query.page - 1) * query.pageSize;
const [ordersResult, countResult] = await Promise.all([
this.readDb.query(
`SELECT * FROM order_read_models
WHERE user_id = $1
ORDER BY created_at DESC
LIMIT $2 OFFSET $3`,
[query.userId, query.pageSize, offset]
),
this.readDb.query(
'SELECT COUNT(*) FROM order_read_models WHERE user_id = $1',
[query.userId]
)
]);
return {
orders: ordersResult.rows,
total: parseInt(countResult.rows[0].count)
};
}
}
// 订阅事件并投影
const projector = new OrderReadModelProjector(readDb);
eventBus.subscribe('OrderCreated', 'read-model-projector', async (event) => {
await projector.project(event);
});
eventBus.subscribe('OrderConfirmed', 'read-model-projector', async (event) => {
await projector.project(event);
});
eventBus.subscribe('OrderCancelled', 'read-model-projector', async (event) => {
await projector.project(event);
});
总结
事件驱动架构的核心价值:
- 领域事件:捕获业务中发生的重要事实,作为系统通信的基础
- 事件总线:实现组件间的松耦合通信,支持多个消费者
- 事件溯源:通过事件流重建状态,提供完整的审计追踪
- CQRS:分离命令和查询,优化读写性能
关键原则:
- 事件是不可变的事实记录
- 使用事件实现服务间的松耦合
- 事件溯源提供强大的审计和调试能力
- CQRS允许针对读写场景分别优化
- 实现幂等性处理重复事件
延伸阅读
- Domain-Driven Design - Eric Evans
- Event Sourcing - Martin Fowler
- CQRS - Martin Fowler
- Apache Kafka文档
- EventStoreDB
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。