• maxwell读取mysql的binlog并发送kafka


    一.Maxwell简介

    类似于cancel,都是监听mysql的binlog两者差异如下.

          1,maxwell没有server-client模式,是由一个server把数据发送到kafka、redis中。

            2,maxwell有一个bootstrap功能,可以引导出完整的历史数据进行初始化。canal就只能读取最新的数据。

            3,maxwell支持断点还原,未来可能支持HA(高可用),canal支持HA,不支持断点还原。

            4,maxwell只支持json的数据格式,canal可以自定义

            5,maxwell比canal轻量。
     

    二.环境ubuntu(win10自带的Linux子系统)

    JDK17(其他版本的也没问题)

    需要安装的软件

    1.maxwell  版本1.38.0

    2.mysql  8.0

    3.zookeeper 3.8.0  

    4.kafka 2.12-3.31

    JDK可以使用ubuntu自带的apt安装,命令

    sudo apt-get install openjdk-17-jre

    三.安装

    1.mysql 

    1.需要创建maxwel库,并开启远程访问。如果有防火墙,建议关闭

    1. CREATE USER 'maxwell '@'%' IDENTIFIED BY '123456';
    2. GRANT ALL ON *.* TO 'maxwell'@'%' WITH GRANT OPTION;
    3. alter user 'maxwell'@'%' identified with mysql_native_password by '123456';
    4. GRANT ALL PRIVILEGES ON *.* TO 'maxwell'@'%'WITH GRANT OPTION;
    5. GRANT ALL PRIVILEGES ON *.* TO 'root'@'%'WITH GRANT OPTION;
    6. FLUSH PRIVILEGES;

    修改mysql配置,开启binlog

    1. ###mysql配置文件地址
    2. vim /etc/mysql/mysql.conf.d/mysqld.cnf
    3. ###注释掉,要不然外面不能访问
    4. ###bind-address = 127.0.0.1
    5. log.bin=ON
    6. binlog_format=ROW
    7. service-id=123456
    8. ##binlog二进制文件保存的天数
    9. expire_logs_days = 2

    2.zookeeper

    因为kafka依赖zk,所以需要安装zk

    首先进入/var/lib目录下(ubuntu默认目录),创建目录zookeeper

    1. mkdir zooKeeper
    2. ###在zookeeper目录下执行
    3. wget https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0-bin.tar.gz
    4. ###解压
    5. tar -xzvf apache-zookeeper-3.8.0-bin.tar.gz
    6. ###进入解压后的目录,复制一个文件,修改配置
    7. cp zoo_sample.cfg zoo.cfg
    8. ##编辑
    9. sudo vim zoo.cfg
    10. ###修改zk的目录文件地址
    11. data/dir=
    12. ###进入zk的bin目录启动
    13. ./zkServer.sh start

    3.kafka

    下载

    Apache Kafka

    1. wget https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
    2. tar -xzvf 文件名称

    在config目录下修改service.properties配置

    1.修改zk地址 

    2.配置自动生成topic:   auto.create.topics.enable=true

    启动kafka

    1. ##启动
    2. bin/kafka-server-start.sh config/server.properties
    3. ##停止
    4. bin/kafka-server-stop.sh config/server.properties

    单独开一个终端,可以查看消息

    bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic my_topic --from-beginning

    4.maxwell

    安装方式同上,下载到/var/lib目录下

    修改配置文件

    1. ##配置
    2. producer=kafka
    3. kafka.bootstrap.servers=localhost:9092
    4. #需要添加 需要用到的topic
    5. kafka_topic=my_topic
    6. # mysql 连接信息,需要maxwell用户
    7. host=localhost
    8. user=maxwell
    9. password=123456

    如果只是打印监听,不发kafka

    启动命令

    bin/maxwell --user='maxwell' --password='123456' --host='172.21.2.221' --producer=stdout

    这时候修改mysql可以看到,打印出来

    1. {"database":"test","table":"user_test","type":"update","ts":1667443912,"xid":175812,"commit":true,"data":{"id":12,"name":"6","sex":"7"},"old":{"sex":"3333333333333333"}}
    2. {"database":"test","table":"user_test","type":"update","ts":1667443923,"xid":175845,"commit":true,"data":{"id":2,"name":"jacks","sex":"rrr"},"old":{"sex":"1"}}
    3. {"database":"test","table":"user_test","type":"update","ts":1667444624,"xid":177609,"commit":true,"data":{"id":3,"name":"111","sex":"333"},"old":{"sex":"1"}}

    如果直接发送kafaka,启动命令

    1. ##只需要执行这个就行
    2. bin/maxwell

    这时候需要修改mysql表数据,就可以看到

    1. ##启动消息打印
    2. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --from-beginning
    3. ##监听到的消息
    4. {"database":"test","table":"user_test","type":"update","ts":1667445176,"xid":178548,"commit":true,"data":{"id":12,"name":"6","sex":"10"},"old":{"sex":"9"}}

  • 相关阅读:
    ‘element-plus/lib/el-loading/src/loading.type‘找不到
    MyEclipse,一个支持技术全面的现代Web开发工具
    安装部署 Kubernetes 仪表板(Dashboard)
    linux golang安装
    windows 常用命令字典
    简单php反序列化实现执行代码
    【HMS core】【FAQ】push kit、AR Engine、广告服务、扫描服务典型问题合集2
    .net 6 api 修改URL为小写
    IEEE754 标准存储浮点数
    【scikit-learn基础】--『监督学习』之 支持向量机回归
  • 原文地址:https://blog.csdn.net/csgarten/article/details/127669419