⼀个⽣产者,两个消费者 第⼀个消费者消费⼀条休眠0.02秒 第⼆个休眠1秒/**
* ⼯作队列
* @throws \\Exception * */
public function tastQueue(){
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin'); $channel = $connection->channel();
$queue_name = \"task_queue\";
//$durable false 消息持久化 true 反之
$channel->queue_declare($queue_name, false, false, false, false);
// $data = implode(' ', array_slice($argv, 1)); for($i=0;$i<50;$i++){
$data = \"Hello World!\".$i; $msg = new AMQPMessage( $data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) );
$channel->basic_publish($msg, '', 'task_queue'); }
echo ' [x] Sent ', $data, \"\\n\"; $channel->close(); $connection->close(); }
/**
* 消费⼯作模式 * */
public function receiveTask(){ $queue='task_queue';
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin'); $channel = $connection->channel();
$channel->queue_declare($queue, false, false, false, false); //能者多劳模式 //同⼀时刻服务器只会发⼀条消息 $channel->basic_qos(null,1,null);
//监听队列 ture表⾃动 false表⼿动返回完成状态 $no_ack=false;
$channel->basic_consume($queue, '', false, $no_ack, false, false, [$this,'callbackTask']); while ($channel->is_consuming()) { $channel->wait(); } }
/**
* 消费⼯作模式 * */
public function receiveTaskTwo(){ $queue='task_queue';
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin'); $channel = $connection->channel();
$channel->queue_declare($queue, false, false, false, false); //能者多劳模式 //同⼀时刻服务器只会发⼀条消息 $channel->basic_qos(null,1,null);
//监听队列 ture表⾃动 false表⼿动返回完成状态 $no_ack=false;
$channel->basic_consume($queue, '', false, $no_ack, false, false, [$this,'callbackTaskTwo']); while ($channel->is_consuming()) { $channel->wait(); } } /**
* 回调⼯作模式 * @param $msg * */
public function callbackTask($msg){ //⼿动完成状态
logs(\"回调⼯作模式1:\". $msg->body,'pushMessage');
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); sleep(0.02); } /**
* 回调⼯作模式 * @param $msg * */
public function callbackTaskTwo($msg){ //⼿动完成状态
logs(\"回调⼯作模式2:\". $msg->body,'pushMessage');
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); sleep(1); }
执⾏结果 以上采⽤的是能者多劳模式
/****************************************⼴播模式start*************************************************************************//**
* ⽣产⼴播模式
* @throws \\Exception
* User: cwh DateTime:2021/8/20 1:08 */
public function SendFanout(){
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin'); $channel = $connection->channel();
$channel->exchange_declare('logs', 'fanout', false, false, false); if (empty($data)) {
$data = \"info: Hello World!\"; }
$msg = new AMQPMessage($data); $channel->basic_publish($msg, 'logs'); echo ' [x] Sent ', $data, \"\\n\"; $channel->close(); $connection->close();}
/**
* 消费⼴播模式
* @throws \\ErrorException
* User: cwh DateTime:2021/8/20 1:09 */
public function receiveFanout(){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel();
$channel->exchange_declare('logs', 'fanout', false, false, false); $queue_name = \"fanout_queue\";
$channel->queue_declare($queue_name, false, false, true, false); $channel->queue_bind($queue_name, 'logs'); echo \" [*] Waiting for logs. To exit press CTRL+C\\n\";
$channel->basic_consume($queue_name, '', false, true, false, false, [$this,'callbackFanout']); while ($channel->is_open()) { $channel->wait(); }
$channel->close(); $connection->close();}
/**
* 消费⼴播模式
* @throws \\ErrorException
* User: cwh DateTime:2021/8/20 1:09 */
public function receiveFanoutTwo(){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel();
$channel->exchange_declare('logs', 'fanout', false, false, false); $queue_name = \"fanout_queue_two\";
$channel->queue_declare($queue_name, false, false, true, false); $channel->queue_bind($queue_name, 'logs'); echo \" [*] Waiting for logs. To exit press CTRL+C\\n\";
$channel->basic_consume($queue_name, '', false, true, false, false, [$this,'callbackFanoutTwo']); while ($channel->is_open()) { $channel->wait(); }
$channel->close(); $connection->close();}
/** * 回调
* @param $msg
* User: cwh DateTime:2021/8/20 1:00 */
public function callbackFanout($msg){
logs(\"回调⼴播模式:\".$msg->body,'pushMessage');}/** * 回调
* @param $msg
* User: cwh DateTime:2021/8/20 1:00 */
public function callbackFanoutTwo($msg){
logs(\"回调⼴播模式2:\".$msg->body,'pushMessage');}
/****************************************⼴播模式end*************************************************************************/
因篇幅问题不能全部显示,请点此查看更多更全内容