Table functions(表函数)⼀进多出(炸裂),继承TableFunction,提供⽆返回值的eval⽅法,使⽤collect来输出。Table functions的返回值是⼀个表,需要跟原来的表join才能得到最终结果,因此要⽤到侧写表(不明⽩的可以研究下LATERAL TABLE)
2.数据集格式
用户使用app的测试数据集如下所示:
{"userId":9527,"day":"2022-05-12","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.browser","activetime":120000},{"package":"com.qq","activetime":80000}]}
{"userId":9528,"day":"2022-05-13","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.weixin","activetime":150000},{"package":"com.youdao","activetime":60000}]}
{"userId":9529,"day":"2022-05-14","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.ebay","activetime":40000},{"package":"com.baidu","activetime":120000}]}
{"userId":9530,"day":"2022-05-15","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.huawei","activetime":180000},{"package":"com.leshi","activetime":20000}]}
{"userId":9531,"day":"2022-05-16","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.xiaomi","activetime":150000},{"package":"com.wangyi","activetime":60000}]}
在JSON对象中,userId表示用户ID,day表示日期,begintime表示开始时间,endtime表示结束时间。data表示用户使用应用的时长列表,其中package表示应用名称,activetime表示应用使用时长。
3.自定义UDTF
FlinkSQL自定义UDTF函数解析用户行为数据的具体代码实现如下所示。
public class FlinkSQLBaseTableFunction {
public static void main(String[] args) {
//1.获取stream的执行环境
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
senv.setParallelism(1);
//2.创建表执行环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);
//3.读取数据
DataStream<String> lines = senv.socketTextStream("hadoop1",8888);
//4.流转换为动态表
Table table = tEnv.fromDataStream(lines,$("line"));
tEnv.createTemporaryView("userbehavior",table);
//5.1调用方式1:TableAPI内连接实现
tEnv.from("userbehavior")
.joinLateral(call(explodeFunction.class,$("line"),"data").as("package","activetime"))
.select(
call(JsonFunction.class,$("line"),"userId"),
call(JsonFunction.class,$("line"),"day"),
call(JsonFunction.class,$("line"),"begintime"),
call(JsonFunction.class,$("line"),"endtime"),
$("package"),
$("activetime")
).execute().print();
//5.2调用方式2:TableAPI左外连接实现
tEnv.from("userbehavior")
.leftOuterJoinLateral(call(explodeFunction.class,$("line"),"data").as("package","activetime"))
.select(
call(JsonFunction.class,$("line"),"userId"),
call(JsonFunction.class,$("line"),"day"),
call(JsonFunction.class,$("line"),"begintime"),
call(JsonFunction.class,$("line"),"endtime"),
$("package"),
$("activetime")
).execute().print();
tEnv.createTemporarySystemFunction("JsonFunction",JsonFunction.class);
tEnv.createTemporarySystemFunction("explodeFunction",explodeFunction.class);
//5.3 调用方式3:sql内连接实现
tEnv.sqlQuery("select " +
"JsonFunction(line,'userId')," +
"JsonFunction(line,'day')," +
"JsonFunction(line,'begintime')," +
"JsonFunction(line,'endtime')," +
"package," +
"activetime " +
" from userbehavior " +
",lateral table(explodeFunction(line,'data')) "
).execute().print();
//5.4调用方式4:sql左外连接实现
tEnv.sqlQuery("select " +
"JsonFunction(line,'userId')," +
"JsonFunction(line,'day')," +
"JsonFunction(line,'begintime')," +
"JsonFunction(line,'endtime')," +
"package," +
"activetime " +
" from userbehavior " +
"left join lateral table(explodeFunction(line,'data')) as sc(package,activetime) on true "
).execute().print();
}
/**
* 自定义udf
*/
public static class JsonFunction extends ScalarFunction {
public String eval(String line,String key){
//转换为JSON
JSONObject baseJson = new JSONObject(line);
String value = "";
if(baseJson.has(key)){
//根据key获取value
return baseJson.getString(key);
}
return value;
}
}
/**
* 自定义udtf
*/
@FunctionHint(output = @DataTypeHint("ROW<package STRING,activetime INT>"))
public static class explodeFunction extends TableFunction{
public void eval(String line,String key){
//转换为JSON
JSONObject baseJson = new JSONObject(line);
//提取key为data的JSONArray数据
JSONArray jsonArray = new JSONArray(baseJson.getString(key));
//循环解析输出
for(int i=0;i<jsonArray.length();i++){
String col1 = jsonArray.getJSONObject(i).getString("package");
Integer col2 = Integer.parseInt(jsonArray.getJSONObject(i).getString("activetime"));
collect(Row.of(col1,col2));
}
}
}
}
4.输入测试数据
在Linux虚拟机的hadoop1节点中使用如下命令打开nc服务并输入测试数据集。
[root@hadoop1 ~] nc -lk 8888
5.运行结果
FlinkSQL自定义UDTF函数之后,使用注册的JsonFunction和explodeFunction函数对用户行为数据解析之后的效果如下所示。