我们经常会使用到比如数据库中的配置表信息,而我们不希望每次都去查询db,那么我们就想定时把db配置表的数据定时加载到flink的本地内存中,那么如何实现呢?
1.在open函数中进行定时器的创建和定时加载,这个方法对于所有的RichFunction富函数都适用,包括RichMap,RichFilter,RichSink等,代码如下所示
package wikiedits.schedule;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExecutorUtils;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduleRichMapFunction extends RichFlatMapFunction<String, String> {
// 定时任务执行器
private transient ScheduledExecutorService scheduledExecutorService;
// 本地变量
private int threshold;
@Override
public void open(Configuration parameters) throws Exception {
// 1.从db查询数据初始化本地变量
// threshold = DBManager.SELECTSQL.getConfig("threshold");
// 2.使用定时任务更新本地内存的配置信息以及更新本地变量threshold的值
scheduledExecutorService = Executors.newScheduledThreadPool(10);
scheduledExecutorService.scheduleWithFixedDelay(() -> {
// 2.1 定时任务更新本地内存配置项
// List configList = DBManager.SELECTSQL.getConfigs();
// for(ConfigEntity entity : configList){
ConfigEntityLocalCache.getInstance().update("key", "value");
// }
// 2.2 更新本地变量threshold的值
// threshold = DBManager.SELECTSQL.getConfig("threshold");
}, 0, 100, TimeUnit.SECONDS);
}
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
}
@Override
public void close() throws Exception {
ExecutorUtils.gracefulShutdown(100, TimeUnit.SECONDS, scheduledExecutorService);
}
}
//本地缓存实现
package wikiedits.schedule;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
/**
* 保存Config信息的本地缓存 ---定时同步DB配置表的数据
*/
public class ConfigEntityLocalCache {
private static volatile ConfigEntityLocalCache instance = new ConfigEntityLocalCache();
/**
* 获取本地缓存实例
*/
public static ConfigEntityLocalCache getInstance() {
return instance;
}
/** 缓存内存配置项 */
private static Cache<String, String> configCache =
CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();
/**
* 更新本地缓存数据
*/
public boolean update(String key, String value){
configCache.put(key, value);
return true;
}
/**
* 更新本地缓存数据
*/
public String getByKey(String key){
return configCache.getIfPresent(key);
}
}
2.在静态类中通过static语句块创建定时器并定时加载,代码如下
package wikiedits.schedule;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
/**
* 静态类定时加载DB配置表到本地内存中
*/
public class StaticLoadUtil {
// 定时任务执行器
private static transient ScheduledExecutorService scheduledExecutorService;
public static final Cache<String, String> configCache =
CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();
// 通过定时执行器定时同步本地缓存和DB配置表
static {
scheduledExecutorService = Executors.newScheduledThreadPool(10);
scheduledExecutorService.scheduleWithFixedDelay(() -> {
// 2.1 定时任务更新本地内存配置项
// List configList = DBManager.SELECTSQL.getConfigs();
// for(ConfigEntity entity : configList){
configCache.put("key", "value");
// }
// 2.2 更新本地变量threshold的值
// threshold = DBManager.SELECTSQL.getConfig("threshold");
}, 0, 100, TimeUnit.SECONDS);
}
/**
* 获取本地缓存
*/
public static Cache<String, String> getConfigCache() {
return configCache;
}
}
1.外部定时器可以通过在富函数的open中进行初始化并开始定时执行
2.外部定时器也可以通过创建一个单独的静态类,然后在static模块中进行初始化并开始定时执行