• 基于chunjun纯钧的增量数据同步问题排查【博客园-实习小生】


    基于chunjun纯钧的增量数据同步

    目前我司的大数据平台使用的是flink技术栈,底层的连接器插件使用的是国产的chunjun插件,在使用chunjun的过程中也遇到了很多问题,本次记录下在SQL模式的情况下怎么支持增量的数据同步

    chunjun的官网文档对增量同步已经做出了一定的说明

    纯钧官方
    根据文档我编写了一个SQL脚本

    create table `source` (
            `sfzh` STRING COMMENT '',
            `xm` STRING COMMENT '',
            `xb` STRING COMMENT '',
            `xbdm` STRING COMMENT '',
            `jzdz` STRING COMMENT '',
            `fzrq` DATE COMMENT '',
            `dsc_biz_record_id` STRING COMMENT ''
    ) with (
            'connector' = 'mysql-x',
            'url' = 'jdbc:mysql://:/?useSSL=false&useInformationSchema=true&nullCatalogMeansCurrent=true',
            'table-name' = '',
            'username' = '',
            'password' = '',
            'scan.fetch-size' = '1024',
            'scan.increment.column' = 'fzrq',
            --'scan.increment.column-type' = 'date',
            'scan.start-location' = '1659974400000'
    );
    
    create table `sink` (
            `sfzh` STRING COMMENT '',
            `xm` STRING COMMENT '',
            `xb` STRING COMMENT '',
            `xbdm` STRING COMMENT '',
            `jzdz` STRING COMMENT '',
            `fzrq` DATE COMMENT '',
            `dsc_biz_record_id` STRING COMMENT '',
            PRIMARY KEY (`dsc_biz_record_id`) NOT ENFORCED
    ) with (
            'connector' = 'stream-x'
    );
    
    

    然后提交任务的时候发现已经记录了start-locationstart-location的指标信息了,但是并没有上报到Prometheus!

    在本地调试源码解决问题的大致过程

    在类 com.dtstack.chunjun.source.format.BaseRichInputFormat中有一个成员变量

    /** 自定义的prometheus reporter,用于提交startLocation和endLocation指标 */
    protected transient CustomReporter customReporter;
    

    该变量是用来提交增量信息的对象,flink任务在开始的时候会执行一下方法

        @Override
        public void openInputFormat() throws IOException {
            Map<String, String> vars = getRuntimeContext().getMetricGroup().getAllVariables();
            if (vars != null) {
                jobName = vars.getOrDefault(Metrics.JOB_NAME, "defaultJobName");
                jobId = vars.get(Metrics.JOB_NAME);
                indexOfSubTask = Integer.parseInt(vars.get(Metrics.SUBTASK_INDEX));
            }
    
            LOG.info("是否使用自定义报告 {}", useCustomReporter());
            if (useCustomReporter()) {
    
                customReporter =
                        DataSyncFactoryUtil.discoverMetric(
                                config, getRuntimeContext(), makeTaskFailedWhenReportFailed());
                customReporter.open();
                LOG.info("customReporter 的hashcode is {}", customReporter.hashCode());
            }
    
            startTime = System.currentTimeMillis();
        }
    

    通过排查useCustomReporter方法得知 jdbcConf.getInitReporter()是false,而在JdbcConfig类里面这个对象默认是true

     /** 使用自定义的指标输出器把增量指标打到普罗米修斯 */
        @Override
        protected boolean useCustomReporter() {
            return jdbcConf.isIncrement() && jdbcConf.getInitReporter();
        }
    
        /** 增量同步或者间隔轮询时,是否初始化外部存储 */
        protected Boolean initReporter = true;
    

    经过查找 initReporter 属性的set方法调用,找到了下面的问题
    在类 com.dtstack.chunjun.connector.jdbc.source.JdbcDynamicTableSource 中有个地方说暂时不支持SQL的方式

    尝试一下将false修改为true,然后在本地进行测试,测试的时候将pushgateway的host和port写到代码里面,执行任务发现pushgateway里面已经有数据了

    那么可以开始打包了,由于改了源代码,所以要先格式化代码 mvn spotless:apply 再打包 mvn clean package -DskipTests

    后续问题

    打包到虚拟机进行测试,我使用的是yarn-per-job模式,提交任务后发现报找不到Prometheus报告类的异常,通过异常信息发现在前面提到的方法里有classloader

    public void openInputFormat() throws IOException {
            Map<String, String> vars = getRuntimeContext().getMetricGroup().getAllVariables();
            if (vars != null) {
                jobName = vars.getOrDefault(Metrics.JOB_NAME, "defaultJobName");
                jobId = vars.get(Metrics.JOB_NAME);
                indexOfSubTask = Integer.parseInt(vars.get(Metrics.SUBTASK_INDEX));
            }
    
            LOG.info("是否使用自定义报告 {}", useCustomReporter());
            if (useCustomReporter()) {
    
                customReporter =
                        DataSyncFactoryUtil.discoverMetric(
                                config, getRuntimeContext(), makeTaskFailedWhenReportFailed());
                customReporter.open();
                LOG.info("customReporter 的hashcode is {}", customReporter.hashCode());
            }
    
            startTime = System.currentTimeMillis();
        }
    
        public static CustomReporter discoverMetric(
                ChunJunCommonConf commonConf,
                RuntimeContext context,
                boolean makeTaskFailedWhenReportFailed) {
            try {
                String pluginName = commonConf.getMetricPluginName();
                // 这里获取到了类的全限定名 com.dtstack.chunjun.metrics.prometheus.PrometheusReport
                String pluginClassName = PluginUtil.getPluginClassName(pluginName, OperatorType.metric);
                MetricParam metricParam =
                        new MetricParam(
                                context, makeTaskFailedWhenReportFailed, commonConf.getMetricProps());
    
                ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
                Class clazz = classLoader.loadClass(pluginClassName);
                Constructor constructor = clazz.getConstructor(MetricParam.class);
    
                return (CustomReporter) constructor.newInstance(metricParam);
            } catch (Exception e) {
                throw new ChunJunRuntimeException(e);
            }
        }
    

    在本地的时候这里加载类的时候是没问题的,但是在线上的时候出现了了找不到类的异常,猜测是相关的jar没有加载到flink jvm进程里面,所以将项目里面的 chunjun-metrics-prometheus.jar 放到了flink的lib目录下,再次启动任务 问题得以解决!

  • 相关阅读:
    【leetcode10-21】子串、普通数组、矩阵
    MySQL数据库管理DDL语言和数据库管理
    5个精美的wordpress中文企业主题模板
    Mac电脑输入正确密码后提示密码错误
    TS流分析
    在Ubuntu20.04单机部署Doris1.1
    AudioLM: 音频生成的革命性模型
    nacos配置中心使用教程
    超详细带你用Java实现QQ的聊天功能
    React 全栈体系(九)
  • 原文地址:https://www.cnblogs.com/sxxs/p/17309745.html