• Redis - 保证数据库与缓存数据一致性 - 如何保证两步都执行成功?


    缓存和数据库一致性问题,看这篇就够了,中写道:无论是更新缓存还是删除缓存,只要第二步发生失败,那么就会导致数据库和缓存不一致。

    1.重试

    2.异步重试

    3.订阅binlog日志

            具体来讲就是,我们的业务应用在修改数据时,「只需」修改数据库,无需操作缓存。

    那什么时候操作缓存呢?这就和数据库的「变更日志」有关了。

    拿 MySQL 举例,当一条数据发生修改时,MySQL 就会产生一条变更日志(Binlog),我们可以订阅这个日志,拿到具体操作的数据,然后再根据这条数据,去删除对应的缓存。

    使用canal中间件,订阅数据库变更日志,将变更传递到下游消息队列.

    环境准备:

    mysql数据库:   1.开启binlog,row模式,server-id = 1不与canal的slaveId重复

                            2.账号授权

    canal下载安装配置:

                            1.下载安装

                            2.配置 - rocketMq为例

                                    conf/canal.properties

    1. #################################################
    2. ######### common argument #############
    3. #################################################
    4. # tcp bind ip
    5. canal.ip = 192.168.43.231
    6. # register ip to zookeeper
    7. #canal.register.ip =
    8. canal.port = 11111
    9. #canal.metrics.pull.port = 11112
    10. # canal instance user/passwd
    11. canal.user = canal
    12. canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
    13. # canal admin config
    14. #canal.admin.manager = 127.0.0.1:8089
    15. canal.admin.port = 11110
    16. canal.admin.user = admin
    17. canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
    18. # admin auto register
    19. #canal.admin.register.auto = true
    20. #canal.admin.register.cluster =
    21. #canal.admin.register.name =
    22. #canal.zkServers =
    23. # flush data to zk
    24. #canal.zookeeper.flush.period = 1000
    25. #canal.withoutNetty = false
    26. # tcp, kafka, rocketMQ, rabbitMQ
    27. #需要修改的地方
    28. canal.serverMode = rocketMQ
    29. # flush meta cursor/parse position to file
    30. canal.file.data.dir = ${canal.conf.dir}
    31. canal.file.flush.period = 1000
    32. ## memory store RingBuffer size, should be Math.pow(2,n)
    33. canal.instance.memory.buffer.size = 16384
    34. ## memory store RingBuffer used memory unit size , default 1kb
    35. canal.instance.memory.buffer.memunit = 1024
    36. ## meory store gets mode used MEMSIZE or ITEMSIZE
    37. canal.instance.memory.batch.mode = MEMSIZE
    38. canal.instance.memory.rawEntry = true
    39. ## detecing config
    40. canal.instance.detecting.enable = false
    41. #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
    42. canal.instance.detecting.sql = select 1
    43. canal.instance.detecting.interval.time = 3
    44. canal.instance.detecting.retry.threshold = 3
    45. canal.instance.detecting.heartbeatHaEnable = false
    46. # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
    47. canal.instance.transaction.size = 1024
    48. # mysql fallback connected to new master should fallback times
    49. canal.instance.fallbackIntervalInSeconds = 60
    50. # network config
    51. canal.instance.network.receiveBufferSize = 16384
    52. canal.instance.network.sendBufferSize = 16384
    53. canal.instance.network.soTimeout = 30
    54. # binlog filter config
    55. canal.instance.filter.druid.ddl = true
    56. canal.instance.filter.query.dcl = false
    57. canal.instance.filter.query.dml = false
    58. canal.instance.filter.query.ddl = false
    59. canal.instance.filter.table.error = false
    60. canal.instance.filter.rows = false
    61. canal.instance.filter.transaction.entry = false
    62. canal.instance.filter.dml.insert = false
    63. canal.instance.filter.dml.update = false
    64. canal.instance.filter.dml.delete = false
    65. # binlog format/image check
    66. canal.instance.binlog.format = ROW,STATEMENT,MIXED
    67. canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
    68. # binlog ddl isolation
    69. canal.instance.get.ddl.isolation = false
    70. # parallel parser config
    71. canal.instance.parser.parallel = true
    72. ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
    73. #canal.instance.parser.parallelThreadSize = 16
    74. ## disruptor ringbuffer size, must be power of 2
    75. canal.instance.parser.parallelBufferSize = 256
    76. # table meta tsdb info
    77. canal.instance.tsdb.enable = true
    78. canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
    79. canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
    80. canal.instance.tsdb.dbUsername = canal
    81. canal.instance.tsdb.dbPassword = canal
    82. # dump snapshot interval, default 24 hour
    83. canal.instance.tsdb.snapshot.interval = 24
    84. # purge snapshot expire , default 360 hour(15 days)
    85. canal.instance.tsdb.snapshot.expire = 360
    86. #################################################
    87. ######### destinations #############
    88. #################################################
    89. #canal.destinations = example
    90. canal.destinations = example
    91. # conf root dir
    92. canal.conf.dir = ../conf
    93. # auto scan instance dir add/remove and start/stop instance
    94. canal.auto.scan = true
    95. canal.auto.scan.interval = 5
    96. # set this value to 'true' means that when binlog pos not found, skip to latest.
    97. # WARN: pls keep 'false' in production env, or if you know what you want.
    98. canal.auto.reset.latest.pos.mode = false
    99. canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
    100. #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
    101. canal.instance.global.mode = spring
    102. canal.instance.global.lazy = false
    103. canal.instance.global.manager.address = ${canal.admin.manager}
    104. #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
    105. canal.instance.global.spring.xml = classpath:spring/file-instance.xml
    106. #canal.instance.global.spring.xml = classpath:spring/default-instance.xml
    107. ##################################################
    108. ######### MQ Properties #############
    109. ##################################################
    110. # aliyun ak/sk , support rds/mq 要修改的
    111. canal.aliyun.accessKey = LTAIs4e56kBVE9
    112. canal.aliyun.secretKey = pJPGAtvJKGWsS
    113. canal.aliyun.uid=
    114. canal.mq.servers = 192.168.56.1:9876
    115. canal.mq.flatMessage = true
    116. canal.mq.canalBatchSize = 50
    117. canal.mq.canalGetTimeout = 100
    118. # Set this value to "cloud", if you want open message trace feature in aliyun.
    119. canal.mq.accessChannel = local
    120. canal.mq.database.hash = true
    121. canal.mq.send.thread.size = 30
    122. canal.mq.build.thread.size = 8
    123. ##################################################
    124. ######### Kafka #############
    125. ##################################################
    126. # kafka.bootstrap.servers = 127.0.0.1:9092
    127. # kafka.acks = all
    128. # kafka.compression.type = none
    129. # kafka.batch.size = 16384
    130. # kafka.linger.ms = 1
    131. # kafka.max.request.size = 1048576
    132. # kafka.buffer.memory = 33554432
    133. # kafka.max.in.flight.requests.per.connection = 1
    134. # kafka.retries = 0
    135. # kafka.kerberos.enable = false
    136. # kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
    137. # kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
    138. ##################################################
    139. ######### RocketMQ #############
    140. ##################################################
    141. rocketmq.producer.group = redis-test-producer
    142. rocketmq.enable.message.trace = false
    143. rocketmq.customized.trace.topic =
    144. rocketmq.namespace =
    145. #需要修改的地方,多个值用分号隔开
    146. rocketmq.namesrv.addr = 192.168.56.1:9876
    147. rocketmq.retry.times.when.send.failed = 0
    148. rocketmq.vip.channel.enabled = false
    149. rocketmq.tag = canal_tag
    150. ##################################################
    151. ######### RabbitMQ #############
    152. ##################################################
    153. # rabbitmq.host =
    154. # rabbitmq.virtual.host =
    155. # rabbitmq.exchange =
    156. # rabbitmq.username =
    157. # rabbitmq.password =
    158. # rabbitmq.deliveryMode =

    instance.properties

    1. ## mysql serverId
    2. canal.instance.mysql.slaveId = 5
    3. #position info,需要改成自己的数据库信息
    4. canal.instance.master.address = 127.0.0.1:3306
    5. canal.instance.master.journal.name =
    6. canal.instance.master.position =
    7. canal.instance.master.timestamp =
    8. #canal.instance.standby.address =
    9. #canal.instance.standby.journal.name =
    10. #canal.instance.standby.position =
    11. #canal.instance.standby.timestamp =
    12. #username/password,需要改成自己的数据库信息
    13. canal.instance.dbUsername = canal
    14. canal.instance.dbPassword = canal
    15. canal.instance.defaultDatabaseName =
    16. canal.instance.connectionCharset = UTF-8
    17. #table regex
    18. canal.instance.filter.regex =.*\\..*
    19. canal.mq.topic = canal_topic
    20. canal.mq.partition = 0

                                    

            3.启动

        rocket相关

    测试:

            
            
                com.alibaba.otter
                canal.client
                1.1.4
            
    package com.cloud.redis.consumer.binlog;
    
    rocket监听
    @Slf4j
    @Component
    @RocketMQMessageListener(topic = "canal_topic",
                            selectorExpression = "*",
                            consumerGroup = "canal_consumer",
    messageModel = MessageModel.CLUSTERING, //广播...
    consumeMode = ConsumeMode.ORDERLY) //同时,顺序
    public class MyRocketMQListener implements RocketMQListener {
        @Override
        public void onMessage(Object message) {
            System.out.println("MQListener1..."+message);
        }
    }
    
    问题
    1.mysql服务报错:MySQL启动失败提示:本地计算机上的Mysql服务启动后停止 - 查看data下.err日志解决2.连接问题:D:\software\canal\canal\logs\user_date - caching_sha2_password Auth failed -select host, user, authentication_string, plugin from userALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
    3.c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
    4. The specified topic is blank - 设置canal.mq.topic
    5.Connection refused - canal.serverMode = tcp
    6.NoClassDefFoundError:org/apache/rocketmq/client/consumer/DefaultLitePullConsumer -  导包jar
    7.Error[UPDATE command denied to user 'canal'@'localhost' for table 'canal_node_server'] - 1. GRANT SELECT, INSERT, UPDATE, DELETE, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; 2.flush privileges; 3.show grants for 'canal'@'%';

    QuickStart · alibaba/canal Wiki · GitHub

  • 相关阅读:
    Java预习43
    上传镜像到 docker hub 中
    【计网】链路层
    智能优化算法Matlab源码大礼包领取
    门控循环单元(GRU)【动手学深度学习v2】
    Android 判断当前线程是否是主线程的方法
    Python之第六章 内置容器 --- 字符串
    java-net-php-python-ssm儿童演出礼服租赁网站计算机毕业设计程序
    C语言中常见的逻辑错误
    ZigBee 3.0理论教程-通用-1-11:安全加密-网络层(NWK)安全
  • 原文地址:https://blog.csdn.net/xiaoxiamiqianqian/article/details/126555516