一架梯子,一头程序猿,仰望星空!
Watermill Go事件驱动框架教程 > 内容正文

消息组件


消息

消息是Watermill的核心部分之一。消息通过“发布者”发布并由“订阅者”接收。当处理消息时,如果处理失败,您应该发送一个Ack()(表示成功处理)或一个Nack()(表示处理失败)。

消息的AckNack由订阅者进行处理(在默认实现中,订阅者会等待AckNack)。

完整源代码:github.com/ThreeDotsLabs/watermill/message/message.go

// ...
type Message struct {
	// UUID是消息的唯一标识符。
	//
	// 它仅用于Watermill进行调试。
	// UUID可以为空。
	UUID string

	// Metadata包含消息的元数据。
	//
	// 可用于存储不需要解码整个有效载荷的数据。
	// 它类似于HTTP请求的标头。
	//
	// Metadata会被编组并保存到PubSub中。
	Metadata Metadata

	// Payload是消息的有效载荷。
	Payload Payload

	// ack在接收到确认时关闭。
	ack chan struct{}
	// noACk在接收到否定确认时关闭。
	noAck chan struct{}

	ackMutex    sync.Mutex
	ackSentType ackType

	ctx context.Context
}

// ...

Ack

发送Ack

完整源代码:github.com/ThreeDotsLabs/watermill/message/message.go

// ...
// Ack发送消息的确认。
//
// Ack不会阻塞。
// Ack具有幂等性。
// 如果已经发送了Nack,则返回false。
func (m *Message) Ack() bool {
// ...

Nack

完整源代码:github.com/ThreeDotsLabs/watermill/message/message.go

// ...
// Nack发送消息的否定确认。
//
// Nack不会阻塞。
// Nack具有幂等性。
// 如果已经发送了Ack,则返回false。
func (m *Message) Nack() bool {
// ...

接收Ack/Nack

完整源代码:github.com/ThreeDotsLabs/watermill/docs/content/docs/message/receiving-ack.go

// ...
	select {
	case 

Context

消息包含标准库的上下文(Context),就像HTTP请求一样。

完整源代码:github.com/ThreeDotsLabs/watermill/message/message.go

// ...
// Context返回消息的上下文。要更改上下文,请使用SetContext。
//
// 返回的上下文始终非nil;默认为background上下文。
func (m *Message) Context() context.Context {
	if m.ctx != nil {
		return m.ctx
	}
	return context.Background()
}

// SetContext将提供的上下文设置为消息的上下文。
func (m *Message) SetContext(ctx context.Context) {
	m.ctx = ctx
}
// ...