类封装
定义一个名为 rabbitmq.php的类文件
<?php class RabbitMQ { private $connect;//连接对象 private $channel;// private $exchange;//交换机对象 private $exchange_name;//交换机名称 private $exchange_type;//交换机类型 private $queue;//对列对象 private $queue_name;//队列名称 private $queue_type;//对列类型 private $call_back_fnc;//回调函数 private $is_ack;//是否确认收到消息 public function __construct($params=[], $persistent_flag=false) { try { $con['host'] = $params['host'] ?? '127.0.0.1'; $con['vhost'] = $params['vhost'] ?? '/'; $con['port'] = $params['port'] ?? 5672; $con['login'] = $params['login'] ?? 'guest'; $con['password'] = $params['password'] ?? 'guest'; $this -> exchange_name = $params['exchange_name'] ?? ''; $this -> exchange_type = $params['exchange_type'] ?? AMQP_EX_TYPE_DIRECT; $this -> queue_name = $params['queue_name'] ?? ''; $this -> queue_type = $params['queue_type'] ?? AMQP_EX_TYPE_DIRECT; //连接初始化 $this -> connect = $this -> connectInit($con, $persistent_flag); if(!$con) { throw new Exception('Cannot connect to the broker!'); } $this -> channel = new AMQPChannel($this -> connect); } catch(Exception $ex) { $error_msg = "Exception error connect: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; writeLog('msg:'. $error_msg); return false; } } /** * 连接初始化 * @param $config array 配置信息 * @param persistent_flag bool 持久化 * @return object */ public function connectInit($config, $persistent_flag) { $connect = new AMQPConnection($config); if (!$connect->pconnect()) { return false; } if($persistent_flag) { $connect->isPersistent(1); } return $connect; } /** * 设置交换机 * @param $params['exchange_name'] string 交换机名称 * @param $params['exchange_type'] string 交换机类型 * @param $params['flag'] int 交换机标志 * @return object | bool */ public function setExchange($params) { $error_msg = ''; try{ if(!$this->channel){ throw new \AMQPQueueException("Error channel on method setExchange", 1); } $this->exchange = new \AMQPExchange($this->channel); if($params['exchange_name']){ $this->exchange_name = $params['exchange_name']; // 交换机名称 } if (empty($this->exchange_name)) return false; $this->exchange->setName($this->exchange_name); // 设置名称 if($params['exchange_type']){ $this->exchange_type = $params['exchange_type']; } $this->exchange->settype($this->exchange_type); // 设置交换机类型 if($params['flag']){ $this->exchange->setFlags($params['flag']); //交换机标志 } if($this -> exchange_type && $params['flag']){ $this->exchange->declareExchange(); // 创建 } } catch(AMQPQueueException $ex) { $error_msg = "AMQPQueueException error exchange: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } catch(Exception $ex) { $error_msg = "Exception error exchange: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } if($error_msg){ writeLog($error_msg); return false; } return $this; } /** * 设置队列 * @param $params['queue_name'] string 对列名称 * @param $params['flag'] int 对列标记 * @param $params['arguments'] array 参数配置 * @param $params['routing_key'] string 要绑定的模式或路由键 * @return object | bool */ public function setQueue($params){ $error_msg = ''; try{ $this->queue = new \AMQPQueue($this->channel); if (!empty($params['exchange_name'])) { $this -> exchange_name = $params['exchange_name']; } if(!empty($params['queue_name'])) { $this->queue_name = $params['queue_name']; // 队列名称 } $this->queue->setName($this->queue_name); if($params['flag']){ $this->queue->setFlags($params['flag']); // 队列标志。与消息持久化有关。 这篇文字不涉及这一块的说明 } if(is_array($params['arguments']) && !empty($params['arguments'])){ $this->queue->setArguments($params['arguments']); // 参数配置 } $this->queue->declareQueue(); // 创建一个队列 $routing_key = !empty($params['routing_key']) ? $params['routing_key'] : $this->queue_name; if($this -> exchange_name && $routing_key){ $this->queue->bind($this ->exchange_name, $routing_key); // 交换机和队列的绑定操作 } } catch(AMQPQueueException $ex) { $error_msg = "AMQPQueueException error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } catch(Exception $ex) { $error_msg = "Exception error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } if($error_msg){ writeLog($error_msg); return false; } return $this; } /** * 发布消息 * @param $params['message'] string 消息 * @param $params['routing_key'] string 要绑定的模式或路由键 * @param $params['flag'] int 参数配置 AMQP_NOPARAM | AMQP_IMMEDIATE * @param $params['attributes'] array * @return bool */ public function publishMessage($params=[]){ if(!$params['message']) return false; $routing_key = $params['routing_key'] ? $params['routing_key'] : $this->queue_name; $params['attributes'] = $params['attributes'] ?? []; // 发布消息,带有路由key。如果需要,则会用于关联。 $this->exchange->publish($params['message'], $routing_key, $params['flag'], $params['attributes']); return true; } /** * 消费 * @param $params['callback'] arguments 回调函数 * @param $params['qos'] int 要预取的消息数 * @param $params['is_ack'] bool 是否确认收到消息 */ public function consumeMessage($params){ $params['callback'] = $params['callback'] ?? null; $params['qos'] = $params['qos'] ?? 0; $params['is_ack'] = $params['is_ack'] ?? true; if($params['qos']){ $this->channel->qos(0, $params['qos']); } $error_msg = ''; try{ if(!$this->queue){ throw new \AMQPQueueException("Error queue on method consume", 1); } $this->call_back_fnc = $params['callback']; $this->is_ack = $params['is_ack']; $params['callback'] = function($envelope, $queue){ if(is_callable($this->call_back_fnc)){ call_user_func($this->call_back_fnc, $envelope->getBody()); if($this->is_ack){ $queue->ack($envelope->getDeliveryTag()); }else{ $queue->nack($envelope->getDeliveryTag()); } } }; $this->queue->consume($params['callback']); } catch(AMQPQueueException $ex) { $error_msg = "AMQPQueueException error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } catch(Exception $ex) { $error_msg = "Exception error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } if($error_msg){ writeLog($error_msg); } } } function writeLog($msg){ $logFile = './rabbitmq_w_' . date('N') . '.log'; $fp = fopen($logFile,"a"); $requestdir = dirname($logFile); if (!is_dir($requestdir)) { mkdir($requestdir, 0777, 1); } flock($fp, LOCK_EX); fwrite($fp,date("Y-m-d H:i:s",time()).": ".$msg."\n"); flock($fp, LOCK_UN); fclose($fp); }
生产消息
定义一个名称为sender.php的文件
<?php require_once './rabbitmq.php'; $config = [ 'host' => '127.0.0.1', //IP地址 'vhost' => '/', //主机 'port' => 5672, //端口号 'login' => 'guest', //用户名 'password' => 'guest', //密码 ]; $mq = new RabbitMQ($config); // 初始化(rmq连接操作); if ($mq) { $mq_route = 'push_data_to_routing'; // 路由 $mq_exchange = 'push_data_to_exchange'; // 交换机 $mq_queue = 'push_data_to_queue'; // 队列 // 建立连接,设置交换机,设置队列 $exchange_params = [ 'exchange_name' => $mq_exchange, 'exchange_type' => AMQP_EX_TYPE_DIRECT, 'flag' => AMQP_DURABLE, ]; $queue_params = [ 'queue_name' => $mq_queue, 'exchange_name' => $mq_exchange, 'routing_key' => $mq_route, 'flag' => AMQP_DURABLE, ]; $publish_params = [ 'routing_key' => $mq_route, ]; $data_list = ['iphone','honor','vivo','xiaomi','oppo']; $mq->setExchange($exchange_params)->setQueue($queue_params); foreach ($data_list as $item){ $publish_params['message'] = $item; $mq->publishMessage($publish_params); //推送 } }
接收消息
定义一个名称为receiver.php的文件
<?php require_once './rabbitmq.php'; $config = [ 'host' => '127.0.0.1', //IP地址 'vhost' => '/', //主机 'port' => 5672, //端口号 'login' => 'guest', //用户名 'password' => 'guest', //密码 ]; $mq = new RabbitMQ($config); $mq_route = 'push_data_to_routing'; $mq_exchange = 'push_data_to_exchange'; $mq_queue = 'push_data_to_queue'; $exchange_params = [ 'exchange_name' => $mq_exchange, 'exchange_type' => '', 'flag' => AMQP_PASSIVE, ]; $queue_params = [ 'queue_name' => $mq_queue, 'flag' => AMQP_PASSIVE, ]; $mq->setExchange($exchange_params)->setQueue($queue_params); $mq->consumeMessage([ 'callback'=>function($msg){ sleep(2); var_dump($msg); return true; } ]);
执行
win+R 调出cmd,切换至代码所在目录,一个窗口执行sender.php 一个窗口执行receiver.php
结果如下: