• Flink流式框架过程问题


    注:博主使用的版本就是:<flink.version>1.16.1

    前提环境:

    因公司业务需要,使用Flink对数据进行流式处理,具体处理流程就是,从kafka接到数据,然后连续请求十多个接口(算法)对数据进行打标;

    主程序:
    在这里插入图片描述
    具体的异步IO代码(随便找一个展示):

    package com.wenge.datagroup.storage.process;
    
    import com.alibaba.fastjson.JSONObject;
    import com.wenge.datagroup.storage.bean.ParamConfig;
    import com.wenge.datagroup.storage.common.ArgsConstants;
    import com.wenge.datagroup.storage.process.base.BaseETL;
    import com.wenge.datagroup.storage.service.YaYiService.YaYiPolarityService;
    import com.wenge.datagroup.storage.utils.ConfigUtil;
    import com.wenge.datagroup.storage.utils.Funnel;
    import com.wenge.datagroup.storage.utils.YaYiUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.AsyncDataStream;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.functions.async.ResultFuture;
    import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
    
    import java.util.Collections;
    import java.util.Objects;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    import java.util.function.Supplier;
    
    @Slf4j
    public class AnalyerAsyncIOProcessPolarity {
    
        public static DataStream<JSONObject> process(DataStream<JSONObject> dataStream) {
    
            log.error("----------------------------开始异步IO处理----------------");
            String topic = Funnel.contains(ArgsConstants.TOPIC) ? Funnel.getString(ArgsConstants.TOPIC) : "";
            String configFile = Funnel.contains(ArgsConstants.CONFIG) ? Funnel.getString(ArgsConstants.CONFIG) : "config.properties";
            int asyncNum = Funnel.contains(ArgsConstants.ASYNC_NUM) ? Funnel.getInt(ArgsConstants.ASYNC_NUM) : ConfigUtil.getInteger(ArgsConstants.ASYNC_NUM);
            int mapParallelism = Funnel.contains(ArgsConstants.MAP_PARALLELISM) ? Funnel.getInt(ArgsConstants.MAP_PARALLELISM) : ConfigUtil.getInteger(ArgsConstants.MAP_PARALLELISM);
            int filterParallelism = Funnel.contains(ArgsConstants.FILTER_PARALLELISM) ? Funnel.getInt(ArgsConstants.FILTER_PARALLELISM) : ConfigUtil.getInteger(ArgsConstants.FILTER_PARALLELISM);
            int TranslateParallelism = (Funnel.contains(ArgsConstants.Translate_MAP_PARALLELISM) ? Funnel.getInt(ArgsConstants.Translate_MAP_PARALLELISM) : ConfigUtil.getInteger(ArgsConstants.Translate_MAP_PARALLELISM));
    
            // 异步IO
            RichAsyncFunction richAsyncFunction = new RichAsyncFunction<JSONObject, JSONObject>() {
                private transient ExecutorService executorService;
                private ParamConfig paramConfig;
                private YaYiUtil yaYiUtil;
    
                @Override
                public void open(Configuration parameters) {
                    // 重新加载配置文件
                    log.error("重新加载配置文件");
                    ConfigUtil.setConfigFile(configFile);
                    ConfigUtil.setTopic(topic);
                    ConfigUtil.init();
                    this.executorService = Executors.newFixedThreadPool(asyncNum);
                    paramConfig = new ParamConfig(ConfigUtil.getString("YaYiappKey"), ConfigUtil.getString("YaYiappSecret"));
                    yaYiUtil = new YaYiUtil(paramConfig);
                }
    
                @Override
                public void close() throws Exception {
                    // 关闭线程池
                    if (executorService != null) {
                        executorService.shutdown();
                    }
                    log.error("----------------------------情感分析-线程池关闭----------------------");
                }
    
                @Override
                public void timeout(JSONObject input, ResultFuture<JSONObject> resultFuture) {
                    JSONObject data = input;
                    String uuid = data.getString("uuid");
                    log.error("-----------------------数据超时----------------------:{}", uuid);
                    //对超时数据进行处理
                    resultFuture.complete(Collections.singleton(data));
                }
    
                @Override
                public void asyncInvoke(JSONObject json, ResultFuture<JSONObject> resultFuture) {
    
                    CompletableFuture.supplyAsync(new Supplier<JSONObject>() {
    
                        @Override
                        public JSONObject get() {
                            String uuid = json.getString("uuid");
                            long start =System.currentTimeMillis();
                            try {
                                //TODO: 根据业务逻辑进行处理
                                String title = json.getString("title");
                                String content = json.getString("content");
                                String translate_title = json.getString("translate_title");
                                String translate_content = json.getString("translate_content");
                                String languageRecognition = json.getJSONObject("analysis").getString("language");
                                String dataSourceType = json.getJSONObject("platform").getString("data_source_type");
    
                                
                                if (StringUtils.isNotBlank(translate_content)) {
                                    String polarity = new String();
                                    Integer polaritySum = 0;
    								//具体算法调用
                                    YaYiPolarityService yaYiPolarityService = new YaYiPolarityService();
                                    polarity = yaYiPolarityService.yaYiPolarity(translate_content);
                                    if (StringUtils.isNotBlank(polarity)) {
                                        polaritySum = StringUtils.equals(polarity, "A") ? 0 : StringUtils.equals(polarity, "B") ? 1 : 2;
                                        JSONObject analysis = json.getJSONObject("analysis");
                                        if (Objects.nonNull(analysis)) {
                                            analysis.put("polarity", polaritySum);
                                            json.put("analysis", analysis);
                                        } else {
                                            JSONObject analysisJson = new JSONObject();
                                            analysisJson.put("polarity", polaritySum);
                                            json.put("analysis", analysisJson);
                                        }
                                    }
                                    log.error("uuid:{},分析后数据:{}", uuid, polarity);
                                    long end =System.currentTimeMillis();
                                    log.error("uuid:{},分析,耗时:{} ms", uuid,(end-start));
                                }
                                return json;
                            } catch (Exception e) {
                                log.error("--------分析异常:{},数据:{}",uuid, e);
                                return json;
                            }
                        }
                    }, executorService).thenAccept((JSONObject dbResult) -> {
                        resultFuture.complete(Collections.singleton(dbResult));
                    });
                }
            };
            DataStream<JSONObject> downloadStream = AsyncDataStream.unorderedWait(
                    dataStream,
                    richAsyncFunction,
                    50000,
                    TimeUnit.MILLISECONDS,
                    asyncNum).name("qinggan").setParallelism(TranslateParallelism);
    
            return downloadStream;
        }
    
    
    }
    
    

    问题

    1:异步io访问接口直接引发关闭

    在这里插入图片描述
    在这里插入图片描述

    解决方案:

    方法中传递参数不要使用 Set

    具体的原因我没深究,只是经过验证在异步IO中使用如下就会导致线程关闭:
    在这里插入图片描述
    改成如下就行:
    在这里插入图片描述

    2:数据格式问题直接引发关闭

    在这里插入图片描述
    解决方案:
    在这里插入图片描述
    在这里插入图片描述

    整体来说,flink中如果数据格式传输导致错误,就会引发线程关闭,
    所以 DataStream 改为DataStream 一定要通过map和filter 筛选

    3:flink 消费kafka不提交offset

    虽然在new FlinkKafkaConsumer<> 中设置了自动提交间隔,如下:

     properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");//自动提交的时间间隔
     properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "20000");//自动提交的时间间隔
    

    但是在实际应用过程中发现,到了设置的20000ms既20s,依然不提交offset,以为flink读取kafka失败。
    实际上是因为我们再代码中开启了Checkpoint,就会覆盖kafka的配置,所以是经过规定的Checkpoint时间后才会提交offset
    在这里插入图片描述

    4:flink本地运行,发现默认并行度为6

    当我们在本地中可以通过一下开启本地webui模式

    // 使用本地模式并开启WebUI
    Configuration configuration = new Configuration();
    configuration.setInteger("rest.port", 8083);
    streamEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
    

    当时我们本地打开web UI发现,默认的并行度是6
    在这里插入图片描述
    这样有时候影响我们异步调用外部接口的qps设置,

    解决方案:
    在开发环境中,没有配置文件,默认并行度就是当前机器的 CPU 核心数(巨坑!)
    所以我们需要自己手动指定每个算子的并行度,不要使用默认的,可以通过.setParallelism(2)来指定某一个算子的并行度

    5:java代码的jar包,本地运行可以,flink框架上运行异常

    问题:
    我写的flink处理程序,在我本地和我打包放在flink框架都可以正常运行,
    但是我的朋友打包成功后,放在flink框架上运行就异常。

    解决:
    经过各种尝试后,我的同事将自己maven的版本由3.5.3升级到了3.6.1只有就正常了
    具体原因未知。

  • 相关阅读:
    汉兰达汽车发动机怠速抖动故障诊断方案设计
    AI题目整理
    PNPM 高效入门:安装配置一本通
    Qt QDateTime计算时间差
    Docker的四种网络模式
    红队隧道应用篇之Netsh端口转发
    SpringCloud-Sleuth服务追踪
    如何写一个中间件的springboot的starter?
    Android Studio入门——页面跳转
    Linux发布Spring Boot项目
  • 原文地址:https://blog.csdn.net/qq_41694906/article/details/140925724