• 光说不练假把式,一起Kafka业务实战。


    在这里插入图片描述

    7.1 顺序性场景

    7.1.1 场景概述

    假设我们要传输一批订单到另一个系统,那么订单对应状态的演变是有顺序性要求的。

    已下单 → 已支付 → 已确认

    不允许错乱!

    7.1.2 顺序级别

    1)全局有序:

    串行化。每条经过kafka的消息必须严格保障有序性。

    这就要求kafka单通道,每个groupid下单消费者

    极大的影响性能,现实业务下几乎没必要

    2)局部有序:

    业务局部有序。同一条订单有序即可,不同订单可以并行处理。不同订单的顺序前后无所谓

    充分利用kafka多分区的并发性,只需要想办法让需要顺序的一批数据进同一分区即可。

    7.1.3 实现方案

    1)发送端:

    指定key发送,key=order.id即可,案例回顾:4.2.3,PartitionProducer

    2)发送中:

    给队列配置多分区保障并发性。

    3)读取端:

    单消费者:显然不合理

    吞吐量显然上不去,kafka开多个分区还有何意义?

    所以开多个消费者指定分区消费,理想状况下,每个分区配一个。

    但是,这个吞吐量依然有限,那如何处理呢?

    方案:多线程

    在每个消费者上再开多线程,是个解决办法。但是,要警惕顺序性被打破!

    参考下图:thread处理后,会将data变成 2-1-3

    file
    改进:接收后分发二级内存队列

    消费者取到消息后不做处理,根据key二次分发到多个阻塞队列。

    再开启多个线程,每个队列分配一个线程处理。提升吞吐量

    file

    7.1.4 代码验证

    1)新建一个sort队列,2个分区

    2)启动order项目

    源码参考:

    SortedProducer(顺序性发送端)

    SortedConsumer(顺序性消费端 - 阻塞队列实现,方便大家理解设计思路)

    SortedConsumer2(顺序性消费端 - 线程池实现,现实中推荐这种方式!)

    3)通过swagger请求

    file

    先按不同的id发送,查看控制台日志,id被正确分发到对应的队列

    file

    同一个key分配到同一个queue,顺序性得到保障

    file

    7.2 海量同步场景

    假设大数据部门需要大屏来展示用户的打车订单情况,需要把订单数据送入druid

    这里不涉及顺序,只要下单就传输,但是对实时性和并发量要求较高

    7.2.1 常规架构

    在下单完成mysql后,通过程序代码打印,直接进入kafka

    或者logback和kafka集成,通过log输送

    优点:

    更符合常规的思维。将数据送给想要的部门

    缺点:

    耦合度高,将kafka发送消息嵌入了订单下单的主业务,形成代码入侵。

    下单不关心,也不应该关注送入kafka的情况,一旦kafka不可用,程序受影响

    7.2.2 解耦合

    借助canal,监听订单表的数据变化,不再影响主业务。

    file

    7.2.3 部署实现

    1)mysql部署

    注意,需要打开binlog,8.0 默认处于开启状态

    #启动mysql8
    docker run --name mysql8 -v /opt/data/mysql8:/var/lib/mysql -p 3389:3306 -e TZ=Asia/Shanghai -e MYSQL_ROOT_PASSWORD=thisisprizemysql8db -d daocloud.io/mysql:8.0
    
    • 1
    • 2

    连上mysql,执行以下sql,添加canal用户

    CREATE USER canal IDENTIFIED BY 'canal';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
    ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
    
    • 1
    • 2
    • 3
    • 4

    创建订单表

    CREATE TABLE `orders` (
      `id` int unsigned NOT NULL AUTO_INCREMENT,
      `name` varchar(255) DEFAULT NULL,
      PRIMARY KEY (`id`)
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2)canal部署

    #canal.properties
    #附带资料里有,放到服务器 /opt/data/canal/ 目录下
    #修改servers为你的kafka的机器地址
    canal.serverMode = kafka
    kafka.bootstrap.servers = 52.82.98.209:10903,52.82.98.209:10904
    
    • 1
    • 2
    • 3
    • 4
    • 5
    #docker-compose.yml
    #附带资料里有canal.yml,随便找个目录,重命名为docker-compose.yml
    #修改mysql的链接信息的链接信息
    #然后在当前目录下执行 docker-compose up -d
    version: '2'
    services:
        canal:
            image: canal/canal-server
            container_name: canal
            restart: always
            ports:
                - "10908:11111"
            environment:
            		#mysql的链接信息
                canal.instance.master.address: 52.82.98.209:3389
                canal.instance.dbUsername: canal
                canal.instance.dbPassword: canal
                #投放到kafka的哪个主题?要提前准备好!
                canal.mq.topic: canal
            volumes:
                - "/opt/data/canal/canal.properties:/home/admin/canal-server/conf/canal.properties"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    3)数据通道验证

    进入kafka容器,用上面3.2.4里的命令行方式监听canal队列

    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic canal
    
    • 1

    在mysql上创建orders表,增删数据试一下

    mysql> insert into orders (name) values ('张三');
    Query OK, 1 row affected (0.03 sec)
    
    • 1
    • 2

    在kafka控制台,可以看到同步的消息

    {"data":[{"id":"1","name":"张三"}],"database":"canal","es":1611657853000,"id":5,"isDdl":false,"mysqlType":{"id":"int unsigned","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"orders","ts":1611657853802,"type":"INSERT"}
    
    • 1

    数据通道已打通,还缺少的是druid作为消费端来接收消息

    4)druid部署

    #druid.yml
    #在附带资料里有
    #随便找个目录,执行
    docker-compose -f druid.yml up -d
    
    • 1
    • 2
    • 3
    • 4

    5)验证

    配置druid的数据源,从kafka读取数据,验证数据可以正确进入druid。

    file

    注:

    关于druid的详细使用,在大数据篇章里会详细讲解。

    7.3 kafka监控

    7.3.1 eagle简介

    Kafka Eagle监控系统是一款用来监控Kafka集群的工具,支持管理多个Kafka集群、管理Kafka主题(包含查看、删除、创建等)、消费者组合消费者实例监控、消息阻塞告警、Kafka集群健康状态查看等。

    file

    7.3.2 部署

    推荐docker-compose启动

    将配备的资料中 eagle.yml , 拷贝到服务器任意目录

    修改对应的ip地址为你服务器的地址

    #注意ip地址:52.82.98.209,全部换成你自己服务器的
    
    version: '3'
    services:
        zookeeper:
            image: zookeeper:3.4.13
    
        kafka-1:
            container_name: kafka-1
            image: wurstmeister/kafka:2.12-2.2.2
            ports:
                - 10903:9092
                - 10913:10913
            environment:
                KAFKA_BROKER_ID: 1 
                HOST_IP: 52.82.98.209
                KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
                #docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接
                KAFKA_ADVERTISED_HOST_NAME: 52.82.98.209
                KAFKA_ADVERTISED_PORT: 10903 
                KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=52.82.98.209 -Dcom.sun.management.jmxremote.rmi.port=10913"
                JMX_PORT: 10913
            volumes:
                - /etc/localtime:/etc/localtime
            depends_on:
                - zookeeper           
        kafka-2:
            container_name: kafka-2
            image: wurstmeister/kafka:2.12-2.2.2
            ports:
                - 10904:9092
                - 10914:10914
            environment:
                KAFKA_BROKER_ID: 2 
                HOST_IP: 52.82.98.209
                KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
                KAFKA_ADVERTISED_HOST_NAME: 52.82.98.209
                KAFKA_ADVERTISED_PORT: 10904 
                KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=52.82.98.209 -Dcom.sun.management.jmxremote.rmi.port=10914"
                JMX_PORT: 10914
            volumes:
                - /etc/localtime:/etc/localtime
            depends_on:
                - zookeeper 
        eagle:
            image: gui66497/kafka_eagle
            container_name: ke
            restart: always
            depends_on:
                - kafka-1
                - kafka-2
            ports:
                - "10907:8048"
            environment:
                ZKSERVER: "zookeeper:2181"
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    执行 docker-compose -f eagle.yml up -d

    7.3.3 使用说明

    访问 : http://52.82.98.209:10907/ke/

    默认用户名密码: admin / 123456

    如果要删除topic等操作,需要管理token: keadmin

    file

    与km到底选哪个呢?根据自己习惯,个人认为:

    • 界面美观程度和监控曲线优于km,有登录权限控制
    • 功能操作上不如km简单直白,但是km需要配置一定的连接信息

    专注Java技术干货分享,欢迎志同道合的小伙伴,一起交流学习

  • 相关阅读:
    vue axios封装并发请求
    改变光标形状的多种方式
    递归解决换零钱问题--回顾总结之递归的表达能力
    力扣在线OJ——栈和队列
    昨天阅读量创记录3万
    git常用的几条命令介绍
    Vue 项目结构介绍
    学习 MongoDB5 这一篇就够了
    《GB/T 8566-2022/ISO/IEC/IEEE:系统与软件工程生存周期过程》国家标准解读,附下载地址
    Java学习笔记2 变量的命名
  • 原文地址:https://blog.csdn.net/bxg_kyjgs/article/details/126364882