官方文档里面有详细说明,安装和配置不再赘述,这里只是记录下实际中使用Redis异步队列的具体使用。
工作原理
ConsumerProcess 是异步消费进程,会根据用户创建的 Job 或者使用 @AsyncQueueMessage 的代码块,执行消费逻辑。 Job 和 @AsyncQueueMessage 都是需要投递和执行的任务,即数据、消费逻辑都会在任务中定义。
1)Job 类中成员变量即为待消费的数据,handle() 方法则为消费逻辑。
2)@AsyncQueueMessage 注解的方法,构造函数传入的数据即为待消费的数据,方法体则为消费逻辑。
1、配置异步消费进程
config/autoload/processes.php
1 <?php 2 3 return [ 4 5 Hyperf\AsyncQueue\Process\ConsumerProcess::class, 6 ];
2、定义Job
Job 类中成员变量即为待消费的数据,handle() 方法则为消费逻辑
app/Job/AnalyseJob.php
1 <?php 2 3 declare(strict_types=1); 4 5 namespace App\Job; 6 7 use App\Log; 8 use App\Service\V1\Order\OrderService; 9 use Hyperf\AsyncQueue\Job; 10 11 class AnalyseJob extends Job 12 { 13 public $params; 14 15 /** 16 * 任务执行失败后的重试次数,即最大执行次数为 $maxAttempts+1 次 17 * @var int 18 */ 19 protected $maxAttempts = 2; 20 21 public function __construct($params) 22 { 23 $this->params = $params; 24 } 25 26 public function handle() 27 { 28 // 这里的逻辑会在 ConsumerProcess 进程中执行 29 try { 30 var_dump(">>> 监听到队列消息(晒单数据):"); 31 var_dump($this->params); 32 // 分析晒单数据 33 $orderService = new OrderService(); 34 $res = $orderService->analyseOne($this->params); 35 Log::get('AnalyseJob')->debug('监听到队列消息(晒单数据),进行分析:', $res); 36 } catch (\Exception $e) { 37 Log::get('AnalyseJob')->debug('监听到队列消息(晒单数据),进行分析:' . $e->getMessage()); 38 } 39 } 40 }
3、写一个专门投递消息的service
QueueService.php
1 <?php 2 3 declare(strict_types=1); 4 5 namespace App\Service\Producer; 6 7 use App\Job\AnalyseJob; 8 use Hyperf\AsyncQueue\Driver\DriverFactory; 9 use Hyperf\AsyncQueue\Driver\DriverInterface; 10 11 class QueueService 12 { 13 /** 14 * @var DriverInterface 15 */ 16 protected $driver; 17 18 public function __construct(DriverFactory $driverFactory) 19 { 20 $this->driver = $driverFactory->get('default'); 21 } 22 23 /** 24 * 生产消息 25 * @param $params 26 * @param int $delay 27 * @return bool 28 */ 29 public function push($params, int $delay = 0): bool 30 { 31 return $this->driver->push(new AnalyseJob($params), $delay); 32 } 33 }
4、投递消息
业务中调用service,来投递消息
1 <?php 2 3 declare(strict_types=1); 4 5 namespace App\Controller; 6 7 use App\Service\QueueService; 8 use Hyperf\Di\Annotation\Inject; 9 use Hyperf\HttpServer\Annotation\AutoController; 10 11 /** 12 * @AutoController 13 */ 14 class UserOrderController extends AbstractController 15 { 16 /** 17 * @Inject 18 * @var QueueService 19 */ 20 protected $service; 21 22 /** 23 * 注解模式投递消息 24 */ 25 public function example() 26 { 27 // 创建/编辑成功后,推送队列消息,消费晒单消息用于比对原始订单 28 if ($ret['code'] == 0) { 29 $userOrderRes = $this->orderService->getUserOrderNativeInfo($ret['data']); 30 $this->queueService->push($userOrderRes['data']); 31 } 32 33 // ... 34 35 } 36 }
参考链接:
https://hyperf.wiki/2.1/#/zh-cn/async-queue