hyperf 如何对AMQP消息进行手动消费?
在使用 hyperf 官方自带的 AMQP 队列时你会发现,不需要我们再额外启动进程对消息进行消费。这是因为默认情况下,使用 @Consumer 注解时,hyperf 会为我们自动创建子进程启动消费者,并且会在子进程异常退出后,重新拉起。
来看一个简单的例子。
1、创建producer
php bin/hyperf.php gen:amqp-producer DemoProducer
2、投递消息
namespace App\Controller; use App\Amqp\Producer\DemoProducer; use Hyperf\Amqp\Producer; use Hyperf\Utils\ApplicationContext; class IndexController extends AbstractController { public function index() { $user = $this->request->input('user', 'Hyperf'); $data = [ 'message' => "Hello {$user}.", ]; $message = new DemoProducer($data); $producer = ApplicationContext::getContainer()->get(Producer::class); $producer->produce($message); return 'ok'; } }
3、创建消费者
php bin/hyperf.php gen:amqp-consumer DemoConsumer
4、测试
启动项目,浏览器访问 http://127.0.0.1:9501/ ,我们可以在控制台看到打印的消息输出。
Array ( [method] => GET [message] => Hello Hyperf. ) [DEBUG] 1 acked.
这个是 hyperf 自启动进程对消息进行消费。
现在我们想把消费者进程从项目中剥离出来,用其他机器单独进行消费。我们可以参考 hyperf 自启动的流程+command 实现。
下面先看看 hyperf 源码是怎么实现自启动消费者进程的。
首先我们注意到 Hyperf\Amqp\Listener\BeforeMainServerStartListener 这个监听器,在 BeforeMainServerStart 或者 MainCoroutineServerStart 事件被触发时,Hyperf\Amqp\ConsumerManager::run 方法被执行。
public function process(object $event) { // Init the consumer process. $consumerManager = $this->container->get(ConsumerManager::class); $consumerManager->run(); }
Hyperf\Amqp\ConsumerManager::run 方法如下:
大致步骤都在图上进行了标注,可以看到整个过程都比较简单。
下面我们参考这个过程,根据 command 来手动创建消费进程。
1、原 consumer 先禁用
【App\Amqp\Consumer\DemoConsumer】enable 设置 false public function isEnable(): bool { return false; }
2、手动创建一个新的 consumer 用于测试
<?php declare(strict_types=1); namespace App\Amqp\Consumer; use Hyperf\Amqp\Result; use Hyperf\Amqp\Annotation\Consumer; use Hyperf\Amqp\Message\ConsumerMessage; use PhpAmqpLib\Message\AMQPMessage; class DemoConsumer2 extends ConsumerMessage { public $exchange = 'hyperf'; public $routingKey = 'hyperf'; public $queue = 'hyperf'; public function consumeMessage($data, AMQPMessage $message): string { print_r($data); return Result::ACK; } public function isEnable(): bool { return false; } }
在这个 consumer 中,我们手动指定了 exchange、routingKey 和 queue,同时禁止自启动(enable=false)。
3、构建 command
php bin/hyperf.php gen:command DemoConsumerCommand 【App\Command\DemoConsumerCommand】代码如下: <?php declare(strict_types=1); namespace App\Command; use App\Amqp\Consumer\DemoConsumer; use App\Amqp\Consumer\DemoConsumer2; use Hyperf\Amqp\Consumer; use Hyperf\Command\Command as HyperfCommand; use Hyperf\Command\Annotation\Command; use Hyperf\Di\Annotation\AnnotationCollector; use Psr\Container\ContainerInterface; use Hyperf\Amqp\Annotation\Consumer as ConsumerAnnotation; /** * @Command */ #[Command] class DemoConsumerCommand extends HyperfCommand { /** * @var ContainerInterface */ protected $container; public function __construct(ContainerInterface $container) { $this->container = $container; parent::__construct('DemoConsumer:command'); } public function configure() { parent::configure(); $this->setDescription('手动启动消费进程测试'); } public function handle() { $consumer = $this->container->get(Consumer::class); $consumer->consume(make(DemoConsumer2::class)); $this->line('ok.', 'info'); } }
4、重新启动 hyperf 以及另起一个窗口启动 command
启动 hyperf : php bin/hyperf.php start 启动 command: php bin/hyperf.php DemoConsumer:command
5、测试
浏览器访问 http://127.0.0.1:9501/ ,我们可以在启动 command 的窗口看到消息被成功输出。