在本教程中,我们将详细介绍如何在Hyperf框架中使用RocketMQ消息队列。我们将使用Losingbattle/rocketmq-http这个Composer包来实现这一目标。

前提条件

  • 已经安装了Hyperf框架

  • 已经安装了Composer

  • 对RocketMQ有基础了解

目录

  1. 安装RocketMQ扩展包

  2. 发布配置文件

  3. RocketMQ基础配置

  4. 注意事项

  5. 生产者投递实例

  6. 消费者订阅实例


1. 安装RocketMQ扩展包

步骤1:打开终端并进入项目目录

cd your_hyperf_project_directory

步骤2:使用Composer引入RocketMQ扩展包

composer require Losingbattle/rocketmq-http

2. 发布配置文件

步骤1:在终端中执行以下命令

php bin/hyperf.php vendor:publish losingbattle/rocketmq-http

这将会把RocketMQ的配置文件发布到app\config\autoload目录下。


3. RocketMQ基础配置

步骤1:打开配置文件

使用文本编辑器打开app\config\autoload\rocketmq-http.php

步骤2:配置文件内容

配置文件的内容应该如下所示:

<?php

declare(strict_types=1);

return [
    'host' => env('ROCKETMQ_HTTP_HOST'),
    'access_key_id' => env('ROCKETMQ_HTTP_ACCESS_KEY_ID'),
    'access_key_secret' => env('ROCKET_MQ_HTTP_ACCESS_KEY_SECRET'),
    'instance_id' => env('ROCKET_MQ_HTTP_INSTANCE_ID'),
    'concurrent' => [
        'limit' => 15,
    ],
];

这里,hostaccess_key_idaccess_key_secretinstance_id都是通过环境变量来设置的。您需要在.env文件中设置这些变量。


4. 注意事项

  1. 在多个消费者同时消费同一个Topic时,GroupId不能重复。

  2. 使用新GroupId或者Topic前,请先前往阿里云消息队列RocketMQ版控制台提前创建。

  3. 创建消费者时,请将文件放在此路径:app/Queue/RocketMQ/Consumer/下,不存在则新建。


5. 生产者投递实例

控制器或者任意业务模块代码处,您可以使用以下代码来投递消息到RocketMQ。

// ...(省略其他代码)

use Losingbattle\RocketMqHttp\Producer;

public function index(Producer $producer)
{
    // ...(省略其他代码)

    $producer = $this->container->get(Producer::class);
    $paymentNotifyMessage = new PaymentNotifyMessage();
    foreach ($queue_data as $key => $val) {
        $paymentNotifyMessage->setMessageBody($key, $val);
    }
    $producer->produce($paymentNotifyMessage);

    // ...(省略其他代码)
}

6. 消费者订阅实例

步骤1:新建消费者文件

app\RocketMQ\Consumer下新建Consumer.php文件。

步骤2:消费者代码

消费者代码应该如下所示:

// ...(省略其他代码)

use Losingbattle\RocketMqHttp\Annotation\Consumer;
use Losingbattle\RocketMqHttp\Message\ConsumerMessage;
use Losingbattle\RocketMqHttp\Result;

/**
 * @Consumer(groupId="GID_tim_push", topic="driver_task_order_match", numOfMessages=16, waitSeconds=1, maxConsumption=0)
 */
class OrderMatchTaskPushConsumer extends ConsumerMessage
{
    // ...(省略其他代码)

    public function consumeMessage($data)
    {
        // ...(省略其他代码)

        return Result::ACK;
    }
}

恭喜,您现在已经成功地在Hyperf框架中集成了RocketMQ!这个教程应该为您提供了一个很好的起点,以进一步探索RocketMQ的高级功能。如需更多信息,强烈建议您查阅RocketMQ和Hyperf的官方文档。