• flink中使用外部定时器实现定时刷新


    背景:

    我们经常会使用到比如数据库中的配置表信息,而我们不希望每次都去查询db,那么我们就想定时把db配置表的数据定时加载到flink的本地内存中,那么如何实现呢?

    外部定时器定时加载实现

    1.在open函数中进行定时器的创建和定时加载,这个方法对于所有的RichFunction富函数都适用,包括RichMap,RichFilter,RichSink等,代码如下所示

    package wikiedits.schedule;
    
    import org.apache.flink.api.common.functions.RichFlatMapFunction;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.ExecutorUtils;
    
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    public class ScheduleRichMapFunction extends RichFlatMapFunction<String, String> {
    
        // 定时任务执行器
        private transient ScheduledExecutorService scheduledExecutorService;
        // 本地变量
        private int threshold;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            // 1.从db查询数据初始化本地变量
    //        threshold = DBManager.SELECTSQL.getConfig("threshold");
            // 2.使用定时任务更新本地内存的配置信息以及更新本地变量threshold的值
            scheduledExecutorService = Executors.newScheduledThreadPool(10);
            scheduledExecutorService.scheduleWithFixedDelay(() -> {
                // 2.1 定时任务更新本地内存配置项
                // List configList = DBManager.SELECTSQL.getConfigs();
    //            for(ConfigEntity entity : configList){
                    ConfigEntityLocalCache.getInstance().update("key", "value");
    //            }
                // 2.2 更新本地变量threshold的值
    //            threshold = DBManager.SELECTSQL.getConfig("threshold");
            }, 0, 100, TimeUnit.SECONDS);
    
        }
    
        @Override
        public void flatMap(String value, Collector<String> out) throws Exception {
    
        }
    
        @Override
        public void close() throws Exception {
            ExecutorUtils.gracefulShutdown(100, TimeUnit.SECONDS, scheduledExecutorService);
        }
    
    
    }
    
    //本地缓存实现
    package wikiedits.schedule;
    
    import com.google.common.cache.Cache;
    import com.google.common.cache.CacheBuilder;
    
    /**
     * 保存Config信息的本地缓存 ---定时同步DB配置表的数据
     */
    public class ConfigEntityLocalCache {
    
        private static volatile ConfigEntityLocalCache instance = new ConfigEntityLocalCache();
    
        /**
         * 获取本地缓存实例
         */
        public static ConfigEntityLocalCache getInstance() {
            return instance;
        }
    
        /** 缓存内存配置项 */
        private static Cache<String, String> configCache =
                CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();
    
    
        /**
         * 更新本地缓存数据
         */
        public boolean update(String key, String value){
            configCache.put(key, value);
            return true;
        }
    
    
        /**
         * 更新本地缓存数据
         */
        public  String getByKey(String key){
            return configCache.getIfPresent(key);
        }
    
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93

    2.在静态类中通过static语句块创建定时器并定时加载,代码如下

    package wikiedits.schedule;
    
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    import com.google.common.cache.Cache;
    import com.google.common.cache.CacheBuilder;
    
    /**
     * 静态类定时加载DB配置表到本地内存中
     */
    public class StaticLoadUtil {
    
        // 定时任务执行器
        private static transient ScheduledExecutorService scheduledExecutorService;
    
        public static final Cache<String, String> configCache =
                CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();
    
        // 通过定时执行器定时同步本地缓存和DB配置表
        static {
            scheduledExecutorService = Executors.newScheduledThreadPool(10);
            scheduledExecutorService.scheduleWithFixedDelay(() -> {
                // 2.1 定时任务更新本地内存配置项
                // List configList = DBManager.SELECTSQL.getConfigs();
                // for(ConfigEntity entity : configList){
                configCache.put("key", "value");
                // }
                // 2.2 更新本地变量threshold的值
                // threshold = DBManager.SELECTSQL.getConfig("threshold");
            }, 0, 100, TimeUnit.SECONDS);
        }
    
        /**
         * 获取本地缓存
         */
        public static Cache<String, String> getConfigCache() {
            return configCache;
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    总结:

    1.外部定时器可以通过在富函数的open中进行初始化并开始定时执行

    2.外部定时器也可以通过创建一个单独的静态类,然后在static模块中进行初始化并开始定时执行

  • 相关阅读:
    人工智能在无人驾驶领域有哪些方面的运用和应用
    设计模式 14 模板模式
    Arduino ESP32/ESP8266 +ST7735 1.8“tft中秋小时钟
    Html和Markdown中的空格, &nbsp; &ensp; &emsp; 以及 &thinsp; &zwnj; &zwj;三种Unicode空格
    视频SDK,高效视频解决方案
    动态计算图笔记
    C++ 进制转化入门知识(1)
    【667. 优美的排列 II】
    ChatGPT - 高效编写Prompt
    【Java】Object类及类中方法
  • 原文地址:https://blog.csdn.net/lixia0417mul2/article/details/133716692