from kafka import KafkaProducer
import json
def push_kafka(sqlstring, valuelist):
# logging.info("kafka string ----- [%s]" % (sqlstring % valuelist))
producer = KafkaProducer(bootstrap_servers=["ip1:9092", "ip2:9092", "ip3:9092"])
try:
msg_dict = {
"data": {
'sqlstring': sqlstring,
'paramslist': valuelist
}
}
msg = json.dumps(msg_dict)
producer.send("xx3_ccccc", value=msg.encode("utf-8"))
except Exception as e:
logging.exception('push_kafka Exception[%s]', str(e))
def consumer_session():
consumer = KafkaConsumer("xx3_ccccc",
bootstrap_servers=["ip1:9092", "ip2:9092", "ip3:9092"])
while True:
c_msgs = consumer.poll(max_records=300).values()
for m_bo in c_msgs:
for msg in m_bo:
c_msg = msg.value.decode("utf-8")
d_msg = json.loads(c_msg)
try:
cur.execute(d_msg['data']["sqlstring"], d_msg['data']["paramslist"])
conn.commit()
except Exception as e:
logging.error("insert error:%s", e)
# 关闭游标和连接
cur.close()
conn.close()
未完待续…