用Flume采集Kafka写入到Hive的ODS层在HDFS路径下的JSON数据,需要在DWD层进行解析并清洗
create external table if not exists ods_queue(
queue_json string
)
comment '静态排队数据表——静态分区'
partitioned by (day string)
row format delimited fields terminated by '\x001'
lines terminated by '\n'
stored as SequenceFile
tblproperties("skip.header.line.count"="1");


JSON数据字段queue_json中除了单个字段如deviceNo外,还包括json数组queueList
{
"deviceNo": "radar-1020",
"createTime": "2023-10-16 10:18:55",
"laneNum": 5,
"queueList": [
{
"laneNo": 8,
"queueLen": 28.0,
"queueHead": 24.0,
"queueTail": 23.0,
"queueCount": 88
},
{
"laneNo": 5,
"queueLen": 12.0,
"queueHead": 45.0,
"queueTail": 2.0,
"queueCount": 91
},
{
"laneNo": 7,
"queueLen": 79.0,
"queueHead": 1.0,
"queueTail": 78.0,
"queueCount": 71
},
{
"laneNo": 7,
"queueLen": 87.0,
"queueHead": 99.0,
"queueTail": 38.0,
"queueCount": 42
},
{
"laneNo": 14,
"queueLen": 51.0,
"queueHead": 41.0,
"queueTail": 52.0,
"queueCount": 36
},
{
"laneNo": 1,
"queueLen": 10.0,
"queueHead": 81.0,
"queueTail": 27.0,
"queueCount": 57
},
{
"laneNo": 5,
"queueLen": 40.0,
"queueHead": 81.0,
"queueTail": 19.0,
"queueCount": 42
},
{
"laneNo": 5,
"queueLen": 80.0,
"queueHead": 96.0,
"queueTail": 34.0,
"queueCount": 100
}
]
}
除了用到get_json_object函数或json_tuple函数外,json数组还需要用到explode函数、regexp_replace函数以及lateral view函数
select a.timestamp, get_json_object(a.appevents, '$.eventid'), get_json_object(a.appenvets, '$.eventname') from log a;
json_tuple(json_string, k1, k2 ...)2、说明:解析json的字符串json_string,可指定多个json数据中的key,返回对应的value。如果输入的json字符串无效,那么返回NULL。json_tuple函数不能加$.,否则会解析不到select a.timestamp, b.*
from log a lateral view json_tuple(a.appevent, 'eventid', 'eventname') b as f1, f2;
(三)explode函数explode(Array OR Map)在用UDTF比如explode的时候,由于SELECT 只支持一个字段,因此需要lateral view函数!
lateral view函数一般用于和split、explode等UDTF一起使用的,它能将一行数据拆分成多行数据,在并此基础上可以对拆分的数据进行聚合。
select col_A,col_B,tmp_table.tmp_col
from test_table
lateral view explode(split(col_C,'分隔符')) tmp_table as tmp_col
where partition_name='xxx';
col_A,col_B,col_C: 都是原表 test_table 的列(字段);
tmp_table:explode形成的新虚拟表,可以不写;
tmp_col:explode 形成的新列(字段);
with t1 as(
select
get_json_object(queue_json,'$.deviceNo') device_no,
get_json_object(queue_json,'$.createTime') create_time,
get_json_object(queue_json,'$.laneNum') lane_num,
get_json_object(queue_json,'$.queueList') queue_list,
day
from hurys_dc_ods.ods_queue)
select
t1.device_no,
t1.lane_num,
t1.create_time,
get_json_object(list_json,'$.laneNo') lane_no,
get_json_object(list_json,'$.queueCount') queue_count,
get_json_object(list_json,'$.queueLen') queue_len,
get_json_object(list_json,'$.queueHead') queue_head,
get_json_object(list_json,'$.queueTail') queue_tail,
date(t1.create_time) day
from t1
lateral view explode(split(regexp_replace(regexp_replace(queue_list,'\\[|\\]','') , --将json数组两边的中括号去掉
'\\}\\,\\{','\\}\\;\\{'), --将json数组元素之间的逗号换成分号
'\\;') --以分号作为分隔符(split函数以分号作为分隔)
)list_queue as list_json
;

json字段解析成功!
