• 2022年最新《谷粒学院开发教程》:11 - 统计分析




    一、统计分析功能

    在这里插入图片描述
    在这里插入图片描述


    1.1、数据库设计

    CREATE TABLE `statistics_daily` (
      `id` char(19) NOT NULL COMMENT '主键',
      `date_calculated` varchar(20) NOT NULL COMMENT '统计日期',
      `register_num` int(11) NOT NULL DEFAULT '0' COMMENT '注册人数',
      `login_num` int(11) NOT NULL DEFAULT '0' COMMENT '登录人数',
      `video_view_num` int(11) NOT NULL DEFAULT '0' COMMENT '每日播放视频数',
      `course_num` int(11) NOT NULL DEFAULT '0' COMMENT '每日新增课程数',
      `gmt_create` datetime NOT NULL COMMENT '创建时间',
      `gmt_modified` datetime NOT NULL COMMENT '更新时间',
      PRIMARY KEY (`id`),
      KEY `statistics_day` (`date_calculated`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='网站统计日数据';
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    1.2、搭建微服务

    1、在 service 模块新建 service_statistics

    2、配置类

    # 服务端口
    server.port=8008
    # 服务名
    spring.application.name=service-statistics
    # mysql数据库连接
    spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
    spring.datasource.url=jdbc:mysql://localhost:3306/guli?useSSL=false&useUnicode=true&characterEncoding=utf-8&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai
    spring.datasource.username=root
    spring.datasource.password=*****
    # 返回json的全局格式
    spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
    spring.jackson.time-zone=GMT+8
    # 配置mybatis xml文件的路径
    mybatis-plus.mapper-locations=/mapper/*.xml
    # mybatis日志
    mybatis-plus.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
    # nacos服务地址
    spring.cloud.nacos.discovery.server-addr=localhost:8848
    # 开启熔断机制
    feign.hystrix.enabled=true
    # 设置hystrix超时时间,默认1000ms
    hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=3000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    3、生成代码

    gc.setOutputDir("D:\\MyCode\\IdeaCode\\project\\gulicollege\\guli_parent\\service\\service_statistics" + "/src/main/java"); //输出目录
    
    //生成包:com.laptoy.eduservice
    pc.setModuleName("staservice"); //模块名
    
    // 5、策略配置
    strategy.setInclude("statistics_daily");//根据数据库哪张表生成,有多张表就加逗号继续填写
    strategy.setTablePrefix("statistics" + "_"); //生成实体时去掉表前缀
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    4、启动类

    @SpringBootApplication
    @EnableFeignClients
    @MapperScan("com.laptoy.staservice.mapper")
    @EnableDiscoveryClient //开启服务发现
    @ComponentScan("com.laptoy")
    public class StaServiceMain8008 {
        public static void main(String[] args) {
            SpringApplication.run(StaServiceMain8008.class,args);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    1.3、远程接口

    1、在service_ucenter模块创建接口,统计某一天的注册人数

    // 根据日期获取当天注册人数
    @GetMapping("/countRegister/{day}")
    public R countRegister(@PathVariable String day) {
        Integer count = memberService.getCountRegister(day);
        return R.ok().data("countRegister", count);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    @Override
    public Integer getCountRegister(String day) {
        return baseMapper.getCountRegister(day);
    }
    
    • 1
    • 2
    • 3
    • 4
    <select id="getCountRegister" resultType="java.lang.Integer">
        select count(*)
        from ucenter_member
        where date (gmt_create) = #{day}
    select>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2、在service_statistics模块创建远程调用接口

    @FeignClient(value = "service-ucenter", fallback = UCenterClientImpl.class)
    public interface UCenterClient {
        @GetMapping("/ucenter/member/countRegister/{day}")
        R countRegister(@PathVariable String day);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    @Component
    public class UCenterClientImpl implements UCenterClient {
        @Override
        public R countRegister(String day) {
            return null;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    1.4、接口

    //统计某一天注册人数
    @PostMapping("/createStatisticsByDay/{day}")
    public R createStatisticsByDay(@PathVariable String day){
        dailyService.createStatisticsByDay(day);
        return R.ok();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    @Autowired
    UCenterClient uCenterClient;
    
    @Override
    public void createStatisticsByDay(String day) {
        // 1、添加之前删除表相同的数据
        QueryWrapper<Daily> wrapper = new QueryWrapper<>();
        wrapper.eq("date_calculated", day);
        baseMapper.delete(wrapper);
    
        // 2、远程调用得到当天注册人数
        R r = uCenterClient.countRegister(day);
        Integer countRegister = (Integer) r.getData().get("countRegister");
    
        // 3、封装数据
        Daily daily = new Daily();
        daily.setRegisterNum(countRegister);
        daily.setCourseNum(RandomUtils.nextInt(100,200));       // 每日新增课程数
        daily.setLoginNum(RandomUtils.nextInt(200,300));        // 登录数
        daily.setVideoViewNum(RandomUtils.nextInt(200,300));    // 视频流量数
        daily.setDateCalculated(day);							//统计日期
    	
        this.save(daily);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    当天注册人

    在这里插入图片描述

    记录

    在这里插入图片描述


    二、添加定时任务

    在特定的时间,让程序自动执行,闹钟一样

    七子表达式,总共有7位,但是springboot整合只整合了前面6位,最后一位的年没有整合,直接默认是每年

    1、启动类添加注解

    在这里插入图片描述

    2、创建定时任务类 corn表达式

    @Component
    public class ScheduleTask {
        @Autowired
        DailyService dailyService;
    
        // 在每天凌晨1点执行方法,把前一天的数据查询进行添加
        @Scheduled(cron = "0 0 1 * * ? ") // 指定cron表达式规则
        public void createStatisticsByDay() {
            dailyService.createStatisticsByDay(DateUtil.formatDate(DateUtil.addDays(new Date(), -1)));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    三、统计数据前端整合

    1、添加路由并新建页面

    // 统计分析路由
    {
      path: '/sta',
      component: Layout,
      redirect: '/sta/create',
      name: '统计分析',
      meta: { title: '统计分析', icon: 'nested' },
      children: [
        {
          path: 'create',
          name: '生成数据',
          component: () => import('@/views/sta/create.vue'),
          meta: { title: '生成数据', icon: 'table' }
        },
        {
          path: 'show',
          name: '图表显示',
          component: () => import('@/views/sta/show.vue'),
          meta: { title: '图表显示', icon: 'nested' }
        }
      ]
    },
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    2、create.vue

    <template>
      <div class="app-container">
        
        <el-form :inline="true" class="demo-form-inline">
          <el-form-item label="日期">
            <el-date-picker v-model="day" type="date" placeholder="选择要统计的日期" value-format="yyyy-MM-dd" />
          el-form-item>
          <el-button :disabled="btnDisabled" type="primary" @click="create()">生成el-button>
        el-form>
      div>
    template>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    <script>
    import staApi from "@/api/sta";
    export default {
      data() {
        return {
          day: "",
          btnDisabled: false,
        };
      },
      created() { },
      methods: {
        create() {
          staApi.createStaByDay(this.day).then((resp) => {
            //提示
            this.$message({
              type: "success",
              message: "生成成功!",
            });
            //跳转页面到show
            this.$router.push({ path: '/sta/show' })
          });
        },
      },
    };
    script>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    添加成功跳转到图标显示页面


    四、统计数据图表显示ECharts

    官方网站

    ECharts是百度的一个项目,后来百度把Echart捐给apache,用于图表展示,提供了常规的折线图、柱状图、散点图、饼图、K线图,用于统计的盒形图,用于地理数据可视化的地图、热力图、线图,用于关系数据可视化的关系图、treemap、旭日图,多维数据可视化的平行坐标,还有用于 BI 的漏斗图,仪表盘,并且支持图与图之间的混搭。


    4.1、项目集成ECharts

    1、安装

    npm install --save echarts@4.1.0
    
    • 1

    2、show.vue 初始化页面静态数据

    <template>
      <div class="app-container">
        
        <el-form :inline="true" class="demo-form-inline">
          <el-form-item>
            <el-select v-model="searchObj.type" clearable placeholder="请选择">
              <el-option label="学员登录数统计" value="login_num" />
              <el-option label="学员注册数统计" value="register_num" />
              <el-option label="课程播放数统计" value="video_view_num" />
              <el-option label="每日课程数统计" value="course_num" />
            el-select>
          el-form-item>
          <el-form-item>
            <el-date-picker v-model="searchObj.begin" type="date" placeholder="选择开始日期" value-format="yyyy-MM-dd" />
          el-form-item>
          <el-form-item>
            <el-date-picker v-model="searchObj.end" type="date" placeholder="选择截止日期" value-format="yyyy-MM-dd" />
          el-form-item>
          <el-button :disabled="btnDisabled" type="primary" icon="el-icon-search" @click="showChart()">查询el-button>
        el-form>
        <div class="chart-container">
          <div id="chart" class="chart" style="height: 500px; width: 100%" />
        div>
      div>
    template>
    <script>
    import echarts from "echarts";
    export default {
      data() {
        return {
          searchObj: {
            begin: "",
            end: "",
            type: "",
          },
          btnDisabled: false,
          chart: null,
          title: "",
          xData: [],
          yData: [],
        };
      },
      methods: {
        showChart() {
          // 基于准备好的dom,初始化echarts实例
          this.chart = echarts.init(document.getElementById("chart"));
          // 指定图表的配置项和数据
          var option = {
            // x轴是类目轴(离散数据),必须通过data设置类目数据
            xAxis: {
              type: "category",
              data: ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"],
            },
            // y轴是数据轴(连续数据)
            yAxis: {
              type: "value",
            },
            // 系列列表。每个系列通过 type 决定自己的图表类型
            series: [
              {
                // 系列中的数据内容数组
                data: [820, 932, 901, 934, 1290, 1330, 1320],
                // 折线图
                type: "line",
              },
            ],
          };
          this.chart.setOption(option);
        },
      },
    };
    script>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    在这里插入图片描述


    4.2、后端接口

    图表显示,根据传入的类型和日期范围返回两部分数据,日期json数组,数量json数组

    // 图表显示,返回两部分数据,日期json数组,数量json数组
    @GetMapping("/showData/{type}/{begin}/{end}")
    public R showData(@PathVariable String type, @PathVariable String begin, @PathVariable String end) {
        Map<String, Object> map = dailyService.getShowData(type, begin, end);
        return R.ok().data(map);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    //图表显示,返回两部分数据,日期json数组,数量json数组
    @Override
    public Map<String, Object> getShowData(String type, String begin, String end) {
        // 根据条件查询对应的数据
        QueryWrapper<Daily> wrapper = new QueryWrapper<>();
        wrapper.select("date_calculated", type);
        wrapper.between("date_calculated", begin, end);
    
        List<Daily> dailyList = baseMapper.selectList(wrapper);
    
        // 前端要求数组json结果,对应后端为List集合
        // 创建两个list集合,一个放日期X轴,一个放数量Y轴
        List<String> xlist = new ArrayList<>();
        List<Integer> ylist = new ArrayList<>();
    
        for (Daily daily : dailyList) {
            xlist.add(daily.getDateCalculated());
    
            //判断查询的哪个字段
            if ("register_num".equals(type)) {
                ylist.add(daily.getRegisterNum());
            }
            if ("login_num".equals(type)) {
                ylist.add(daily.getLoginNum());
            }
            if ("video_view_num".equals(type)) {
                ylist.add(daily.getVideoViewNum());
            }
            if ("course_num".equals(type)) {
                ylist.add(daily.getCourseNum());
            }
    
        }
        HashMap<String, Object> map = new HashMap<>();
        map.put("xlist", xlist);
        map.put("ylist", ylist);
    
        return map;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    4.3、前端展示

    1、api

    //图表显示
    getShowData(searchObj) {
        return request({
            url: `/staservice/daily/showData/${searchObj.type}/${searchObj.begin}/${searchObj.end}`,
            method: 'get'
        })
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2、js

    <script>
    import echarts from "echarts";
    import staApi from "@/api/sta"
    export default {
      data() {
        return {
          searchObj: {
            begin: "",
            end: "",
            type: "",
          },
          btnDisabled: false,
          chart: null,
          title: "",
          xData: [],
          yData: [],
        };
      },
      methods: {
        showChart() {
          staApi.getShowData(this.searchObj).then(resp => {
            // x轴 时间
            this.xData = resp.data.xlist
            // y轴 数据
            this.yData = resp.data.ylist
            //调用下面生成图表方法,改变值
            this.setChart();
          })
        },
        setChart() {
          // 基于准备好的dom,初始化echarts实例
          this.chart = echarts.init(document.getElementById("chart"));
          // 指定图表的配置项和数据
          var option = {
            // x轴是类目轴(离散数据),必须通过data设置类目数据
            xAxis: {
              type: "category",
              data: this.xData,
            },
            // y轴是数据轴(连续数据)
            yAxis: {
              type: "value",
            },
            // 系列列表 每个系列通过 type 决定自己的图表类型
            series: [
              {            
                data: this.yData,   // 系列中的数据内容数组
                type: "line",       // 折线图
              },
            ],
          };
          this.chart.setOption(option);
        },
      },
    };
    script>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    3、展示

    在这里插入图片描述


    4.4、样式补充

    参考手册

    1、x坐标轴触发提示

    var option = {
      tooltip: {
        trigger: 'axis'
      },
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

    2、显示标题

    var option = {
      title: {
          text: "数据统计"
      },
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述


    五、Canal数据同步工具

    在前面的统计分析功能中,我们采取了服务调用获取统计数据,这样耦合度高,效率相对较低。

    目前我采取另一种实现方式,通过实时同步数据库表的方式实现,例如我们要统计每天注册与登录人数,我们只需把会员表同步到统计库中,实现本地统计就可以了,这样效率更高,耦合度更低,Canal就是一个很好的数据库步工具。

    canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL

    远程库里的内容同步到本地库中,他做数据更新,同步到本地库的数据也更新

    在这里插入图片描述

    要求两个库里面有相同的数据库名、和数据库结构

    在这里插入图片描述


    5.1、配置linux

    需求:linux的数据发生改变,window的数据也同步改变

    1、在window和linux分别创建数据库 guli 及创建数据表 member

    create table member(
       id int(11),
       username varchar(20),
       age int(11)
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2、linux 开启 binlog - 修改 my.cnf

    log-bin=mysql-bin	# binlog文件名
    binlog-format=ROW   # 选择row模式
    server_id=1			# mysql实例id,不能和canal的slaveId重复
    
    • 1
    • 2
    • 3

    3、linux 检查 binlog 功能是否开启

    mysql> show variables like 'log_bin';
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    | log_bin       | ON    |
    +---------------+-------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    4、linux 查看 binlog 日志文件

    mysql> show binary logs;
    +------------------+-----------+
    | Log_name         | File_size |
    +------------------+-----------+
    | mysql-bin.000001 |       154 |
    +------------------+-----------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    5、查看正在写入的 binlog 文件

    mysql> show master status;
    +------------------+----------+--------------+------------------+-------------------+
    | File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
    +------------------+----------+--------------+------------------+-------------------+
    | mysql-bin.000001 |      154 |              |                  |                   |
    +------------------+----------+--------------+------------------+-------------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    6、创建用户并授权

    # 新建用户 用户名:canal  密码:canal 
    CREATE USER canal IDENTIFIED by 'canal';
    # 授权
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    # 刷新MySQL的系统权限相关表
    FLUSH PRIVILEGES;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    5.2、安装Canal

    官网下载地址

    这里用的docker容器运行Canal,基于docker 自定义网络 guli 进行连接,通过服务名代替ip地址进行连接,如下的步骤3所用的即是mysql的服务名 guli_mysql(需要linux有1g以上空闲内存,否则无法成功运行)


    1、启动测试容器,用于复制properties配置文件

    docker run -p 11111:11111 --name test -d canal/canal-server:latest
    
    • 1

    2、将 instance.properties 文件复制到宿主机,用于后续挂载使用

    docker cp canal:/home/admin/canal-server/conf/example/instance.properties  /mydata/guli/canal/conf/
    
    • 1

    3、修改instance.properties

    在这里插入图片描述

    4、删除旧的canal容器、再创建新容器(使用挂载)

    docker rm -f test
    
    docker run -p 11111:11111 --name guli_canal \
    -v /mydata/guli/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
    -v /mydata/guli/canal/logs:/home/admin/canal-server/logs \
    --network guli \
    -d canal/canal-server:latest
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    5.3、搭建微服务

    1、在父路径创建 canal_client 模块

    2、POM

    <dependencies>
        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-webartifactId>
        dependency>
        
        <dependency>
            <groupId>mysqlgroupId>
            <artifactId>mysql-connector-javaartifactId>
        dependency>
        <dependency>
            <groupId>commons-dbutilsgroupId>
            <artifactId>commons-dbutilsartifactId>
        dependency>
        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-jdbcartifactId>
        dependency>
        <dependency>
            <groupId>com.alibaba.ottergroupId>
            <artifactId>canal.clientartifactId>
        dependency>
    dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    3、配置文件(这里连接的是window的mysql)

    # 服务端口号
    server.port=12000
    # 服务名
    spring.application.name=canal-client
    # 环境设置:dev,test,prod
    spring.profiles.active=dev
    # mysql数据库连接
    spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
    spring.datasource.url=jdbc:mysql://localhost:3306/guli?useSSL=false&useUnicode=true&characterEncoding=utf-8&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai
    spring.datasource.username=root
    spring.datasource.password=root
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    4、编写客户端类

    com.laptoy.canal.client.CanalClient

    package com.laptoy.canal.client;
    
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry.*;
    import com.alibaba.otter.canal.protocol.Message;
    import com.google.protobuf.InvalidProtocolBufferException;
    import org.apache.commons.dbutils.DbUtils;
    import org.apache.commons.dbutils.QueryRunner;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import javax.sql.DataSource;
    import java.net.InetSocketAddress;
    import java.sql.Connection;
    import java.sql.SQLException;
    import java.util.List;
    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    @Component
    public class CanalClient {
        @Resource
        private DataSource dataSource;
    
        // sql队列 下面判断拼接后的sql会加入这个队列
        private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
    
        /**
         * canal入库方法
         * 指定要监听的ip地址的canal端口号,默认开启为11111
         */
        public void run() {
            CanalConnector connector = CanalConnectors.newSingleConnector(new
                    InetSocketAddress("120.76.55.55", 11111), "example", "", "");
            int batchSize = 1000;
            try {
                connector.connect();
                connector.subscribe(".*\\..*");
                connector.rollback();
                try {
                    while (true) {
                        //尝试从master那边拉去数据batchSize条记录,有多少取多少
                        //监控上面ip的数据库是否变化
                        Message message = connector.getWithoutAck(batchSize);
                        long batchId = message.getId();
                        int size = message.getEntries().size();
                        if (batchId == -1 || size == 0) {
                            //没变化就睡
                            Thread.sleep(1000);
                        } else {
                            //有变化就同步
                            dataHandle(message.getEntries());
                        }
                        connector.ack(batchId);
                        //当队列里面堆积的sql大于一定数值的时候就模拟执行
                        //队列里如果有sql语句就执行
                        if (SQL_QUEUE.size() >= 1) {
                            executeQueueSql();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (InvalidProtocolBufferException e) {
                    e.printStackTrace();
                }
            } finally {
                connector.disconnect();
            }
        }
    
        /**
         * 模拟执行队列里面的sql语句
         */
        public void executeQueueSql() {
            int size = SQL_QUEUE.size();
            for (int i = 0; i < size; i++) {
                String sql = SQL_QUEUE.poll();
                System.out.println("[sql]----> " + sql);
                this.execute(sql.toString());
            }
        }
    
        /**
         * 数据处理
         */
        private void dataHandle(List<Entry> entrys) throws
                InvalidProtocolBufferException {
            for (Entry entry : entrys) {
                if (EntryType.ROWDATA == entry.getEntryType()) {
                    RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                    EventType eventType = rowChange.getEventType();
                    //判断当前是什么操作:删除、更新、插入
                    if (eventType == EventType.DELETE) {
                        saveDeleteSql(entry);
                    } else if (eventType == EventType.UPDATE) {
                        saveUpdateSql(entry);
                    } else if (eventType == EventType.INSERT) {
                        saveInsertSql(entry);
                    }
                }
            }
        }
    
        // 保存更新语句
        private void saveUpdateSql(Entry entry) {
            try {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                List<RowData> rowDatasList = rowChange.getRowDatasList();
                for (RowData rowData : rowDatasList) {
                    List<Column> newColumnList = rowData.getAfterColumnsList();
                    StringBuffer sql = new StringBuffer("update " +
                            entry.getHeader().getTableName() + " set ");
                    for (int i = 0; i < newColumnList.size(); i++) {
                        sql.append(" " + newColumnList.get(i).getName()
                                + " = '" + newColumnList.get(i).getValue() + "'");
                        if (i != newColumnList.size() - 1) {
                            sql.append(",");
                        }
                    }
                    sql.append(" where ");
                    List<Column> oldColumnList = rowData.getBeforeColumnsList();
                    for (Column column : oldColumnList) {
                        if (column.getIsKey()) {
                            //暂时只支持单一主键
                            sql.append(column.getName() + "=" + column.getValue());
                            break;
                        }
                    }
                    SQL_QUEUE.add(sql.toString());
                }
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        }
    
        // 保存删除语句
        private void saveDeleteSql(Entry entry) {
            try {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                List<RowData> rowDatasList = rowChange.getRowDatasList();
                for (RowData rowData : rowDatasList) {
                    List<Column> columnList = rowData.getBeforeColumnsList();
                    StringBuffer sql = new StringBuffer("delete from " +
                            entry.getHeader().getTableName() + " where ");
                    for (Column column : columnList) {
                        if (column.getIsKey()) {
                            //暂时只支持单一主键
                            sql.append(column.getName() + "=" + column.getValue());
                            break;
                        }
                    }
                    SQL_QUEUE.add(sql.toString());
                }
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        }
    
        // 保存插入语句
        private void saveInsertSql(Entry entry) {
            try {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                List<RowData> rowDatasList = rowChange.getRowDatasList();
                for (RowData rowData : rowDatasList) {
                    List<Column> columnList = rowData.getAfterColumnsList();
                    StringBuffer sql = new StringBuffer("insert into " +
                            entry.getHeader().getTableName() + " (");
                    for (int i = 0; i < columnList.size(); i++) {
                        sql.append(columnList.get(i).getName());
                        if (i != columnList.size() - 1) {
                            sql.append(",");
                        }
                    }
                    sql.append(") VALUES (");
                    for (int i = 0; i < columnList.size(); i++) {
                        sql.append("'" + columnList.get(i).getValue() + "'");
                        if (i != columnList.size() - 1) {
                            sql.append(",");
                        }
                    }
                    sql.append(")");
                    SQL_QUEUE.add(sql.toString());
                }
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        }
    
        // 入库 与数据库连接,并执行队列里面取出的sql语句
        public void execute(String sql) {
            Connection con = null;
            try {
                if (null == sql) return;
                con = dataSource.getConnection();
                QueryRunner qr = new QueryRunner();
                int row = qr.execute(con, sql);
                System.out.println("update: " + row);
            } catch (SQLException e) {
                e.printStackTrace();
            } finally {
                DbUtils.closeQuietly(con);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205

    5、启动类

    @SpringBootApplication
    public class CanalMain12000 implements CommandLineRunner {
    
        @Autowired
        private CanalClient canalClient;
    
        public static void main(String[] args) {
            SpringApplication.run(CanalMain12000.class,args);
        }
    
        //只要程序在执行状态,他就会一直执行里面的方法
        @Override
        public void run(String... args) throws Exception {
            //项目启动,执行canal客户端监听
            canalClient.run();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    5.4、运行测试

    1、linux的mysql添加一条数据

    2、控制台输出

    在这里插入图片描述

    3、查看

    在这里插入图片描述

  • 相关阅读:
    KLOOK客路旅行基于Apache Hudi的数据湖实践
    SpringCore完整学习教程4,入门级别
    JAVA:在IDEA引入本地jar包的方法并解决打包scope为system时发布无法打包进lib的方案
    关于微信阅读协议的分析
    Spring Security 单点登出
    Cadence OrCAD Capture 绘制总线的方法
    jq工具及其常用用法
    【MySQL调优(二)】性能监控分析 - Show Profile
    kafka-生产者拦截器(SpringBoot整合Kafka)
    vue3 实现pdf预览
  • 原文地址:https://blog.csdn.net/apple_53947466/article/details/126236988