Kafka扩展包的安装与使用

1. 安装Kafka扩展包

使用Composer进行安装:

composer require hyperf/kafka

2. 发布配置

执行以下命令发布配置:

php bin/hyperf.php vendor:publish hyperf/kafka

3. Kafka配置文件

app\config\目录下,您会找到kafka.php配置文件,内容如下:

<?php

declare(strict_types=1);
use Hyperf\Kafka\Constants\KafkaStrategy;

return [
    'default' => [
        'connect_timeout'               => -1,
        'send_timeout'                  => -1,
        'recv_timeout'                  => -1,
        'client_id'                     => env('APP_NAME', ''),
        'max_write_attempts'            => 3,
        'bootstrap_servers'             => [
            'xxx:xxx',
            'xxx:xxx',
			'xxx:xxx',
        ],
        'acks'                          => -1,
        'producer_id'                   => -1,
        'producer_epoch'                => -1,
        'partition_leader_epoch'        => -1,
        'interval'                      => 0,
        'session_timeout'               => 60,
        'rebalance_timeout'             => 60,
        'replica_id'                    => -1,
        'rack_id'                       => '',
        'group_retry'                   => 5,
        'group_retry_sleep'             => 1,
        'group_heartbeat'               => 3,
        'offset_retry'                  => 5,
        'auto_create_topic'             => true,
        'partition_assignment_strategy' => KafkaStrategy::RANGE_ASSIGNOR,
        'sasl'                          => [
            'type'     => \longlang\phpkafka\Sasl\PlainSasl::class,
            'username' => 'xxxxx',
            'password' => 'xxxxx',
        ],
        'ssl'                           => [
            'open' => true,
            // 'compression'     => false,
            // 'allowSelfSigned' => true,
        ],
        'pool'                          => [
            'min_connections' => 1,
            'max_connections' => 10,
            'connect_timeout' => 10.0,
            'wait_timeout'    => 3.0,
            'heartbeat'       => -1,
            'max_idle_time'   => 60.0,
        ],
    ],
];

注意事项:

- GroupId唯一性:多个消费者同时消费同一个Topic时,`GroupId`不能重复。

- 预创建GroupId和Topic:使用新的`GroupId`或`Topic`前,请确保在阿里云消息队列Kafka版控制台中已创建。

- 消费者文件存放路径:创建消费者文件时,应存放于`app/Queue/Kafka/`目录下,如该路径不存在,请先创建。

4. 生产者投递示例

以下是一个在控制器中使用生产者投递消息的示例:

<?php

declare(strict_types=1);
namespace App\Controller;
use Hyperf\Kafka\Producer;

class TestController extends AbstractController

{

    public function index(Producer $producer)

    {

        try {

            $producer->send('lafeng-payment-notice', '测试', 'key');

        } catch (\Exception $exception) {

            return Output::build()->setMsg($exception->getMessage())->send();

        }

        return Output::build()->setMsg('生成成功')->send();

    }

}

5. 消费者订阅示例

新建消费者:请在app\Kafka\Consumer目录下新建KafkaConsumer.php文件,内容如下:

<?php

declare(strict_types=1);
namespace App\Kafka\Consumer;
use Hyperf\Kafka\AbstractConsumer;
use Hyperf\Kafka\Annotation\Consumer;
use longlang\phpkafka\Consumer\ConsumeMessage;

/**

 * @Consumer(topic="payment-notice", name="KafkaConsumer", groupId="user-order", autoCommit=true, nums=1)

 */

class KafkaConsumer extends AbstractConsumer

{

    public function consume(ConsumeMessage $message)

    {

        var_dump($message->getTopic() . ':' . $message->getKey() . ':' . $message->getValue());

        // TODO 这里写你的业务逻辑....

    }

}