• 7. dws 层建设(大宽表)


    7. dws 层建设(大宽表)

    需求:

    dws层的层结构如下:

    7.1注意事项:

    学习两个新的函数 named_struct() 将字段和字段的值变成一个kv格式的类型,数据和结果如下:

    D71FBB5BF8922508B8FEA72BFA556294 {"r_fwzl":"富强路东侧壹号公馆A*******","htydjzmj":"117.2900","tntjzmj":"96.8700","ftmj":"20.4200","time_tjba":"2011-11-04 10:19:26","htzj":"499670.0000"}

    collect_list() 将很多的数据转变成一个集合, to_json()将这个集合转变称为一个json格式

    7.2 在dws层的建表语句如下:

    1. CREATE TABLE IF NOT EXISTS dws.dws_population_property_info_msk_d(
    2. id String comment '身份证号码',
    3. sfyfc String comment '是否有房产【1-是,0-否】',
    4. sfyfc_num BIGINT comment '房产数量',
    5. sfyfc_info String comment '房产信息',
    6. sfygc String comment '是否法定代表人【1-是,0-否】',
    7. sfygc_num BIGINT comment '公司数量',
    8. sfygc_info String comment '公司信息',
    9. sywgd String comment '是否为股东【1-是,0-否】',
    10. sywgd_num BIGINT comment '为股东次数',
    11. sfysb String comment '是否有社保【1-是,0-否】',
    12. sfysb_num BIGINT comment '交社保的数量',
    13. sfysb_info String comment '社保信息',
    14. sfygjj String comment '是否有公积金【1-是,0-否】',
    15. sfygjj_num BIGINT comment '交公积金次数',
    16. sfygjj_info String comment '公积金信息'
    17. )
    18. PARTITIONED BY (
    19. ds string comment '分区'
    20. )
    21. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    22. STORED AS TEXTFILE
    23. location'/daas/motl/dws/dws_population_property_info_msk_d';

    7.3 构建大款表 的语句如下

    1. insert overwrite table dws.dws_population_property_info_msk_d partition(ds=${ds})
    2. select
    3. -- /*+ broadcast(b),broadcast(c),broadcast(d),broadcast(e),broadcast(f) */
    4. a.id,
    5. case when b.id is null then 0 else 1 end sfyfc,
    6. case when b.id is null then 0 else b.sfyfc_num end sfyfc_num,
    7. b.sfyfc_info as sfyfc_info,
    8. case when c.id is null then 0 else 1 end sfygc,
    9. case when c.id is null then 0 else c.sfygc_num end sfygc_num,
    10. c.sfygc_info as sfygc_info,
    11. case when d.id is null then 0 else 1 end sywgd,
    12. case when d.id is null then 0 else d.sywgd_num end sywgd_num,
    13. case when e.id is null then 0 else 1 end sfysb,
    14. case when e.id is null then 0 else e.sfysb_num end sfysb_num,
    15. e.sfysb_info as sfysb_info,
    16. case when f.id is null then 0 else 1 end sfygjj,
    17. case when f.id is null then 0 else f.sfygjj_num end sfygjj_num,
    18. f.sfygjj_num as sfygjj_info
    19. from
    20. (
    21. select id from dwd.dwd_ga_hjxx_czrkjbxx_msk_d
    22. group by id
    23. ) as a
    24. left join
    25. (
    26. select id,
    27. count(1) as sfyfc_num,
    28. to_json(collect_list(named_struct( -- 将一个人所有的房产信息合并成一个json字符串
    29. "r_fwzl",r_fwzl,
    30. "htydjzmj",htydjzmj,
    31. "tntjzmj",tntjzmj,
    32. "ftmj",ftmj,
    33. "time_tjba",time_tjba,
    34. "htzj",htzj
    35. ) )) as sfyfc_info
    36. from dwd.dwd_fcj_nwrs_sellbargain_msl_d
    37. where ds=${ds}
    38. group by id
    39. ) as b
    40. on a.id=b.id
    41. left join
    42. (
    43. select
    44. id,
    45. count(1) as sfygc_num ,
    46. to_json(collect_list(named_struct(
    47. "position",position,
    48. "tel",tel,
    49. "appounit",appounit,
    50. "accdside",accdside,
    51. "posbrmode",posbrmode,
    52. "offhfrom",offhfrom,
    53. "offhto",offhto,
    54. "stufftype",stufftype
    55. ) )) as sfygc_info
    56. from dwd.dwd_gsj_reg_legrepre_msk_d
    57. where ds=${ds}
    58. group by id
    59. ) as c
    60. on
    61. a.id=c.id
    62. left join
    63. (
    64. select id,count(1) as sywgd_num
    65. from dwd.dwd_gsj_reg_investor_msk_d
    66. where ds=${ds}
    67. group by id
    68. ) as d
    69. on
    70. a.id=d.id
    71. left join
    72. (
    73. select id,
    74. count(1) as sfysb_num,
    75. to_json(collect_list(named_struct(
    76. "citty_id",citty_id,
    77. "ss_id",ss_id,
    78. "fkrq",fkrq,
    79. "yxqz",yxqz,
    80. "aaz502",aaz502,
    81. "aae008",aae008,
    82. "aae008b",aae008b,
    83. "aae010",aae010,
    84. "aae010a",aae010a,
    85. "aae010b",aae010b
    86. ) )) as sfysb_info
    87. from
    88. (
    89. select distinct *
    90. from dwd.dwd_rs_zhcs_az01bsbkxx_msk_d
    91. where ds=${ds}
    92. ) as t
    93. group by id
    94. ) as e
    95. on
    96. a.id=e.id
    97. left join
    98. (
    99. select id,count(1) as sfygjj_num,
    100. to_json(collect_list(named_struct(
    101. "spcode",spcode,
    102. "hjstatus",hjstatus,
    103. "sncode",sncode,
    104. "spname",spname,
    105. "spcard",spcard,
    106. "sppassword",sppassword,
    107. "zjfdm",zjfdm,
    108. "spkhrq",spkhrq,
    109. "spperm",spperm,
    110. "spgz",spgz,
    111. "spsingl",spsingl,
    112. "spjcbl",spjcbl,
    113. "spmfact",spmfact,
    114. "spmfactzg",spmfactzg,
    115. "spjym",spjym,
    116. "ncye",ncye,
    117. "splast",splast,
    118. "dwbfye",dwbfye,
    119. "grbfye",grbfye,
    120. "spmend",spmend,
    121. "splastlx",splastlx,
    122. "spout",spout,
    123. "spin",spin,
    124. "bnlx",bnlx,
    125. "nclx",nclx,
    126. "dwhjny",dwhjny,
    127. "zghjny",zghjny,
    128. "btyje",btyje,
    129. "btye",btye,
    130. "btbl",btbl,
    131. "bthjny",bthjny,
    132. "spxh",spxh,
    133. "spzy",spzy,
    134. "spxhrq",spxhrq,
    135. "splr",splr,
    136. "spoldbankno",spoldbankno,
    137. "spdk",spdk,
    138. "spdy",spdy,
    139. "zhdj",zhdj,
    140. "spnote",spnote,
    141. "modifytime",modifytime,
    142. "status",`status`,
    143. "cbank",cbank,
    144. "bcyje",bcyje,
    145. "bcye",bcye,
    146. "bcbl",bcbl,
    147. "bchjny",bchjny,
    148. "zjzl",zjzl
    149. ) )) as sfygjj_info
    150. from
    151. dwd.dwd_gjj_sspersons_msk_d
    152. where ds=${ds}
    153. group by id
    154. ) as f
    155. on a.id=f.id

    脚本如下:

    在提交spark任务时,必须指定executer 的数量,然后指定核数和内存,然后就是分区数(避免生成很多的小文件)

    1. # 分区
    2. ds=$1
    3. # 执行任务
    4. # num-executor 在项目现场一般50-100个
    5. spark-sql \
    6. --master yarn-client \
    7. --num-executors=1 \
    8. --executor-cores=2 \
    9. --executor-memory=4G \
    10. --conf spark.sql.shuffle.partitions=2 \
    11. -f dws_population_property_info_msk_d.sql \
    12. -d ds=$ds

    7.4 提交任务

    因为在dwd层我的分区搞错了,所以有两张表没有数据,如下:

    当我把分区统一一致后,重新跑数据之后,再次执行任务。如下:

    将小表广播出去,提高效率

    还有一个就是分区问题,如果不指定分区默认就是200 个分区,到时候在hdfs中就会生成200个左右的小文件,但是指定分区数后,如下:

  • 相关阅读:
    C++、c语法基础【1】
    flutter开发实战-应用更新apk下载、安装apk、启动应用实现
    python+opencv神经网络风格迁移--你也可以拥有梵高一样的画作
    weblogic CVE-2024-20931分析
    SCI一区 | Matlab实现PSO-TCN-LSTM-Attention粒子群算法优化时间卷积长短期记忆神经网络融合注意力机制多变量时间序列预测
    大唐杯题目
    java毕业设计商场会员管理系统源码+lw文档+mybatis+系统+mysql数据库+调试
    基于ABP实现DDD--领域服务、应用服务和DTO实践
    Warmup小记
    【应用层协议】初始Http,fiddler的使用
  • 原文地址:https://blog.csdn.net/weixin_48370579/article/details/126238250