若Maxwell发送数据的目的地为Kafka集群,则需要先确保Kafka集群为启动状态。
[summer@hadoop102 maxwell-1.29.2]$ bin/maxwell --config config.properties --daemon
[summer@hadoop102 ~]$ ps -ef | grep maxwell | grep -v grep | grep maxwell | awk '{print $2}' | xargs kill -9
[summer@hadoop102 bin]$ vim mxw.sh
#!/bin/bash
MAXWELL_HOME=/opt/module/maxwell-1.29.2
status_maxwell(){
result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`
return $result
}
start_maxwell(){
status_maxwell
if [[ $? -lt 1 ]]; then
echo "启动Maxwell"
$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon
else
echo "Maxwell正在运行"
fi
}
stop_maxwell(){
status_maxwell
if [[ $? -gt 0 ]]; then
echo "停止Maxwell"
ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
else
echo "Maxwell未在运行"
fi
}
case $1 in
start )
start_maxwell
;;
stop )
stop_maxwell
;;
restart )
stop_maxwell
start_maxwell
;;
esac
[summer@hadoop103 kafka-3.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic maxwell
[summer@hadoop102 bin]$ java -jar gmall2020-mock-db-2021-11-14.jar
上一节,我们已经实现了使用Maxwell实时增量同步MySQL变更数据的功能。但有时只有增量数据是不够的,我们可能需要使用到MySQL数据库中从历史至今的一个完整的数据集。这就需要我们在进行增量同步之前,先进行一次历史数据的全量同步。这样就能保证得到一个完整的数据集。
[summer@hadoop102 maxwell-1.29.2]$ /opt/module/maxwell-1.29.2/bin/maxwell-bootstrap --database gmall --table user_info --config /opt/module/maxwell-1.29.2/config.properties
采用bootstrap方式同步的输出数据格式如下:
{
"database": "gmall",
"table": "user_info",
"type": "bootstrap-start",
"ts": 1667014630,
"data": {}
}
{
"database": "gmall",
"table": "user_info",
"type": "bootstrap-insert",
"ts": 1667014630,
"data": {
"id":194,
"login_name":"nwuckp5",
"nick_name":"环雪",
"passwd":null,
"name":"鲍环雪",
"phone_num":"13878128474",
"email":"nwuckp5@126.com",
"head_img":null,
"user_level":"2",
"birthday":"2002-12-14",
"gender":"F",
"create_time":"2020-06-14 10:31:20",
"operate_time":null,
"status":null
}
}
{
"database": "gmall",
"table": "user_info",
"type": "bootstrap-insert",
"ts": 1667014630,
"data": {
"id":195,
"login_name":"eu3kk9va08",
"nick_name":"文辉",
"passwd":null,
"name":"齐文辉",
"phone_num":"13771612693",
"email":"eu3kk9va08@sina.com",
"head_img":null,
"user_level":"1",
"birthday":"1984-06-14",
"gender":null,
"create_time":"2020-06-14 10:31:20",
"operate_time":null,
"status":null
}
}
{
"database": "gmall",
"table": "user_info",
"type": "bootstrap-complete",
"ts": 1667014630,
"data": {}
}
注意事项:
1)第一条type为bootstrap-start和最后一条type为bootstrap-complete的数据,是bootstrap开始和结束的标志,不包含数据,中间的type为bootstrap-insert的数据才包含数据。
2)一次bootstrap输出的所有记录的ts都相同,为bootstrap开始的时间。
[summer@hadoop102 maxwell-1.29.2]$ vim config.properties
log_level=info
producer=kafka
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092
#kafka topic配置
kafka_topic=topic_db
# mysql login info
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
[summer@hadoop102 maxwell-1.29.2]$ mxw.sh restart
[summer@hadoop103 kafka-3.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_db
[summer@hadoop102 bin]$ cd /opt/module/db_log/
[summer@hadoop102 db_log]$ java -jar gmall2020-mock-db-2021-11-14.jar