🚀 作者 :“大数据小禅”
🚀 文章简介 :【Flink实战】用户统计:按照省份维度统计新老用户
🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬
{
"deviceType":"iPhone 10",
"uid":"user_1",
"product":{
"name":"宝马",
"category":"车"
},
"os":"iOS",
"ip":"171.11.85.21",
"nu":1,
"channel":"华为商城",
"time":1735419335423,
"event":"browse",
"net":"WiFi",
"device":"4759947c-cd47-433c-ac8f-ae923a6d38b6",
"version":"V1.2.0"
}
关键字
IP测试类: 导入maven依赖http请求
Apache HttpComponents是Apache软件基金会的开源项目,它提供了一系列的高性能,高可用性的Java组件,用于实现HTTP协议,包括客户端,服务器,代理,缓存,身份验证,Cookie管理和HTTP协议处理。它的目标是提供一个完整的,高性能的HTTP解决方案,而不需要任何外部依赖。HttpComponents是一个基于Java的客户端/服务器HTTP协议实现,它提供了一个完整的,高性能的HTTP解决方案,而不需要任何外部依赖。
org.apache.httpcomponents
httpclient
4.5.6
public class IPRequest {
public static void main(String[] args) {
String ip="120.79.75.140";
String province="-";
String city="-";
String url="https://apis.juhe.cn/ip/ipNew?ip="+ip+"&key="+ Key.key;
CloseableHttpResponse response=null;
System.out.println(url);
CloseableHttpClient httpClient = HttpClients.createDefault();
try {
HttpGet httpGet = new HttpGet(url);
response = httpClient.execute(httpGet);
int code = response.getStatusLine().getStatusCode();
if(code==200){
HttpEntity entity = response.getEntity();
String result = EntityUtils.toString(entity, "UTF-8");
//{"resultcode":"200","reason":"查询成功","result":{"Country":"中国","Province":"广东省","City":"深圳市","Isp":"阿里云"},"error_code":0}
JSONObject jsonData = JSON.parseObject(result);
JSONObject data = jsonData.getJSONObject("result");
province=data.getString("Province");
city=data.getString("City");
System.out.println(province+city);
}
}catch (Exception e){
e.printStackTrace();
}
}
}
结果:
广东省深圳市
public class ProvinceUserCntV1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream = environment.readTextFile("data/access.json");
environment.setParallelism(1); //设置并行度为1方便观察
SingleOutputStreamOperator<Access> filter = stream.map(new MapFunction<String, Access>() {
@Override
public Access map(String s) throws Exception {
// json 转 Access
try {
return JSON.parseObject(s, Access.class);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
//这里是只要不为空的数据 x != null等于把上面的空的数据过滤掉
}).filter(x -> x != null).filter(
new FilterFunction<Access>() {
@Override
public boolean filter(Access access) throws Exception {
//只过滤出来 event='startup'的数据
return "startup".equals(access.event);
}
});
//使用Rich额外定义一个类来实现map
SingleOutputStreamOperator<Access> result = filter.map(new IPMapFunction());
DataStreamSink<Tuple3<String, Integer, Integer>> user = result.map(new MapFunction<Access, Tuple3<String, Integer, Integer>>() {
@Override
public Tuple3<String, Integer, Integer> map(Access access) throws Exception {
return Tuple3.of(access.province, access.nu, 1);
}
}).keyBy(new KeySelector<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> getKey(Tuple3<String, Integer, Integer> value) throws Exception {
return Tuple2.of(value.f0, value.f1);
}
}).sum(2).print("按照省份维度统计新老用户");
environment.execute("OsUserCntAppV1");
}
}
工作中:很大程度都是各种维度的统计分析
较多的维度
会遇到的统计问题