生产者
$connConfig = array( 'host' => $this->host, 'port' => $this->port, 'login' => $this->user, 'password' => $this->password, 'vhost'=>'/' //mq的虚拟机 类似mysql里面的数据库 ); //建立RabbitMQ连接 $conn = new \AMQPConnection($connConfig); if(!$conn->connect()){ echo '链接失败'; exit(); } echo "连接名 ".$this->host." 成功建立连接!\n"; //创建channel(信道或者叫通道) $channel = new \AMQPChannel($conn); echo "创建通道完毕!\n"; //创建一个交换机 消息是根据交换机发送的mq中的 $ex = new \AMQPExchange($channel); //声明一个路由键 $routingKey = $this->routing_key; //声明交换机名称 $exchangeName = $this->exchange_name; //设置交换机名称 $ex->setName($exchangeName); //设置交换机类型 一共四种类型 $ex->setType(AMQP_EX_TYPE_DIRECT); //设置交换机持久类型 1:不持久在磁盘 2:持久化在磁盘 $ex->setFlags(AMQP_DURABLE); //交换机设置完后需要一个代码 $ex->declareExchange(); echo "创建通道完毕!\n"; //推送消息 推送10条 for ($i=1;$i<=10;$i++){ echo '插入消息成功:'."\n"; $msg = [ 'data'=>'消息:'.$i, ]; $ex->publish(json_encode($msg),$routingKey,AMQP_NOPARAM,['delivery_mode'=>2]); }
在浏览器或者命令行执行该方法
消费者 目前有个小问题一直没有解决 如果有小伙伴知道解决方法可以联系我 大家一起进步 感谢~!
/** * @Notes (备注) : 消费者 * @Author (作者) : Seven * @Date (开发时间) : 2021/10/20 16:57 * @Interface (方法名称) : consumption */ public function consumption(){ $connConfig = array( 'host' => $this->host, 'port' => $this->port, 'login' => $this->user, 'password' => $this->password, 'vhost'=>'/' //mq的虚拟机 类似mysql里面的数据库 ); //建立RabbitMQ连接 $conn = new \AMQPConnection($connConfig); if(!$conn->connect()){ echo '链接失败'; exit(); } echo "连接名 ".$this->host." 成功建立连接!\n"; //创建channel(信道或者叫通道) $channel = new \AMQPChannel($conn); echo "创建通道完毕!\n"; //创建一个交换机 消息是根据交换机发送的mq中的 $ex = new \AMQPExchange($channel); //声明一个路由键 $routingKey = $this->routing_key; //声明交换机名称 $exchangeName = $this->exchange_name; //设置交换机名称 $ex->setName($exchangeName); //设置交换机类型 一共四种类型 $ex->setType(AMQP_EX_TYPE_DIRECT); //设置交换机持久类型 1:不持久在磁盘 2:持久化在磁盘 $ex->setFlags(AMQP_DURABLE); //交换机设置完后需要一个代码 $ex->declareExchange(); echo "创建通道完毕!\n"; //创建消息队列 参数是通道的返回的对象 $query = new \AMQPQueue($channel); //设置消息队列名称 $query->setName($this->queue_name); //设置队列的持久 $query->setFlags(AMQP_DURABLE); //队列设置完后需要一个代码 $query->declareQueue(); //把交换机绑定在路由键里面 参数1:交换机返回的name 参数2:路由键 $query->bind($ex->getName(),$routingKey); echo "创建消息队列完毕!\n"; //网上资料都是第一个方法,但是我在执行的时候会返回找不到该方法所以用的第二个方法,但是第二个返回我无法打印出返回的结果 //方法一:参数为回调方法 $query->consume('receive'); //方法二:队列 consume 监听的意思 参数为方法名 消费者代码 $query->consume(function($envelope,$queue){ sleep(1); //消息接收器 监听消息并进行处理 回调方法 echo $envelope->getBody()."\n"; $this->recordErrorLog(json_encode($envelope->getBody())); }); } //消息接收器 监听消息并进行处理 回调方法 public function receive($envelope){ echo $envelope->getBody()."\n"; } /* * 将异常写入日志 */ private function recordErrorLog($msg) { Log::init([ 'type' => 'File', 'path' => LOG_PATH, 'level' => ['error'], ]); Log::record([ '错误消息' => $msg, ], 'error'); }