• 用户画像知识点补充——多数据源


    引入

    针对用户画像项目来说(产品)必须要支持从多种数据源加载业务数据,构建用户标签。

    在之前的标签模型开发中,主要是为了简化开发复杂度,业务数据统一存储到HBase表中。

            数据源包含如下几个方面:

            存储HDFS文件系统

            存储Hive表

            存储HBase表

            存储MySQL表

            存储NoSQL数据库:Redis数据库、MongoDB数据库

            存储Elasticsearch索引库

            存储Kafka分布式队列

            封装数据为RDD:

    1. val offsetRanges = Array(
    2. // topic, partition, inclusive starting offset, exclusive ending offset
    3. OffsetRange(“test”, 0, 0, 100),
    4. OffsetRange(“test”,1, 0, 100),
    5. )
    6. val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)

            封装数据为DataFrame:

    1. // Subscribe to multiple topics, specifyingexplicit Kafka offsets
    2. val df = spark
    3. .read
    4. .format(“kafka”)
    5. .option(“kafka.bootstrap.servers”, “host1:port1, host2:port2”)
    6. .option(“subscribe”, “topic1, topic2”)
    7. .option(“startingOffsets”, “””{“topic1”:{“0”:23,”1”:-2},”topic2”:{“0”:-2}}”””)
    8. .option(“endingOffsets”, “””{“topic1”:{“0”:50,”1”:-1},”topic2”:{“0”:-1}}”””)
    9. .load()
    10. df.selectExpr(“CAST(key AS STRING)”, “CAST(value AS STRING)”)
    11. .as[(String, String)]

    用户数据多种数据源

    进入正题,先说明一下我们的用户画像:

    1)、面向业务的用户标签及用户画像管理中台

    统一规范的标签可视化管理中台,业务人员可自助生产和维护标签,适应营销策略变化。

    2)全端采集用户行为数据,整合业务数据等多种数据源,帮助企业构建体系化用户标签图书馆,输出用户画像,赋能业务实现用户精细化运营和精准营销。

    具体功能说明

    1)、构建用户价值体系

    实现用户召回等精准营销目标

    用户在平台消费几次后,一段时间内没有再次访问平台进行消费,我们需要对其进行流失召回。

    2)、输出全景用户画像

    管理客户全生命周期、有效提升用户体验

    用户群体的特征属性和偏好概况,使用产品的方式是否和预期一致;为精准营销做有效的数据支撑

    3)、利用用户标签形成用户分层

    提供个性化推荐内容,持续提升用户转化

    用户在平台的消费金额达到一定的级别时,根据其消费能力不同,投其所好的推荐不同价格定位的商品。

    4)、利用智能算法,快速找到相似似人群

    补充标签定制的不足,高效锁走目标人群

    选定种子人群,根据特征在更大范围内为每个用户计算相似度,精准找到相似人群,挖掘更多潜在客户

    数据源概述

    针对用户画像标签系统来说,不同标签(业务标签,4级标签)来源于不同的业务数据(订单相关数据、搜搜数据、广告点击数据等)、用户行为数据以及第三方数据(社交数据、信用数据等)构建而来。

    不同类型数据采集存储在不同的存储引擎系统(比如HDFS、HBase、Hive、Elasticsearch、MYSQL数据库等),因此需要用户画像标签系统可以支持从不同的数据源读取业务数据,进行构建标签,恰好Spark SQL支持多数据源的加载与保存。

     

    加载HBase表

    前面的标签开发中,无论是加载注册会员信息表tbl_tag_users还是订单数据表tbl_tag_orders,都是从HBase数据库中读取,自己依据Spark SQL实现外部数据源接口,在标签管理平台构建标签时,通过标签规则rule传递参数,开发标签模型时:解析标签规则获取业务数据,逻辑如下:

     

    重构代码(加载数据)

    将上述代码抽象为两个方法:

    其一:解析标签规则rule为Map集合

    其二:依据规则Map集合中inType判断具体数据源,加载业务数据

    编写MetaParse对象object,创建方法parseRuleToMap和parseMetaToData

    1. 、解析规则rule为参数ParamsMap

    获取业务标签规则rule,按照分隔符分割数据,具体实现代码如下:

    1. import org.apache.spark.internal.Logging
    2. import org.apache.spark.sql.{DataFrame, SparkSession}
    3. /**
    4. * 加载业务数据工具类:
    5. * 解析业务标签规则rule,依据规则判断数段数据源,加载业务数据
    6. */
    7. object MetaParse extends Logging {
    8. /**
    9. * 依据标签数据,获取业务标签规则rule,解析转换为Map集合
    10. * @param tagDF 标签数据
    11. * @return Map集合
    12. */
    13. def parseRuleToParams(tagDF: DataFrame): Map[String, String] = {
    14. import tagDF.sparkSession.implicits._
    15. // 1. 4级标签规则rule
    16. val tagRule: String = tagDF
    17. .filter($"level" === 4)
    18. .head()
    19. .getAs[String]("rule")
    20. logInfo(s"==== 业务标签数据规则: {$tagRule} ====")
    21. // 2. 解析标签规则,先按照换行\n符分割,再按照等号=分割
    22. /*
    23. inType=hbase
    24. zkHosts=bigdata-cdh01.itcast.cn
    25. zkPort=2181
    26. hbaseTable=tbl_tag_logs
    27. family=detail
    28. selectFieldNames=global_user_id,loc_url,log_time
    29. whereCondition=log_time#day#30
    30. */
    31. val paramsMap: Map[String, String] = tagRule
    32. .split("\n")
    33. .map{ line =>
    34. val Array(attrName, attrValue) = line.trim.split("=")
    35. (attrName, attrValue)
    36. }
    37. .toMap
    38. // 3. 返回集合Map
    39. paramsMap
    40. }
    41. /**
    42. * 依据inType判断数据源,封装元数据Meta,加载业务数据
    43. * @param spark SparkSession实例对象
    44. * @param paramsMap 业务数据源参数集合
    45. * @return
    46. */
    47. def parseMetaToData(spark: SparkSession,
    48. paramsMap: Map[String, String]): DataFrame = {
    49. // 1. 从inType获取数据源
    50. val inType: String = paramsMap("inType")
    51. // 2. 判断数据源,封装Meta,获取业务数据
    52. val businessDF: DataFrame = inType.toLowerCase match {
    53. case "hbase" =>
    54. // 解析map集合,封装Meta实体类中
    55. val hbaseMeta = HBaseMeta.getHBaseMeta(paramsMap)
    56. // 加载业务数据
    57. spark.read
    58. .format("hbase")
    59. .option("zkHosts", hbaseMeta.zkHosts)
    60. .option("zkPort", hbaseMeta.zkPort)
    61. .option("hbaseTable", hbaseMeta.hbaseTable)
    62. .option("family", hbaseMeta.family)
    63. .option("selectFields", hbaseMeta.selectFieldNames)
    64. .option("filterConditions", hbaseMeta.filterConditions)
    65. .load()
    66. case "mysql" =>
    67. // 解析Map集合,封装MySQLMeta对象中
    68. val mysqlMeta = MySQLMeta.getMySQLMeta(paramsMap)
    69. // 从MySQL表加载业务数据
    70. spark.read
    71. .format("jdbc")
    72. .option("driver", mysqlMeta.driver)
    73. .option("url", mysqlMeta.url)
    74. .option("user", mysqlMeta.user)
    75. .option("password", mysqlMeta.password)
    76. .option("dbtable", mysqlMeta.sql)
    77. .load()
    78. case "hive" =>
    79. // Map集合,封装HiveMeta对象
    80. val hiveMeta: HiveMeta = HiveMeta.getHiveMeta(paramsMap)
    81. // 从Hive表加载数据, TODO:此时注意,如果标签模型业务数从Hive表加载,创建SparkSession对象时,集成Hive
    82. spark.read
    83. .table(hiveMeta.hiveTable)
    84. // def select(cols: Column*): DataFrame, selectFieldNames: _* -> 将数组转换可变参数传递
    85. .select(hiveMeta.selectFieldNames: _*)
    86. //.filter(hiveMeta.whereCondition)
    87. case "hdfs" =>
    88. // 解析Map集合,封装HdfsMeta对象中
    89. val hdfsMeta: HdfsMeta = HdfsMeta.getHdfsMeta(paramsMap)
    90. // 从HDFS加载CSV格式数据
    91. spark.read
    92. .option("sep", hdfsMeta.sperator)
    93. .option("header", "true")
    94. .option("inferSchema", "true")
    95. .csv(hdfsMeta.inPath)
    96. .select(hdfsMeta.selectFieldNames: _*)
    97. case "es" =>
    98. null
    99. case _ =>
    100. // 如果未获取到数据,直接抛出异常
    101. new RuntimeException("业务标签规则未提供数据源信息,获取不到业务数据,无法计算标签")
    102. null
    103. }
    104. // 3. 返回加载业务数据
    105. businessDF
    106. }
    107. }

    加载Hive表

    1. import org.apache.spark.sql.Column
    2. /**
    3. * 从Hive表中加载数据,SparkSession创建时与Hive集成已配置
    4. inType=hive
    5. hiveTable=tags_dat.tbl_logs
    6. selectFieldNames=global_user_id,loc_url,log_time
    7. ## 分区字段及数据范围
    8. whereCondition=log_time#day#30
    9. */
    10. case class HiveMeta(
    11. hiveTable: String,
    12. selectFieldNames: Array[Column],
    13. whereCondition: String
    14. )
    15. object HiveMeta{
    16. /**
    17. * 将Map集合数据解析到HiveMeta中
    18. * @param ruleMap map集合
    19. * @return
    20. */
    21. def getHiveMeta(ruleMap: Map[String, String]): HiveMeta = {
    22. // 此处省略依据分组字段值构建WHERE CAUSE 语句
    23. // val whereCondition = ...
    24. // 将选择字段构建为Column对象
    25. import org.apache.spark.sql.functions.col
    26. val fieldColumns: Array[Column] = ruleMap("selectFieldNames")
    27. .split(",")
    28. .map{field => col(field)}
    29. // 创建HiveMeta对象并返回
    30. HiveMeta(
    31. ruleMap("hiveTable"), //
    32. fieldColumns, //
    33. null
    34. )
    35. }
    36. }

    加载HDFS表

    1. import org.apache.spark.sql.Column
    2. /**
    3. * 从HDFS文件系统读取数据,文件格式为csv类型,首行为列名称
    4. inType=hdfs
    5. inPath=/apps/datas/tbl_logs
    6. sperator=\t
    7. selectFieldNames=global_user_id,loc_url,log_time
    8. */
    9. case class HdfsMeta(
    10. inPath: String,
    11. sperator: String,
    12. selectFieldNames: Array[Column]
    13. )
    14. object HdfsMeta{
    15. /**
    16. * 将Map集合数据解析到HdfsMeta中
    17. * @param ruleMap map集合
    18. * @return
    19. */
    20. def getHdfsMeta(ruleMap: Map[String, String]): HdfsMeta = {
    21. // 将选择字段构建为Column对象
    22. import org.apache.spark.sql.functions.col
    23. val fieldColumns: Array[Column] = ruleMap("selectFieldNames")
    24. .split(",")
    25. .map{field => col(field)}
    26. // 创建HdfsMeta对象并返回
    27. HdfsMeta(
    28. ruleMap("inPath"), //
    29. ruleMap("sperator"), //
    30. fieldColumns
    31. )
    32. }
    33. }

    (叠甲:大部分资料来源于黑马程序员,这里只是做一些自己的认识、思路和理解,主要是为了分享经验,如果大家有不理解的部分可以私信我,也可以移步【黑马程序员_大数据实战之用户画像企业级项目】https://www.bilibili.com/video/BV1Mp4y1x7y7?p=201&vd_source=07930632bf702f026b5f12259522cb42,以上,大佬勿喷)

  • 相关阅读:
    mysql经典问题(可用于面试提问)
    Windows 源码编译 nginx (加入nginx-http-flv-module)
    LLM之Prompt(一):5个Prompt高效方法在文心一言3.5的测试对比
    使用python查找指定文件夹下所有xml文件中带有指定字符的xml文件
    5.13一行代码就能解决的算法题
    Java面试题简答(整理)
    gpt-4o看图说话-根据图片回答问题
    XCTF1-web Robots
    5种在TypeScript中使用的类型保护
    Git查询某次提交属于哪个分支
  • 原文地址:https://blog.csdn.net/qq_41680016/article/details/139380392