一架梯子,一头程序猿,仰望星空!
Watermill Go事件驱动框架教程 > 内容正文

CQRS


Golang实现的Watermill中的CQRS。

CQRS机制

CQRS意味着“命令查询职责分离”。我们将命令(写请求)和查询(读请求)的责任分离开。写请求和读请求由不同的对象处理。

这就是CQRS。我们还可以进一步分割数据存储,拥有单独的读和写存储。一旦这样做,可能会有许多读存储,针对处理不同类型的查询或跨越许多有界上下文进行优化。尽管单独的读/写存储通常是与CQRS相关讨论的主题,但这不是CQRS本身。CQRS仅是命令和查询的第一次分割。

CQRS架构图

cqrs组件提供了一些有用的抽象,构建在Pub/Sub和Router的基础之上,有助于实现CQRS模式。

您不需要实现整个CQRS。通常只使用该组件的事件部分构建事件驱动的应用程序。

构建模块

事件

事件表示已经发生的事情。事件是不可变的。

事件总线

完整源代码:github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
// EventBus将事件传输到事件处理程序。
type EventBus struct {
// ...

完整源代码:github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
type EventBusConfig struct {
	// GeneratePublishTopic用于生成发布事件的主题名称。
	GeneratePublishTopic GenerateEventPublishTopicFn

	// OnPublish在发送事件之前调用。
	// 可以修改*message.Message。
	//
	// 此选项不是必需的。
	OnPublish OnEventSendFn

	// Marshaler用于编码和解码事件。
	// 这是必需的。
	Marshaler CommandEventMarshaler

	// 用于记录的Logger实例。
	// 如果未提供,则使用watermill.NopLogger。
	Logger watermill.LoggerAdapter
}

func (c *EventBusConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

事件处理器

完整代码:github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
// EventProcessor 用于确定应该处理从事件总线接收到的事件的 EventHandler。
type EventProcessor struct {
// ...

完整代码:github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
type EventProcessorConfig struct {
	// GenerateSubscribeTopic 用于生成订阅事件的主题。
	// 如果事件处理器使用处理程序组,则使用 GenerateSubscribeTopic。
	GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor 用于为 EventHandler 创建订阅者。
	//
	// 此函数对每个 EventHandler 实例调用一次。
	// 如果要为多个处理程序重用一个订阅者,请使用 GroupEventProcessor。
	SubscriberConstructor EventProcessorSubscriberConstructorFn

	// OnHandle 在处理事件之前被调用。
	// OnHandle 的工作方式类似于中间件:您可以在处理事件之前和之后注入其他逻辑。
	//
	// 因此,您需要显式调用 params.Handler.Handle() 来处理事件。
	//
	//  func(params EventProcessorOnHandleParams) (err error) {
	//      // 处理之前的逻辑
	//      //  (...)

	//      err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//      // 处理之后的逻辑
	//      //  (...)

	//      return err
	//  }
	//
	// 此选项不是必需的。
	OnHandle EventProcessorOnHandleFn

	// AckOnUnknownEvent 用于确定是否应在事件没有定义处理程序时确认消息。
	AckOnUnknownEvent bool

	// Marshaler 用于编组和解组事件。
	// 必需的。
	Marshaler CommandEventMarshaler

	// 用于日志记录的 Logger 实例。
	// 如果未提供,将使用 watermill.NopLogger。
	Logger watermill.LoggerAdapter

	// disableRouterAutoAddHandlers 是用于保持向后兼容性的。
	// 当使用 NewEventProcessor 创建 EventProcessor 时,将设置该值。
	// 已弃用:请迁移到 NewEventProcessorWithConfig。
	disableRouterAutoAddHandlers bool
}

func (c *EventProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

事件组处理器

完整源码:github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
// EventGroupProcessor 决定应该由哪个事件处理程序处理从事件总线接收到的事件。
// 与 EventProcessor 相比,EventGroupProcessor 允许具有共享同一订阅者实例的多个处理程序。
type EventGroupProcessor struct {
// ...

完整源码:github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
type EventGroupProcessorConfig struct {
	// GenerateSubscribeTopic 用于生成订阅处理程序组事件的主题。
	// 如果使用处理程序组,则此选项对于 EventProcessor 是必需的。
	GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor 用于为 GroupEventHandler 创建订阅者。
	// 此函数在每个事件组中调用一次 - 这样可以为每个组创建一个订阅。
	// 当我们要按顺序处理来自一个流的事件时,它非常有用。
	SubscriberConstructor EventGroupProcessorSubscriberConstructorFn

	// OnHandle 在处理事件之前调用。
	// OnHandle 类似于中间件:您可以在处理事件之前和之后注入其他逻辑。
	//
	// 因此,您需要显式调用 params.Handler.Handle() 来处理事件。
	//
	//  func(params EventGroupProcessorOnHandleParams) (err error) {
	//      // 处理之前的逻辑
	//      //  (...)
	//
	//      err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//      // 处理之后的逻辑
	//      //  (...)
	//
	//      return err
	//  }
	//
	// 此选项不是必需的。
	OnHandle EventGroupProcessorOnHandleFn

	// AckOnUnknownEvent 用于决定是否应该确认如果事件没有定义处理程序。
	AckOnUnknownEvent bool

	// Marshaler 用于编码和解码事件。
	// 这是必需的。
	Marshaler CommandEventMarshaler

	// 用于记录的 Logger 实例。
	// 如果未提供,则使用 watermill.NopLogger。
	Logger watermill.LoggerAdapter
}

func (c *EventGroupProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

了解更多关于事件组处理器的信息。

事件处理程序

完整源码:github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// EventHandler 接收由 NewEvent 定义的事件,并使用其 Handle 方法处理它们。
// 如果使用 DDD,事件处理程序可以修改和持久化聚合。
// 它还可以调用流程管理器、saga 或只是构建读模型。
//
// 与命令处理程序不同,每个事件可以有多个事件处理程序。
//
// 在处理消息期间,使用一个 EventHandler 实例。
// 当同时传递多个事件时,Handle 方法可以同时执行多次。
// 因此,Handle 方法需要是线程安全的!
type EventHandler interface {
// ...

命令

命令是一个简单的数据结构,表示执行某些操作的请求。

命令总线

完整源码:github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
// CommandBus是将命令传输给命令处理程序的组件。
type CommandBus struct {
// ...

完整源码:github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
type CommandBusConfig struct {
	// GeneratePublishTopic用于生成发布命令的主题。
	GeneratePublishTopic CommandBusGeneratePublishTopicFn

	// OnSend在发布命令之前调用。
	// 可以修改 *message.Message。
	//
	// 该选项不是必需的。
	OnSend CommandBusOnSendFn

	// Marshaler用于序列化和反序列化命令。
	// 必需的。
	Marshaler CommandEventMarshaler

	// 用于记录日志的Logger实例。
	// 如果未提供,将使用watermill.NopLogger。
	Logger watermill.LoggerAdapter
}

func (c *CommandBusConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

命令处理器

完整源码:github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
// CommandProcessorSubscriberConstructorFn为CommandHandler创建订阅者。
// 它允许您为每个命令处理程序创建单独的自定义订阅者。
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...

完整源码:github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
type CommandProcessorConfig struct {
	// GenerateSubscribeTopic用于生成订阅命令的主题。
	GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor用于为CommandHandler创建订阅者。
	SubscriberConstructor CommandProcessorSubscriberConstructorFn

	// OnHandle在处理命令之前调用。
	// OnHandle的工作方式与中间件类似:您可以在处理命令之前和之后注入附加的逻辑。
	//
	// 由于这一点,您需要显式调用params.Handler.Handle()来处理命令。
	//  func(params CommandProcessorOnHandleParams) (err error) {
	//      // 逻辑在处理之前
	//      //  (...)
	//
	//      err := params.Handler.Handle(params.Message.Context(), params.Command)
	//
	//      // 逻辑在处理之后
	//      //  (...)
	//
	//      return err
	//  }
	//
	// 该选项不是必需的。
	OnHandle CommandProcessorOnHandleFn

	// Marshaler用于序列化和反序列化命令。
	// 必需的。
	Marshaler CommandEventMarshaler

	// 用于记录日志的Logger实例。
	// 如果未提供,将使用watermill.NopLogger。
	Logger watermill.LoggerAdapter

	// 如果为true,即使CommandHandler返回错误,CommandProcessor也会ack消息。
	// 如果RequestReplyBackend不为null并且发送回复失败,消息仍将被否认。
	//
	// 警告:在使用requestreply组件(requestreply.NewCommandHandler或requestreply.NewCommandHandlerWithResult)时,不建议使用此选项,
	// 因为当发送回复失败时,它可能会ack命令。
	//
	// 当使用requestreply时,您应该使用requestreply.PubSubBackendConfig.AckCommandErrors。
	AckCommandHandlingErrors bool

	// disableRouterAutoAddHandlers用于保持向后兼容性。
	// 当由NewCommandProcessor创建CommandProcessor时设置它。
	// 已弃用:请迁移到NewCommandProcessorWithConfig。
	disableRouterAutoAddHandlers bool
}

func (c *CommandProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

命令处理器

完整的源码:github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// CommandHandler接收由NewCommand定义的命令,并使用Handle方法处理它。
// 如果使用DDD,CommandHandler可能会修改和持久化聚合。
//
// 与EventHandler不同,每个Command只能有一个CommandHandler。
//
// 在处理消息期间,使用一个CommandHandler实例。
// 当多个命令同时被投递时,Handle方法可以同时执行多次。
// 因此,Handle方法需要是线程安全的!
type CommandHandler interface {
// ...

命令和事件编组器

完整的源码:github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go

// ...
// CommandEventMarshaler将命令和事件编组为Watermill的消息,反之亦然。
// 命令的有效负载需要被编组为[]bytes。
type CommandEventMarshaler interface {
	// Marshal将命令或事件编组为Watermill的消息。
	Marshal(v interface{}) (*message.Message, error)

	// Unmarshal将Watermill的消息解码为v命令或事件。
	Unmarshal(msg *message.Message, v interface{}) (err error)

	// Name返回命令或事件的名称。
	// 名称可用于确定所接收到的命令或事件是否是我们要处理的事件。
	Name(v interface{}) string

	// NameFromMessage从Watermill的消息中返回命令或事件的名称(由Marshal生成)。
	//
	// 当我们有编组为Watermill的消息的命令或事件时,
	// 我们应该使用NameFromMessage而不是Name,以避免不必要的解码。
	NameFromMessage(msg *message.Message) string
}
// ...

使用方法

示例领域

以一个简单的领域为例,该领域负责在酒店中处理房间预订。

我们将使用Event Storming符号来展示此领域的模型。

符号说明:

  • 蓝色便笺是命令
  • 橙色便笺是事件
  • 绿色便笺是从事件异步生成的读取模型
  • 紫色便笺是由事件触发并生成命令的策略
  • 粉色便笺是热点区域;我们标记经常出现问题的地方

CQRS Event Storming

领域很简单:

  • 客户可以预订房间
  • 每当预订一间房,我们就为客户订购一瓶啤酒(因为我们热爱我们的客人)。
    • 我们知道有时啤酒不够
  • 我们会基于预订生成一份财务报告

发送命令

首先,我们需要模拟客户的动作。

完整的源码:github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf

// ...
		bookRoomCmd := &BookRoom{
			RoomId:    fmt.Sprintf("%d", i),
			GuestName: "John",
			StartDate: startDate,
			EndDate:   endDate,
		}
		if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
			panic(err)
		}
// ...

命令处理器

BookRoomHandler将处理我们的命令。

完整源代码: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookRoomHandler是一个命令处理器,处理BookRoom命令并发出RoomBooked事件。
//
// 在CQRS中,一种命令必须由一个处理器处理。
// 当添加另一个处理此命令的处理器到命令处理器时,将返回错误。
type BookRoomHandler struct {
	eventBus *cqrs.EventBus
}

func (b BookRoomHandler) HandlerName() string {
	return "BookRoomHandler"
}

// NewCommand返回此处理器应处理的命令类型。它必须是一个指针。
func (b BookRoomHandler) NewCommand() interface{} {
	return &BookRoom{}
}

func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
	// c始终是由`NewCommand`返回的类型,强制转换始终是安全的
	cmd := c.(*BookRoom)

	// 一些随机价格,在实际生产中可能会以更明智的方式计算
	price := (rand.Int63n(40) + 1) * 10

	log.Printf(
		"预订了%s,从%s到%s",
		cmd.RoomId,
		cmd.GuestName,
		time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
		time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
	)

	// RoomBooked将由OrderBeerOnRoomBooked事件处理器处理,
	// 将来RoomBooked可以由多个事件处理器处理
	if err := b.eventBus.Publish(ctx, &RoomBooked{
		ReservationId: watermill.NewUUID(),
		RoomId:        cmd.RoomId,
		GuestName:     cmd.GuestName,
		Price:         price,
		StartDate:     cmd.StartDate,
		EndDate:       cmd.EndDate,
	}); err != nil {
		return err
	}

	return nil
}

// OrderBeerOnRoomBooked是一个事件处理器,处理RoomBooked事件并发出OrderBeer命令。
// ...

事件处理器

如前所述,我们希望每次订房时都能订一瓶啤酒(*“当房间被预订时”*贴纸)。我们通过使用OrderBeer命令来实现。

完整源代码: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// OrderBeerOnRoomBooked是一个事件处理器,处理RoomBooked事件并发出OrderBeer命令。
type OrderBeerOnRoomBooked struct {
	commandBus *cqrs.CommandBus
}

func (o OrderBeerOnRoomBooked) HandlerName() string {
	// 这个名称传给EventsSubscriberConstructor用于生成队列名称
	return "OrderBeerOnRoomBooked"
}

func (OrderBeerOnRoomBooked) NewEvent() interface{} {
	return &RoomBooked{}
}

func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
	event := e.(*RoomBooked)

	orderBeerCmd := &OrderBeer{
		RoomId: event.RoomId,
		Count:  rand.Int63n(10) + 1,
	}

	return o.commandBus.Send(ctx, orderBeerCmd)
}

// OrderBeerHandler是一个命令处理器,处理OrderBeer命令并发出BeerOrdered事件。
// ...

OrderBeerHandlerBookRoomHandler非常相似。唯一的区别是,当啤酒不足时,它有时会返回错误,导致命令被重新传递。您可以在示例源代码中找到完整的实现。

事件处理程序组

默认情况下,每个事件处理程序都有一个单独的订阅者实例。如果只有一个事件类型发送到主题,那么这种方式可以正常工作。

在主题上存在多个事件类型的情况下,有两种选择:

  1. 您可以将 EventConfig.AckOnUnknownEvent 设置为 true - 这将确认所有未由处理程序处理的事件。
  2. 您可以使用事件处理程序组机制。

要使用事件组,您需要在 EventConfig 中设置 GenerateHandlerGroupSubscribeTopicGroupSubscriberConstructor 选项。

然后,您可以在 EventProcessor 上使用 AddHandlersGroup

完整源码:github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Beer ordered", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
// ...

GenerateHandlerGroupSubscribeTopicGroupSubscriberConstructor 都在函数参数中接收有关组名称的信息。

通用处理程序

从 Watermill v1.3 开始,可以使用通用处理程序来处理命令和事件。当您有大量命令/事件且不想为每个命令/事件创建一个处理程序时,这非常有用。

完整源码:github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Beer ordered", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
// ...

在幕后,它创建了 EventHandler 或 CommandHandler 实现。它适用于所有类型的处理程序。

完整源码:github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// NewCommandHandler 根据提供的函数和从函数参数中推断出的命令类型创建一个新的 CommandHandler 实现。
func NewCommandHandler[Command any](
// ...

完整源码:github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewEventHandler 根据提供的函数和从函数参数中推断出的事件类型创建一个新的 EventHandler 实现。
func NewEventHandler[T any](
// ...

完整源码:github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewGroupEventHandler 根据提供的函数和从函数参数中推断出的事件类型创建一个新的 GroupEventHandler 实现。
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...

使用事件处理程序构建读模型

完整源代码:github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookingsFinancialReport是一个读模型,它计算我们可以从预订中赚取多少钱。
// 就像在发生RoomBooked事件时,它监听RoomBooked事件。
//
// 这个实现只是写入内存。在生产环境中,您可能会使用某种持久存储。
type BookingsFinancialReport struct {
	handledBookings map[string]struct{}
	totalCharge     int64
	lock            sync.Mutex
}

func NewBookingsFinancialReport() *BookingsFinancialReport {
	return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}

func (b BookingsFinancialReport) HandlerName() string {
	// 此名称传递给EventsSubscriberConstructor并用于生成队列名称
	return "BookingsFinancialReport"
}

func (BookingsFinancialReport) NewEvent() interface{} {
	return &RoomBooked{}
}

func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
	// Handle可能会并发调用,因此需要线程安全。
	b.lock.Lock()
	defer b.lock.Unlock()

	event := e.(*RoomBooked)

	// 当我们使用不提供精确一次交付语义的Pub/Sub时,我们需要对消息进行去重。
	// GoChannel Pub/Sub提供了精确一次交付,
	// 但是让我们为其他Pub/Sub实现准备这个示例。
	if _, ok := b.handledBookings[event.ReservationId]; ok {
		return nil
	}
	b.handledBookings[event.ReservationId] = struct{}{}

	b.totalCharge += event.Price

	fmt.Printf(">>> 已经预订了价值%d美元的房间\n", b.totalCharge)
	return nil
}

var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"

func main() {
// ...

连接一切

我们已经拥有了构建CQRS应用所需的所有组件。

我们将使用AMQP(RabbitMQ)作为我们的消息代理:AMQP。

在底层,CQRS使用了Watermill的消息路由器。如果您对此不熟悉,并且想要了解其工作原理,您应该查看入门指南。它还将向您展示如何使用某些标准的消息模式,如度量、毒消息队列、限流、关联和其他每个消息驱动应用程序所使用的工具。这些工具已经内置在Watermill中。

让我们回到CQRS。正如您已经知道的,CQRS由多个组件构成,如命令或事件总线、处理器等。

完整源代码:github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
func main() {
	logger := watermill.NewStdLogger(false, false)
	cqrsMarshaler := cqrs.ProtobufMarshaler{}

	// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
	// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
	// Commands will be send to queue, because they need to be consumed once.
	commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
	commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}
	commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}

	// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
	// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
	eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
	if err != nil {
		panic(err)
	}

	// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// Simple middleware which will recover panics from event or command handlers.
	// More about router middlewares you can find in the documentation:
	// https://watermill.io/docs/messages-router/#middleware
	//
	// List of available middlewares you can find in message/router/middleware.
	router.AddMiddleware(middleware.Recoverer)

	commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
		GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
			// we are using queue RabbitMQ config, so we need to have topic per command type
			return params.CommandName, nil
		},
		OnSend: func(params cqrs.CommandBusOnSendParams) error {
			logger.Info("Sending command", watermill.LogFields{
				"command_name": params.CommandName,
			})

			params.Message.Metadata.Set("sent_at", time.Now().String())

			return nil
		},
		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
		router,
		cqrs.CommandProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
				// we are using queue RabbitMQ config, so we need to have topic per command type
				return params.CommandName, nil
			},
			SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				// we can reuse subscriber, because all commands have separated topics
				return commandsSubscriber, nil
			},
			OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Command)

				logger.Info("Command handled", watermill.LogFields{
					"command_name": params.CommandName,
					"duration":     time.Since(start),
					"err":          err,
				})

				return err
			},
			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
		GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
			// because we are using PubSub RabbitMQ config, we can use one topic for all events
			return "events", nil

			// we can also use topic per event type
			// return params.EventName, nil
		},

		OnPublish: func(params cqrs.OnEventSendParams) error {
			logger.Info("Publishing event", watermill.LogFields{
				"event_name": params.EventName,
			})

			params.Message.Metadata.Set("published_at", time.Now().String())

			return nil
		},

		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
		router,
		cqrs.EventGroupProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
				return "events", nil
			},
			SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				config := amqp.NewDurablePubSubConfig(
					amqpAddress,
					amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
				)

				return amqp.NewSubscriber(config, logger)
			},

			OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Event)

				logger.Info("Event handled", watermill.LogFields{
					"event_name": params.EventName,
					"duration":   time.Since(start),
					"err":        err,
				})

				return err
			},

			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	err = commandProcessor.AddHandlers(
		BookRoomHandler{eventBus},
		OrderBeerHandler{eventBus},
	)
	if err != nil {
		panic(err)
	}

	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Beer ordered", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
		panic(err)
	}

	// publish BookRoom commands every second to simulate incoming traffic
	go publishCommands(commandBus)

	// processors are based on router, so they will work when router will start
	if err := router.Run(context.Background()); err != nil {
		panic(err)
	}
}
// ...

那就是全部了。我们有一个可运行的CQRS应用程序。