实时使用Prometheus监控Watermill。
指标
可以通过使用发布者/订阅者的装饰器和处理程序的中间件来监控Watermill。我们提供了一个使用Go的官方Prometheus客户端的默认实现。
components/metrics
包导出PrometheusMetricsBuilder
,它提供了方便的函数来包装发布者、订阅者和处理程序,以便它们更新相关的Prometheus注册表:
完整源代码:github.com/ThreeDotsLabs/watermill/components/metrics/builder.go
// ...
// PrometheusMetricsBuilder提供了装饰发布者、订阅者和处理程序的方法。
type PrometheusMetricsBuilder struct {
// PrometheusRegistry可以填充预先存在的Prometheus注册表,或者为空以使用默认注册表。
PrometheusRegistry prometheus.Registerer
Namespace string
Subsystem string
}
// AddPrometheusRouterMetrics是一个方便的函数,用于在消息路由器上添加度量中间件到所有处理程序。同时,也会对处理程序的发布者和订阅者进行装饰。
func (b PrometheusMetricsBuilder) AddPrometheusRouterMetrics(r *message.Router) {
// ...
包装发布者、订阅者和处理程序
如果您正在使用Watermill的路由器(在大多数情况下是推荐的),您可以使用一个方便的函数AddPrometheusRouterMetrics
,确保所有添加到此路由器的处理程序都被包装以更新Prometheus注册表,以及它们的发布者和订阅者:
完整源代码:github.com/ThreeDotsLabs/watermill/components/metrics/builder.go
// ...
// AddPrometheusRouterMetrics是一个方便的函数,用于在消息路由器上添加度量中间件到所有处理程序。同时,也会对处理程序的发布者和订阅者进行装饰。
func (b PrometheusMetricsBuilder) AddPrometheusRouterMetrics(r *message.Router) {
r.AddPublisherDecorators(b.DecoratePublisher)
r.AddSubscriberDecorators(b.DecorateSubscriber)
r.AddMiddleware(b.NewRouterMiddleware().Middleware)
}
// ...
使用AddPrometheusRouterMetrics
的示例:
完整源代码:github.com/ThreeDotsLabs/watermill/_examples/basic/4-metrics/main.go
// ...
// 我们将命名空间和子系统为空
metricsBuilder := metrics.NewPrometheusMetricsBuilder(prometheusRegistry, "", "")
metricsBuilder.AddPrometheusRouterMetrics(router)
// ...
在上面的代码片段中,我们将namespace
和subsystem
参数留空。Prometheus客户端库 会使用这些参数 来为指标名称加上前缀。您可能想要使用命名空间或子系统,但请注意,这将影响指标名称,因此您需要相应地调整Grafana仪表板。
通过使用PrometheusMetricBuilder
的专用方法,还可以装饰独立的发布者和订阅者:
完整源代码:github.com/ThreeDotsLabs/watermill/_examples/basic/4-metrics/main.go
// ...
subWithMetrics, err := metricsBuilder.DecorateSubscriber(pubSub)
if err != nil {
panic(err)
}
pubWithMetrics, err := metricsBuilder.DecoratePublisher(pub)
if err != nil {
panic(err)
}
// ...
暴露 /metrics 接口
根据 Prometheus 的工作原理,服务需要暴露一个用于抓取数据的 HTTP 端点。按照惯例,它是一个 GET 端点,并且路径通常为 /metrics
。
为了提供该端点,有两个便捷函数可用,一个使用之前创建的 Prometheus Registry,另一个同时创建一个新的 Registry:
完整源代码:github.com/ThreeDotsLabs/watermill/components/metrics/http.go
// ...
// CreateRegistryAndServeHTTP 会在给定的地址上建立一个 HTTP 服务器来暴露 /metrics 接口给 Prometheus。
// 它返回一个新的 Prometheus Registry(用于注册指标)和一个用于关闭服务器的取消函数。
func CreateRegistryAndServeHTTP(addr string) (registry *prometheus.Registry, cancel func()) {
registry = prometheus.NewRegistry()
return registry, ServeHTTP(addr, registry)
}
// ServeHTTP 会在给定的地址上建立一个 HTTP 服务器来暴露 /metrics 接口给 Prometheus。
// 它接受一个已存在的 Prometheus Registry,并返回一个关闭服务器的取消函数。
func ServeHTTP(addr string, registry *prometheus.Registry) (cancel func()) {
// ...
下面是一个使用示例:
完整源代码:github.com/ThreeDotsLabs/watermill/_examples/basic/4-metrics/main.go
// ...
prometheusRegistry, closeMetricsServer := metrics.CreateRegistryAndServeHTTP(*metricsAddr)
defer closeMetricsServer()
// 我们将命名空间(namespace)和子系统(subsystem)留空
metricsBuilder := metrics.NewPrometheusMetricsBuilder(prometheusRegistry, "", "")
metricsBuilder.AddPrometheusRouterMetrics(router)
// ...
示例应用
要了解指标仪表板在实践中的工作原理,可以查看 metrics 示例。
按照示例中的 README 中的说明来运行它,并将 Prometheus 数据源添加到 Grafana 中。
Grafana 仪表板
我们准备了一个 Grafana 仪表板 与上述指标实现一起使用。它提供有关吞吐量、失败率和发布/处理持续时间的基本信息。
如果您想在本地查看该仪表板,可以使用示例应用。
欲了解有关导出到 Prometheus 的指标的更多信息,请参阅 Exported metrics。
导入仪表板
要导入 Grafana 仪表板,请从左侧菜单中选择 Dashboard/Manage,然后点击 +Import
。
输入仪表板的URL https://grafana.com/dashboards/9777(或仅输入 ID,9777),然后点击 Load。
然后选择用于抓取 /metrics
端点的 Prometheus 数据源。点击 Import
,完成!
导出的指标
下面列出了所有由PrometheusMetricsBuilder
在Prometheus注册表中注册的指标。
有关Prometheus指标类型的更多信息,请参考Prometheus文档。
对象 | 指标 | 描述 | 标签/值 |
---|---|---|---|
Subscriber | subscriber_messages_received_total |
一个Prometheus计数器。计算订阅者获取的消息数量。 | acked 为 "acked" 或 "nacked"。 |
如果订阅者在处理程序内操作,则设置 handler_name ;否则为" |
|||
subscriber_name 标识订阅者。如果它实现了fmt.Stringer 接口,则是String() 的结果;否则为package.structName 。 |
|||
Handler | handler_execution_time_seconds |
一个Prometheus直方图。记录中间件封装的处理程序函数的执行时间。 | handler_name 为处理程序的名称。 |
success 为 "true" 或 "false",取决于封装的处理程序函数是否返回错误。 |
|||
Publisher | publish_time_seconds |
一个Prometheus直方图。记录装饰的发布者的Publish函数的执行时间。 | success 为 "true" 或 "false",取决于装饰的发布者是否返回错误。 |
如果发布者在处理程序内操作,则设置 handler_name ;否则为" |
|||
publisher_name 标识发布者。如果它实现了fmt.Stringer 接口,则是String() 的结果;否则为package.structName 。 |
此外,每个指标都有node
标签,由Prometheus提供,并且其值对应于指标来源的实例,以及job
,它是在Prometheus配置文件中指定的作业名称。
注意:如上所述,使用非空的namespace
或subsystem
将导致指标名称前缀。您可能需要进行相应的调整,例如在Grafana仪表盘的面板定义中。
自定义化
如果您认为某个指标漏掉了,您可以很容易地扩展此基本实现。最佳方法是使用与ServeHTTP方法一起使用的Prometheus注册表,并根据Prometheus客户端的文档注册指标。
更新这些指标的一种简洁方法是通过使用修饰器:
完整源代码:github.com/ThreeDotsLabs/watermill/message/decorator.go
// ...
// MessageTransformSubscriberDecorator 创建一个订阅者修饰器,它在每个通过订阅者传递的消息上调用transform函数。
func MessageTransformSubscriberDecorator(transform func(*Message)) SubscriberDecorator {
if transform == nil {
panic("transform函数为nil")
}
return func(sub Subscriber) (Subscriber, error) {
return &messageTransformSubscriberDecorator{
sub: sub,
transform: transform,
}, nil
}
}
// MessageTransformPublisherDecorator 创建一个发布者修饰器,它在每个通过发布者传递的消息上调用transform函数。
func MessageTransformPublisherDecorator(transform func(*Message)) PublisherDecorator {
if transform == nil {
panic("transform函数为nil")
}
return func(pub Publisher) (Publisher, error) {
return &messageTransformPublisherDecorator{
Publisher: pub,
transform: transform,
}, nil
}
}
type messageTransformSubscriberDecorator struct {
// ...
和/或路由器中间件。
一种更简单的方法是仅在处理程序函数中更新所需的指标。