如果RabbitMQ服务异常导致重启,将会导致消息丢失,怎么样让消息持久化呢?RabbitMQ持久化是怎么样的呢?带着问题来学习(baidu)一下
回顾持久化机制,在以往的持久化机制中,redis持久化,kafka消息持久化,mysql 事务持久化,都无一例外是落盘(磁盘)。那rabbitmq也不例外,就是把队列和消息都放在磁盘里面。
消息持久化
- 设置消息的头部
- $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
- delivery_mode 消息传输模式一共2种 1非持久化 2 持久化 这里设置成2 代表消息是持久化的。
- 消息是放在队列里面的,如果队列没有持久化,皮之不存 毛将焉附?
队列持久化
-
设置队列为持久化队列
name: $queue passive: false durable: true // the queue will survive server restarts exclusive: false // the queue can be accessed in other channels auto_delete: false //the queue won't be deleted once the channel is closed. $channel->queue_declare($queue, false, true, false, false);
-
第3个参数为true 代表它是一个持久化队列,服务重启之后队列依然坚挺
-
注意:队列持久化和消息持久化是各自持久化,二者互不影响。但要二者同时设置durable=true
-
消息什么时候刷到磁盘?
写入文件前会有一个Buffer,大小为1M,数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘)。 有个固定的刷盘时间:25ms,也就是不管Buffer满不满,每25ms,Buffer里的数据及未刷新到磁盘的文件内容必定会刷到磁盘。 每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘: 使用Erlang的receive x after 0实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作
交换机的持久化
如果不设置exchange的持久化对消息的可靠性来说没有什么影响,但是同样如果exchange不设置持久化,那么当broker服务重启之后,exchange将不复存在,那么既而发送方rabbitmq producer就无法正常发送消息。这里建议,同样设置exchange的持久化。exchange的持久化设置也特别简单,方法如下:
/*
name: $exchange
type: direct
passive: false
durable: true // the exchange will survive server restarts
auto_delete: false //the exchange won't be deleted once the channel is closed.
*/
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
- 当发布一条消息到交换机上时,Rabbit会先把消息写入持久化日志,然后才向生产者发送响应。
- 一旦从队列中消费了一条消息的话并且做了确认,RabbitMQ会在持久化日志中移除这条消息。
- 在消费消息前,如果RabbitMQ重启的话,服务器会自动重建交换机和队列,加载持久化日志中的消息到相应的队列或者交换机上,保证消息不会丢失。
所有的持久化做完了就万无一失了么?
- 然而并不是
- 从客户端角度来说,如果no_ack机制为true 消费者拿到了消息,但还没处理就gg了,消息会自动ack。这时这条数据就丢了;
- 解决方案: 那可通过把no_ack的参数设置为false来解决这个问题。
- 从服务器角度来说,生产者把消息投递过来之后,并不是立马就fsync刷到磁盘。就类似于mysql的事务机制一样,它会先写在一个buffer中,那么恰好在刷盘之前server down掉了,这些消息也会丢失。
- 解决方案: 镜像队列(mirrored-queue),这个有点相当于kafka的replica机制了
/*
queue: Queue from where to get the messages
consumer_tag: Consumer identifier
no_local: Don't receive messages published by this consumer.
no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details.
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
nowait:
callback: A PHP Callback
*/
$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
以下无正文
参考链接:https://zhuanlan.zhihu.com/p/410440404
https://blog.csdn.net/yongche_shi/article/details/51500534
https://blog.csdn.net/weixin_30391339/article/details/95013185
https://www.cnblogs.com/kancy/p/13401554.html
https://github.com/php-amqplib/php-amqplib/blob/master/demo/amqp_consumer.php
https://www.cnblogs.com/aspirant/p/9451239.html
https://www.rabbitmq.com/confirms.html