• maxwell采集mysql binlog 日志数据到kafka topic


    采集mysql binlog日志数据:

    maxwell-1.29.2/conf/project/目录下的properties文件
    配置要进行binlog数据采集的库/表
    
    • 1
    • 2

    示例:一个完整的properties

    # tl;dr config
    log_level=info
    
    producer=kafka
    
    # 创建maxwell库(存放要采集mysql数据库表源数据信息)所在主机
    host=rm-rf***xx***nn.mysql.rds.aliyuncs.com
    #mysql用户名
    user=bigdata_xx
    #mysql密码
    password=***1FjJc2XXXXXXV^1
    #端口
    port=3306
    #maxwell唯一实例 (每一个properties文件对应一个唯一的client_id)
    client_id = weixin_user_consume_prod
    
    #监听的复制表mysql配置信息(要进行监听的库表对应mysql url )
    replication_host=localhost
    replication_user=weixin_user
    replication_password=5g*****q$vxxxGM
    replication_port=3306
    
    #     *** kafka ***
    kafka.bootstrap.servers=192.xxx.xxx.xx:9092,192.yyy.yyy.yy:9092,192.zzz.zzz.zz:9092
    kafka.compression.type=snappy
    kafka.retries=5
    kafka.acks=all
    kafka.enable.idempotence=true
    #kafka_topic=qy_%{database}_%{table}
    #自定义kafka topic 
    kafka_topic=ods_weixin_db_binlog
    
    jdbc_options = useSSL=false&serverTimezone=Asia/Shanghai
    
    #filter=exclude: *.*,include: cps.admin
    #一个mysql url 下有很多库,每个库下又都有 consume、sign、user_recently_read
    # 所以这里写成  *.consume   *.sign  *.user_recently_read 
    # filter = exclude:*.* 是针对于所有表进行扫描过滤出
    # 想要的 consume 、sign 、user_recently_read
    filter=exclude: *.*,include: *.consume,include: *.sign,include: *.user_recently_read,include: donate.wings_money_donate
    
    # 该ID不能与mysql my.cnf中的一样,
    # 也不能与其他监控同一个库的CDC中配置id一样(当前时间的秒值)
    # 每一个properties文件对应一个replica_server_id
    replica_server_id=1666767284
    
    
    #http_config=true
    #     *** general ***
    # choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
    #producer=kafka
    
    # set the log level.  note that you can configure things further in log4j2.xml
    #log_level=DEBUG # [DEBUG, INFO, WARN, ERROR]
    
    # if set, maxwell will look up the scoped environment variables, strip off the prefix and inject the configs
    #env_config_prefix=MAXWELL_
    
    #       *** output format 输出格式***
    
    # records include binlog position (default false)
    output_binlog_position=true
    
    # DML records include list of values that make up a row's primary key (default false)
    output_primary_keys=true
    
    # DML records include list of columns that make up a row's primary key (default false)
    output_primary_key_columns=true
    
    
    
    #           *** partitioning ***
    
    # What part of the data do we partition by?
    # [database, table, primary_key, transaction_id, thread_id, column]
    producer_partition_by=column
    
    # specify what fields to partition by when using producer_partition_by=column
    # column separated list.
    producer_partition_columns=id
    
    # when using producer_partition_by=column, partition by this when
    # the specified column(s) don't exist.
    producer_partition_by_fallback=table
    
    
    #     *** mysql ***
    
    # mysql host to connect to
    #host=hostname
    
    # mysql port to connect to
    #port=3306
    
    # mysql user to connect as.  This user must have REPLICATION SLAVE permissions,
    # as well as full access to the `maxwell` (or schema_database) database
    #user=maxwell
    
    # mysql password
    #password=maxwell
    
    # options to pass into the jdbc connection, given as opt=val&opt2=val2
    #jdbc_options=opt1=100&opt2=hello
    
    # name of the mysql database where maxwell keeps its own state
    #schema_database=maxwell
    
    # whether to use GTID or not for positioning
    #gtid_mode=true
    
    # maxwell will capture an initial "base" schema containing all table and column information,
    # and then keep delta-updates on top of that schema.  If you have an inordinate amount of DDL changes,
    # the table containing delta changes will grow unbounded (and possibly too large) over time.  If you
    # enable this option Maxwell will periodically compact its tables.
    #max_schemas=10000
    
    # SSL/TLS options
    # To use VERIFY_CA or VERIFY_IDENTITY, you must set the trust store with Java opts:
    #   -Djavax.net.ssl.trustStore= -Djavax.net.ssl.trustStorePassword=
    # or import the MySQL cert into the global Java cacerts.
    # MODE must be one of DISABLED, PREFERRED, REQUIRED, VERIFY_CA, or VERIFY_IDENTITY
    #
    # turns on ssl for the maxwell-store connection, other connections inherit this setting unless specified
    #ssl=DISABLED
    # for binlog-connector
    #replication_ssl=DISABLED
    # for the schema-capture connection, if used
    #schema_ssl=DISABLED
    
    # maxwell can optionally replicate from a different server than where it stores
    # schema and binlog position info.  Specify that different server here:
    
    #replication_host=other
    #replication_user=username
    #replication_password=password
    #replication_port=3306
    
    # This may be useful when using MaxScale's binlog mirroring host.
    # Specifies that Maxwell should capture schema from a different server than
    # it replicates from:
    
    #schema_host=other
    #schema_user=username
    #schema_password=password
    #schema_port=3306
    
    
    #       *** output format ***
    
    # records include binlog position (default false)
    #output_binlog_position=true
    
    # records include a gtid string (default false)
    #output_gtid_position=true
    
    # records include fields with null values (default true).  If this is false,
    # fields where the value is null will be omitted entirely from output.
    #output_nulls=true
    
    # records include server_id (default false)
    #output_server_id=true
    
    # records include thread_id (default false)
    #output_thread_id=true
    
    # records include schema_id (default false)
    #output_schema_id=true
    
    # records include row query, binlog option "binlog_rows_query_log_events" must be enabled" (default false)
    #output_row_query=true
    
    # DML records include list of values that make up a row's primary key (default false)
    #output_primary_keys=true
    
    # DML records include list of columns that make up a row's primary key (default false)
    #output_primary_key_columns=true
    
    # records include commit and xid (default true)
    #output_commit_info=true
    
    # This controls whether maxwell will output JSON information containing
    # DDL (ALTER/CREATE TABLE/ETC) infromation. (default: false)
    # See also: ddl_kafka_topic
    
    
    
    #output_ddl=true
    
    
    
    # turns underscore naming style of fields to camel case style in JSON output
    # default is none, which means the field name in JSON is the exact name in MySQL table
    #output_naming_strategy=underscore_to_camelcase
    
    #       *** kafka ***
    
    # list of kafka brokers
    #kafka.bootstrap.servers=hosta:9092,hostb:9092
    
    # kafka topic to write to
    # this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
    # in the latter case 'database' and 'table' will be replaced with the values for the row being processed
    #kafka_topic=qy_%{database}_%{table}
    
    # alternative kafka topic to write DDL (alter/create/drop) to.  Defaults to kafka_topic
    #ddl_kafka_topic=maxwell_ddl
    
    # hash function to use.  "default" is just the JVM's 'hashCode' function.
    #kafka_partition_hash=default # [default, murmur3]
    
    # how maxwell writes its kafka key.
    #
    # 'hash' looks like:
    # {"database":"test","table":"tickets","pk.id":10001}
    #
    # 'array' looks like:
    # ["test","tickets",[{"id":10001}]]
    #
    # default: "hash"
    #kafka_key_format=hash # [hash, array]
    
    # extra kafka options.  Anything prefixed "kafka." will get
    # passed directly into the kafka-producer's config.
    
    # a few defaults.
    # These are 0.11-specific. They may or may not work with other versions.
    #kafka.compression.type=snappy
    #kafka.retries=5
    #kafka.acks=1
    #kafka.batch.size=16384
    
    
    # kafka+SSL example
    # kafka.security.protocol=SSL
    # kafka.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
    # kafka.ssl.truststore.password=test1234
    # kafka.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
    # kafka.ssl.keystore.password=test1234
    # kafka.ssl.key.password=test1234#
    
    # controls a heuristic check that maxwell may use to detect messages that
    # we never heard back from.  The heuristic check looks for "stuck" messages, and
    # will timeout maxwell after this many milliseconds.
    #
    # See https://github.com/zendesk/maxwell/blob/master/src/main/java/com/zendesk/maxwell/producer/InflightMessageList.java
    # if you really want to get into it.
    #producer_ack_timeout=120000 # default 0
    
    
    #           *** partitioning ***
    
    # What part of the data do we partition by?
    #producer_partition_by=database # [database, table, primary_key, transaction_id, thread_id, column]
    
    # specify what fields to partition by when using producer_partition_by=column
    # column separated list.
    #producer_partition_columns=id,foo,bar
    
    # when using producer_partition_by=column, partition by this when
    # the specified column(s) don't exist.
    #producer_partition_by_fallback=database
    
    #            *** kinesis ***
    
    #kinesis_stream=maxwell
    
    # AWS places a 256 unicode character limit on the max key length of a record
    # http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
    #
    # Setting this option to true enables hashing the key with the md5 algorithm
    # before we send it to kinesis so all the keys work within the key size limit.
    # Values: true, false
    # Default: false
    #kinesis_md5_keys=true
    
    #            *** sqs ***
    
    #sqs_queue_uri=aws_sqs_queue_uri
    
    # The sqs producer will need aws credentials configured in the default
    # root folder and file format. Please check below link on how to do it.
    # http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/setup-credentials.html
    
    #            *** pub/sub ***
    
    #pubsub_project_id=maxwell
    #pubsub_topic=maxwell
    #ddl_pubsub_topic=maxwell_ddl
    
    #            *** rabbit-mq ***
    
    #rabbitmq_host=rabbitmq_hostname
    #rabbitmq_port=5672
    #rabbitmq_user=guest
    #rabbitmq_pass=guest
    #rabbitmq_virtual_host=/
    #rabbitmq_exchange=maxwell
    #rabbitmq_exchange_type=fanout
    #rabbitmq_exchange_durable=false
    #rabbitmq_exchange_autodelete=false
    #rabbitmq_routing_key_template=%db%.%table%
    #rabbitmq_message_persistent=false
    #rabbitmq_declare_exchange=true
    
    #           *** redis ***
    
    #redis_host=redis_host
    #redis_port=6379
    #redis_auth=redis_auth
    #redis_database=0
    
    # name of pubsub/list/whatever key to publish to
    #redis_key=maxwell
    
    # this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
    #redis_pub_channel=maxwell
    # this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
    #redis_list_key=maxwell
    # this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
    # Valid values for redis_type = pubsub|lpush. Defaults to pubsub
    
    #redis_type=pubsub
    
    #           *** custom producer ***
    
    # the fully qualified class name for custom ProducerFactory
    # see the following link for more details.
    # http://maxwells-daemon.io/producers/#custom-producer
    #custom_producer.factory=
    
    # custom producer properties can be configured using the custom_producer.* property namespace
    #custom_producer.custom_prop=foo
    
    #          *** filtering ***
    
    # filter rows out of Maxwell's output.  Command separated list of filter-rules, evaluated in sequence.
    # A filter rule is:
    #   ":"  "."  [ "."  "="  ]
    #  type    ::= [ "include" | "exclude" | "blacklist" ]
    #  db      ::= [ "/regexp/" | "string" | "`string`" | "*" ]
    #  tbl     ::= [ "/regexp/" | "string" | "`string`" | "*" ]
    #  col_val ::= "column_name"
    #  tbl     ::= [ "/regexp/" | "string" | "`string`" | "*" ]
    #
    # See http://maxwells-daemon.io/filtering for more details
    #
    #filter= exclude: *.*, include: foo.*, include: bar.baz, include: foo.bar.col_eg = "value_to_match"
    
    
    # javascript filter
    # maxwell can run a bit of javascript for each row if you need very custom filtering/data munging.
    # See http://maxwells-daemon.io/filtering/#javascript_filters for more details
    #
    #javascript=/path/to/javascript_filter_file
    
    #       *** encryption ***
    
    # Encryption mode. Possible values are none, data, and all. (default none)
    #encrypt=none
    
    # Specify the secret key to be used
    #secret_key=RandomInitVector
    
    #       *** monitoring ***
    
    # Maxwell collects metrics via dropwizard. These can be exposed through the
    # base logging mechanism (slf4j), JMX, HTTP or pushed to Datadog.
    # Options: [jmx, slf4j, http, datadog]
    # Supplying multiple is allowed.
    #metrics_type=jmx,slf4j
    
    # The prefix maxwell will apply to all metrics
    #metrics_prefix=MaxwellMetrics # default MaxwellMetrics
    
    # Enable (dropwizard) JVM metrics, default false
    #metrics_jvm=true
    
    # When metrics_type includes slf4j this is the frequency metrics are emitted to the log, in seconds
    #metrics_slf4j_interval=60
    
    # When metrics_type includes http or diagnostic is enabled, this is the port the server will bind to.
    #http_port=8080
    
    # When metrics_type includes http or diagnostic is enabled, this is the http path prefix, default /.
    #http_path_prefix=/some/path/
    
    # ** The following are Datadog specific. **
    # When metrics_type includes datadog this is the way metrics will be reported.
    # Options: [udp, http]
    # Supplying multiple is not allowed.
    #metrics_datadog_type=udp
    
    # datadog tags that should be supplied
    #metrics_datadog_tags=tag1:value1,tag2:value2
    
    # The frequency metrics are pushed to datadog, in seconds
    #metrics_datadog_interval=60
    
    # required if metrics_datadog_type = http
    #metrics_datadog_apikey=API_KEY
    
    # required if metrics_datadog_type = udp
    #metrics_datadog_host=localhost # default localhost
    #metrics_datadog_port=8125 # default 8125
    
    # Maxwell exposes http diagnostic endpoint to check below in parallel:
    # 1. binlog replication lag
    # 2. producer (currently kafka) lag
    
    # To enable Maxwell diagnostic
    #http_diagnostic=true # default false
    
    # Diagnostic check timeout in milliseconds, required if diagnostic = true
    #http_diagnostic_timeout=10000 # default 10000
    
    #    *** misc ***
    
    # maxwell's bootstrapping functionality has a couple of modes.
    #
    # In "async" mode, maxwell will output the replication stream while it
    # simultaneously outputs the database to the topic.  Note that it won't
    # output replication data for any tables it is currently bootstrapping -- this
    # data will be buffered and output after the bootstrap is complete.
    #
    # In "sync" mode, maxwell stops the replication stream while it
    # outputs bootstrap data.
    #
    # async mode keeps ops live while bootstrapping, but carries the possibility of
    # data loss (due to buffering transactions).  sync mode is safer but you
    # have to stop replication.
    #bootstrapper=async [sync, async, none]
    #bootstrapper=async
    
    
    # output filename when using the "file" producer
    #output_file=/path/to/file
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341
    • 342
    • 343
    • 344
    • 345
    • 346
    • 347
    • 348
    • 349
    • 350
    • 351
    • 352
    • 353
    • 354
    • 355
    • 356
    • 357
    • 358
    • 359
    • 360
    • 361
    • 362
    • 363
    • 364
    • 365
    • 366
    • 367
    • 368
    • 369
    • 370
    • 371
    • 372
    • 373
    • 374
    • 375
    • 376
    • 377
    • 378
    • 379
    • 380
    • 381
    • 382
    • 383
    • 384
    • 385
    • 386
    • 387
    • 388
    • 389
    • 390
    • 391
    • 392
    • 393
    • 394
    • 395
    • 396
    • 397
    • 398
    • 399
    • 400
    • 401
    • 402
    • 403
    • 404
    • 405
    • 406
    • 407
    • 408
    • 409
    • 410
    • 411
    • 412
    • 413
    • 414
    • 415
    • 416
    • 417
    • 418
    • 419
    • 420
    • 421
    • 422
    • 423
    • 424
    • 425
    • 426
    • 427
    • 428
    • 429
    • 430
    • 431
    • 432
    • 433
    • 434
    • 435
    • 436

    开启binlog数据增量同步

    ./bin/maxwell --config ./conf/hy/project/config_weixin_wings_donate_pord.properties --daemon
    
    • 1

    同步binlog历史数据

    ./bin/maxwell-bootstrap --config ./conf/project/config_weixin_wings_donate_pord.properties --database donate --table wings_money_donate --client_id weixin_user_consume_prod
    
    • 1

    如果mysql url 里有256个库
    每个库下都有 consume 、sign 、user_recently_read 那么要执行
    同步历史命令 256次

    写shell 脚本进行循环执行 同步历史binlog数据
    #!/bin/bash
    #1 获取输入参数个数,如果没有参数,直接退出

    #--where "createtime>=1652025600"
    
    for((i=0;i<=255;i++))
    do
    
    data_base=$1'_'$i
    /opt/maxwell-1.29.2/bin/maxwell-bootstrap --config /opt/maxwell-1.29.2/project/config_weixin_wings_donate_pord.properties --database $data_base --table $2  --client_id weixin_user_consume_prod
    echo $data_base'.'$2'执行完毕'
    
    
    done
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    执行命令(需要传参数 p1 库名 p2表名):

    consume 表
    sh zfb_history_data_cps_n_user_consume.sh cps_user consume
    
    sign 表
    sh zfb_history_data_cps_n_user_consume.sh cps_user sign
    
    • 1
    • 2
    • 3
    • 4
    • 5

    同步历史数据需要添加(不需要全部的历史数据)

    # 这里的create_at 是表中时间字段,根据时间为过滤条件进行同步binlog日志数据
    ./bin/maxwell-bootstrap --config ./conf/project/config_weixin_wings_donate_pord.properties --database donate --table wings_money_donate --client_id weixin_user_consume_prod --where "created_at>='2022-08-18 00:00:00'"
    
    • 1
    • 2

    业务拓展,增加 mysql url 下的业务表数据采集任务,之前有 url 对应的properties文件已经在执行着了

    先执行 
     
    ps -ef|grep maxwell 
    
    • 1
    • 2
    • 3

    找到 uri对应properties 文件 maxwell 进程,

    进行 kill 掉(不要kill -9)  
    // 用kill 杀死后再重新启动 properties文件,执行监听的表会继续监听
    
    • 1
    • 2

    将新增要采集的表信息添加到properties 文件中

    先执行增量同步命令
    
    再单独同步新增监听表的历史数据
    
    • 1
    • 2
    • 3
  • 相关阅读:
    基于springboot的宠物商城网站
    hive,hbase集群拷贝注意事项
    知识蒸馏NST算法实战:使用CoatNet蒸馏ResNet18
    学习阶段单片机买esp32还是stm32?
    【LeetCode】70. 爬楼梯
    (const char *format, ...) 可变参数在文本日志中的巧妙使用
    算法练习3——删除有序数组中的重复项
    AJAX和JSON
    Vue:内置组件:KeepAlive(缓存组件实例)
    思科防火墙应用NAT
  • 原文地址:https://blog.csdn.net/weixin_46609492/article/details/127814511