类似于cancel,都是监听mysql的binlog两者差异如下.
1,maxwell没有server-client模式,是由一个server把数据发送到kafka、redis中。
2,maxwell有一个bootstrap功能,可以引导出完整的历史数据进行初始化。canal就只能读取最新的数据。
3,maxwell支持断点还原,未来可能支持HA(高可用),canal支持HA,不支持断点还原。
4,maxwell只支持json的数据格式,canal可以自定义
5,maxwell比canal轻量。
JDK17(其他版本的也没问题)
需要安装的软件
1.maxwell 版本1.38.0
2.mysql 8.0
3.zookeeper 3.8.0
4.kafka 2.12-3.31
JDK可以使用ubuntu自带的apt安装,命令
sudo apt-get install openjdk-17-jre
1.需要创建maxwel库,并开启远程访问。如果有防火墙,建议关闭
- CREATE USER 'maxwell '@'%' IDENTIFIED BY '123456';
-
- GRANT ALL ON *.* TO 'maxwell'@'%' WITH GRANT OPTION;
-
- alter user 'maxwell'@'%' identified with mysql_native_password by '123456';
- GRANT ALL PRIVILEGES ON *.* TO 'maxwell'@'%'WITH GRANT OPTION;
- GRANT ALL PRIVILEGES ON *.* TO 'root'@'%'WITH GRANT OPTION;
- FLUSH PRIVILEGES;
修改mysql配置,开启binlog
- ###mysql配置文件地址
- vim /etc/mysql/mysql.conf.d/mysqld.cnf
- ###注释掉,要不然外面不能访问
- ###bind-address = 127.0.0.1
- log.bin=ON
- binlog_format=ROW
- service-id=123456
-
- ##binlog二进制文件保存的天数
- expire_logs_days = 2
因为kafka依赖zk,所以需要安装zk
首先进入/var/lib目录下(ubuntu默认目录),创建目录zookeeper
- mkdir zooKeeper
-
- ###在zookeeper目录下执行
- wget https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0-bin.tar.gz
-
- ###解压
- tar -xzvf apache-zookeeper-3.8.0-bin.tar.gz
-
- ###进入解压后的目录,复制一个文件,修改配置
-
- cp zoo_sample.cfg zoo.cfg
-
- ##编辑
- sudo vim zoo.cfg
-
- ###修改zk的目录文件地址
- data/dir=
-
- ###进入zk的bin目录启动
- ./zkServer.sh start
-
-
下载
- wget https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
-
- tar -xzvf 文件名称
在config目录下修改service.properties配置
1.修改zk地址
2.配置自动生成topic: auto.create.topics.enable=true
启动kafka
- ##启动
- bin/kafka-server-start.sh config/server.properties
- ##停止
- bin/kafka-server-stop.sh config/server.properties
单独开一个终端,可以查看消息
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic my_topic --from-beginning
安装方式同上,下载到/var/lib目录下
修改配置文件
- ##配置
- producer=kafka
- kafka.bootstrap.servers=localhost:9092
-
- #需要添加 需要用到的topic
- kafka_topic=my_topic
-
- # mysql 连接信息,需要maxwell用户
- host=localhost
- user=maxwell
- password=123456
如果只是打印监听,不发kafka
启动命令
bin/maxwell --user='maxwell' --password='123456' --host='172.21.2.221' --producer=stdout
这时候修改mysql可以看到,打印出来
- {"database":"test","table":"user_test","type":"update","ts":1667443912,"xid":175812,"commit":true,"data":{"id":12,"name":"6","sex":"7"},"old":{"sex":"3333333333333333"}}
- {"database":"test","table":"user_test","type":"update","ts":1667443923,"xid":175845,"commit":true,"data":{"id":2,"name":"jacks","sex":"rrr"},"old":{"sex":"1"}}
- {"database":"test","table":"user_test","type":"update","ts":1667444624,"xid":177609,"commit":true,"data":{"id":3,"name":"111","sex":"333"},"old":{"sex":"1"}}
如果直接发送kafaka,启动命令
- ##只需要执行这个就行
- bin/maxwell
这时候需要修改mysql表数据,就可以看到
- ##启动消息打印
- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --from-beginning
-
- ##监听到的消息
- {"database":"test","table":"user_test","type":"update","ts":1667445176,"xid":178548,"commit":true,"data":{"id":12,"name":"6","sex":"10"},"old":{"sex":"9"}}