• Flink 维表关联


    1、实时查询维表

    实时查询维表是指用户在 Flink 算子中直接访问外部数据库,比如用 MySQL 来进行关联,这种方式是同步方式,数据保证是最新的。但是,当我们的流计算数据过大,会对外 部系统带来巨大的访问压力,一旦出现比如连接失败、线程池满等情况,由于我们是同步调用,所以一般会导致线程阻塞、Task 等待数据返回,影响整体任务的吞吐量。而且这种方案对外部系统的 QPS 要求较高,在大数据实时计算场景下,QPS 远远高于普通的后台系统,峰值高达十万到几十万,整体作业瓶颈转移到外部系统

    
    public class DimSync  extends RichMapFunction<fplOverview, String> {
        private static final Logger LOGGER = LoggerFactory.getLogger(DimSync.class);
        private Connection conn = null;
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            conn = DriverManager.getConnection("jdbc:test:3306/mysqldb?characterEncoding=UTF-8", "root", "qyllt1314#");
        }
        @Override
        public String map(fplOverview fplOverview) throws Exception {
            JSONObject jsonObject = JSONObject.parseObject(fplOverview.toJson());
    
            String dp_id = jsonObject.getString("dp_id");
    
            //根据 dp_id 查询  上周的 fpl_amount,yw
            PreparedStatement pst = conn.prepareStatement("select  max(fpl_amount) as fpl_amount,max(yearweek(datatime)) as yw \n" +
                                                          "from fpl_overview \n" +
                                                          "where datatime >= date_sub(curdate(), interval weekday(curdate()) + 7 Day)  # 上周第一天\n" +
                                                          "and datatime < date_sub(curdate(), interval weekday(curdate()) + 0 Day)  # 上周最后一天+1 \n" +
                                                          "and dp_id = ? \n" +
                                                          "group by dp_id");
            pst.setString(1,dp_id);
            ResultSet resultSet = pst.executeQuery();
            String fpl_amount = null;
            String yw = null ;
            while (resultSet.next()){
                fpl_amount = resultSet.getString(1);
                yw = resultSet.getString(2);
            }
            pst.close();
    
            jsonObject.put("lastweek_fpl_amount",fpl_amount);
            jsonObject.put("lastweek_yw",yw)
            return jsonObject.toString();
    
        }
        public void close() throws Exception {
            super.close();
            conn.close();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    2、LRU 缓存 (flink 异步Id)

    利用 Flink 的 RichAsyncFunction 读取 mysql 的数据到缓存中,我们在关联维度表时先去查询缓存,如果缓存中不存在这条数据,就利用客户端去查询 mysql,然后插入到缓存中。

    
    public class JDBCAsyncFunction  extends RichAsyncFunction<fplOverview, JsonObject> {
        private SQLClient client;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            Vertx vertx = Vertx.vertx(new VertxOptions()
                    .setWorkerPoolSize(10)
                    .setEventLoopPoolSize(10));
    
            JsonObject config = new JsonObject()
                    .put("url", "jdbc:mysql://rm-bp161be65d56kbt4nzo.mysql.rds.aliyuncs.com:3306/mysqldb?characterEncoding=UTF-8;useSSL=false")
                    .put("driver_class", "com.mysql.cj.jdbc.Driver")
                    .put("max_pool_size", 10)
                    .put("user", "root")
                    .put("password", "");
    
            client = JDBCClient.createShared(vertx, config);
        }
    
        @Override
        public void close() throws Exception {
            client.close();
        }
    
        @Override
        public void asyncInvoke(fplOverview fplOverview, ResultFuture<JsonObject> resultFuture) throws Exception {
            client.getConnection(
                    conn -> {
                if (conn.failed()) {
                    return;
                }
    
                final SQLConnection connection = conn.result();
                // 执行sql
                connection.query("select  max(fpl_amount) as fpl_amount,max(yearweek(datatime)) as yw \n" +
                                 "from fpl_overview \n" +
                                "where datatime >= date_sub(curdate(), interval weekday(curdate()) + 7 Day)  # 上周第一天\n" +
                                "and datatime < date_sub(curdate(), interval weekday(curdate()) + 0 Day)  # 上周最后一天+1 \n" +
                                 "and dp_id = '" + fplOverview.getDp_id() + " ' " +
                                "group by dp_id ", res2 -> {
                    ResultSet rs = new ResultSet();
                    if (res2.succeeded()) {
                        rs = res2.result();
                    }else{
                        System.out.println("查询数据库出错");
                    }
                    List<JsonObject> stores = new ArrayList<>();
                    for (JsonObject json : rs.getRows()) {
                        stores.add(json);
                    }
                    connection.close();
                    resultFuture.complete(stores);
                });
            });
    
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    3、预加载全量mysql数据

    预加载全量mysql数据 使用 ScheduledExecutorService 每隔 5 分钟拉取一次维表数据,这种方式适用于那些实时场景不是很高,维表数据较小的场景

    public class WholeLoad extends RichMapFunction<fplOverview,String> {
        private static final Logger LOGGER = LoggerFactory.getLogger(WholeLoad.class);
        // 定义map的结果,key为关联字段
        private static Map<String,String> cache ;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            cache = new HashMap<>();
            ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
            executor.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        load();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            },0,5, TimeUnit.MINUTES); //从现在开始每隔5分钟查询数据
        }
    
        @Override
        public String map(fplOverview fplOverview) throws Exception {
            JSONObject jsonObject = JSONObject.parseObject(fplOverview.toJson());
            String dp_id = jsonObject.getString("dp_id");
            // 获取对应id的结果
            String rs = cache.get(dp_id);
            JSONObject rsObject = JSONObject.parseObject(rs);
            jsonObject.putAll(rsObject);
            return jsonObject.toString();
        }
    
        public   void  load() throws Exception {
    
            Class.forName("com.mysql.jdbc.Driver");
            Connection con = DriverManager.getConnection("jdbc:mysql://test:3306/mysqldb?characterEncoding=UTF-8", "root", "qyllt1314#");
            // 执行查询的SQL
            PreparedStatement statement = con.prepareStatement("select  dp_id,max(fpl_amount) as fpl_amount,max(yearweek(datatime)) as yw \n" +
                    "from fpl_overview \n" +
                    "where datatime >= date_sub(curdate(), interval weekday(curdate()) + 7 Day)  # 上周第一天\n" +
                    "and datatime < date_sub(curdate(), interval weekday(curdate()) + 0 Day)  # 上周最后一天+1 \n" +
                    "group by dp_id");
            ResultSet rs = statement.executeQuery();
            while (rs.next()) {
                // 查询结果放入缓存
                String dp_id = rs.getString("dp_id");
                String fpl_amount = rs.getString("fpl_amount");
                String yw = rs.getString("yw");
                JSONObject jsonObject = JSONObject.parseObject("{}");
                jsonObject.put("lastweek_fpl_amount",fpl_amount);
                jsonObject.put("lastweek_yw",yw);
                cache.put(dp_id,jsonObject.toString());
            }
            System.out.println("数据输出测试:"+cache.toString());
            con.close();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
  • 相关阅读:
    计组笔记1-计算机系统概论
    【SpringCloud】OpenFeign高级特性
    如何在迅睿CMS中使用if语句判断多个栏目ID
    安卓性能优化手册
    yolov
    金融强化学习与finRL开发包
    【leetcode】【2022/8/18】1224. 最大相等频率
    计算机毕业设计ssm+vue基本微信小程序的校园二手商城系统
    Ax=y,Ax=0以及非线性方程组的最小二乘解
    Linux常用命令
  • 原文地址:https://blog.csdn.net/weixin_43823423/article/details/134011696