RabbitMQ 持久化

如果RabbitMQ服务异常导致重启,将会导致消息丢失,怎么样让消息持久化呢?RabbitMQ持久化是怎么样的呢?带着问题来学习(baidu)一下

回顾持久化机制,在以往的持久化机制中,redis持久化,kafka消息持久化,mysql 事务持久化,都无一例外是落盘(磁盘)。那rabbitmq也不例外,就是把队列和消息都放在磁盘里面。

消息持久化

  • 设置消息的头部
  • $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
  • delivery_mode 消息传输模式一共2种 1非持久化 2 持久化 这里设置成2 代表消息是持久化的。
  • 消息是放在队列里面的,如果队列没有持久化,皮之不存 毛将焉附?

队列持久化

  • 设置队列为持久化队列

    name: $queue
    passive: false
    durable: true // the queue will survive server restarts
    exclusive: false // the queue can be accessed in other channels
    auto_delete: false //the queue won't be deleted once the channel is closed.
    $channel->queue_declare($queue, false, true, false, false);
  • 第3个参数为true 代表它是一个持久化队列,服务重启之后队列依然坚挺

  • 注意:队列持久化和消息持久化是各自持久化,二者互不影响。但要二者同时设置durable=true

  • 消息什么时候刷到磁盘?

    写入文件前会有一个Buffer,大小为1M,数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘)。
    有个固定的刷盘时间:25ms,也就是不管Buffer满不满,每25ms,Buffer里的数据及未刷新到磁盘的文件内容必定会刷到磁盘。
    每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:
    使用Erlang的receive x after 0实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作

交换机的持久化

如果不设置exchange的持久化对消息的可靠性来说没有什么影响,但是同样如果exchange不设置持久化,那么当broker服务重启之后,exchange将不复存在,那么既而发送方rabbitmq producer就无法正常发送消息。这里建议,同样设置exchange的持久化。exchange的持久化设置也特别简单,方法如下:

/*
    name: $exchange
    type: direct
    passive: false
    durable: true // the exchange will survive server restarts
    auto_delete: false //the exchange won't be deleted once the channel is closed.
*/

$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
  • 当发布一条消息到交换机上时,Rabbit会先把消息写入持久化日志,然后才向生产者发送响应。
  • 一旦从队列中消费了一条消息的话并且做了确认,RabbitMQ会在持久化日志中移除这条消息。
  • 在消费消息前,如果RabbitMQ重启的话,服务器会自动重建交换机和队列,加载持久化日志中的消息到相应的队列或者交换机上,保证消息不会丢失。

所有的持久化做完了就万无一失了么?

  • 然而并不是
  • 从客户端角度来说,如果no_ack机制为true 消费者拿到了消息,但还没处理就gg了,消息会自动ack。这时这条数据就丢了;
  • 解决方案: 那可通过把no_ack的参数设置为false来解决这个问题。
  • 从服务器角度来说,生产者把消息投递过来之后,并不是立马就fsync刷到磁盘。就类似于mysql的事务机制一样,它会先写在一个buffer中,那么恰好在刷盘之前server down掉了,这些消息也会丢失。
  • 解决方案: 镜像队列(mirrored-queue),这个有点相当于kafka的replica机制了

/*
    queue: Queue from where to get the messages
    consumer_tag: Consumer identifier
    no_local: Don't receive messages published by this consumer.
    no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details.
    exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
    nowait:
    callback: A PHP Callback
*/

$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');

以下无正文

参考链接:https://zhuanlan.zhihu.com/p/410440404
https://blog.csdn.net/yongche_shi/article/details/51500534
https://blog.csdn.net/weixin_30391339/article/details/95013185
https://www.cnblogs.com/kancy/p/13401554.html
https://github.com/php-amqplib/php-amqplib/blob/master/demo/amqp_consumer.php
https://www.cnblogs.com/aspirant/p/9451239.html
https://www.rabbitmq.com/confirms.html

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注