• Flink异步io关联Hbase


    主程序

        public static void main(String[] args) throws Exception {
            //1.获取流执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
            //设置动态参数
            ParameterTool propertiesargs = ParameterTool.fromArgs(args);
            String fileName = propertiesargs.get("CephConfPath");
            //从hdfs获取动态参数配置文件
            org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
            FileSystem fs = FileSystem.get(URI.create(fileName), conf);
            fs.open(new org.apache.hadoop.fs.Path(fileName));
            ParameterTool propertiesFile = ParameterTool.fromPropertiesFile(fs.open(new org.apache.hadoop.fs.Path(fileName)).getWrappedStream());
            // 注册给环境变量(HBASE使用)
            env.getConfig().setGlobalJobParameters(propertiesFile);
            new CephConfig(propertiesFile);
    
            //2.设置CK&状态后端
            env.setStateBackend(new FsStateBackend(FSSTATEBACKEND));
            env.enableCheckpointing(10000);// 每 ** ms 开始一次 checkpoint
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置模式为精确一次
            env.getCheckpointConfig().setCheckpointTimeout(100000);// Checkpoint 必须在** ms内完成,否则就会被抛弃
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);// 同一时间只允许一个 checkpoint 进行
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);// 确认 checkpoints 之间的时间会进行 ** ms
            env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10,TimeUnit.SECONDS)));//重启策略:重启3次,间隔10s
    
            //3.从kafka中读取日志信息,将将每行数据转换为JavaBean对象 主流
            DataStreamSource<String> dataStream = env.addSource(KafkaUtils.getKafkaSource(KAFKA_SOURCE_TOPIC, KAFKA_SOURCE_GROUP));
            …………
            //8.读取HBase中user表,进行维度关联
            SingleOutputStreamOperator<CephAccessRecord> record = AsyncDataStream.unorderedWait(
                    validDS,
                    new DimAsyncFunction<CephAccessRecord>() {
                        @Override
                        public String getKey(CephAccessRecord record) {
                            return record.access_key;
                        }
                    },
                    60, TimeUnit.SECONDS);
            BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));
            StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(
                    new Path(HDFS_FILE_PATH),
                    new SimpleStringEncoder<>("UTF-8"))
                    .withRollingPolicy(
                            DefaultRollingPolicy.builder()
                                    .withRolloverInterval(TimeUnit.DAYS.toMillis(1))//至少包含 20 分钟的数据
                                    .withInactivityInterval(TimeUnit.DAYS.toMillis(1 ))//最近 20 分钟没有收到新的数据
                                    .withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB
                                    .build())
                    .withBucketAssigner(assigner)
                    .build();
    
            // 将record-->过滤上传数据-->转换成jsonstring-->写入到hdfs
    //        allDataDS.filter(log->log.event_type.equals("upload")).map(line->JSON.toJSONString(line)).addSink(fileSink);
            dataStream.map(line->JSON.toJSONString(line)).addSink(fileSink);
    
            //10.流环境执行
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    异步关联程序

    package com.data.ceph.function;
    
    import org.apache.commons.beanutils.BeanUtils;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.async.ResultFuture;
    import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
    import org.apache.hadoop.hbase.*;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.security.User;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.security.UserGroupInformation;
    
    import java.util.Collections;
    import java.util.Map;
    
    public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> implements DimAsyncJoinFunction<T> {
    
        private org.apache.hadoop.hbase.client.Connection connection = null;
        private ResultScanner rs = null;
        private Table table = null;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            //不启用安全认证
            System.setProperty("zookeeper.sasl.client", "false");
            Map<String, String> stringStringMap = getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap();
            String hbase = stringStringMap.get("hbase_zookeeper_quorum");
            org.apache.hadoop.conf.Configuration hconf = HBaseConfiguration.create();
            hconf.set(HConstants.ZOOKEEPER_QUORUM, "172.16.23.37,172.16.23.38,172.16.23.39");
    //        hconf.set(HConstants.ZOOKEEPER_QUORUM, hbase);
            hconf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181");
            hconf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase");
    
            //指定用户名为hbase的用户去访问hbase服务
            UserGroupInformation userGroupInformation = UserGroupInformation.createRemoteUser("hive");
            connection = ConnectionFactory.createConnection(hconf, User.create(userGroupInformation));
            table = connection.getTable(TableName.valueOf("cloud:user_info"));
        }
    
    
        @Override
        public void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {
            Get get = new Get(Bytes.toBytes(getKey(input)));
            Result rs = table.get(get);
            for (Cell cell : rs.rawCells()) {
                String column = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
                String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                BeanUtils.setProperty(input, column, value);
            }
            resultFuture.complete(Collections.singletonList(input));
        }
        @Override
        public void close() throws Exception {
            if (rs != null) rs.close();
            if (table != null) table.close();
            if (connection != null) connection.close();
        }
        @Override
        public void timeout(T input, ResultFuture<T> resultFuture) throws Exception {
            System.out.println("TimeOut:" + input);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
  • 相关阅读:
    完美的分布式监控系统 Prometheus与优雅的开源可视化平台 Grafana
    如何看待程序员不写注释?
    微服务项目:尚融宝(1)(项目介绍)
    【C++】动态内存管理 ③ ( C++ 对象的动态创建和释放 | new 运算符 为类对象 分配内存 | delete 运算符 释放对象内存 )
    「SpringCloud」06 Hystrix断路器
    【Flask基础】七,Flask--Jinja2模板学习
    java计算机毕业设计web家庭财务管理系统MyBatis+系统+LW文档+源码+调试部署
    ElasticSearch从入门到精通--第五话(整合SpringBoot高效开发、分页高亮等、Kibana使用篇)
    Juniper Networks Junos OS EX远程命令执行漏洞(CVE-2023-36845)
    刷题日常计~JS③
  • 原文地址:https://blog.csdn.net/lck_csdn/article/details/136732987