• 【Flink实战】新老用户分析:按照操作系统维度进行新老用户的分析


    🚀 作者 :“大数据小禅”

    🚀 文章简介 :新老用户分析:按照操作系统维度进行新老用户的分析

    🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬


    同类产品参考

    在这里插入图片描述

    日志的数据格式

    {
    "deviceType":"iPhone 10",
    "uid":"user_1",
    "product":{
    "name":"宝马",
    "category":"车"
    },
    "os":"iOS",
    "ip":"171.11.85.21",
    "nu":1,
    "channel":"华为商城",
    "time":1735419335423,
    "event":"browse",
    "net":"WiFi",
    "device":"4759947c-cd47-433c-ac8f-ae923a6d38b6",
    "version":"V1.2.0"
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    需求:按照操作系统维度进行新老用户的分析

    • 关键字:操作系统 OS 老用户nu

    • 维度先从单一的开始 扩展:操作系统 省份的维度

    • 写入数据到Redis 官方文档https://bahir.apache.org/docs/flink/current/flink-streaming-redis/

    •     <dependency>
              <groupId>org.apache.bahir</groupId>
              <artifactId>flink-connector-redis_2.11</artifactId>
              <version>1.0</version>
          </dependency>
      
      • 1
      • 2
      • 3
      • 4
      • 5
    • 代码

    public class OsUserCntAppV1 {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> stream = environment.readTextFile("data/access.json");
            environment.setParallelism(1); //设置并行度为1方便观察
            SingleOutputStreamOperator<Access> filter = stream.map(new MapFunction<String, Access>() {
                @Override
                public Access map(String s) throws Exception {
                    // json 转 Access
                    try {
                        return JSON.parseObject(s, Access.class);
                    } catch (Exception e) {
                        e.printStackTrace();
                        return null;
                    }
                }
                //这里是只要不为空的数据  x != null等于把上面的空的数据过滤掉
            }).filter(x -> x != null).filter(new FilterFunction<Access>() {
                @Override
                public boolean filter(Access access) throws Exception {
                    //只过滤出来 event='startup'的数据
                    return "startup".equals(access.event);
                }
            });
            //Access{device='6d27244c-b5e5-4520-9c6d-c4e17e2391fe', deviceType='iPhone 9', os='iOS', event='startup', net='4G', channel='华为商城', uid='user_36', nu=0, nu2=0, ip='123.232.241.201', time=1735419335573, version='V1.2.0', province='null', city='null', product=null}
            // 操作系统维度  新老用户
            //返回三元组 (操作系统 新老用户 数字1)
            SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> sum = filter.map(new MapFunction<Access, Tuple3<String, Integer, Integer>>() {
                @Override
                public Tuple3<String, Integer, Integer> map(Access access) throws Exception {
                    return Tuple3.of(access.os, access.nu, 1);
                }
                //根据OS操作系统 与 字段nu新老用户分组  传入三元组返回二元组  
                //因为是按照Tuple2这两个进行keyby 所以进入三元组出去二元组
            }).keyBy(new KeySelector<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> getKey(Tuple3<String, Integer, Integer> value) throws Exception {
                    return Tuple2.of(value.f0, value.f1);
                }
                /**
                 * (Android,1,1)
                 * (iOS,1,1)
                 * (iOS,0,1)
                 */
                //根据第三个字段进行聚合
            }).sum(2);
            //结果
            /**
             * (Android,1,23)
             * (Android,0,13)
             * (iOS,0,20)
             */
    
            //单机redis  出现连接超时->修改redis.conf配置文件 bing IP
            FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.192.100").build();
    
            sum.addSink(new RedisSink<>(conf,new RedisSinkForV1()));
            environment.execute("OsUserCntAppV1");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    分析结果

    在这里插入图片描述
    在这里插入图片描述

  • 相关阅读:
    LED灯实验--汇编
    Windows下Docker搭建Flink集群
    细说spring IOC三种开发模式
    基于SpringMVC+html5+BootStrap的图书销售智能推荐系统
    flink的regular join/window join/interval join/temporal join/lookup join
    React 基础案例
    vc++ mfc 操作注册表
    浅析std::vector的底层实现机制
    Python 完美解决 Import “模块“ could not be resolved ...
    【深入解析spring cloud gateway】08 Reactor 知识扫盲
  • 原文地址:https://blog.csdn.net/weixin_45574790/article/details/132858135