一架梯子,一头程序猿,仰望星空!
Asynq任务队列教程 > 内容正文

Go Asynq 快速入门


Asynq是一个Go语言异步任务框架,它以Redis作为消息队列,具备可伸缩性和简易性。

Asynq异步任务解决方案:

  • 客户端将任务放入队列
  • 服务器从队列中提取任务并为每个任务启动一个工作线程(协程)
  • 多个工作协程并行处理任务

任务队列是一种将工作分配到多台机器的机制。系统可以由多个工作服务器和代理组成,实现高可用性和水平扩展。

任务队列图

特性

  • 保证至少执行一次任务
  • 任务调度
  • 重试失败的任务
  • 在工作线程崩溃时自动恢复任务
  • 加权优先级队列
  • 严格优先级队列
  • 由于Redis中的写操作快速,添加任务的延迟低
  • 使用唯一选项对任务进行去重
  • 允许为每个任务设置超时和截止时间
  • 允许聚合一组任务以批量执行多个连续操作
  • 灵活的处理程序接口,支持中间件
  • 允许暂停队列以停止从队列中处理任务
  • 周期性任务
  • 支持Redis Cluster以自动分片和实现高可用性
  • 支持Redis Sentinel实现高可用性
  • 与Prometheus集成,以收集和可视化队列指标
  • Web界面,用于检查和远程控制队列和任务
  • 命令行界面,用于检查和远程控制队列和任务

入门指南

在本教程中,我们将编写两个程序,clientworkers

  • client.go将创建并安排任务,以异步地由后台工作线程处理。
  • workers.go将启动多个并发的工作线程,处理由客户端创建的任务。

本指南假设您在localhost:6379上运行Redis服务器。在开始之前,请确保已安装并运行Redis。

让我们首先创建我们的两个主要文件。

mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go

然后安装asynq包。

go get -u github.com/hibiken/asynq

在开始编写代码之前,让我们回顾一下在这两个程序中将使用的一些核心类型。

Redis连接选项

Asynq使用Redis作为消息代理。client.goworkers.go都需要连接到Redis进行读写操作。我们将使用RedisClientOpt来指定与本地运行的Redis服务器的连接。

redisConnOpt := asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // 如果不需要密码,可省略
    Password: "mypassword",
    // 为asynq使用一个专用的数据库编号。
    // 默认情况下,Redis提供16个数据库(0..15)
    DB: 0,
}

任务

asynq中,工作单元封装在称为Task的类型中,它概念上具有两个字段:TypePayload

// Type是一个字符串值,用于指示任务的类型。
func (t *Task) Type() string

// Payload是任务执行所需的数据。
func (t *Task) Payload() []byte

现在我们已经看了核心类型,让我们开始编写我们的程序。

客户端程序

client.go中,我们将创建一些任务,并使用asynq.Client对它们进行入队。

要创建一个任务,可以使用NewTask函数并传入任务的类型和有效负载。

Enqueue方法接受一个任务和任意数量的选项。使用ProcessInProcessAt选项来安排未来处理的任务。

// 与电子邮件相关任务的有效负载。
type EmailTaskPayload struct {
    // 电子邮件接收者的ID。
    UserID int
}

// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    // 创建带有类型名称和有效负载的任务。
    payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
    if err != nil {
        log.Fatal(err)
    }
    t1 := asynq.NewTask("email:welcome", payload)

    t2 := asynq.NewTask("email:reminder", payload)

    // 立即处理任务。
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] 成功将任务加入队列:%+v", info)

    // 在24小时后处理任务。
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] 成功将任务加入队列:%+v", info)
}

这就是我们客户端程序所需要的全部内容。

Workers程序

workers.go中,我们将创建一个asynq.Server实例来启动workers。

NewServer函数接受RedisConnOptConfig作为参数。

Config用于调整服务器的任务处理行为。 你可以查看Config文档以了解所有可用的配置选项。

为了简单起见,我们在这个例子中只指定并发数。

// workers.go
func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // 注意: 在下面的部分中,我们将介绍`handler`是什么。
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

(*Server).Run方法的参数是一个接口asynq.Handler,它有一个方法ProcessTask

type Handler interface {
    // 如果任务成功处理,ProcessTask应返回nil。
    // 如果ProcessTask返回一个非nil错误或导致panic,任务将稍后重试。
    ProcessTask(context.Context, *Task) error
}

实现一个handler最简单的方法是定义一个具有相同签名的函数,并在将其传递给Run时使用asynq.HandlerFunc适配器类型。

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type() {
    case "email:welcome":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] 给用户 %d 发送欢迎邮件", p.UserID)

    case "email:reminder":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] 给用户 %d 发送提醒邮件", p.UserID)

    default:
        return fmt.Errorf("意外的任务类型:%s", t.Type())
    }
    return nil
}

func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // 使用asynq.HandlerFunc适配器来处理函数
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

我们可以继续为这个handler函数添加switch case,但在一个实际的应用中,将每个case的逻辑定义在一个单独的函数中会更方便。

为了重构我们的代码,让我们使用ServeMux来创建我们的handler。就像来自"net/http"包的ServeMux一样,你可以通过调用HandleHandleFunc来注册一个handler。ServeMux满足Handler接口,所以可以将其传递给(*Server).Run

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", sendWelcomeEmail)
    mux.HandleFunc("email:reminder", sendReminderEmail)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] 给用户 %d 发送欢迎邮件", p.UserID)
    return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] 给用户 %d 发送提醒邮件", p.UserID)
    return nil
}

现在,我们已将每个任务类型的处理函数提取出来,代码看起来更加有组织性。 然而,代码还是有点太隐晦了,我们有这些任务类型和负载类型的字符串值,应该将它们封装在一个有机的包中。让我们重构我们的代码,编写一个封装任务创建和处理的包。我们简单地创建一个名为task的包。

mkdir task && touch task/task.go
package task

import (
    "context"
    "fmt"
   
    "github.com/hibiken/asynq"
)

// 任务类型的列表。
const (
    TypeWelcomeEmail  = "email:welcome"
    TypeReminderEmail = "email:reminder"
)

// 与任何与电子邮件相关的任务相关的任务负载。
type emailTaskPayload struct {
    // 电子邮件收件人的ID。
    UserID int
}

func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(emailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeWelcomeEmail, payload), nil
}

func NewReminderEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(emailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeReminderEmail, payload), nil
}

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload  
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] 向用户 %d 发送欢迎电子邮件", p.UserID)
    return nil
}

func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload  
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] 向用户 %d 发送提醒电子邮件", p.UserID)
    return nil
}

现在我们可以在client.goworkers.go中导入这个包了。

// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    t1, err := task.NewWelcomeEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    t2, err := task.NewReminderEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    // 立即处理任务。
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] 任务成功入队列:%+v", info)

    // 24小时后处理任务。
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] 任务成功入队列:%+v", info)
}
// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
    mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

现在代码看起来更好了!

运行程序

现在我们有了clientworkers,我们可以运行这两个程序。让我们先运行client程序来创建和调度任务。

go run client/client.go

这将创建两个任务:一个立即处理的任务和一个在24小时后处理的任务。

让我们使用asynq的命令行界面来检查任务。

asynq dash

您应该能够看到一个任务处于Enqueued状态,另一个任务处于Scheduled状态。

注意:要了解每个状态的含义,请查看任务的生命周期

最后,让我们启动workers程序来处理任务。

go run workers/workers.go

注意:该程序不会退出,直到您发送一个信号来终止程序。有关如何安全地终止后台工作者的最佳实践,请参阅信号Wiki页面

您应该能够在终端中看到一些文本输出,表示任务已成功处理。

您可以再次运行client程序,看看工作者如何接受任务并处理它们。

任务重试

一个任务在第一次尝试时无法成功处理并不罕见。默认情况下,失败的任务将使用指数退避重试25次。让我们更新我们的处理程序,返回一个错误以模拟一个不成功的情况。

// tasks.go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] 尝试发送欢迎邮件给用户 %d...", p.UserID)
    return fmt.Errorf("无法发送邮件给用户") 
}

让我们重新启动我们的workers程序并加入一个任务。

go run workers/workers.go

go run client/client.go

如果你正在运行asynq dash,你应该能够看到一个任务处于Retry状态(通过导航到队列详情视图并高亮“retry”选项卡)。

要检查哪些任务处于重试状态,您还可以运行

asynq task ls --queue=default --state=retry

这将列出所有将来将被重试的任务。输出包括任务下一次执行的预计时间。

一旦一个任务耗尽它的重试次数,任务将转为Archived状态,并且将不会再次重试(您仍然可以使用CLI或WebUI工具手动运行存档的任务)。

在结束本教程之前,让我们修复我们的处理程序。

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] 给用户 %d 发送欢迎邮件", p.UserID)
    return nil 
}

现在我们修复了处理程序,在下一次尝试中任务将被成功处理 :)