php amqp 消息队列 RabbitMQ 交换器类型 直连 (三)
1、AMQP_EX_TYPE_DIRECT:直连型
<?php $connect = new AMQPConnection(); $connect->connect(); $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName(‘exchange‘); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); $queue = new AMQPQueue($channel); $queue->setName(‘logs‘); $queue->declare(); $queue->bind(‘exchange‘, ‘logs‘); while (true) { $queue->consume(‘callback‘); } $connection->close(); function callback($envelope, $queue) { var_dump($envelope->getBody()); $queue->nack($envelope->getDeliveryTag()); }
发送端send.php代码如下
<?php $connect = new AMQPConnection(); $connect->connect(); $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName(‘exchange‘); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); $exchange->publish(‘direct type test‘,‘logs‘); var_dump("Send Message OK"); $connect->disconnect();
运行结果如图所示
<?php $connect = new AMQPConnection(); $connect->connect(); $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName(‘exchange‘); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); $queue = new AMQPQueue($channel); $queue->setName(‘logs‘); @$queue->declare(); $queue->bind(‘exchange‘, ‘logs‘); while (true) { $queue->consume(‘callback‘); } $connection->close(); function callback($envelope, $queue) { var_dump($envelope->getBody()); $queue->nack($envelope->getDeliveryTag()); }
<?php $connect = new AMQPConnection(); $connect->connect(); $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName(‘exchange‘); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); for ($index = 1; $index < 5; $index++) { $exchange->publish($index,‘logs‘); var_dump("Send:$index"); } $exchange->delete(); $connect->disconnect();
运行结果如下
for ($index = 1; $index < 50; $index++) { $exchange->publish($index,‘logs‘); var_dump("Send:$index"); }
receive_two.php 加上 sleep(3)
function callback($envelope, $queue) { var_dump($envelope->getBody()); sleep(3); $queue->nack($envelope->getDeliveryTag()); }
我们运行程序结果如下
$channel = new AMQPChannel($connect);
$channel = new AMQPChannel($connect); $channel->setPrefetchCount(1);
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。