RabbitMQ 6种模式 php代码实现

  • 1.simple简单模式
  • 2.work-queues工作队列模式
  • 3.pub/sub 发布订阅模式
  • 4.Routing模式
  • 5.Topics模式
  • 6.Rpc模式

第一种 simple简单模式

特点:不声明交换机,队列进 队列出

生产者代码

$this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass,
    $this->vhost,false,'AMQPLAIN',null,'en_US',10,10);
    //创建信道
    $channel = $this->connection->channel();

    $simpleQueue = "simple";
    $channel->queue_declare($simpleQueue, false, true, false, false);
    //生产消息
    for($i=0;$i<5;$i++) {
        $body = '消息发送时间:'.date('Y-m-d H:i:s');
        echo "第{$i}条消息,{$body}\n";
        $message = new AMQPMessage($body);
        $channel->basic_publish($message,'','simple');//这里routingkey 的值为队列名字,第二个参数为''
    }
    echo " [x] 所有消息都已经发送完毕 :".date('Y-m-d H:i:s')."\n";
    $channel->close();
    $this->connection->close();

    #消费者代码

    $queue = 'simple';
    $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost);
    //创建信道
    $channel = $this->connection->channel();
    $channel->queue_declare($queue, false, true, false, false);
    $process_message = function($message)
    {
        echo "\n--------\n";
        echo "消费时间:".date("Y-m-d H:i:s")."消息体为:".$message->body;
        $message->ack();

        // Send a message with the string "quit" to cancel the consumer.
        if ($message->body === 'quit') {
            $message->getChannel()->basic_cancel($message->getConsumerTag());
        }
    };

//   $channel->basic_qos(null,1,null);
    $channel->basic_consume($queue, '', false, false, false, false, $process_message);
    while ($channel->is_consuming()) {
            $channel->wait();
        }

        $channel->close();
        $this->connection->close();
    }

第二种 work-queues

特点:有多个消费队列,2种消费方式:
1 轮询(默认)
2 公平分发

RabbitMQ 它不看消费者的未确认消息的数量,它仅仅不加思考地分发每一个 n-th消息到 n-th 消费者。

为了改变这种分发策略! 可以使用 basic_qos 设置 prefetch_count=1;

含义是:在消费者已经处理并确认了之前的一个消息之前,不要分发一个新消息给它。

producer.php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;

$exchange = 'fanout_example_exchange';

$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();

/*
    name: $exchange
    type: fanout
    passive: false // don't check is an exchange with the same name exists
    durable: false // the exchange won't survive server restarts
    auto_delete: true //the exchange will be deleted once the channel is closed.
*/

$channel->exchange_declare($exchange, AMQPExchangeType::FANOUT, false, false, true);

$messageBody = implode(' ', array_slice($argv, 1));
$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain'));
$channel->basic_publish($message, $exchange);

$channel->close();
$connection->close();

consumer_fanout1.php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;

$exchange = 'fanout_example_exchange';
$queue = 'fanout_group_1';
$consumerTag = 'consumer' . getmypid();

$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();

/*
    name: $queue    // should be unique in fanout exchange.
    passive: false  // don't check if a queue with the same name exists
    durable: false // the queue will not survive server restarts
    exclusive: false // the queue might be accessed by other channels
    auto_delete: true //the queue will be deleted once the channel is closed.
*/
$channel->queue_declare($queue, false, false, false, true);

/*
    name: $exchange
    type: direct
    passive: false // don't check if a exchange with the same name exists
    durable: false // the exchange will not survive server restarts
    auto_delete: true //the exchange will be deleted once the channel is closed.
*/

$channel->exchange_declare($exchange, AMQPExchangeType::FANOUT, false, false, true);

$channel->queue_bind($queue, $exchange);

/**
 * @param \PhpAmqpLib\Message\AMQPMessage $message
 */
function process_message($message)
{
    echo "\n--------\n";
    echo $message->body;
    echo "\n--------\n";

    $message->ack();

    // Send a message with the string "quit" to cancel the consumer.
    if ($message->body === 'quit') {
        $message->getChannel()->basic_cancel($message->getConsumerTag());
    }
}

/*
    queue: Queue from where to get the messages
    consumer_tag: Consumer identifier
    no_local: Don't receive messages published by this consumer.
    no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details.
    exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
    nowait: don't wait for a server response. In case of error the server will raise a channel
            exception
    callback: A PHP Callback
*/
$channel->basic_qos(null,1,null);# 加上这句就是公平分发
$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');

/**
 * @param \PhpAmqpLib\Channel\AMQPChannel $channel
 * @param \PhpAmqpLib\Connection\AbstractConnection $connection
 */
function shutdown($channel, $connection)
{
    $channel->close();
    $connection->close();
}

register_shutdown_function('shutdown', $channel, $connection);

// Loop as long as the channel has callbacks registered
while ($channel->is_consuming()) {
    $channel->wait();
}

consumer_fanout2.php


consumer_fanout1和fanout2 要同时开启

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;

$exchange = 'fanout_example_exchange';
$queue = 'fanout_group_2'; // Let RabbitMQ create a queue name
$consumerTag = 'consumer' . getmypid();

$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();

/*
    name: $queue    // should be unique in fanout exchange.
    passive: false  // don't check if a queue with the same name exists
    durable: false // the queue will not survive server restarts
    exclusive: false // the queue might be accessed by other channels
    auto_delete: true //the queue will be deleted once the channel is closed.
*/
$channel->queue_declare($queue, false, false, false, true);

/*
    name: $exchange
    type: direct
    passive: false // don't check if a exchange with the same name exists
    durable: false // the exchange will not survive server restarts
    auto_delete: true //the exchange will be deleted once the channel is closed.
*/

$channel->exchange_declare($exchange, AMQPExchangeType::FANOUT, false, false, true);

$channel->queue_bind($queue, $exchange);

/**
 * @param \PhpAmqpLib\Message\AMQPMessage $message
 */
function process_message($message)
{
    echo "\n--------\n";
    echo $message->body;
    echo "\n--------\n";

    $message->ack();

    // Send a message with the string "quit" to cancel the consumer.
    if ($message->body === 'quit') {
        $message->getChannel()->basic_cancel($message->getConsumerTag());
    }
}

/*
    queue: Queue from where to get the messages
    consumer_tag: Consumer identifier
    no_local: Don't receive messages published by this consumer.
    no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details.
    exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
    nowait: don't wait for a server response. In case of error the server will raise a channel
            exception
    callback: A PHP Callback
*/

$channel->basic_qos(null,1,null);# 加上这句就是公平分发
$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');

/**
 * @param \PhpAmqpLib\Channel\AMQPChannel $channel
 * @param \PhpAmqpLib\Connection\AbstractConnection $connection
 */
function shutdown($channel, $connection)
{
    $channel->close();
    $connection->close();
}

register_shutdown_function('shutdown', $channel, $connection);

// Loop as long as the channel has callbacks registered
while ($channel->is_consuming()) {
    $channel->wait();
}
  • pub/sub 发布订阅模式
  • 特点:引入了交换机,可以把一个消息分给多个消费者。可以用来做广播,或者聊天室。==rabbitmq 所有的消息都不是直接到队列的,都是先经过交换机==。前面的代码没有交换机声明操作,使用一个默认的 exchange, 它被定义为空字符串“”。
生产者代码.php

 $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass,
            $this->vhost,false);
        //创建信道
        $channel = $this->connection->channel();
        $exchange = 'pubSubExchange';
        $channel->exchange_declare($exchange,AMQPExchangeType::FANOUT,false,false,false);

        //生产消息
        for($i=0;$i<9;$i++) {
            $body ="第".$i."条消息";
            echo $body."\n";
            $message = new AMQPMessage($body);
            $channel->basic_publish($message,$exchange);
        }
        echo " [x] 所有消息都已经发送完毕 :".date('Y-m-d H:i:s')."\n";
        $channel->close();
        $this->connection->close();
注意:没有声明队列

消费者代码.php

    $exchange = 'pubSubExchange';
    $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost);
    //创建信道
    $channel = $this->connection->channel();
    list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);# 这里是关键,队列的名字由rabbitmq自动生成。每次生成的都不一样

    $channel->queue_bind($queue_name, $exchange);
    echo '[x] queue\'s name'.$queue_name;
    /**
     * @param \PhpAmqpLib\Message\AMQPMessage $message
     */
    $callback = function ($msg) {
        echo ' [x] ', $msg->body, "\n";
    };

    //        $channel->basic_qos(null,1,null);
    $channel->basic_consume($queue_name, '', false, true, false, false, $callback);

        while ($channel->is_consuming()) {
            $channel->wait();
        }

        $channel->close();
        $this->connection->close();

启动一个生产者,2个消费者,可以看到 2个消费者都同时收到了所有的消息。
  • Routing模式
    特点:可以根据路由键的不同,自动处理消息分发到哪个队列。bind了响应的路由键的队列可以消费到信息。fanout是一股脑的全抛出去,而direct不一样是按路由来得。

    
    生产者代码.php
    
    $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass,
            $this->vhost,false);
    //创建信道
    $channel = $this->connection->channel();
    
    $exchange = 'routingExchange';
    $channel->exchange_declare($exchange,AMQPExchangeType::DIRECT,false,false,false); # 注意这里的type是direct
    
    $prefix = "black";
        //生产消息
    for($i=0;$i<9;$i++) {
            $body ="第".$i."条消息";
            $i <= 5 && $prefix = "green";
            echo $body."\n";
            $message = new AMQPMessage($body);
            $channel->basic_publish($message,$exchange,$prefix.'.msg');
            #第3个参数是路由键,只有bind的相应路由键的队列才能收到消息。前6条给green队列,后面几条给black队列
    }
    
    echo " [x] 所有消息都已经发送完毕 :".date('Y-m-d H:i:s')."\n";
    $channel->close();
    $this->connection->close();

消费者代码.php

$this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost);
//创建信道
$channel = $this->connection->channel();
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$exchange = 'routingExchange';
$channel->queue_bind($queue_name, $exchange,'green'); # 注意这里的第3个参数,设置了green才能收到消息。设置为black就只能收到routingKey为black的消息了

echo '[x] queue\'s name'.$queue_name;
/**

  • @param \PhpAmqpLib\Message\AMQPMessage $message
    */
    $callback = function ($msg) {
    echo ' [x] ', $msg->body, "\n";
    };

// $channel->basic_qos(null,1,null);
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
$channel->wait();
}

    $channel->close();
    $this->connection->close();
}

这个就像路由一样,什么路由的请求就会路由到哪个队列。


5.Topics模式 通配符模式
 特点:这个就像正则表达式一样,* 代表多个字符 #代表一个字符 路由键最长255个字符
  ```php
 生产者代码.php

 $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass,
        $this->vhost,false);
    //创建信道
    $channel = $this->connection->channel();

    $exchange = 'topicExchange';
    $channel->exchange_declare($exchange,AMQPExchangeType::TOPIC,false,false,false);

    $prefix = "black";
    //生产消息
    for($i=0;$i<9;$i++) {
        $body ="第".$i."条消息";
        $i > 5 && $prefix = "green.colour";
        echo "routingkey_".$prefix.'.msg'."\n";
        echo $body."\n";
        $message = new AMQPMessage($body);
        $channel->basic_publish($message,$exchange,$prefix.'.msg');
    }
    echo " [x] 所有消息都已经发送完毕 :".date('Y-m-d H:i:s')."\n";
    $channel->close();
    $this->connection->close();

    消费者1.php
        $exchange = 'topicExchange';
        $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost);
        //创建信道
        $channel = $this->connection->channel();
        list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

        $channel->queue_bind($queue_name, $exchange,'*.msg'); # 这里是*代表多个字符,所以它可以收到所有的消息

    消费者2.php
     跟消费者1唯一不同的点
       $channel->queue_bind($queue_name, $exchange,'#.msg'); #从结果上看它只能收到前6条数据。

6.Rpc模式
如果我想在远端的计算机上运行一个函数并等待函数的返回结果。这个模式通常叫RPC模式。md有点困了,草(一种植物),看片不困,一写博客就困。2021年09月30日00:12:31

服务端代码
  $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost);
        //创建信道
        $channel = $this->connection->channel();
        //创建队列
        $channel->queue_declare('rpc_queue', false, false, false, false);

        echo " [x] Awaiting RPC requests\n";

        //创建回调函数,意图把消息回传给原队列
        $callback = function($req) {
            $n = intval($req->body);
            echo '[.]正在计算 fib(', $n, ")\n";
            echo "消息体{$n}".PHP_EOL;
            $msg = new AMQPMessage(
                (string) $this->fib($n),#计算出来的值当做消息体返回去
                array('correlation_id' => $req->get('correlation_id'))
            );

            echo "原来的correlation_id".$req->get('correlation_id').PHP_EOL;

            #var_dump($req->get('reply_to')); 这里打印出来就是原来的队列
            $req->delivery_info['channel']->basic_publish(
                $msg,
                '',
                $req->get('reply_to')#原队列
            );

            $req->ack();#确认收到消息
        };

        #一条一条处理消息
        $channel->basic_qos(null, 1, null);
        #消费rpc_queue
        $channel->basic_consume('rpc_queue',
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        while ($channel->is_open()) {
            $channel->wait();
        }
        $channel->close();
        $this->connection->close();

   /**
     * @描述: 计算fibonacci
     * @param $n
     * @return int
     * @Author: jack lee
     * @Date  : 2021/9/30   8:01 下午
     */
    protected function fib($n)
    {
        if ($n == 0) {
            return 0;
        }
        if ($n == 1) {
            return 1;
        }
        return $this->fib($n - 1) + $this->fib($n - 2);
    }

#客户端代码

$this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost, false);
        //创建信道
        $channel = $this->connection->channel();
        #创建一个随机队列
        list($queue_name, ,) = $channel->queue_declare('', false, false, false, false);

        $this->response = null;
        $this->corr_id = uniqid();

        echo "发送的correlation_id". $this->corr_id.PHP_EOL;
        $callback = function ($req) {
            if ($req->get('correlation_id') == $this->corr_id) {
                echo "收到的消息体".$req->body.PHP_EOL;
                $this->response = $req->body;
            }
        };
       #这一步是因为server端会把消息传到这个队列里来,所以进行消费
        $channel->basic_consume($queue_name,
            '',
            false,
            true,
            false,
            false,
            $callback
        );
        #生产一条消息,求30的斐波那契数列的值
        $msg = new AMQPMessage(
            "30",
            array(
                'correlation_id' => $this->corr_id,
                'reply_to' => $queue_name
            )
        );
        #消息发到rpc_queue路由键
        $channel->basic_publish($msg, '', 'rpc_queue');

        while (!$this->response) {
            $channel->wait();
        }

        echo '[x] Requesting fib(30)'.PHP_EOL;
        $channel->close();
        $this->connection->close();

        return $this->response;

操作方法:生产者一个terminal,多个消费者的多个terminal操作。

总结:写了rabbitmq 6种官网的php代码实现,增加了对rabbitmq的认知。

感想:每个代码都是手敲的,写了好几天,中间断断停停很开心。一开始写了一部分,自己甚至觉得没有必要写这篇文章。还因为大半夜的感冒了。明天就10.1了,希望自己这次回家,不要跟爸妈吵架。


以下无正文

参考链接:https://www.rabbitmq.com/tutorials/tutorial-six-php.html

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注