一架梯子,一头程序猿,仰望星空!
Asynq任务队列教程 > 内容正文

任务聚合


本页面介绍了Asynq的任务聚合功能。

概述

任务聚合允许您将多个任务连续排队,而不是一个个地传递给“Handler”。该功能允许您将多个连续操作批量处理成一个,以节省成本、优化缓存或批量通知等。

工作原理

为了使用任务聚合功能,您需要将任务以相同的组名排队到同一个队列中。使用相同的(queue, group)对排队的任务将由您提供的**GroupAggregator**聚合为一个任务,并将聚合后的任务传递给处理程序。

创建聚合任务时,Asynq服务器将等待更多的任务,直到可配置的宽限期到期。每次使用相同的(queue, group)排队新任务时,宽限期都会更新。

宽限期有可配置的上限:您可以设置最大聚合延迟时间,在此之后,Asynq服务器将无视剩余的宽限期并聚合任务。

您还可以设置可以一起聚合的最大任务数。如果达到该数目,Asynq服务器将立即聚合任务。

注意:任务的调度和聚合是冲突的功能,调度优先于聚合。

快速示例

在客户端端,使用QueueGroup选项将任务排队到相同的组中。

// 将三个任务排队到同一组。
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))

在服务器端,提供GroupAggregator以启用任务聚合功能。您可以选择配置GroupGracePeriodGroupMaxDelayGroupMaxSize来自定义聚合策略。

// 此函数用于将多个任务聚合为一个任务。
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    // ... 您的逻辑以聚合给定的任务并返回聚合后的任务。
    // ... 如果需要,使用NewTask(typename, payload, opts...)创建一个新任务并设置选项。
    // ... (注意)将会忽略Queue选项,并且聚合后的任务将始终排队到组所属的同一个队列中。
}

srv := asynq.NewServer(
           redisConnOpt,
           asynq.Config{
               GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
               GroupMaxDelay:    10 * time.Minute,
               GroupGracePeriod: 2 * time.Minute,
               GroupMaxSize:     20,
               Queues: map[string]int{"notifications": 1},
           },
       )

教程

在本节中,我们提供了一个简单的程序,以展示聚合功能的使用。

首先,创建一个带有以下代码的客户端程序:

// client.go
package main

import (
        "flag"
        "log"

        "github.com/hibiken/asynq"
)

var (
       flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Redis服务器地址")
       flagMessage = flag.String("message", "hello", "任务处理时要打印的消息")
)

func main() {
        flag.Parse()

        c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
        defer c.Close()

        task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
        info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
        if err != nil {
                log.Fatalf("无法加入任务队列:%v", err)
        }
        log.Printf("成功加入任务队列:%s", info.ID)
}

您可以运行此程序多次:

$ go build -o client client.go 
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=

现在,如果您通过CLI或Web UI检查队列,您可以看到您在队列中有正在聚合的任务。

接下来,创建一个带有以下代码的服务器程序:

// server.go
package main

import (
        "context"
        "flag"
        "log"
        "strings"
        "time"

        "github.com/hibiken/asynq"
)

var (
        flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Redis服务器地址")
        flagGroupGracePeriod = flag.Duration("grace-period", 10*time.Second, "组的优雅延迟时间")
        flagGroupMaxDelay = flag.Duration("max-delay", 30*time.Second, "组的最大延迟时间")
        flagGroupMaxSize = flag.Int("max-size", 20, "组的最大尺寸")
)

// 简单的聚合函数。
// 将所有任务的消息组合在一起,每个消息占一行。
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
        log.Printf("从组 %q 聚合了 %d 个任务", len(tasks), group)
        var b strings.Builder
        for _, t := range tasks {
                b.Write(t.Payload())
                b.WriteString("\n")
        }
        return asynq.NewTask("aggregated-task", []byte(b.String()))
}

func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
        log.Print("处理程序收到聚合的任务")
        log.Printf("聚合的消息:%s", task.Payload())
        return nil
}

func main() {
        flag.Parse()

        srv := asynq.NewServer(
                asynq.RedisClientOpt{Addr: *flagRedisAddr},
                asynq.Config{
                        Queues:           map[string]int{"tutorial": 1},
                        GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
                        GroupGracePeriod: *flagGroupGracePeriod,
                        GroupMaxDelay:    *flagGroupMaxDelay,
                        GroupMaxSize:     *flagGroupMaxSize,
                },
        )

        mux := asynq.NewServeMux()
        mux.HandleFunc("aggregated-task", handleAggregatedTask)

        if err := srv.Run(mux); err != nil {
                log.Fatalf("无法启动服务器:%v", err)
        }
}

您可以运行此程序并观察输出:

$ go build -o server server.go
$ ./server --redis-addr=

您应该能够在输出中看到服务器已经在组中聚合了任务并且处理程序处理了聚合的任务。可以随意尝试在上述程序中更改--grace-period--max-delay--max-size标志,以查看它们如何影响聚合策略。