您的当前位置:首页正文

tp5.1rabbitmq实现工作模式队列和订阅模式

2023-12-29 来源:易榕旅网
tp5.1rabbitmq实现⼯作模式队列和订阅模式

⼀个⽣产者,两个消费者 第⼀个消费者消费⼀条休眠0.02秒 第⼆个休眠1秒/**

* ⼯作队列

* @throws \\Exception * */

public function tastQueue(){

$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin'); $channel = $connection->channel();

$queue_name = \"task_queue\";

//$durable false 消息持久化 true 反之

$channel->queue_declare($queue_name, false, false, false, false);

// $data = implode(' ', array_slice($argv, 1)); for($i=0;$i<50;$i++){

$data = \"Hello World!\".$i; $msg = new AMQPMessage( $data,

array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) );

$channel->basic_publish($msg, '', 'task_queue'); }

echo ' [x] Sent ', $data, \"\\n\"; $channel->close(); $connection->close(); }

/**

* 消费⼯作模式 * */

public function receiveTask(){ $queue='task_queue';

$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin'); $channel = $connection->channel();

$channel->queue_declare($queue, false, false, false, false); //能者多劳模式 //同⼀时刻服务器只会发⼀条消息 $channel->basic_qos(null,1,null);

//监听队列 ture表⾃动 false表⼿动返回完成状态 $no_ack=false;

$channel->basic_consume($queue, '', false, $no_ack, false, false, [$this,'callbackTask']); while ($channel->is_consuming()) { $channel->wait(); } }

/**

* 消费⼯作模式 * */

public function receiveTaskTwo(){ $queue='task_queue';

$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin'); $channel = $connection->channel();

$channel->queue_declare($queue, false, false, false, false); //能者多劳模式 //同⼀时刻服务器只会发⼀条消息 $channel->basic_qos(null,1,null);

//监听队列 ture表⾃动 false表⼿动返回完成状态 $no_ack=false;

$channel->basic_consume($queue, '', false, $no_ack, false, false, [$this,'callbackTaskTwo']); while ($channel->is_consuming()) { $channel->wait(); } } /**

* 回调⼯作模式 * @param $msg * */

public function callbackTask($msg){ //⼿动完成状态

logs(\"回调⼯作模式1:\". $msg->body,'pushMessage');

$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); sleep(0.02); } /**

* 回调⼯作模式 * @param $msg * */

public function callbackTaskTwo($msg){ //⼿动完成状态

logs(\"回调⼯作模式2:\". $msg->body,'pushMessage');

$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); sleep(1); }

执⾏结果 以上采⽤的是能者多劳模式

/****************************************⼴播模式start*************************************************************************//**

* ⽣产⼴播模式

* @throws \\Exception

* User: cwh DateTime:2021/8/20 1:08 */

public function SendFanout(){

$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin'); $channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false); if (empty($data)) {

$data = \"info: Hello World!\"; }

$msg = new AMQPMessage($data); $channel->basic_publish($msg, 'logs'); echo ' [x] Sent ', $data, \"\\n\"; $channel->close(); $connection->close();}

/**

* 消费⼴播模式

* @throws \\ErrorException

* User: cwh DateTime:2021/8/20 1:09 */

public function receiveFanout(){

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false); $queue_name = \"fanout_queue\";

$channel->queue_declare($queue_name, false, false, true, false); $channel->queue_bind($queue_name, 'logs'); echo \" [*] Waiting for logs. To exit press CTRL+C\\n\";

$channel->basic_consume($queue_name, '', false, true, false, false, [$this,'callbackFanout']); while ($channel->is_open()) { $channel->wait(); }

$channel->close(); $connection->close();}

/**

* 消费⼴播模式

* @throws \\ErrorException

* User: cwh DateTime:2021/8/20 1:09 */

public function receiveFanoutTwo(){

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false); $queue_name = \"fanout_queue_two\";

$channel->queue_declare($queue_name, false, false, true, false); $channel->queue_bind($queue_name, 'logs'); echo \" [*] Waiting for logs. To exit press CTRL+C\\n\";

$channel->basic_consume($queue_name, '', false, true, false, false, [$this,'callbackFanoutTwo']); while ($channel->is_open()) { $channel->wait(); }

$channel->close(); $connection->close();}

/** * 回调

* @param $msg

* User: cwh DateTime:2021/8/20 1:00 */

public function callbackFanout($msg){

logs(\"回调⼴播模式:\".$msg->body,'pushMessage');}/** * 回调

* @param $msg

* User: cwh DateTime:2021/8/20 1:00 */

public function callbackFanoutTwo($msg){

logs(\"回调⼴播模式2:\".$msg->body,'pushMessage');}

/****************************************⼴播模式end*************************************************************************/

因篇幅问题不能全部显示,请点此查看更多更全内容