该方案糅杂了更新的逻辑和大量的join处理
import org.apache.spark.sql.SparkSession
object dwd_order_info_abi {
def main(args: Array[String]): Unit = {
var dt = "2023-07-01"
var dt1 = "2023-07-17"
System.setProperty("HADOOP_USER_NAME", "root")
val builder = SparkUtils.getBuilder
if (System.getProperties.getProperty("os.name").contains("Windows")) {
builder.master("local[*]")
} else {
dt = args(0)
dt1 = args(1)
}
val spark = builder.appName("dwd_order_info_abi").getOrCreate()
HiveUtil.openDynamicPartition(spark)
spark.sql("set spark.sql.shuffle.partitions=1")
new IDate {
override def onDate(dt: String): Unit = {
process(spark, dt)
}
}.invoke(dt, dt1)
spark.stop()
}
/**
* insert overwrite table ${to_db}.dwd_order_info_abi partition(dt)
*/
private def process(spark: SparkSession, do_date: String) = {
val hive_db = "paascloud"
spark.sql(s"use $hive_db")
val frame = spark.sql(
s"""
|
|
|with t_org_flat as(select * from ods_t_org_flat),
|
|detail as (select * from ${hive_db}.tbworkorder_config_detail)
|
|insert overwrite table ${hive_db}.dwd_order_info_abi partition(dt)
|
|select
|
|if(new.orderid is null,old.orderid,new.orderid) as orderid
|,if(new.ordercreatetime is null,old.ordercreatetime,new.ordercreatetime) as ordercreatetime
|,if(new.create_by is null,old.create_by,new.create_by) as create_by
|,if(new.ct_name is null,old.ct_name,new.ct_name) as ct_name
|,if(new.ct_phone is null,old.ct_phone,new.ct_phone) as ct_phone
|,if(new.splicing_name is null,old.splicing_name,new.splicing_name) as splicing_name
|
|,if(new.operatelifecycle is null,old.operatelifecycle,new.operatelifecycle) as operatelifecycle
|
|,if(new.stars is null,old.stars,new.stars) as stars
|,if(new.isfinished is null,old.isfinished,new.isfinished) as isfinished
|
|,if(new.AUDIT_ORDER_star is null,old.AUDIT_ORDER_star,new.AUDIT_ORDER_star) as AUDIT_ORDER_star
|,if(new.EVALUATE_ORDER_star is null,old.EVALUATE_ORDER_star,new.EVALUATE_ORDER_star) as EVALUATE_ORDER_star
|,if(new.RETURN_VISIT_star is null,old.RETURN_VISIT_star,new.RETURN_VISIT_star) as RETURN_VISIT_star
|,if(new.overall_score is null,old.overall_score,new.overall_score) as overall_score
|,if(new.overall_score_level is null,old.overall_score_level,new.overall_score_level) as overall_score_level
|
|
|,if(new.operateUsers is null,old.operateUsers,new.operateUsers) as operateUsers
|,if(new.evaluate_operator is null,old.evaluate_operator,new.evaluate_operator) as evaluate_operator
|,if(new.revisit_operator is null,old.revisit_operator,new.revisit_operator) as revisit_operator
|
|,if(new.corcode_f3 is null,old.corcode_f3,new.corcode_f3) as corcode_f3
|,if(new.name_f3 is null,old.name_f3,new.name_f3) as name_f3
|,if(new.sort_f3 is null,old.sort_f3,new.sort_f3) as sort_f3
|,if(new.corcode_f2 is null,old.corcode_f2,new.corcode_f2) as corcode_f2
|,if(new.name_f2 is null,old.name_f2,new.name_f2) as name_f2
|,if(new.sort_f2 is null,old.sort_f2,new.sort_f2) as sort_f2
|,if(new.corcode_f1 is null,old.corcode_f1,new.corcode_f1) as corcode_f1
|,if(new.name_f1 is null,old.name_f1,new.name_f1) as name_f1
|,if(new.sort_f1 is null,old.sort_f1,new.sort_f1) as sort_f1
|,if(new.project_star is null,old.project_star,new.project_star) as project_star
|,if(new.project_nature is null,old.project_nature,new.project_nature) as project_nature
|
|,if(new.companyId is null,old.companyId,new.companyId) as companyId
|,if(new.areasId is null,old.areasId,new.areasId) as areasId
|,if(new.institutionId is null,old.institutionId,new.institutionId) as institutionId
|,if(new.platfromFiledCode is null,old.platfromFiledCode,new.platfromFiledCode) as platfromFiledCode
|,if(new.orderLargerType is null,old.orderLargerType,new.orderLargerType) as orderLargerType
|,if(new.orderSecondType is null,old.orderSecondType,new.orderSecondType) as orderSecondType
|,if(new.orderThirdlyType is null,old.orderThirdlyType,new.orderThirdlyType) as orderThirdlyType
|,if(new.serviceFlowAlias is null,old.serviceFlowAlias,new.serviceFlowAlias) as serviceFlowAlias
|,if(new.orderSource is null,old.orderSource,new.orderSource) as orderSource
|,if(new.orderSourceName is null,old.orderSourceName,new.orderSourceName) as orderSourceName
|,if(new.orderStatus is null,old.orderStatus,new.orderStatus) as orderStatus
|,if(new.orderStatusName is null,old.orderStatusName,new.orderStatusName) as orderStatusName
|
|,if(new.actualhour is null,old.actualhour,new.actualhour) as actualhour
|,if(new.second_hour is null,old.second_hour,new.second_hour) as second_hour
|,if(new.third_hour is null,old.third_hour,new.third_hour) as third_hour
|,if(new.man_hour is null,old.man_hour,new.man_hour) as man_hour
|,if(new.istimeout is null,old.istimeout,new.istimeout) as istimeout
|
|,if(new.isurgent is null,old.isurgent,new.isurgent) as isurgent
|,if(new.isimportance is null,old.isimportance,new.isimportance) as isimportance
|,if(new.issupervise is null,old.issupervise,new.issupervise) as issupervise
|,if(new.reworknum is null,old.reworknum,new.reworknum) as reworknum
|,if(new.isrework is null,old.isrework,new.isrework) as isrework
|,if(new.isexception is null,old.isexception,new.isexception) as isexception
|
|,if(new.deal_user_ids is null,old.deal_user_ids,new.deal_user_ids) as deal_user_ids
|,if(new.dealUserOrgIds is null,old.dealUserOrgIds,new.dealUserOrgIds) as dealUserOrgIds
|
|,if(new.comefrom is null,old.comefrom,new.comefrom) as comefrom
|,if(new.address_id is null,old.address_id,new.address_id) as address_id
|,if(new.flow_subsystem_code is null,old.flow_subsystem_code,new.flow_subsystem_code) as flow_subsystem_code
|,if(new.floor_id is null,old.floor_id,new.floor_id) as floor_id
|,if(new.is_over_time is null,old.is_over_time,new.is_over_time) as is_over_time
|,if(new.out_source is null,old.out_source,new.out_source) as out_source
|,if(new.send_user_ids is null,old.send_user_ids,new.send_user_ids) as send_user_ids
|
|,date_format(if(new.ordercreatetime is null,old.ordercreatetime,new.ordercreatetime),'yyyy-MM-dd') as dt
|from
|
|(
|
|select
|
| *
|from
| ${hive_db}.dwd_order_info_abi
|where
| dt in (
| select
| date_format(ordercreatetime, 'yyyy-MM-dd')
| from
| ${hive_db}.ods_order
| where
| dt = '$do_date'
| group by date_format(ordercreatetime, 'yyyy-MM-dd')
| )
|
|) old
|
|full outer join
|(
|
|select
| info.orderid,
| info.ordercreatetime,
| info.create_by,
| user.user_name as ct_name,
| user.telephone as ct_phone,
| build.splicing_name,
| trace.operatelifecycle,
| trace.stars,
|
| if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0') as isfinished,
|
| if(if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0')='1',nvl(trace.stars['AUDIT_ORDER'],'0'),'-1') as AUDIT_ORDER_star,--负1表示无效分,这个工单还没有完成,完成的情况下没有值给0
| if(if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0')='1',nvl(trace.stars['RETURN_VISIT'],'4'),'-1') as RETURN_VISIT_star,--负1表示无效分,这个工单还没有完成,完成的情况下没有值给默认值4
| if(if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0')='1',nvl(trace.stars['EVALUATE_ORDER'],'4'),'-1') as EVALUATE_ORDER_star,--负1表示无效分,这个工单还没有完成,完成的情况下没有值给默认值4
|
| round((if(if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0')='1',nvl(trace.stars['AUDIT_ORDER'],'0'),'-1')*0.2+
| if(if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0')='1',nvl(trace.stars['RETURN_VISIT'],'4'),'-1')*0.1+
| if(if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0')='1',nvl(trace.stars['EVALUATE_ORDER'],'4'),'-1')*0.7),1) as overall_score,
|
| round((if(if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0')='1',nvl(trace.stars['AUDIT_ORDER'],'0'),'-1')*0.2+
| if(if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0')='1',nvl(trace.stars['RETURN_VISIT'],'4'),'-1')*0.1+
| if(if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0')='1',nvl(trace.stars['EVALUATE_ORDER'],'4'),'-1')*0.7),0) as overall_score_level,
|
| trace.operateUsers,
| nvl(trace.operateUsers['RETURN_VISIT'],'-1') as revisit_operator,
| nvl(trace.operateUsers['EVALUATE_ORDER'],'-1') as evaluate_operator,
|
| t_org_flat.corcode_f3,
| t_org_flat.name_f3,t_org_flat.sort_f3,
| t_org_flat.corcode_f2,
| t_org_flat.name_f2,t_org_flat.sort_f2,
| t_org_flat.corcode_f1,
| t_org_flat.name_f1,t_org_flat.sort_f1,
| t_org_flat.project_star,
| t_org_flat.project_nature,
|
| info.companyId,
| info.areasId,
| info.institutionId ,
| info.platfromFiledCode ,
| info.orderLargerType,
| info.orderSecondType,
| info.orderThirdlyType,
| info.serviceFlowAlias ,
| info.orderSource ,
| info.orderSourceName ,
| info.orderStatus ,
| info.orderStatusName ,
|
| info.actualhour ,
| detail.man_hour as second_hour ,
| detail2.man_hour as third_hour ,
| case
| when orderThirdlyType is not null then detail2.man_hour
| when orderThirdlyType is null and orderSecondType is not null then detail.man_hour
| when orderThirdlyType is null and orderSecondType is null then '9999'
| end as man_hour ,
| --该工单对应的服务类型下的定义工时
| case
| when if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0')='0' then '-1'
| when orderThirdlyType is not null then if(info.actualHour>detail2.man_hour, 1, 0)
| when orderThirdlyType is null and orderSecondType is not null then if(info.actualHour>detail.man_hour, 1, 0)
| when orderThirdlyType is null and orderSecondType is null then '0' --三级二级都没有,则默认为及时
| end as istimeout,
|
| if(info.urgent=1 or info.urgent=2,1,0) as isurgent,
| info.importance as isimportance,
| if(info.superviseNum>0,1,0) as issupervise,
| info.reworknum as reworknum,--该工单返工次数
| if(info.reworkNum>0,1,0) as isrework,
|
| if(info.orderStatus = 'ORDER_YQX' or info.orderstatus = 'ORDER_YCGB'
| or info.superviseNum>0
| or info.reworkNum>0
| or nvl(trace.stars['AUDIT_ORDER'],'100')<3
| or nvl(trace.stars['RETURN_VISIT'],'100')<3
| or nvl(trace.stars['EVALUATE_ORDER'],'100')<3 , '1', '0') as isexception,
|
| info.dealUserIds as deal_user_ids,
| info.dealUserOrgIds as dealUserOrgIds,
|
| info.comefrom as comefrom,
| if(info.home_address is null,info.floor_id,info.home_address) as address_id
| ,info.flow_subsystem_code as flow_subsystem_code
| ,info.floor_id as floor_id
| ,info.is_over_time as is_over_time
| ,info.out_source as out_source
| ,info.send_user_ids as send_user_ids
|from
| (
| select
| *
| from
| ${hive_db}.ods_order
| where
| dt = '$do_date' ) info
|left join (
| select
| workOrderId
| ,str_to_map(concat_ws(',', collect_set(concat(operateType, '=>', createTime))), ',', '=>') operatelifecycle
| ,str_to_map(concat_ws(',', collect_set(concat(operateType, '=>', if(evaluateLevel is not null,evaluateLevel,'-1')))), ',', '=>') stars
| ,str_to_map(concat_ws(',', collect_set(concat(operateType, '=>', if(operateUser is not null,operateUser,'-1')))), ',', '=>') operateUsers
| from
| ${hive_db}.ods_order_track
| where dt in(
| select
| date_format(ordercreatetime, 'yyyy-MM-dd')
| from
| ${hive_db}.ods_order
| where
| dt = '$do_date'
| group by date_format(ordercreatetime, 'yyyy-MM-dd')
|
| )
|
| group by workOrderId ) trace on
| info.orderid = trace.workOrderId
|
| left join
| detail on
| info.orderSecondType=detail.id
| left join
| detail detail2 on
| info.orderThirdlyType=detail2.id
|
| left join t_org_flat on
| info.institutionId=t_org_flat.corcode_f1
| or info.institutionid=t_org_flat.corcode_f2
|
| left join(
|
|
| select * from ${hive_db}.t_uac_user
| ) user
| on info.create_by=user.user_id
|
| left join(
|
| select * from ${hive_db}.t_uac_building_room
| ) build
| on info.home_address=build.id
|
| ) new
|on old.orderid=new.orderid
|
|
|""".stripMargin)
frame
}
}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
import org.apache.spark.storage.StorageLevel
object ods_record_detailed_update {
val hive_db = "tcm"
def main(args: Array[String]): Unit = {
var dt = "2023-06-30"
var dt1 = "2023-06-30"
System.setProperty("HADOOP_USER_NAME", "root")
val builder = SparkUtils.getBuilder
if (System.getProperties.getProperty("os.name").contains("Windows")) {
builder.master("local[*]")
} else {
dt = args(0)
dt1 = args(1)
}
val spark = builder
.appName(this.getClass.getSimpleName).getOrCreate()
HiveUtil.openDynamicPartition(spark)
spark.sql("set spark.sql.shuffle.partitions=1")
spark.sql(s"use $hive_db")
new IDate {
override def onDate(dt: String): Unit = {
/**
* may be include (dt1,dt2,dt3...)
*/
val f_new = spark.sql(
s"""
|
|select * from ods_record_detailed where dt='${dt}'
|
|""".stripMargin)
.select(
"record_id"
, "record_detailed_id"
, "record_content"
, "create_datetime"
, "update_datetime"
, "record_detailed_state"
, "dt"
)
val f_old = spark.sql(
s"""
|
|select * from ods_record_detailed_update2 where dt in (
|
|
|SELECT date_format(create_datetime,'yyyy-MM-dd') from ods_record_detailed
|WHERE dt='$dt' GROUP BY date_format(create_datetime, 'yyyy-MM-dd')
|
|
|) and dt !='$dt'
|
|""".stripMargin)
.select(
"record_id"
, "record_detailed_id"
, "record_content"
, "create_datetime"
, "update_datetime"
, "record_detailed_state"
, "dt"
)
val f_all = f_new.unionByName(f_old)
/**
* 一份提交的 record_detailed_id,有更新的会有多个,取dt最大的
* 注意不能用更新时间,因为updateTime在重新加载后,不同dt的两条数据会一模一样
*/
val windowSpec = Window.partitionBy("record_detailed_id").orderBy(col("dt").desc)
val f_rank = f_all
.withColumn("rank", row_number.over(windowSpec))
.persist(StorageLevel.MEMORY_ONLY)
/**
* 基于 create_datetime 重新设置dt,在这里按照业务创建时间分区
*/
val r = f_rank
.where("rank<=1")
.withColumn("dt", to_date(col("create_datetime"), "yyyy-MM-dd").cast(StringType))
.select(
"record_id"
, "record_detailed_id"
, "record_content"
, "create_datetime"
, "update_datetime"
, "record_detailed_state"
, "rank"
, "dt"
)
println("r show===>")
r.show(30, false)
val r_ck = SparkUtils.persistDataFrame(spark, r)
r_ck._1.createOrReplaceTempView("r_ck")
/**
* need hive table,not spark table
*
*
* create table tcm.ods_record_detailed_update2(
*
* record_id string,
* record_detailed_id string,
*
* record_content string,
* create_datetime string,
* update_datetime string,
* record_detailed_state string
*
* )
* partitioned by (dt string)
* stored as parquet
* location '/user/hive/warehouse/tcm.db/ods_record_detailed_update2'
*/
spark.sql(
"""
|
|insert overwrite table tcm.ods_record_detailed_update2 partition(dt)
|
|select
|
|record_id,
|record_detailed_id,
|record_content,
|create_datetime,
|update_datetime,
|record_detailed_state,
|
|dt as dt
|from r_ck
|
|
|""".stripMargin)
SparkUtils.unPersistDataFrame(spark, r_ck._2)
}
}.invoke(dt, dt1)
spark.stop()
}
}
get_changed_dt.sh
#!/bin/bash
db=paascloud
hive=/opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/bin/hive
if [ $1 ];
then
dt=$1
else
dt=`date -d "-1 day" +%F`
fi
if [ $2 ];
then
dt1=$2
else
dt1=`date -d "-1 day" +%F`
fi
flag=""
f(){
do_date=$1
#echo "===函数get_changed_dt.f()执行日期为 $do_date==="
sql="
use $db;
set hive.cli.print.header=false;
set HIVE_SKIP_SPARK_ASSEMBLY=true;
select
ct
from (
select
date_format(ordercreatetime, 'yyyy-MM-dd') as ct
from
ods_order
where
dt = '$do_date'
) t group by ct
order by ct desc
;
"
WORK_DIR=$(cd "$(dirname "$0")";pwd)
LOG_PATH="$WORK_DIR/log/$do_date"
mkdir -p $LOG_PATH
FILE_NAME="get_changed_dt"
FILE_NAME1="get_changed_dt_result"
flag=`$hive -e "$sql" 2>&1 | tee $LOG_PATH/${FILE_NAME}.log | grep -v "WARN"`
while read -r line
do
# echo "$line"
error="FAILED"
if [[ $line == *$error* ]]
then
#echo "HIVE JOB EXECUTION FAILED AND DATE IS 【${do_date}】......"
#exit 1
sleep 1m
f ${do_date}
fi
done < ${LOG_PATH}/${FILE_NAME}.log
#sink
arr=()
arr+=($do_date)
for i in $flag
do
if [[ $i == "20"* ]] && [[ ${#i} == 10 ]] && [ $i \< $do_date ]
then
arr+=($i)
fi
done
echo ${arr[*]} 2>&1 | tee $LOG_PATH/${FILE_NAME1}
}
start_day=$dt
end_day=$dt1
dt=$start_day
while [[ $dt < `date -d "+1 day $end_day" +%Y-%m-%d` ]]
do
#echo "while process:"$dt
f $dt
dt=`date -d "+1 day $dt" +%Y-%m-%d`
done
[root@mz-hadoop-01 tcm]# cat hive_to_olap_4_tcm_parse.sh
#!/bin/bash
source /root/bin/common_config/db_config.properties
hive_table=$1
target_table=$2
if [ $3 ];
then
dt=$3
else
dt=`date -d "-1 day" +%F`
fi
if [ $4 ];
then
dt1=$4
else
dt1=`date -d "-1 day" +%F`
fi
echo "起始日期为$dt"
echo "结束日期为$dt1"
f(){
do_date=$1
echo "===函数执行日期为 $do_date==="
/root/bin/hive_to_mysql.sh "$olap_host" tcm "$hive_table" "$olap_db" "$target_table" "$olap_user" "$olap_pwd" $1 $1
}
if [[ $dt == $dt1 ]]
then
echo "dt = dt1......"
for i in `/mnt/db_file/tcm/get_changed_dt.sh $dt`
do
echo "同步变化的日期======================>$i"
f $i
done
else
echo "batch process..."
start_day=$dt
end_day=$dt1
dt=$start_day
while [[ $dt < `date -d "+1 day $end_day" +%Y-%m-%d` ]]
do
echo "批处理===>"$dt
f $dt
dt=`date -d "+1 day $dt" +%Y-%m-%d`
done
fi