• 【Flink实战】用户统计:按照省份维度统计新老用户


    🚀 作者 :“大数据小禅”

    🚀 文章简介 :【Flink实战】用户统计:按照省份维度统计新老用户

    🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬


    数据源JSON格式数据

    {
    "deviceType":"iPhone 10",
    "uid":"user_1",
    "product":{
    "name":"宝马",
    "category":"车"
    },
    "os":"iOS",
    "ip":"171.11.85.21",
    "nu":1,
    "channel":"华为商城",
    "time":1735419335423,
    "event":"browse",
    "net":"WiFi",
    "device":"4759947c-cd47-433c-ac8f-ae923a6d38b6",
    "version":"V1.2.0"
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    统计分析

    • 关键字

      • 省份 :ip ==>省份,城市,运行商,经纬度…
      • 解决方案: 1)请求商业接口 ,高德百度 2)开源版 github.com ipparse 3) 从缓存中找 ,请求
      • https://www.free-api.com/doc/90 免费接口
    • IP测试类: 导入maven依赖http请求

    Apache HttpComponents是Apache软件基金会的开源项目,它提供了一系列的高性能,高可用性的Java组件,用于实现HTTP协议,包括客户端,服务器,代理,缓存,身份验证,Cookie管理和HTTP协议处理。它的目标是提供一个完整的,高性能的HTTP解决方案,而不需要任何外部依赖。HttpComponents是一个基于Java的客户端/服务器HTTP协议实现,它提供了一个完整的,高性能的HTTP解决方案,而不需要任何外部依赖。
    
            
                org.apache.httpcomponents
                httpclient
                4.5.6
            
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    IP提取测试

    public class IPRequest {
        public static void main(String[] args) {
            String ip="120.79.75.140";
            String province="-";
            String city="-";
            String url="https://apis.juhe.cn/ip/ipNew?ip="+ip+"&key="+ Key.key;
            CloseableHttpResponse response=null;
            System.out.println(url);
            CloseableHttpClient httpClient = HttpClients.createDefault();
            try {
                HttpGet httpGet = new HttpGet(url);
                response = httpClient.execute(httpGet);
                int code = response.getStatusLine().getStatusCode();
                if(code==200){
                    HttpEntity entity = response.getEntity();
                    String result = EntityUtils.toString(entity, "UTF-8");
                    //{"resultcode":"200","reason":"查询成功","result":{"Country":"中国","Province":"广东省","City":"深圳市","Isp":"阿里云"},"error_code":0}
                    JSONObject jsonData = JSON.parseObject(result);
                    JSONObject data = jsonData.getJSONObject("result");
                    province=data.getString("Province");
                    city=data.getString("City");
                    System.out.println(province+city);
                }
            }catch (Exception e){
                e.printStackTrace();
            }
    
        }
    }
    
    结果:
    广东省深圳市
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 总体代码 写入Redis复用上一篇文章的代码修改即可
    
    public class ProvinceUserCntV1 {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> stream = environment.readTextFile("data/access.json");
            environment.setParallelism(1); //设置并行度为1方便观察
            SingleOutputStreamOperator<Access> filter = stream.map(new MapFunction<String, Access>() {
                @Override
                public Access map(String s) throws Exception {
                    // json 转 Access
                    try {
                        return JSON.parseObject(s, Access.class);
                    } catch (Exception e) {
                        e.printStackTrace();
                        return null;
                    }
                }
                //这里是只要不为空的数据  x != null等于把上面的空的数据过滤掉
            }).filter(x -> x != null).filter(
                    new FilterFunction<Access>() {
                @Override
                public boolean filter(Access access) throws Exception {
                    //只过滤出来 event='startup'的数据
                    return "startup".equals(access.event);
                }
            });
            //使用Rich额外定义一个类来实现map
            SingleOutputStreamOperator<Access> result = filter.map(new IPMapFunction());
            DataStreamSink<Tuple3<String, Integer, Integer>> user = result.map(new MapFunction<Access, Tuple3<String, Integer, Integer>>() {
                @Override
                public Tuple3<String, Integer, Integer> map(Access access) throws Exception {
                    return Tuple3.of(access.province, access.nu, 1);
                }
            }).keyBy(new KeySelector<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> getKey(Tuple3<String, Integer, Integer> value) throws Exception {
                    return Tuple2.of(value.f0, value.f1);
                }
            }).sum(2).print("按照省份维度统计新老用户");
    
    
            environment.execute("OsUserCntAppV1");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    在这里插入图片描述

    • 关于flink异步IO的一篇文章 https://zhuanlan.zhihu.com/p/365232338
    • 上面的操作写入是同步的 IO 这样会存在问题,出现问题的时候后面的数据就阻塞了**(流处理吞吐量问题)**
    • 日志中是商品与相关信息是记录的ID,需要去连表查询数据库补全数据,考虑与外部数据连通的性能
    • 需要支持异步的数据库 只需要按照Flink给的模板实现一个RichAsync就可以了
    • 后面有单独的一篇文章完成Flink异步IO在这里插入图片描述
    思考:上面的代码还存在哪些问题?
    • 工作中:很大程度都是各种维度的统计分析

      • 离线数仓
      • 实时数仓
    • 较多的维度

      • 操作系统 + 新老用户
      • 新老用户
      • 省份 + 新老用户
      • 操作系统 + 省份 + 新老用户
      • 运营商 + 省份 + 新老用户
      • 运营商 + 新老用户
      • ==> KeyBy(…).sum(index)
    • 会遇到的统计问题

      • 每N(小时/分钟)统计一次
      • 每10分钟统计一次
      • 从xxxx==>xxxx事件段内的各种维度(…) 统计
  • 相关阅读:
    基于BP神经网络预测电力负荷(Matlab代码实现)
    FL Studio 21.2.3大更新,免费下载(FL激活码)带主题皮肤/音频分离功能
    擎创动态 | 开箱即用!擎创科技联合中科可控推出大模型一体机
    软件测试简历项目经验怎么写,没有项目经验?
    igraph load 无法读取保存的graph attr
    idea中maven plugin提示not found
    【C++ 学习 ㉝】- C++11 使用 using 定义别名
    压缩包过大,如何使用split命令拆分压缩包
    java对象传递给前端vue表格从上插入的效果
    B树(B-tree)
  • 原文地址:https://blog.csdn.net/weixin_45574790/article/details/132859054