飘易博客(作者:Flymorn)
订阅《飘易博客》RSS,第一时间查看最新文章!
飘易首页 | 留言本 | 关于我 | 订阅Feed

【跟我一起搭建物联网平台】6、EMQX之Kafka插件编译安装

Author:飘易 Source:飘易
Categories:物联网 PostTime:2019-6-26 21:57:33
正 文:

EMQX服务器搭建好之后,我们需要将MQTT消息持久化保存到mysql等数据库里了,这个时候就需要借助kafka这个消息中间件了,下面飘易来介绍下Kafka插件编译安装。


Kafka插件github上有好几个:

https://github.com/bob403/emqx_kafka_bridge  (我们选择这个,支持emqx-v3.0版本)

https://github.com/iotblue/emqx-kafka-bridge/tree/emqx30 

https://github.com/shsjdcbg/emq-kafka 

 

修改 emqx-rel目录下的Makefile:

DEPS += $(foreachdep,$(MAIN_APPS),$(call app_name,$(dep)))
DEPS += emqx_kafka_bridge 【增加这行】
...
$(foreach dep,$(MAIN_APPS),$(evaldep_$(call app_name,$(dep)) = $(CLONE_METHOD) https://github.com/emqx/$(dep)$(call app_vsn,$(dep))))
dep_emqx_kafka_bridge = git https://github.com/bob403/emqx_kafka_bridge.git master 【增加这行】


在relx.config里增加:

{emqx_kafka_bridge, load}


重新编译emqx:

cd emqx-relx
rm -rf _rel                 //删除之前编译的结果
make clean
make


修改kafka插件配置:

/emq/emqx-rel/_rel/emqx/etc/plugins/emqx_kafka_bridge.conf

注意这个插件的配置文件在好几个地方都存在,我们需要找到编译后的【_rel】目录下的配置,否则不生效


根据实际情况调整配置:

##--------------------------------------------------------------------
## kafka Bridge
##--------------------------------------------------------------------
 
## The Kafka loadbalancer node hostthat bridge is listening on.
##
## Value: 127.0.0.1, localhost
kafka.host = localhost
 
## The kafka loadbalancer node portthat bridge is listening on.
##
## Value: Port
kafka.port = 9092
 
## The kafka loadbalancer nodepartition strategy.
##
## Value: random, sticky_round_robin,strict_round_robin, custom
kafka.partitionstrategy =strict_round_robin
 
## Each worker represents aconnection to a broker + topic + partition combination.
## You can decide how many workers tostart for each partition.
##
## Value:
kafka.partitionworkers = 8
 
## payload topic.
## kafka上要提前建立一个名为Processing的Topic
## Value: string
kafka.payloadtopic = Processing


启用插件

列出所有插件:

./emqx_ctl plugins list

看到:

Plugin(emqx_kafka_bridge,version=3.0, description=EMQ X Kafka Bridge, active=false)

 

启动kafka插件:

./emqx_ctl plugins load emqx_kafka_bridge

返回:

Start apps: [emqx_kafka_bridge]
Plugin emqx_kafka_bridge loadedsuccessfully.

 


测试步骤MQTT消息是否转发到kafka

1,kafka上要提前建立一个名为Processing的Topic(分区为3,方便多个客户端消费)

./bin/kafka-topics.sh --create--zookeeper localhost:2181 --replication-factor 1 --partitions 3  --topic Processing


2,打开MQTT客户端,我使用的是chrome浏览器应用MQTTBox:

随便发布消息到一个MQTT主题,比如test,这里的MQTT 的topic和kafka的topic毫无关系,不要搞混了。


3,Kafka消费终端,消费主题Processing:

./bin/kafka-console-consumer.sh--bootstrap-server localhost:9092  --topic Processing

收到反馈如下:

{"action":"connected","device_id":"3fbf56e4-3396-47b0-a99f-5ecf0964e23b1561539149561","username":"undefined","ts":1561539149}
{"action":"message_publish","device_id":"3fbf56e4-3396-47b0-a99f-5ecf0964e23b1561539149561","username":"undefined","topic":"test","payload":"a1","ts":1561539155}
{"action":"message_publish","device_id":"3fbf56e4-3396-47b0-a99f-5ecf0964e23b1561539149561","username":"undefined","topic":"test","payload":"a2","ts":1561539157}
{"action":"disconnected","device_id":"3fbf56e4-3396-47b0-a99f-5ecf0964e23b1561537211825","username":"undefined","ts":1561539143}

可以收到MQTT转发的事件,比如connected、disconnected、message_publish等。

 

剩下的事我们就可以利用PHP、java等客户端去消费Processing主题,将需要的信息存入MYSQL等数据库了。

 

作者:飘易
来源:飘易
版权所有。转载时必须以链接形式注明作者和原始出处及本声明。
Tag:Kafka 浏览(次) 我要评论(0条)
上一篇:没有了
下一篇:【跟我一起搭建物联网平台】5、安装kafka之PHP扩展
0条评论 “【跟我一起搭建物联网平台】6、EMQX之Kafka插件编译安装”
No Comment .
发表评论
名称(*必填)
邮件(选填)
网站(选填)

记住我,下次回复时不用重新输入个人信息
目 录
飘易搜索
最新文章
相关文章
随机文章
© 2007-2019 飘易博客 Www.Piaoyi.Org 原创文章版权由飘易所有