本次操作是以docker为基础进行操作
1.在docker上pull rabbitmq
docker pull rabbitmq:management
docker run -d --hostname rabbit-host --restart always --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 rabbitmq:3.8.9-management-alpine
Thinkphp5.0引入RabbitMq
{ "require": { "php-amqplib/php-amqplib": ">=2.6.1" } }
在命令行执行
composer install
或者 直接运行包引入
composer require php-amqplib/php-amqplib
use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use think\Log; class Mq1 { protected $consumerTag = 'consumer'; // 消费者标签 protected $exchange = 'router'; // 交换机名 protected $queue = 'msgs';// 的队列名 protected $connection; //amqp 链接 protected $channel;//开启通道 public function __construct(){ // 连接rabbitMQ $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'user', '123456', '/'); // 开启一个信道 $channel = $connection->channel(); // 声明一个队列 // queue 队列名 // passive 检测队列是否存在 true 只检测不创建 false 创建 // durable 是否持久化队列 true 为持久化 // exclusive 私有队列 不允许其它用户访问 设置true 将会变成私有 // auto_delete 当所有消费客户端连接断开后,是否自动删除队列 $channel->queue_declare($this->queue, false, true, false, false); // exchange 交换机名称 // type 交换器类型 // passive 检测交换机是否存在 true 只检测不创建 false 创建 // durable 是否持久化队列 true 为持久化 // auto_delete 当所有绑定队列都不在使用时,是否自动删除交换器 true:删除false:不删除 $channel->exchange_declare($this->exchange, 'direct', false, true, false); // 绑定队列和交换机 $channel->queue_bind($this->queue, $this->exchange); $this->connection = $connection; $this->channel = $channel; } /** * 推入消息到队列中 */ public function pushMessage() { $data = [ "data" => '测试一下' ]; // 写入队列的消息 $messageBody = json_encode($data) ; try { // 消息内容 // delivery_mode 投递模式 delivery mode 设置为 2标记持久化 $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); // $message 消息内容 // $exchange 交换器名称 // routing_key 路由键 (routing key) 主题交换机会用到 $this->channel->basic_publish($message, $this->exchange,''); }catch (\Exception $exception){ echo $exception->getMessage(); } //关闭通道和MQ链接 $this-shutdown(); return "ok"; } /** * @Notes (备注) : 数据回调 * @Author (作者) : Seven * @Date (开发时间) : 2021/10/25 15:16 * @Interface (方法名称) : process_message * @param $message */ function process_message($message) { if ($message->body !== 'quit') { $obj = json_decode($message->body); if (!isset($obj->id)) { echo 'error data\n'; // 消费成功会在 日志里面写入一条数据 $this->recordErrorLog("error data1111:" . $message->body); } else { try { $this->recordErrorLog("data222:" . $message->body); } catch (\Think\Exception $e) { $this->recordErrorLog("data3333:" .$e->getMessage()); $this->recordErrorLog("data4444:" .$message->body); } catch (\PDOException $pe) { $this->recordErrorLog("data5555:" .$pe->getMessage()); $this->recordErrorLog("data6666:" .$message->body); } } } //ack 手动确认 确认后移除数据 $message->delivery_info['delivery_tag'] 值:1 $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); if ($message->body === 'quit') { //取消消费者对队列的订阅关系 $message->delivery_info['consumer_tag'] 值:consumer $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']); } } /** * 启动 * * @return \think\Response */ public function start() { // queue 队列名称 // consumer_tag 消费者标签 // no_ack 在设置了 no_ack=false 的情况下)只要 consumer 手动应答了 Basic.Ack ,就算其“成功”处理了 // no_ack=true (此时为自动应答) // exclusive 是否是私有队列 设置true 将会变成私有 // callback = null, 回调函数 $this->channel->basic_consume($this->queue, $this->consumerTag, false, false, false, false, array($this, 'process_message')); // 不管你的php代码执行是否成功,最后都会执行 shutdown方法,关闭信道和连接 // register_shutdown_function(array($this, 'shutdown'), $channel, $connection); while (count($this->channel->callbacks)) { $this->channel->wait(); } $this->recordErrorLog('starting'); } /** * @Notes (备注) : 关闭信道 * @Author (作者) : Seven * @Date (开发时间) : 2021/10/25 15:17 * @Interface (方法名称) : shutdown * @param $channel * @param $connection */ function shutdown() { $this->channel->close(); $this->connection->close(); $this->recordErrorLog('closed'); } /* * 将异常写入日志 */ private function recordErrorLog($msg) { Log::init([ 'type' => 'File', 'path' => LOG_PATH, 'level' => ['error'], ]); Log::record([ '错误消息' => $msg, ], 'error'); } }