使用kafka之前先学习下基础知识:引用:细说 Kafka Partition 分区_性能与架构的博客-CSDN博客_kafka partition
1、首先安装kafka,可以通过安装包的形式
2、进入到kafka安装的目录下的bin目录下
启动kafka,但是启动kafka需要依赖zookeeper 所以先启动zookeeper
./zookeeper-server-start.sh ../config/zookeeper.properties 不要关闭窗口
2、启动Kafka
./kafka-server-start.sh ../config/server.properties
期间启动kafka失败 java.net.ConnectException: Connection refused
缺少java环境 No Java runtime present, requesting install.
3、下载安装java环境jdk
安装户,在重新启动kafka即可
4、开始写脚本
生产者
- #!/bin/env python
- # encoding=utf-8
- from kafka import KafkaProducer;
- import json;
-
- producer = KafkaProducer(
- value_serializer=lambda v: json.dumps(v).encode('utf-8'),
- bootstrap_servers=['127.0.0.1:9092']
- )
- msg_dict = {
- "operatorId": "test", # 公交公司ID
- "terminalId": "123", # 设备Id
- "terminalCode": "123", # 设备编码(使用车辆ID)
- "terminalNo": "1", # 同一车辆内terminal序号从1开始
- }
-
- producer.send("text1", msg_dict)
- producer.close()
消费者
- #!/bin/env python
- # encoding=utf-8
-
- from kafka import KafkaConsumer;
-
- consumer = KafkaConsumer('text1', bootstrap_servers='127.0.0.1:9092')
- for msg in consumer:
- print(msg.value.decode())
-
执行生产者的脚本时,要保证消费者脚本是启动状态
安装python插件kafka
pip install kafka
执行结果
- localhost:kafka zhangweiwei$ python consumer.py
- {"terminalNo": "1", "terminalCode": "123", "operatorId": "test", "terminalId": "123"}
- {"terminalNo": "1", "terminalCode": "123", "operatorId": "test", "terminalId": "123"}
- {"terminalNo": "1", "terminalCode": "123", "operatorId": "test", "terminalId": "123"}
- {"terminalNo": "1", "terminalCode": "123", "operatorId": "test", "terminalId": "123"}