RabbitMQ的延时队列

RabbitMQ的延时队列

实现方法有2种:

  1. 死信队列+TTL
  2. 延时队列插件

死信队列

  • 消息变成死信一般有一下3种情况:
    • 消息被拒(basic.reject or basic.nack)并且没有重新入队(requeue=false);
    • 当前队列中的消息数量已经超过最大长度(创建队列时指定" x-max-length参数设置队列最大消息数量)。
    • 消息在队列中过期,即当前消息在队列中的存活时间已经超过了预先设置的TTL(Time To Live)时间;
  • 死信队列其实就是一个正常的队列,当队列A绑定了队列B用来存放队列A中的死信时,队列B就充当了一个死信队列的角色
  • 流程:
    • 创建死信交换机B,创建死信队列B
    • 创建正常交换机A,创建普通队列A,并指定B为死信队列,通过指定x-dead-letter-exchange和x-dead-letter-routing-key
    • 通过RoutingKeyA绑定exchangeA和QueueA
    • 设置A的生产消息的过期时间,其过期时间就是想要延迟的时间。还有一种方式是声明正常队列的A的有效时间,过期之后队列删除,消息依然会转发到死信队列B
  • 注意事项:
    • 队列都是FIFO的,假如2个消息的过期时间分别为10s和5s,消费者先消费到的是10s的,而不是5s的。这跟FIFO的特性有关系,第一条不过期,后面的就会阻塞。
    • 解决办法:增加一个消费者对延时队列消费,不ack,把第一条消息放到队列尾部。一直让消息在流动,这样就能检测到了。

producer.php内容如下

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;

  public function handle()
    {
        $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass,
            $this->vhost,false,'AMQPLAIN',null,'en_US',10,10);
        //创建信道
        $channel = $this->connection->channel();
        //声明死信队列和交换机
        $deadQueue = "dead_queue";
        $deadExchange = "dead_exchange";

        $ttl = 8000;

        $channel->queue_declare($deadQueue, false, true, false, false);
        $channel->exchange_declare($deadExchange, AMQPExchangeType::DIRECT);
        $channel->queue_bind($deadQueue, $deadExchange,'common_key');

        $args = new AMQPTable();
        $args->set('x-message-ttl', $ttl);//8s过期时间 这里是给整个消息队列设置统一过期时间
        $args->set('x-dead-letter-exchange', $deadExchange);
        $args->set('x-dead-letter-routing-key', 'common_key');

        //创建正常队列和交换机
        $exchange = "common_exchange";
        $queue = "common_queue";
        $common_key = "common_key";
        $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, false,false);
        //注意$args 此时传入
        $channel->queue_declare($queue, false,false,false,false,false,$args);
        $channel->queue_bind($queue, $exchange, $common_key);

        //生产5条消息
        for($i=0;$i<5;$i++) {
            $body = '消息发送时间:'.date('Y-m-d H:i:s').'_将在'.($ttl/1000)."秒后消费";
            echo "第{$i}条消息,{$body}\n";
            $message = new AMQPMessage($body);
            $channel->basic_publish($message, $exchange, $common_key);
        }
        echo " [x] 所有消息都已经发送完毕 :".date('Y-m-d H:i:s')."\n";
        $channel->close();
        $this->connection->close();
    }

rabbitmq 管理后台可以看到队列设置的时间,由于时间多次更改,截图是之前的7s

consumer.php


use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;

public function handle(){

        $exchange = 'dead_exchange';
        $queue = 'dead_queue';
        $consumerTag = 'consumer';
        $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost);
        //创建信道
        $channel = $this->connection->channel();
        $channel->queue_declare($queue, false, true, false, false);
        $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT);
        $channel->queue_bind($queue, $exchange,'common_key');

        /**
         * @param \PhpAmqpLib\Message\AMQPMessage $message
         */
        $process_message = function($message)
        {
            echo "\n--------\n";
            echo "消费时间:".date("Y-m-d H:i:s")."消息体为:".$message->body;
            echo "\n--------\n";

            $message->ack();

            // Send a message with the string "quit" to cancel the consumer.
            if ($message->body === 'quit') {
                $message->getChannel()->basic_cancel($message->getConsumerTag());
            }
        };

//      $channel->basic_qos(null,1,null);//一条条处理
        $channel->basic_consume($queue, $consumerTag, false, false, false, false, $process_message);

        while ($channel->is_consuming()) {
            $channel->wait();
        }

        $channel->close();
        $this->connection->close();
    }

执行结果

 生产者输出:

第0条消息,消息发送时间:2021-09-21 13:55:07_将在8秒后消费
第1条消息,消息发送时间:2021-09-21 13:55:07_将在8秒后消费
第2条消息,消息发送时间:2021-09-21 13:55:07_将在8秒后消费

消费者输出

--------
消费时间:2021-09-21 13:55:15消息体为:消息发送时间:2021-09-21 13:55:07_将在8秒后消费
--------
--------
消费时间:2021-09-21 13:55:15消息体为:消息发送时间:2021-09-21 13:55:07_将在8秒后消费
--------
--------
消费时间:2021-09-21 13:55:15消息体为:消息发送时间:2021-09-21 13:55:07_将在8秒后消费
--------

15-07 = 8 正好是8s

以上代码还有第二种方式,单独给每条消息设置不同的过期时间。

  //更改队列的名字,因为之前的队列已经设置了过期时间,直接使用会报错的
  在producer.php中找到如下代码

  $deadQueue = "dead_queue_msg_ttl";
  $deadExchange = "dead_exchange_msg_ttl";

   //创建正常队列和交换机【在consumer.php和producer.php中都要修改】
  $exchange = "common_exchange_msg_ttl";
  $queue = "common_queue_msg_ttl";

  #以下代码在producer.php中修改
  这行注释掉
  //$args->set('x-message-ttl', $ttl);//8s过期时间
  声明消息体的时候,增加第二个参数
   for($i=0;$i<5;$i++) {
            $i>2 && $ttl = 3000;
            $body = '消息发送时间:'.date('Y-m-d H:i:s').'_将在'.($ttl/1000)."秒后消费";
            echo "第{$i}条消息,{$body}\n";
            $message = new AMQPMessage($body,[
                'expiration' => $ttl //这里的ttl就每条消息的过期时间
            ]);
            $channel->basic_publish($message, $exchange, $common_key);
    }

  这样就能给每条消息设置不同的过期时间了。

这次看不到

x-message-ttl 设置项

因为过期时间在消息里面

但是问题又来了,仔细看上面代码的配置;前面3条是延时8s,后面3条是延时3s;
那总归是希望3s的先执行,然后执行8s的消息。
但由于消息队列先进先出的特性,等8s之后才会执行3s的;

说好的3s,结果3s之后又3s,3s之后又3s...

怪不得 梁朝伟等不及了
三年之后又三年,三年之后又三年
《无间道》。

不过不要担心,方法总比困难多

common_consumer.php

    public function handle()
    {

        $exchange = 'common_exchange_msg_ttl';
        $queue = 'common_queue_msg_ttl';

        $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost);
        //创建信道
        $channel = $this->connection->channel();
        //声明死信队列和交换机
        $deadExchange = "dead_exchange_msg_ttl";
        $common_key = "common_key";

        $args = new AMQPTable();
        $args->set('x-dead-letter-exchange', $deadExchange);
        $args->set('x-dead-letter-routing-key', $common_key);

        $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, false,false);
        //注意$args 此时传入
        $channel->queue_declare($queue, false,false,false,false,false,$args);
        $channel->queue_bind($queue, $exchange, $common_key);
        /**
         * @param \PhpAmqpLib\Message\AMQPMessage $message
         */
        $process_message = function ($message) use($channel) {
            echo "\n---这条是正常流转消息-----\n";
            echo "消费时间:" . date("Y-m-d H:i:s") . "消息体为:" . $message->body;
            echo "\n";

            //$message->ack(); 不ack
            //下面这条语句重新把消息放入队列
            $channel->basic_nack($message->getDeliveryTag(),false,true);
            echo "\n---这条是正常流转消息,已重新入队-----\n";
        };

//        $channel->basic_qos(null,1,null);
        $channel->basic_consume($queue, 'consumer', false, false, false, false, $process_message);

        while ($channel->is_consuming()) {
            $channel->wait();
        }

        $channel->close();
        $this->connection->close();
    }

结果输出情况


producer.php

第0条消息,消息发送时间:2021-09-21 14:36:21_将在8秒后消费
第1条消息,消息发送时间:2021-09-21 14:36:21_将在8秒后消费
第2条消息,消息发送时间:2021-09-21 14:36:21_将在8秒后消费
第3条消息,消息发送时间:2021-09-21 14:36:21_将在3秒后消费
第4条消息,消息发送时间:2021-09-21 14:36:21_将在3秒后消费

consumer.php
--------
消费时间:2021-09-21 14:36:24消息体为:消息发送时间:2021-09-21 14:36:21_将在3秒后消费  # 这里看起来就正常了,3s的率先输出
--------

--------
消费时间:2021-09-21 14:36:24消息体为:消息发送时间:2021-09-21 14:36:21_将在3秒后消费 # 这里看起来就正常了,3s的率先输出
--------

--------
消费时间:2021-09-21 14:36:29消息体为:消息发送时间:2021-09-21 14:36:21_将在8秒后消费 # 8s的延后输出,消费时间仍然对的上
--------

--------
消费时间:2021-09-21 14:36:29消息体为:消息发送时间:2021-09-21 14:36:21_将在8秒后消费
--------

--------
消费时间:2021-09-21 14:36:29消息体为:消息发送时间:2021-09-21 14:36:21_将在8秒后消费
--------

common_consumer.php

---这条是正常流转消息,已重新入队-----

---这条是正常流转消息-----
消费时间:2021-09-21 14:36:29消息体为:消息发送时间:2021-09-21 14:36:21_将在8秒后消费

---这条是正常流转消息,已重新入队-----

---这条是正常流转消息-----
消费时间:2021-09-21 14:36:29消息体为:消息发送时间:2021-09-21 14:36:21_将在8秒后消费

---这条是正常流转消息,已重新入队-----

---这条是正常流转消息-----
消费时间:2021-09-21 14:36:29消息体为:消息发送时间:2021-09-21 14:36:21_将在8秒后消费

……
太多了省略部分输出

插件方式

具体是指

rabbitmq-delayed-message-exchange

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

根据你的RabbmitMQ的版本下载

我记录下我在本地的docker中的操作

docker exec -it 09e bash
apt update
apt install wget 

#进入rabbitmq的插件目录

cd /opt/rabbitmq/plugins

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez

#启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange   

#查看是否启动成功
rabbitmq-plugins list|grep delayed_message_exchange

delay_producer.php


use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

public function handle()
{
    $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass,
        $this->vhost, false, 'AMQPLAIN', null, 'en_US', 10, 10);
    //创建信道
    $channel = $this->connection->channel();
    //声明死信队列和交换机
    $delayQueue = "delay_queue";
    $delayExchange = "delay_exchange";
    $delayKey = "delay_key";
    //交换机声明的时候需要添加args
    $args =  new AMQPTable(['x-delayed-type'=>'direct']);
    $channel->exchange_declare($delayExchange, 'x-delayed-message',false,
        true,false, false,false, $args);

    $channel->queue_declare($delayQueue,false, true,false,false,false);
    $channel->queue_bind($delayQueue, $delayExchange, $delayKey);

    $ttl = 5000;
    //生产消息
    for ($i = 0; $i < 5; $i++) {
        $i > 2 && $ttl = 3000;
        $body = '消息发送时间:' . date('Y-m-d H:i:s') . '_将在' . ($ttl / 1000) . "秒后消费";
        echo "第{$i}条消息,{$body}\n";
        $message = new AMQPMessage($body, [
            'application_headers'=> new AMQPTable(['x-delay' => $ttl]),//这里是重点,没有这个不会延迟消费
            'delivery_mode'=> 2 //持久化模式
        ]);
        $channel->basic_publish($message, $delayExchange, $delayKey);
    }
    echo " [x] 所有消息都已经发送完毕 :" . date('Y-m-d H:i:s') . "\n";
    $channel->close();
    $this->connection->close();
}

delay_consumer.php


use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;

public function handle()
{

    $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass,
        $this->vhost, false, 'AMQPLAIN', null, 'en_US', 10, 10);
    //创建信道
    $channel = $this->connection->channel();
    //声明死信队列和交换机
    $delayQueue = "delay_queue";
    $delayExchange = "delay_exchange";

    $args = new AMQPTable();
    $args->set('x-delayed-type', 'direct');
    $channel->exchange_declare($delayExchange, 'x-delayed-message',false,
        true,false, false,false, $args);
    $channel->queue_declare($delayQueue, false, true, false, false);

    /**
     * @param \PhpAmqpLib\Message\AMQPMessage $message
     */
    $process_message = function ($message) {

        echo "消费时间:" . date("Y-m-d H:i:s") . "消息体为:" . $message->body."\n";
        $message->ack();
        // Send a message with the string "quit" to cancel the consumer.
        if ($message->body === 'quit') {
            $message->getChannel()->basic_cancel($message->getConsumerTag());
        }
    };

//        $channel->basic_qos(null,1,null);
    $channel->basic_consume($delayQueue, 'consumer', false, false, false, false, $process_message);

    while ($channel->is_consuming()) {
        $channel->wait();
    }

    $channel->close();
    $this->connection->close();
}

此时去管理后台查看

结果输出:

producer:

第0条消息,消息发送时间:2021-09-22 10:40:37_将在5秒后消费
第1条消息,消息发送时间:2021-09-22 10:40:37_将在5秒后消费
第2条消息,消息发送时间:2021-09-22 10:40:37_将在5秒后消费
第3条消息,消息发送时间:2021-09-22 10:40:37_将在3秒后消费
第4条消息,消息发送时间:2021-09-22 10:40:37_将在3秒后消费
 [x] 所有消息都已经发送完毕 :2021-09-22 10:40:37

consumer:

消费时间:2021-09-22 10:40:40消息体为:消息发送时间:2021-09-22 10:40:37_将在3秒后消费
消费时间:2021-09-22 10:40:40消息体为:消息发送时间:2021-09-22 10:40:37_将在3秒后消费
消费时间:2021-09-22 10:40:42消息体为:消息发送时间:2021-09-22 10:40:37_将在5秒后消费
消费时间:2021-09-22 10:40:42消息体为:消息发送时间:2021-09-22 10:40:37_将在5秒后消费
消费时间:2021-09-22 10:40:42消息体为:消息发送时间:2021-09-22 10:40:37_将在5秒后消费

实现原理


第一种使用 DLX + TTL 的模式,消息首先会路由到一个正常的队列,根据设置的 TTL 进入死信队列,与之不同的是通过 x-delayed-message 声明的交换机,它的消息在发布之后不会立即进入队列,先将消息保存至 Mnesia(一个分布式数据库管理系统,适合于电信和其它需要持续运行和具备软实时特性的 Erlang 应用。目前资料介绍的不是很多)

这个插件将会尝试确认消息是否过期,首先要确保消息的延迟范围是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被设置的范围为 (2^32)-1 毫秒),如果消息过期通过 x-delayed-type 类型标记的交换机投递至目标队列,整个消息的投递过程也就完成了。

总结:死信队列(DLX+TTL)是借助队列的形式来达到延时消息的目的,可以设置过期时间到消息或者队列,但存在时序问题,需要手动解决。
插件版是通过交换机来实现延时队列的,先保存至Mnesia,等到时间,再抛出给消息队列,这不存在时序问题,但有rabbitmq版本限制,需要 RabbitMQ 3.5.3 和更高版本。

参考链接:https://zhuanlan.zhihu.com/p/121083535

  1. 延时队列怎么实现的
  2. 消息持久化的流程是什么
  3. 消息的有序性与否

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注