一架梯子,一头程序猿,仰望星空!
RabbitMQ教程 > 内容正文

RabbitMQ 延迟队列(延迟消息)


RabbitMQ原生不支持延迟消息,目前主要通过死信交换机 + 消息TTL方案或者rabbitmq-delayed-message-exchange插件实现。

延迟队列应用场景

  • 对消息生产和消费有时间窗口要求的场景。例如,在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。
  • 通过消息触发延时任务的场景。例如,在指定时间段之后向用户发送提醒消息。

死信交换机 + 消息TTL方案

这个方案核心思想就是,创建一个没有消费者的队列,借助消息过期时间(TTL),当一条消息过期后会成为死信,这条死信消息会投递到死信交换机,死信交换机将消息发给死信队列,我们只要消费死信队列即可。

在这个方案中,消息过期时间就是消息延迟时间,例如: 消息TTL=30秒,因为这个队列没有消费者,消息30秒后过期,这条消息就变成死信,会被死信队列处理。

实现方案结合下面两个章节的教程设置下属性即可:

延迟消息插件方案

1.安装插件

rabbitmq-delayed-message-exchange的Github地址

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

从github的release页面的assets, 下载rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez文件,把文件放到rabbitmq插件目录(plugins目录)

提示:版本号可能跟本教程不一样,如果你的rabbitmq就是最新版本,插件也选择最新版本就行。

2.激活插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.定义交换机

通过x-delayed-type设置自定义交换机属性,支持发送延迟消息

    props := make(map[string]interface{})
    //关键参数,支持发送延迟消息
    props["x-delayed-type"] = "direct"

    // 声明交换机
    err = ch.ExchangeDeclare(
        "delay.queue",   // 交换机名字
        "fanout", // 交换机类型
        true,     // 是否持久化
        false,    
        false,
        false, 
        props,      // 设置属性
    )

4.发送延迟消息

通过消息头(x-delay),设置消息延迟时间。

        msgHeaders := make(map[string]interface{})
        // 通过消息头,设置消息延迟时间,单位毫秒
        msgHeaders["x-delay"] = 6000

        err = ch.Publish(
            "delay.queue",     // 交换机名字
            "", // 路由参数
            false,
            false, 
            amqp.Publishing{
                Headers:msgHeaders, // 设置消息头
                ContentType: "text/plain",
                Body:        []byte(body),
            })

提示:如果你直接使用阿里云的RabbitMQ消息云服务,通过消息头属性(delay),设置延迟时间即可,不用安装插件,阿里云已经对rabbitmq扩展了。


推荐教程