- create table student(
- sno char(5),
- sname char(10),
- ssex char(2),
- sage int
- );
- insert into student values(‘95001’,’John’,’M’,23);
- insert into student values(‘95002’,’Tom’,’M’,23);
- from kafka import KafkaProducer
- import json
- import pymysql.cursors
-
- producer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v:json.dumps(v).encode('utf-8'))
-
- connect=pymysql.Connect(
- host='localhost',
- port=3306,
- user='root',
- passwd='123456',
- db='zhangna',
- charset='utf8'
- )
- cursor=connect.cursor()
- sql="select sno,sname,ssex,sage from student;"
- cursor.execute(sql)
- data=cursor.fetchall()
- connect.commit()
-
- for message in data:
- zn={}
- zn['sno']=message[0]
- zn['sname']=message[1]
- zn['sex']=message[2]
- zn['age']=message[3]
- producer.send('mysql_topic',zn)
-
- connect.close()
- producer.close()
- from kafka import KafkaConsumer
- import json
- import pymysql.cursors
-
- consumer = KafkaConsumer('mysql_topic',bootstrap_servers=['localhost:9092'],group_id=None,auto_offset_reset='earliest')
- for msg in consumer:
- msg1=str(msg.value,encoding="utf-8")
- data=json.loads(msg1)
- print(data)
终于出来了,出错的原因是encoding,我写成了encodings的缘故
为什么我会出现两条重复记录,原因是我生产者程序运行了多次,生产者多运行一次,消费者程序就会多一次查询