飘易博客(作者:Flymorn)
订阅《飘易博客》RSS,第一时间查看最新文章!
飘易首页 | 留言本 | 关于我 | 订阅Feed

【跟我一起搭建物联网平台】5、安装kafka之PHP扩展

Author:飘易 Source:飘易
Categories:物联网 PostTime:2019-6-26 21:48:02
正 文:

1、安装kafka的扩展之前,在安装php-rdkafka之前,需要先安装librdkafka

wget https://github.com/edenhill/librdkafka/archive/v1.0.1.tar.gz
tar xvzf v1.0.1.tar.gz
cd  librdkafka-1.0.1
./configure
make && make install

2、安装rdkafka

wget https://github.com/arnaud-lb/php-rdkafka/archive/3.1.0.tar.gz
tar xvzf v1.0.1.tar.gz
cd php-rdkafka-3.1.0
phpize
./configure --with-php-config=/usr/local/php/bin/php-config  ###你的php-config路径
make && make install

3、在php.ini中添加行

# /usr/local/php/etc/php.ini 编辑
extension=rdkafka.so

 

接下来就可以利用PHP编写消费者了,有2种消费者类型:High-level consumer、Low-level consumer,区别是kafka服务器不会记录低级别消费者,但不影响它们的消费。


使用Low Level Consumer的主要原因是,用户希望比Consumer Group更好的控制数据的消费, 如

  •     *同一条消息读多次,方便Replay

  •     *只消费某个Topic的部分Partition

  •     *管理事务,从而确保每条消息被处理一次(Exactly once)

与High Level Consumer相对,Low Level Consumer要求用户做大量的额外工作

  •     *在应用程序中跟踪处理offset,并决定下一条消费哪条消息

  •     *获知每个Partition的Leader

  •     *处理Leader的变化

  •     *处理多Consumer的协作

 


kafka生产者

<?php header( 'Content-Type: text/html; charset=UTF-8' );

// 文档参考 https://github.com/arnaud-lb/php-rdkafka
// kafka配置
$conf = new RdKafka\Conf();
// 当 RdKafka\ProducerTopic::produce() 生产消息时触发回调
$conf->setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./kafka_dr.log", var_export($message, true).PHP_EOL, FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    file_put_contents("./kafka_err.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});

// 生产者
$producer = new RdKafka\Producer($conf);
$producer->setLogLevel(LOG_DEBUG);
$producer->addBrokers("localhost:9092");

// 主题配置
$cf = new RdKafka\TopicConf();
// -1:必须等所有brokers同步完成的确认, 1:当前服务器确认, 0:不确认,这里如果是0回调里的offset无返回,如果是1和-1会返回offset
// 我们可以利用该机制做消息生产的确认,不过还不是100%,因为有可能会中途kafka服务器挂掉
$cf->set('request.required.acks', 1);

// 创建主题
$topic = $producer->newTopic("test", $cf);

// 生产消息
for ($i = 1; $i <= 5; $i++) {
    //第1个参数:RD_KAFKA_PARTITION_UA 自动选择分区;第2个参数:消息flag,固定为0;第3个参数:payload;第4个参数:可选
    //public RdKafka\ProducerTopic::produce ( integer $partition , integer $msgflags , string $payload [, string $key = NULL ] ) : void
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "key . $i", "key_1");
}


while ($len = $producer->getOutQLen()) {
    echo "producer getOutQLen: $len <br>";
    $producer->poll(50);// 轮询事件,阻塞多少毫秒
}


kafka低级别消费者

<?php header( 'Content-Type: text/html; charset=UTF-8' );
// Low Level消费者 - kafka服务器不记录Low Level消费者

// 文档参考 https://github.com/arnaud-lb/php-rdkafka
// kafka配置
$conf = new RdKafka\Conf();
$conf->setErrorCb(function ($kafka, $err, $reason) {
    file_put_contents("./kafka_err.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});


// 设置消费组 Set the group id. This is required when storing offsets on the broker
$conf->set('group.id', 'testGroup');
//$conf->set('client.id', 'consumer'.time());//kafka服务器不记录Low Level消费者

// 消费者
$consumer = new RdKafka\Consumer($conf);
$consumer->addBrokers("localhost:9092");

// 主题配置
$topicConf = new RdKafka\TopicConf();
$topicConf->set('request.required.acks', 1);
// 自动提交时间间隔,默认5000ms
$topicConf->set('auto.commit.interval.ms', 1000);

// 设置offset的存储为file
//$topicConf->set('offset.store.method', 'file');
//$topicConf->set('offset.store.path', __DIR__);
// 设置offset的存储为broker
 $topicConf->set('offset.store.method', 'broker');

//smallest:简单理解为从头开始消费
//largest:简单理解为从最新的开始消费
$topicConf->set('auto.offset.reset', 'smallest');

// 创建主题
$topic = $consumer->newTopic("test", $topicConf);

// 第1个参数:消费分区0
// 第2个参数:
// RD_KAFKA_OFFSET_BEGINNING 重头开始消费
// RD_KAFKA_OFFSET_STORED 最后一条消费的offset记录开始消费
// RD_KAFKA_OFFSET_END 最后一条消费
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {
    //参数1:表示消费分区,这里是分区0
    //参数2:表示同步阻塞多久
    $message = $topic->consume(0, 120 * 1000);
    if(is_null($message)){
        continue;
    }
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}


kafka高级别消费者

<?php header( 'Content-Type: text/html; charset=UTF-8' );
// High LEVEL消费者 - kafka服务器会记录High Level消费者

// 文档参考 https://github.com/arnaud-lb/php-rdkafka
// kafka配置
$conf = new RdKafka\Conf();
$conf->setErrorCb(function ($kafka, $err, $reason) {
    file_put_contents("./kafka_err.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});

// Set a rebalance callback to log partition assignments (optional)
// 当有新的消费者加入或者退出消费组时,kafka 会自动重新分配分区给消费者,这里注册了一个回调函数,当分区被重新分配时触发
$conf->setRebalanceCb(function(\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign($partitions);
            break;

        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
            echo "Revoke: ";
            var_dump($partitions);
            $kafka->assign(NULL);
            break;

        default:
            throw new \Exception($err);
    }
});

// 设置消费组 Configure the group.id. All consumer with the same group.id will consume different partitions.
$conf->set('group.id', 'testGroup');
$conf->set('client.id', 'consumer'.time());

// Initial list of Kafka brokers
$conf->set('metadata.broker.list', 'localhost:9092');

// 主题配置
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('request.required.acks', 1);

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');

// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

// 高级别消费者
$consumer = new \RdKafka\KafkaConsumer($conf);
// Subscribe to topic 'test'
$consumer->subscribe(['test']);

echo "Waiting for partition assignment... (may take some time when quickly re-joining the group after leaving it.)\n";

while (true) {
    $message = $consumer->consume(120*1000);
    if(is_null($message)){
        continue;
    }
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}



PHP消费者收到的消息对象如下:

object(RdKafka\Message)#7 (9) {
 ["err"]=>
 int(0)
 ["topic_name"]=>
 string(4) "test"
 ["timestamp"]=>
 int(1560928893402)
 ["partition"]=>
 int(0)
 ["payload"]=>
 string(25)"{"type":1,"data":"hello"}"
 ["len"]=>
 int(25)
 ["key"]=>
 NULL
 ["offset"]=>
 int(242)
 ["headers"]=>
 NULL
}

我们只需要拿到 payload 信息进行处理入库(MYSQL)即可。

 


作者:飘易
来源:飘易
版权所有。转载时必须以链接形式注明作者和原始出处及本声明。
上一篇:【跟我一起搭建物联网平台】6、EMQX之Kafka插件编译安装
下一篇:【跟我一起搭建物联网平台】4、Kafka消息中间件
1条评论 “【跟我一起搭建物联网平台】5、安装kafka之PHP扩展”
1 初学者
2019-9-19 15:48:56
可以把MYSQL的安装配置,还有php连接kafka和MYSQL的部分一起写完整吗?感谢您的分享!
发表评论
名称(*必填)
邮件(选填)
网站(选填)

记住我,下次回复时不用重新输入个人信息
© 2007-2019 飘易博客 Www.Piaoyi.Org 原创文章版权由飘易所有