Hyperf框架内使用Kafka扩展包集成指南:从安装到应用实例
Kafka扩展包的安装与使用
1. 安装Kafka扩展包
使用Composer进行安装:
composer require hyperf/kafka
2. 发布配置
执行以下命令发布配置:
php bin/hyperf.php vendor:publish hyperf/kafka
3. Kafka配置文件
在app\config\
目录下,您会找到kafka.php
配置文件,内容如下:
<?php
declare(strict_types=1);
use Hyperf\Kafka\Constants\KafkaStrategy;
return [
'default' => [
'connect_timeout' => -1,
'send_timeout' => -1,
'recv_timeout' => -1,
'client_id' => env('APP_NAME', ''),
'max_write_attempts' => 3,
'bootstrap_servers' => [
'xxx:xxx',
'xxx:xxx',
'xxx:xxx',
],
'acks' => -1,
'producer_id' => -1,
'producer_epoch' => -1,
'partition_leader_epoch' => -1,
'interval' => 0,
'session_timeout' => 60,
'rebalance_timeout' => 60,
'replica_id' => -1,
'rack_id' => '',
'group_retry' => 5,
'group_retry_sleep' => 1,
'group_heartbeat' => 3,
'offset_retry' => 5,
'auto_create_topic' => true,
'partition_assignment_strategy' => KafkaStrategy::RANGE_ASSIGNOR,
'sasl' => [
'type' => \longlang\phpkafka\Sasl\PlainSasl::class,
'username' => 'xxxxx',
'password' => 'xxxxx',
],
'ssl' => [
'open' => true,
// 'compression' => false,
// 'allowSelfSigned' => true,
],
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'heartbeat' => -1,
'max_idle_time' => 60.0,
],
],
];
注意事项:
- GroupId唯一性:多个消费者同时消费同一个Topic时,`GroupId`不能重复。
- 预创建GroupId和Topic:使用新的`GroupId`或`Topic`前,请确保在阿里云消息队列Kafka版控制台中已创建。
- 消费者文件存放路径:创建消费者文件时,应存放于`app/Queue/Kafka/`目录下,如该路径不存在,请先创建。
4. 生产者投递示例
以下是一个在控制器中使用生产者投递消息的示例:
<?php
declare(strict_types=1);
namespace App\Controller;
use Hyperf\Kafka\Producer;
class TestController extends AbstractController
{
public function index(Producer $producer)
{
try {
$producer->send('lafeng-payment-notice', '测试', 'key');
} catch (\Exception $exception) {
return Output::build()->setMsg($exception->getMessage())->send();
}
return Output::build()->setMsg('生成成功')->send();
}
}
5. 消费者订阅示例
新建消费者:请在app\Kafka\Consumer
目录下新建KafkaConsumer.php
文件,内容如下:
<?php
declare(strict_types=1);
namespace App\Kafka\Consumer;
use Hyperf\Kafka\AbstractConsumer;
use Hyperf\Kafka\Annotation\Consumer;
use longlang\phpkafka\Consumer\ConsumeMessage;
/**
* @Consumer(topic="payment-notice", name="KafkaConsumer", groupId="user-order", autoCommit=true, nums=1)
*/
class KafkaConsumer extends AbstractConsumer
{
public function consume(ConsumeMessage $message)
{
var_dump($message->getTopic() . ':' . $message->getKey() . ':' . $message->getValue());
// TODO 这里写你的业务逻辑....
}
}
- 感谢你赐予我前进的力量
赞赏者名单
因为你们的支持让我意识到写文章的价值🙏
本文是原创文章,完整转载请注明来自 [ 谷溪雨 ] 。
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果