限时 5折! 详情

hyperf 如何对AMQP消息进行手动消费?

博主推荐:yii2实战式教程是一套简单易学、幽默风趣、干货多多、众人大赞的高质量教程,囊括 yii2基础入门、yii2高级进阶的很多知识点,可以说是迄今最好的yii2系列教程,真的不容错过哦~

在使用 hyperf 官方自带的 AMQP 队列时你会发现,不需要我们再额外启动进程对消息进行消费。这是因为默认情况下,使用 @Consumer 注解时,hyperf 会为我们自动创建子进程启动消费者,并且会在子进程异常退出后,重新拉起。

来看一个简单的例子。

1、创建producer

php bin/hyperf.php gen:amqp-producer DemoProducer

2、投递消息

namespace App\Controller;

use App\Amqp\Producer\DemoProducer;
use Hyperf\Amqp\Producer;
use Hyperf\Utils\ApplicationContext;

class IndexController extends AbstractController
{
    public function index()
    {
        $user = $this->request->input('user', 'Hyperf');

        $data = [
            'message' => "Hello {$user}.",
        ];
        $message = new DemoProducer($data);
        $producer = ApplicationContext::getContainer()->get(Producer::class);
        $producer->produce($message);

        return 'ok';
    }
}

3、创建消费者

php bin/hyperf.php gen:amqp-consumer DemoConsumer

4、测试

启动项目,浏览器访问 http://127.0.0.1:9501/ ,我们可以在控制台看到打印的消息输出。

Array
(
    [method] => GET
    [message] => Hello Hyperf.
)
[DEBUG] 1 acked.

这个是 hyperf 自启动进程对消息进行消费。

现在我们想把消费者进程从项目中剥离出来,用其他机器单独进行消费。我们可以参考 hyperf 自启动的流程+command 实现。

下面先看看 hyperf 源码是怎么实现自启动消费者进程的。

首先我们注意到 Hyperf\Amqp\Listener\BeforeMainServerStartListener 这个监听器,在 BeforeMainServerStart 或者 MainCoroutineServerStart 事件被触发时,Hyperf\Amqp\ConsumerManager::run 方法被执行。

public function process(object $event)
{
    // Init the consumer process.
    $consumerManager = $this->container->get(ConsumerManager::class);
    $consumerManager->run();
}

Hyperf\Amqp\ConsumerManager::run 方法如下:

3e6d9374d4-hyperf-consumer.png

大致步骤都在图上进行了标注,可以看到整个过程都比较简单。

下面我们参考这个过程,根据 command 来手动创建消费进程。

1、原 consumer 先禁用

【App\Amqp\Consumer\DemoConsumer】enable 设置 false

public function isEnable(): bool
{
    return false;
}

2、手动创建一个新的 consumer 用于测试

<?php

declare(strict_types=1);

namespace App\Amqp\Consumer;

use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use PhpAmqpLib\Message\AMQPMessage;

class DemoConsumer2 extends ConsumerMessage
{
    public $exchange = 'hyperf';
    public $routingKey = 'hyperf';
    public $queue = 'hyperf';

    public function consumeMessage($data, AMQPMessage $message): string
    {
        print_r($data);

        return Result::ACK;
    }

    public function isEnable(): bool
    {
        return false;
    }
}

在这个 consumer 中,我们手动指定了 exchange、routingKey 和 queue,同时禁止自启动(enable=false)。

3、构建 command

php bin/hyperf.php gen:command DemoConsumerCommand

【App\Command\DemoConsumerCommand】代码如下:
<?php

declare(strict_types=1);

namespace App\Command;

use App\Amqp\Consumer\DemoConsumer;
use App\Amqp\Consumer\DemoConsumer2;
use Hyperf\Amqp\Consumer;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Command\Annotation\Command;
use Hyperf\Di\Annotation\AnnotationCollector;
use Psr\Container\ContainerInterface;
use Hyperf\Amqp\Annotation\Consumer as ConsumerAnnotation;

/**
 * @Command
 */
#[Command]
class DemoConsumerCommand extends HyperfCommand
{
    /**
     * @var ContainerInterface
     */
    protected $container;

    public function __construct(ContainerInterface $container)
    {
        $this->container = $container;

        parent::__construct('DemoConsumer:command');
    }

    public function configure()
    {
        parent::configure();
        $this->setDescription('手动启动消费进程测试');
    }

    public function handle()
    {
        $consumer = $this->container->get(Consumer::class);
        $consumer->consume(make(DemoConsumer2::class));

        $this->line('ok.', 'info');
    }
}

4、重新启动 hyperf 以及另起一个窗口启动 command

启动 hyperf : php bin/hyperf.php start 
启动 command: php bin/hyperf.php DemoConsumer:command

5、测试

浏览器访问 http://127.0.0.1:9501/ ,我们可以在启动 command 的窗口看到消息被成功输出。

e37dacfb9b-hyperf-consumer-command.png

作者 白狼
本文版权归作者,欢迎转载,但未经作者同意必须保留 此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。