一架梯子,一头程序猿,仰望星空!
Watermill Go事件驱动框架教程 > 内容正文

Watermill 监控


实时使用Prometheus监控Watermill。

指标

可以通过使用发布者/订阅者的装饰器和处理程序的中间件来监控Watermill。我们提供了一个使用Go的官方Prometheus客户端的默认实现。

components/metrics包导出PrometheusMetricsBuilder,它提供了方便的函数来包装发布者、订阅者和处理程序,以便它们更新相关的Prometheus注册表:

完整源代码:github.com/ThreeDotsLabs/watermill/components/metrics/builder.go

// ...
// PrometheusMetricsBuilder提供了装饰发布者、订阅者和处理程序的方法。
type PrometheusMetricsBuilder struct {
	// PrometheusRegistry可以填充预先存在的Prometheus注册表,或者为空以使用默认注册表。
	PrometheusRegistry prometheus.Registerer

	Namespace string
	Subsystem string
}

// AddPrometheusRouterMetrics是一个方便的函数,用于在消息路由器上添加度量中间件到所有处理程序。同时,也会对处理程序的发布者和订阅者进行装饰。
func (b PrometheusMetricsBuilder) AddPrometheusRouterMetrics(r *message.Router) {
// ...

包装发布者、订阅者和处理程序

如果您正在使用Watermill的路由器(在大多数情况下是推荐的),您可以使用一个方便的函数AddPrometheusRouterMetrics,确保所有添加到此路由器的处理程序都被包装以更新Prometheus注册表,以及它们的发布者和订阅者:

完整源代码:github.com/ThreeDotsLabs/watermill/components/metrics/builder.go

// ...
// AddPrometheusRouterMetrics是一个方便的函数,用于在消息路由器上添加度量中间件到所有处理程序。同时,也会对处理程序的发布者和订阅者进行装饰。
func (b PrometheusMetricsBuilder) AddPrometheusRouterMetrics(r *message.Router) {
	r.AddPublisherDecorators(b.DecoratePublisher)
	r.AddSubscriberDecorators(b.DecorateSubscriber)
	r.AddMiddleware(b.NewRouterMiddleware().Middleware)
}
// ...

使用AddPrometheusRouterMetrics的示例:

完整源代码:github.com/ThreeDotsLabs/watermill/_examples/basic/4-metrics/main.go

// ...
	// 我们将命名空间和子系统为空
	metricsBuilder := metrics.NewPrometheusMetricsBuilder(prometheusRegistry, "", "")
	metricsBuilder.AddPrometheusRouterMetrics(router)
// ...

在上面的代码片段中,我们将namespacesubsystem参数留空。Prometheus客户端库 会使用这些参数 来为指标名称加上前缀。您可能想要使用命名空间或子系统,但请注意,这将影响指标名称,因此您需要相应地调整Grafana仪表板。

通过使用PrometheusMetricBuilder的专用方法,还可以装饰独立的发布者和订阅者:

完整源代码:github.com/ThreeDotsLabs/watermill/_examples/basic/4-metrics/main.go

// ...
	subWithMetrics, err := metricsBuilder.DecorateSubscriber(pubSub)
	if err != nil {
		panic(err)
	}
	pubWithMetrics, err := metricsBuilder.DecoratePublisher(pub)
	if err != nil {
		panic(err)
	}
// ...

暴露 /metrics 接口

根据 Prometheus 的工作原理,服务需要暴露一个用于抓取数据的 HTTP 端点。按照惯例,它是一个 GET 端点,并且路径通常为 /metrics

为了提供该端点,有两个便捷函数可用,一个使用之前创建的 Prometheus Registry,另一个同时创建一个新的 Registry:

完整源代码:github.com/ThreeDotsLabs/watermill/components/metrics/http.go

// ...
// CreateRegistryAndServeHTTP 会在给定的地址上建立一个 HTTP 服务器来暴露 /metrics 接口给 Prometheus。
// 它返回一个新的 Prometheus Registry(用于注册指标)和一个用于关闭服务器的取消函数。
func CreateRegistryAndServeHTTP(addr string) (registry *prometheus.Registry, cancel func()) {
	registry = prometheus.NewRegistry()
	return registry, ServeHTTP(addr, registry)
}

// ServeHTTP 会在给定的地址上建立一个 HTTP 服务器来暴露 /metrics 接口给 Prometheus。
// 它接受一个已存在的 Prometheus Registry,并返回一个关闭服务器的取消函数。
func ServeHTTP(addr string, registry *prometheus.Registry) (cancel func()) {
// ...

下面是一个使用示例:

完整源代码:github.com/ThreeDotsLabs/watermill/_examples/basic/4-metrics/main.go

// ...
	prometheusRegistry, closeMetricsServer := metrics.CreateRegistryAndServeHTTP(*metricsAddr)
	defer closeMetricsServer()

	// 我们将命名空间(namespace)和子系统(subsystem)留空
	metricsBuilder := metrics.NewPrometheusMetricsBuilder(prometheusRegistry, "", "")
	metricsBuilder.AddPrometheusRouterMetrics(router)
// ...

示例应用

要了解指标仪表板在实践中的工作原理,可以查看 metrics 示例

按照示例中的 README 中的说明来运行它,并将 Prometheus 数据源添加到 Grafana 中。

Grafana 仪表板

我们准备了一个 Grafana 仪表板 与上述指标实现一起使用。它提供有关吞吐量、失败率和发布/处理持续时间的基本信息。

如果您想在本地查看该仪表板,可以使用示例应用。

欲了解有关导出到 Prometheus 的指标的更多信息,请参阅 Exported metrics。

导入仪表板

要导入 Grafana 仪表板,请从左侧菜单中选择 Dashboard/Manage,然后点击 +Import

输入仪表板的URL https://grafana.com/dashboards/9777(或仅输入 ID,9777),然后点击 Load。

导入仪表板

然后选择用于抓取 /metrics 端点的 Prometheus 数据源。点击 Import,完成!

导出的指标

下面列出了所有由PrometheusMetricsBuilder在Prometheus注册表中注册的指标。

有关Prometheus指标类型的更多信息,请参考Prometheus文档

对象 指标 描述 标签/值
Subscriber subscriber_messages_received_total 一个Prometheus计数器。计算订阅者获取的消息数量。 acked 为 "acked" 或 "nacked"。
如果订阅者在处理程序内操作,则设置 handler_name;否则为""。
subscriber_name 标识订阅者。如果它实现了fmt.Stringer接口,则是String()的结果;否则为package.structName
Handler handler_execution_time_seconds 一个Prometheus直方图。记录中间件封装的处理程序函数的执行时间。 handler_name 为处理程序的名称。
success 为 "true" 或 "false",取决于封装的处理程序函数是否返回错误。
Publisher publish_time_seconds 一个Prometheus直方图。记录装饰的发布者的Publish函数的执行时间。 success 为 "true" 或 "false",取决于装饰的发布者是否返回错误。
如果发布者在处理程序内操作,则设置 handler_name;否则为""。
publisher_name 标识发布者。如果它实现了fmt.Stringer接口,则是String()的结果;否则为package.structName

此外,每个指标都有node标签,由Prometheus提供,并且其值对应于指标来源的实例,以及job,它是在Prometheus配置文件中指定的作业名称。

注意:如上所述,使用非空的namespacesubsystem将导致指标名称前缀。您可能需要进行相应的调整,例如在Grafana仪表盘的面板定义中。

自定义化

如果您认为某个指标漏掉了,您可以很容易地扩展此基本实现。最佳方法是使用与ServeHTTP方法一起使用的Prometheus注册表,并根据Prometheus客户端的文档注册指标。

更新这些指标的一种简洁方法是通过使用修饰器:

完整源代码:github.com/ThreeDotsLabs/watermill/message/decorator.go

// ...
// MessageTransformSubscriberDecorator 创建一个订阅者修饰器,它在每个通过订阅者传递的消息上调用transform函数。
func MessageTransformSubscriberDecorator(transform func(*Message)) SubscriberDecorator {
	if transform == nil {
		panic("transform函数为nil")
	}
	return func(sub Subscriber) (Subscriber, error) {
		return &messageTransformSubscriberDecorator{
			sub:       sub,
			transform: transform,
		}, nil
	}
}

// MessageTransformPublisherDecorator 创建一个发布者修饰器,它在每个通过发布者传递的消息上调用transform函数。
func MessageTransformPublisherDecorator(transform func(*Message)) PublisherDecorator {
	if transform == nil {
		panic("transform函数为nil")
	}
	return func(pub Publisher) (Publisher, error) {
		return &messageTransformPublisherDecorator{
			Publisher: pub,
			transform: transform,
		}, nil
	}
}

type messageTransformSubscriberDecorator struct {
// ...

和/或路由器中间件。

一种更简单的方法是仅在处理程序函数中更新所需的指标。