from kafka import KafkaProducer
from kafka import KafkaConsumer
import json
def produce(dic):
producer = KafkaProducer(bootstrap_servers='kmr-171a1031-gn-d65a753b-broker-1-1:9092')
msg_dic = dic
msg_str = json.dumps(msg_dic)
msg_bytes = msg_str.encode('utf-8')
producer.send('geodata_vehicle_upload_receiver',msg_bytes)
producer.close()
def consume():
consumer = KafkaConsumer('geodata_vehicle_upload_receiver',bootstrap_servers=['kmr-171a1031-gn-d65a753b-broker-1-1:9092'],group_id=None,auto_offset_reset='smallest')
n = 1
for msg in consumer:
recv = "num:%s topic:%s partition:%s offset:%s key:%s value:%s" %(n,msg.topic,msg.partition,msg.offset,msg.key,msg.value)
print(recv)
n += 1
if __name__ == '__main__':
consume()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29