• 详解数据挖掘


            数据挖掘(Data Mining),又译为资料探勘、数据采矿,是数据库知识发现(Knowledge-Discovery in Databases,简称:KDD)中的一个步骤。数据挖掘主要是指从大量的数据中,通过算法自动搜索隐藏于其中的特殊关系型信息的过程。在技术层面上,数据挖掘涉及从大量的、不完全的、有噪声的、模糊的和随机的数据中,提取出隐含的、事先未知的但具有潜在价值和有用信息的过程。

            数据挖掘通常与计算机科学紧密相关,它利用统计、在线分析处理、情报检索、机器学习、专家系统(依靠过去的经验法则)和模式识别等多种方法来实现其目标。数据挖掘过程涵盖了从定义目标、获取数据、数据探索(包括数据质量分析和数据特征分析)到数据预处理(包括数据清洗、数据集成、数据变换和数据规约)等一系列步骤。

            数据挖掘目前是人工智能和数据库领域研究的热点问题,其应用领域广泛,包括情报检索、情报分析、模式识别等。通过高度自动化地分析企业的数据,数据挖掘能够做出归纳性的整理,从中挖掘出潜在的模式,从而帮助决策者调整市场策略,减少风险。

            数据挖掘技术融合了统计学、人工智能、模式识别、机器学习、数据库和可视化技术等多个领域的知识和方法,使得人们能够更有效地从海量数据中提取有价值的信息和知识。随着技术的不断发展,数据挖掘将在更多领域发挥重要作用,为决策提供更加科学、准确的支持。  

            而在大数据的比赛中数据挖掘占有非常中要的地位,但也是最为困难的模块,通常数据挖掘模块与能否进入国赛有很大的关联。        

            以下是相应的代码:

    1. 根据Hive的dwd库中相关表或MySQL中shtd_store中相关表(order_detail、sku_info),计算出与用户id为6708的用户所购买相同商品种类最多的前10位用户(只考虑他俩购买过多少种相同的商品,不考虑相同的商品买了多少次),将10位用户id进行输出,若与多个用户购买的商品种类相同,则输出结果按照用户id升序排序,输出格式如下,将结果截图粘贴至客户端桌面【Release\任务C提交结果.docx】中对应的任务序号下;

    结果格式如下:

    -------------------相同种类前10的id结果展示为:--------------------

    1,2,901,4,5,21,32,91,14,52

    1. package com.rmx.gz.spark.gz2.excavate
    2. import org.apache.log4j.{Level, Logger}
    3. import org.apache.spark.ml.Pipeline
    4. import org.apache.spark.ml.feature.{OneHotEncoder, StandardScaler, VectorAssembler}
    5. import org.apache.spark.ml.linalg.{DenseVector, SparseVector}
    6. import org.apache.spark.rdd.RDD
    7. import org.apache.spark.sql.expressions.UserDefinedFunction
    8. import org.apache.spark.sql.functions.{asc, avg, col, desc, udf}
    9. import org.apache.spark.sql.types.DoubleType
    10. import org.apache.spark.sql.{DataFrame, Row, SparkSession, functions}
    11. import java.util.Properties
    12. import scala.+:
    13. /**
    14. * 数据挖掘
    15. */
    16. object Excavate {
    17. val mysqlUrl = "jdbc:mysql://bigdata1:3306/ds_pub?useSSL=false&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useUnicode=true"
    18. System.setProperty("HADOOP_USER_NAME", "root")
    19. Logger.getLogger("org").setLevel(Level.ERROR)
    20. def main(args: Array[String]): Unit = {
    21. val sparkSession = SparkSession.builder()
    22. .master("local[*]")
    23. .appName("extraction")
    24. .config("spark.hadoop.dfs.client.use.datanode.hostname", "true")
    25. .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    26. .enableHiveSupport()
    27. .getOrCreate()
    28. val mysqlProperties: Properties = new Properties()
    29. mysqlProperties.setProperty("user", "root")
    30. mysqlProperties.setProperty("password", "123456")
    31. val order_info = sparkSession.read.jdbc(mysqlUrl, "order_info", properties = mysqlProperties)
    32. val sku_info = sparkSession.read.jdbc(mysqlUrl, "sku_info", properties = mysqlProperties)
    33. val order_detail = sparkSession.read.jdbc(mysqlUrl, "order_detail", properties = mysqlProperties)
    34. val user_info = sparkSession.read.jdbc(url = mysqlUrl, table = "user_info", properties = mysqlProperties)
    35. /**
    36. * 宽表,普通join连接,剔除事实表中不存在现有维表中的记录
    37. */
    38. val width_table = order_info
    39. .join(order_detail, order_detail("order_id") === order_info("id"))
    40. .join(sku_info, sku_info("id") === order_detail("sku_id"))
    41. .join(user_info, order_info("user_id") === user_info("id"))
    42. /**
    43. * 与用户id为6708的用户所购买相同商品最多的前10位用户
    44. */
    45. // 用户8880购买过的商品(去重)
    46. val user8880BuySkuIds = width_table
    47. .filter(order_info("user_id") === 8880L)
    48. .select(order_detail("sku_id"))
    49. .dropDuplicates("sku_id")
    50. .collect()
    51. .map(row => row.getLong(0))
    52. // 与用户id为8880的用户所购买相同商品最多的前10位用户
    53. val top10User = width_table
    54. .filter(order_info("user_id") =!= 8880L && order_detail("sku_id").isin(user8880BuySkuIds: _*))
    55. .select(order_info("user_id"), order_detail("sku_id"))
    56. .dropDuplicates("user_id", "sku_id")
    57. .groupBy("user_id")
    58. .agg(functions.count("*").as("count"))
    59. .orderBy(desc("count"), asc("user_id"))
    60. .limit(10)
    61. .collect()
    62. .map(row => row.getLong(0))
    63. println("-------------------相同种类前10的id结果展示为:--------------------")
    64. println(top10User.mkString(","))
    65. /**
    66. * 商品表信息预处理
    67. */
    68. val sku_dimension_table = width_table
    69. .select(sku_info("*"),order_info("user_id"))
    70. .drop("sku_name", "sku_desc", "sku_default_img")
    71. .filter(col("id").isin(user8880BuySkuIds: _*) || col("user_id").isin(top10User: _*))
    72. .dropDuplicates( "id")
    73. val assemblerPrice = new VectorAssembler().setInputCols(Array("price")).setOutputCol("vector_price")
    74. val scalerPrice = new StandardScaler().setInputCol("vector_price").setOutputCol("standard_price")
    75. val assemblerWeight = new VectorAssembler().setInputCols(Array("weight")).setOutputCol("vector_weight")
    76. val scalerWeight = new StandardScaler().setInputCol("vector_weight").setOutputCol("standard_weight")
    77. val oneHotEncoder = new OneHotEncoder()
    78. .setInputCols(Array("spu_id", "tm_id", "category3_id"))
    79. .setOutputCols(Array("one_hot_spu_id", "one_hot_tm_id", "one_hot_category3_id"))
    80. .setDropLast(false)
    81. // 处理管道
    82. val pipeline = new Pipeline()
    83. val pretreatment_sku_info = pipeline
    84. .setStages(Array(assemblerPrice, assemblerWeight, scalerPrice, scalerWeight, oneHotEncoder))
    85. .fit(sku_dimension_table)
    86. .transform(sku_dimension_table)
    87. // 稀疏向量转值数组
    88. val sparseVectorToArray = udf { x: SparseVector => x.toArray }
    89. // 稠密向量转数组
    90. val denseVectorToArray = udf { x: DenseVector => x.toArray }
    91. import sparkSession.implicits._
    92. val task2Show = pretreatment_sku_info
    93. .orderBy("id")
    94. .limit(1)
    95. .select(
    96. col("id").cast(DoubleType) +:
    97. denseVectorToArray(col("standard_price"))(0).as("price") +:
    98. denseVectorToArray(col("standard_weight"))(0).as("weight") +:
    99. ((0 until 12).map((i: Int) => sparseVectorToArray(col("one_hot_spu_id"))(i) as s"spu_id#${i + 1}") ++
    100. (0 until 7).map((i: Int) => sparseVectorToArray(col("one_hot_tm_id"))(i) as s"tm_id#${i + 1}") ++
    101. (0 until 803).map((i: Int) => sparseVectorToArray(col("one_hot_category3_id"))(i) as s"category3_id#${i + 1}")): _*
    102. )
    103. val elements = task2Show
    104. .limit(1)
    105. .select(task2Show.columns.slice(0, 10).map(col(_: String)): _*)
    106. .collect()
    107. .head
    108. println("--------------------第一条数据前10列结果展示为:---------------------")
    109. println(elements.mkString(","))
    110. /**
    111. * 推荐
    112. */
    113. val cosineSimilarity: UserDefinedFunction = udf((v1: SparseVector, v2: SparseVector) => {
    114. //分子
    115. val member: Double = v2.toArray.zip(v1.toArray).map((num: (Double, Double)) => num._1 * num._2).sum
    116. // 分母
    117. val temp1: Double = math.sqrt(v2.toArray.map((num: Double) => math.pow(num, 2)).sum)
    118. val temp2: Double = math.sqrt(v1.toArray.map((num: Double) => math.pow(num, 2)).sum)
    119. val deno: Double = temp1 * temp2
    120. //余弦
    121. member / deno
    122. })
    123. val sku_vector: DataFrame = new VectorAssembler()
    124. .setInputCols(Array("standard_price", "standard_weight", "one_hot_spu_id", "one_hot_tm_id", "one_hot_category3_id"))
    125. .setOutputCol("vector")
    126. .transform(pretreatment_sku_info)
    127. .select("id", "vector")
    128. val user_8880_sku_info = sku_vector
    129. .filter(col("id").isin(user8880BuySkuIds: _*))
    130. .select(col("vector").as("v1"))
    131. val result = sku_vector
    132. .filter(!col("id").isin(user8880BuySkuIds: _*))
    133. .crossJoin(user_8880_sku_info)
    134. .withColumn("cosineSimilarity", cosineSimilarity(col("vector"), col("v1")))
    135. .groupBy("id")
    136. .agg(avg("cosineSimilarity") as "cosineSimilarity")
    137. .orderBy(desc("cosineSimilarity"))
    138. val tuples: Array[(String, String)] = result
    139. .collect()
    140. .map((r: Row) => (r(0).toString, r(1).toString))
    141. .slice(0, 6)
    142. println("------------------------推荐Top5结果如下------------------------")
    143. for (i <- tuples.indices) {
    144. println(s"相似度top${i + 1}(商品id:${tuples(i)._1},平均相似度:${tuples(i)._2})")
    145. }
    146. }
    147. }

            近年来,数据挖掘引起了信息产业界的极大关注,其主要原因是存在大量数据,可以广泛使用,并且迫切需要将这些数据转换成有用的信息和知识。获取的信息和知识可以广泛用于各种应用,包括商务管理、生产控制、市场分析、工程设计和科学探索等。数据挖掘利用了来自如下一些领域的思想:①来自统计学的抽样、估计和假设检验;②人工智能、模式识别和机器学习的搜索算法建模技术学习理论。数据挖掘也迅速地接纳了来自其他领域的思想,这些领域包括最优化、进化计算信息论信号处理、可视化和信息检索。一些其他领域也起到重要的支撑作用。特别地,需要数据库系统提供有效的存储、索引和查询处理支持。源于高性能(并行)计算的技术在处理海量数据集方面常常是重要的。分布式技术也能帮助处理海量数据,并且当数据不能集中到一起处理时更是至关重要。

    数据挖掘的系统结构

  • 相关阅读:
    记某信漏洞复现
    docker命令介绍,镜像制作,容器启动,进入容器操作等
    PyQt,PySide-槽函数被执行了两次
    【动手学深度学习】CNN浅记
    矩阵分析与应用-06-概率密度函数01
    windows 中, bash: conda: command not found(已解决)
    MemoryCache 缓存 实用
    jwt原理及使用
    linux调试技巧
    一幅长文细学华为MRS大数据开发(五)——MapReduce和Yarn
  • 原文地址:https://blog.csdn.net/2301_77578187/article/details/136634217