目录
前言
composer安装扩展
遇到的问题
代码展示
1.api接口,进行消息发布
2.生产者类库
3.消费者类库
4.目录结构
5.PHP中 register_shutdown_function 函数的基础介绍与用法详解
windows上运行消费者类库
1.项目根目录下执行
2.执行后代码展示
3.修改下config/console.php
4.项目根目录下执行
5.效果展示
6.tp6.0自定义指令
昨天用树莓派搭建的rabbitmq今天就迫不及待的上手试试了!哈哈哈,(附上树莓派搭建步骤树莓派(Raspberry Pi)上安装RabbitMQ(一)_zk_jy520的博客-CSDN博客)赶紧熬夜把操作过程整理了一遍分享给大家,哪个小伙伴想试试的话,也可以按照这个这个步骤来,能帮到的话,记得点赞收藏哦,没有帮到,也可以点赞收藏,给我点鼓励呀!!!
1.composer安装 composer require php-amqplib/php-amqplib
//RabbitMQ使用 public function test4(){ $client = Producer::getInstance(); $res = $client->publishMsg('exchange_name',['queue_name'],'111111','',''); dump($res); }
发送成功
<?php namespace services\rabbitmq; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Exception\AMQPIOException; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; /**生产者类 * Class Producer * @package services\rabbitmq */ class Producer { private $host; private $port; private $user; private $pwd; private $vhost; private static $client; private static $instance; private function __construct($host,$port,$user,$pwd,$vhost) { if (empty($host) || empty($port) || empty($user) || empty($pwd) || empty($vhost)){ $info = config('config')['RabbitMQ']; $this->host = $info['address']; $this->port = $info['port']; $this->user = $info['user']; $this->pwd = $info['pwd']; $this->vhost = $info['vhost']; }else{ $this->host = $host; $this->port = $port; $this->user = $user; $this->pwd = $pwd; $this->vhost = $vhost; } self::$client = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pwd, $this->vhost); } public static function getInstance($host = '',$port = '',$user = '',$pwd = '',$vhost = '') { if (!(self::$instance instanceof self)) { self::$instance = new self($host,$port,$user,$pwd,$vhost); } return self::$instance; } public function publishMsg($exchange,$QueueArr,$msg,$message_id,$route_key = '',$expiration = 3600 * 90){ $channel = self::$client->channel(); /**交换机声明 * $exchange 交换机名称 * AMQPExchangeType::DIRECT 路由模式 * passive: false * durable: true 持久化 交换器将在服务器重启后继续存在 * auto_delete: false 一旦通道关闭,交换器将不会被删除 */ $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false, false, false, [], null); //绑定多个对列 foreach ($QueueArr as $key=>$value){ /**声明队列(设置队列的时间必须设置一次,如要修改需要删除这个队列)new AMQPTable(['x-message-ttl'=>10000]) * $value 队列名称 * passive false * 持久durable true 队列将在服务器重启后继续存在 * 互斥exclusive false 队列可以通过其他渠道访问 * auto_delete false 通道关闭后,队列不会被删除 */ $channel->queue_declare($value,false,true,false,false,false); //队列和交换机绑定 $channel->queue_bind($value, $exchange,$route_key); } //发送消息 $message = new AMQPMessage($msg,array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,'expiration'=>$expiration * 1000,'message_id'=>$message_id)); $channel->basic_publish($message,$exchange,$route_key); $channel->close(); self::$client->close(); return true; } }
<?php namespace services\rabbitmq; use PhpAmqpLib\Connection\AMQPStreamConnection; class Consumer { private $host; private $port; private $user; private $pwd; private $vhost; private $client; private $channel; public function __construct() { $info = config('config')['RabbitMQ']; $this->host = $info['address']; $this->port = $info['port']; $this->user = $info['user']; $this->pwd = $info['pwd']; $this->vhost = $info['vhost']; $this->client = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pwd, $this->vhost); $this->channel = $this->client->channel(); } public function start(){ /** * * queue: queue_name // 被消费的队列名称 * consumer_tag: consumer_tag // 消费者客户端身份标识,用于区分多个客户端 * no_local: false // 这个功能属于AMQP的标准,但是RabbitMQ并没有做实现 * no_ack: true // 收到消息后,是否不需要回复确认即被认为被消费 * exclusive: false // 是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下 * nowait: false // 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错 * callback: $callback // 回调逻辑处理函数 * */ $this->channel->basic_consume('queue_name','',false, false, false, false,[$this,'process_message']); register_shutdown_function([$this, 'shutdown'], $this->channel, $this->client); while (count($this->channel->callbacks)) { $this->channel->wait(); } } public function shutdown($channel, $connection){ $channel->close(); $connection->close(); save_log('close'); } public function process_message($message){ echo $message->body."\n"; //手动发送ack $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); // Send a message with the string "quit" to cancel the consumer. if ($message->body === 'quit') { $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']); } } }
考虑到大家会不理解这个函数哈,下面这链接详解,通俗易懂
参考链接:https://www.jb51.net/article/129213.htm
php think make:command Consumer
<?php declare (strict_types = 1); namespace app\command; use think\console\Command; use think\console\Input; use think\console\input\Argument; use think\console\input\Option; use think\console\Output; /**项目根目录下执行 php think make:command Consumer 会在command目录下生成Consumer.php文件 * Class Consumer * @package app\command */ class Consumer extends Command { protected function configure() { // 指令配置 $this->setName('consumer') ->setDescription('the consumer command'); } protected function execute(Input $input, Output $output) { $consumer = new \services\rabbitmq\Consumer(); $consumer->start(); // 指令输出 $output->writeln('consumer'); } }
添加下面代码
<?php // +---------------------------------------------------------------------- // | 控制台配置 // +---------------------------------------------------------------------- return [ // 指令定义 'commands' => [ 'consumer' => 'app\command\Consumer', ], ];
php think consumer
参考链接:自定义指令 · ThinkPHP6.0完全开发手册 · 看云