• 【Flink实战】Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X


    🚀 作者 :“大数据小禅”

    🚀 文章简介Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X

    🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬


    Flink怎么操作Redis

    • Flink怎么操作redis?

      • 方式一:自定义sink
      • 方式二:使用connector
    • Redis Sink 核心是RedisMapper 是一个接口,使用时要编写自己的redis操作类实现这个接口中的三个方法

      • getCommandDescription 选择对应的数据结构和key名称配置
      • getKeyFromData 获取key
      • getValueFromData 获取value
    • 使用

      • 添加依赖
      <dependency>
          <groupId>org.apache.bahirgroupId>
          <artifactId>flink-connector-redis_2.11artifactId>
          <version>1.0version>
      dependency>
      
      • 1
      • 2
      • 3
      • 4
      • 5
    • 编码

      public class MyRedisSink implements RedisMapper<Tuple2<String, Integer>> {
          @Override
          public RedisCommandDescription getCommandDescription() {
              return new RedisCommandDescription(RedisCommand.HSET, "VIDEO_ORDER_COUNTER");
          }
      
          @Override
          public String getKeyFromData(Tuple2<String, Integer> value) {
              return value.f0;
          }
      
          @Override
          public String getValueFromData(Tuple2<String, Integer> value) {
              return value.f1.toString();
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16

    Flink 商品销量统计-转换-分组-聚合-存储自定义的Redis Sink实战

    • Redis环境说明 redis6

      • 使用docker部署redis6.x 看个人主页docker相关文章

        docker run -d  -p 6379:6379 redis
        
        • 1
    • 编码实战

    数据源

    public class VideoOrderSource extends RichParallelSourceFunction {
    
    
        private volatile Boolean flag = true;
    
        private Random random = new Random();
    
        private static List list = new ArrayList<>();
        static {
            list.add("spring boot2.x课程");
            list.add("微服务SpringCloud课程");
            list.add("RabbitMQ消息队列");
            list.add("Kafka课程");
            list.add("小滴课堂面试专题第一季");
            list.add("Flink流式技术课程");
            list.add("工业级微服务项目大课训练营");
            list.add("Linux课程");
        }
    
    
        /**
         * run 方法调用前 用于初始化连接
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            System.out.println("-----open-----");
        }
    
        /**
         * 用于清理之前
         * @throws Exception
         */
        @Override
        public void close() throws Exception {
            System.out.println("-----close-----");
        }
    
    
        /**
         * 产生数据的逻辑
         * @param ctx
         * @throws Exception
         */
        @Override
        public void run(SourceContext ctx) throws Exception {
    
            while (flag){
                Thread.sleep(1000);
                String id = UUID.randomUUID().toString();
                int userId = random.nextInt(10);
                int money = random.nextInt(100);
                int videoNum = random.nextInt(list.size());
                String title = list.get(videoNum);
                VideoOrder videoOrder = new VideoOrder(id,title,money,userId,new Date());
    
                ctx.collect(videoOrder);
            }
    
    
        }
    
        /**
         * 控制任务取消
         */
        @Override
        public void cancel() {
    
            flag = false;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    保存的格式与存取的方法

    public class VideoOrderCounterSink implements RedisMapper<Tuple2<String, Integer>> {
    
    
        /***
         * 选择需要用到的命令,和key名称
         * @return
         */
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "VIDEO_ORDER_COUNTER");
        }
    
        /**
         * 获取对应的key或者filed
         *
         * @param data
         * @return
         */
        @Override
        public String getKeyFromData(Tuple2<String, Integer> data) {
    
            System.out.println("getKeyFromData=" + data.f0);
            return data.f0;
        }
    
        /**
         * 获取对应的值
         *
         * @param data
         * @return
         */
        @Override
        public String getValueFromData(Tuple2<String, Integer> data) {
            System.out.println("getValueFromData=" + data.f1.toString());
            return data.f1.toString();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    落地

    public class Flink07RedisSinkApp {
    
        /**
         * source
         * transformation
         * sink
         *
         * @param args
         */
        public static void main(String[] args) throws Exception {
    
            //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
            env.setParallelism(1);
    
            //数据源 source
    //        DataStream ds = env.fromElements(
    //                new VideoOrder("21312","java",32,5,new Date()),
    //                new VideoOrder("314","java",32,5,new Date()),
    //                new VideoOrder("542","springboot",32,5,new Date()),
    //                new VideoOrder("42","redis",32,5,new Date()),
    //                new VideoOrder("4252","java",32,5,new Date()),
    //                new VideoOrder("42","springboot",32,5,new Date()),
    //                new VideoOrder("554232","flink",32,5,new Date()),
    //                new VideoOrder("23323","java",32,5,new Date())
    //        );
            DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());
    
    
    
            //transformation
           DataStream<Tuple2<String,Integer>> mapDS =  ds.map(new MapFunction<VideoOrder, Tuple2<String,Integer>>() {
                @Override
                public Tuple2<String, Integer> map(VideoOrder value) throws Exception {
                    return new Tuple2<>(value.getTitle(),1);
                }
            });
    
    
    
    //        DataStream> mapDS = ds.flatMap(new FlatMapFunction>() {
    //            @Override
    //            public void flatMap(VideoOrder value, Collector> out) throws Exception {
    //                out.collect(new Tuple2<>(value.getTitle(),1));
    //            }
    //        });
    
    
           //分组
            KeyedStream<Tuple2<String,Integer>,String> keyByDS = mapDS.keyBy(new KeySelector<Tuple2<String,Integer>, String>() {
                @Override
                public String getKey(Tuple2<String, Integer> value) throws Exception {
                    return value.f0;
                }
            });
    
            //统计每组有多少个
            DataStream<Tuple2<String,Integer>> sumDS =  keyByDS.sum(1);
    
            //控制台打印
            sumDS.print();
    
            //单机redis
            FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build();
    
            sumDS.addSink(new RedisSink<>(conf,new VideoOrderCounterSink()));
    
    
            //DataStream需要调用execute,可以取个名称
            env.execute("custom redis sink job");
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76

    在这里插入图片描述

  • 相关阅读:
    JVM内存模型
    编程艺术之源:深入了解设计模式和设计原则
    kubectl 本地远程链接k8s多个集群,远程管控多集群,查看日志 部署服务(windows版)
    CVE-2023-38831漏洞实例
    数字先锋 | 打造城市“一朵云”,天翼云推动芜湖新型智慧城市建设
    【C++】vector的介绍 | 常见接口的使用
    当Map的值为NULL
    【英语:基础高阶_全场景覆盖表达】K11.口语主题陈述——事物类
    翻译软件有哪些-翻译软件大家都推荐使用的有哪些
    Sentinel vs Hystrix 限流到底怎么选?(荣耀典藏版)
  • 原文地址:https://blog.csdn.net/weixin_45574790/article/details/132857544