• Java应用|使用Apache Spark MLlib构建机器学习模型


    如果您觉得本博客的内容对您有所帮助或启发,请关注我的博客,以便第一时间获取最新技术文章和教程。同时,也欢迎您在评论区留言,分享想法和建议。谢谢支持!

    一、引言

    1.1 Spark MLlib简介

    Apache Spark MLlib(Machine Learning library)是一个开源机器学习框架,建立在Apache Spark之上,支持分布式计算和大规模数据处理。它提供了许多经典机器学习算法和工具,如分类、回归、聚类、协同过滤、特征提取和数据预处理等。

    Spark MLlib使用基于DataFrame的API,提供了一个易于使用的高级API,使得用户能够快速构建、训练和调整机器学习模型,而无需担心底层分布式计算的复杂性。它还支持分布式模型选择和调整,以及与其他Apache Spark组件的集成,如Spark SQL、Spark Streaming和GraphX。

    Spark MLlib还提供了Python、Java和Scala等多种编程语言的API,使得不同的开发人员可以使用他们最喜欢的编程语言来开发机器学习应用程序。

    总之,Spark MLlib是一个非常强大和灵活的机器学习框架,适用于处理大规模数据和需要分布式计算的场景。

    1.2 为什么选择使用Spark MLlib

    1. 处理大规模数据:Spark MLlib支持分布式计算和大规模数据处理,使得处理大规模数据集变得容易。
    2. 丰富的算法库:Spark MLlib包含了许多经典的机器学习算法和工具,如分类、回归、聚类、协同过滤、特征提取和数据预处理等,覆盖了大部分机器学习应用场景。
    3. 高性能:Spark MLlib基于Apache Spark,使用内存计算和RDD(弹性分布式数据集)等优化技术,可以在处理大规模数据时提供高性能和可扩展性。
    4. 易于使用:Spark MLlib提供了一个易于使用的高级API,使得用户可以快速构建、训练和调整机器学习模型,而无需担心底层分布式计算的复杂性。
    5. 多语言支持:Spark MLlib支持多种编程语言的API,包括Python、Java和Scala等,使得不同的开发人员可以使用他们最喜欢的编程语言来开发机器学习应用程序。

    二、Spark MLlib基础

    2.1 RDD和DataFrame的比较

    1. 数据类型:基础RDD可以包含任意类型的数据,包括对象、原始类型、数组和集合等;DataFrame则是一种表格化的数据结构,其数据类型必须是统一的,且可以使用SQL-like的语法进行查询。
    2. 内存计算:DataFrame利用内存计算技术,相比基础RDD更加高效。
    3. 可读性:DataFrame比基础RDD更加易于阅读和理解,可以使用SQL-like的语法进行查询,更加直观。
    4. 类型安全:DataFrame是类型安全的,可以在编译期间捕获类型错误,避免运行时错误;而基础RDD则是类型不安全的,需要在运行时进行类型检查。
    5. 执行计划:基础RDD提供了更加灵活的执行计划,用户可以控制计算的方式和顺序,但这也增加了开发复杂度;而DataFrame则有一个自动优化的执行计划,可以自动优化查询性能。

    总之,基础RDD更加灵活和可控,但需要开发人员自己掌握计算的方式和顺序;而DataFrame则更加易于使用和高效,适合快速开发和迭代。选择使用哪种数据结构,取决于具体的场景和需求。

    2.2 数据准备和预处理

    在使用Spark MLlib进行机器学习之前,需要对原始数据进行预处理和准备。以下是一些常见的数据准备和预处理步骤:

    1. 数据清洗:删除缺失值、处理异常值和重复值等。
    2. 特征选择:选择对模型有用的特征,去除冗余和无关的特征。
    3. 特征缩放:对特征进行缩放,以便它们具有相似的范围和重要性。
    4. 特征变换:将原始特征转换为更有意义的特征,如使用对数、指数、平方根等函数进行变换。
    5. 特征归一化:将特征值归一化为标准正态分布,使得模型更容易学习。
    6. 数据转换:将数据转换为适合模型训练的格式,如将分类变量转换为二进制变量、将文本转换为向量等。

    在Spark MLlib中,可以使用各种预处理和数据准备工具,如:

    1. Imputer:用于填充缺失值。
    2. StandardScaler:用于特征缩放和归一化。
    3. VectorAssembler:用于将多个特征列组合成一个向量列。
    4. OneHotEncoder:用于将分类变量转换为二进制变量。
    5. StringIndexer和IndexToString:用于将字符串类型的变量转换为数字类型的变量。
    6. Tokenizer和StopWordsRemover:用于将文本转换为向量。

    总之,在使用Spark MLlib进行机器学习之前,需要对原始数据进行预处理和准备。Spark MLlib提供了许多工具和功能,可以帮助我们轻松地完成这些任务。

    2.3 特征提取和转换

    在Spark MLlib中,有许多常用的特征提取和转换工具,包括:

    1. Tokenizer:用于将文本转换为单词或词条。
    2. StopWordsRemover:用于去除文本中的停用词,如“the”、“and”等。
    3. CountVectorizer:用于将文本转换为词频向量。
    4. HashingTF:用于将文本转换为哈希向量,可以减少维度并提高计算效率。
    5. IDF:用于计算逆文档频率,可以减少常见词语的权重,提高稀有词语的权重。
    6. Word2Vec:用于将文本转换为向量,可以捕捉词语之间的语义关系。
    7. PCA:用于将高维特征空间降维,可以提高计算效率并避免过拟合。
    8. StringIndexer:用于将分类变量转换为数字类型的变量。
    9. OneHotEncoder:用于将数字类型的变量转换为二进制变量。

    以上这些工具都可以用于特征提取和转换,帮助我们将原始数据转换为模型可以处理的格式。我们可以根据具体的任务和数据类型选择适当的工具,以获得更好的结果。值得注意的是,这些工具的使用通常需要进行适当的参数设置和调整,以达到最佳的效果。

    三、监督学习

    3.1 分类问题

    3.1.1 逻辑回归

    逻辑回归是一种二元分类模型,它的目标是根据已知数据对一个事物进行分类。逻辑回归的输出是一个概率值,代表该事物属于某个类别的概率。如果概率值大于阈值,则将其分类为正类,否则分类为负类。

    在 Spark MLlib 中,可以使用 LogisticRegression 类来实现逻辑回归。下面是一个 Java 版本的示例代码:

    pom引用:

    1. <dependencies>
    2. <!-- Spark core dependencies -->
    3. <dependency>
    4. <groupId>org.apache.spark</groupId>
    5. <artifactId>spark-core_2.12</artifactId>
    6. <version>3.2.0</version>
    7. </dependency>
    8. <dependency>
    9. <groupId>org.apache.spark</groupId>
    10. <artifactId>spark-sql_2.12</artifactId>
    11. <version>3.2.0</version>
    12. </dependency>
    13. <dependency>
    14. <groupId>org.apache.spark</groupId>
    15. <artifactId>spark-mllib_2.12</artifactId>
    16. <version>3.2.0</version>
    17. </dependency>
    18. <!-- Spark testing dependencies (optional) -->
    19. <dependency>
    20. <groupId>org.apache.spark</groupId>
    21. <artifactId>spark-streaming_2.12</artifactId>
    22. <version>3.2.0</version>
    23. <scope>test</scope>
    24. </dependency>
    25. <dependency>
    26. <groupId>org.apache.spark</groupId>
    27. <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
    28. <version>3.2.0</version>
    29. <scope>test</scope>
    30. </dependency>
    31. <dependency>
    32. <groupId>org.apache.spark</groupId>
    33. <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    34. <version>3.2.0</version>
    35. <scope>test</scope>
    36. </dependency>
    37. </dependencies>
    1. import org.apache.spark.ml.classification.LogisticRegression;
    2. import org.apache.spark.ml.classification.LogisticRegressionModel;
    3. import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator;
    4. import org.apache.spark.ml.feature.VectorAssembler;
    5. import org.apache.spark.ml.linalg.Vector;
    6. import org.apache.spark.sql.Dataset;
    7. import org.apache.spark.sql.Row;
    8. import org.apache.spark.sql.SparkSession;
    9. public class LogisticRegressionDemo {
    10. public static void main(String[] args) {
    11. SparkSession spark = SparkSession
    12. .builder()
    13. .appName("LogisticRegressionDemo")
    14. .master("local[*]")
    15. .getOrCreate();
    16. // 加载数据
    17. Dataset<Row> data = spark.read().format("libsvm").load("data/sample_libsvm_data.txt");
    18. // 将特征向量转换成一列
    19. VectorAssembler assembler = new VectorAssembler()
    20. .setInputCols(new String[]{"features"})
    21. .setOutputCol("feature");
    22. Dataset<Row> newData = assembler.transform(data).select("label", "feature");
    23. // 将数据集分为训练集和测试集
    24. Dataset<Row>[] splits = newData.randomSplit(new double[]{0.7, 0.3});
    25. Dataset<Row> trainData = splits[0];
    26. Dataset<Row> testData = splits[1];
    27. // 创建逻辑回归模型
    28. LogisticRegression lr = new LogisticRegression();
    29. // 训练模型
    30. LogisticRegressionModel lrModel = lr.fit(trainData);
    31. // 在测试集上进行预测
    32. Dataset<Row> predictions = lrModel.transform(testData);
    33. // 计算模型评估指标
    34. BinaryClassificationEvaluator evaluator = new BinaryClassificationEvaluator();
    35. double auc = evaluator.evaluate(predictions);
    36. System.out.println("Area under ROC curve = " + auc);
    37. spark.stop();
    38. }
    39. }

    这个示例代码首先加载了一个 libsvm 格式的数据集,然后将特征向量转换成一列,将数据集分为训练集和测试集,创建逻辑回归模型并训练模型,最后在测试集上进行预测并计算模型评估指标。在这个例子中,我们使用了 BinaryClassificationEvaluator 来计算模型的 AUC 指标,它是评估二元分类器性能的一种常用指标。

    需要注意的是,以上代码仅供参考,实际情况可能需要根据数据集的特点和任务的要求进行相应的修改。

    3.1.2 决策树

    Spark MLlib 分类决策树是一种基于树结构的分类算法,通过一系列特征对数据进行划分和分类。该算法在 Spark MLlib 中的实现采用 CART(Classification And Regression Tree)算法,使用信息熵或 Gini 系数等指标进行特征选择和划分。Spark MLlib 分类决策树可用于二分类、多分类和概率预测问题。

    1. import org.apache.spark.ml.Pipeline;
    2. import org.apache.spark.ml.PipelineModel;
    3. import org.apache.spark.ml.PipelineStage;
    4. import org.apache.spark.ml.classification.DecisionTreeClassificationModel;
    5. import org.apache.spark.ml.classification.DecisionTreeClassifier;
    6. import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
    7. import org.apache.spark.ml.feature.IndexToString;
    8. import org.apache.spark.ml.feature.StringIndexer;
    9. import org.apache.spark.ml.feature.StringIndexerModel;
    10. import org.apache.spark.ml.feature.VectorAssembler;
    11. import org.apache.spark.sql.Dataset;
    12. import org.apache.spark.sql.Row;
    13. import org.apache.spark.sql.SparkSession;
    14. public class DecisionTreeClassificationExample {
    15. public static void main(String[] args) {
    16. SparkSession spark = SparkSession.builder()
    17. .appName("DecisionTreeClassificationExample")
    18. .master("local[*]")
    19. .getOrCreate();
    20. // 读取数据集
    21. Dataset<Row> data = spark.read().format("csv")
    22. .option("header", "true")
    23. .option("inferSchema", "true")
    24. .load("path/to/data.csv");
    25. // 将标签列转换为数值类型
    26. StringIndexerModel labelIndexer = new StringIndexer()
    27. .setInputCol("label")
    28. .setOutputCol("indexedLabel")
    29. .fit(data);
    30. data = labelIndexer.transform(data);
    31. // 将特征列转换为特征向量
    32. VectorAssembler featureAssembler = new VectorAssembler()
    33. .setInputCols(new String[]{"feature1", "feature2", "feature3"})
    34. .setOutputCol("features");
    35. data = featureAssembler.transform(data);
    36. // 将数据集分为训练集和测试集
    37. Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3}, 12345);
    38. Dataset<Row> trainData = splits[0];
    39. Dataset<Row> testData = splits[1];
    40. // 创建决策树分类器
    41. DecisionTreeClassifier dt = new DecisionTreeClassifier()
    42. .setLabelCol("indexedLabel")
    43. .setFeaturesCol("features");
    44. // 将标签数值转换回原始标签
    45. IndexToString labelConverter = new IndexToString()
    46. .setInputCol("prediction")
    47. .setOutputCol("predictedLabel")
    48. .setLabels(labelIndexer.labels());
    49. // 创建管道并拟合模型
    50. Pipeline pipeline = new Pipeline()
    51. .setStages(new PipelineStage[]{labelIndexer, featureAssembler, dt, labelConverter});
    52. PipelineModel model = pipeline.fit(trainData);
    53. // 在测试集上进行预测和评估
    54. Dataset<Row> predictions = model.transform(testData);
    55. MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
    56. .setLabelCol("indexedLabel")
    57. .setPredictionCol("prediction")
    58. .setMetricName("accuracy");
    59. double accuracy = evaluator.evaluate(predictions);
    60. System.out.println("Test Error = " + (1.0 - accuracy));
    61. // 输出决策树结构
    62. DecisionTreeClassificationModel treeModel =
    63. (DecisionTreeClassificationModel) (model.stages()[2]);
    64. System.out.println("Learned classification tree model:\n" + treeModel.toDebugString());
    65. spark.stop();
    66. }
    67. }

    以上示例中,我们首先使用 SparkSession 读取 CSV 格式的数据集。然后,使用 StringIndexer 将标签列转换为数值类型,并使用 VectorAssembler 将特征列转换为特征向量。接着,将数据集分为训练集和测试集,并创建 DecisionTreeClassifier 决策树分类器。最后,将管道中的各个阶段组合在一起,拟合模型并在测试集上进行预测和评估。

    3.1.3 随机森林

    随机森林是一种集成学习算法,它将多棵决策树组合起来,通过投票或平均来决定分类结果。该算法在 Spark MLlib 中的实现使用基于 CART(Classification And Regression Tree)算法的决策树作为基分类器,可以用于二分类、多分类和概率预测问题。

    以下是一个基于 Java 的 Spark MLlib 分类随机森林示例:

    1. import org.apache.spark.ml.Pipeline;
    2. import org.apache.spark.ml.PipelineModel;
    3. import org.apache.spark.ml.PipelineStage;
    4. import org.apache.spark.ml.classification.RandomForestClassificationModel;
    5. import org.apache.spark.ml.classification.RandomForestClassifier;
    6. import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
    7. import org.apache.spark.ml.feature.IndexToString;
    8. import org.apache.spark.ml.feature.StringIndexer;
    9. import org.apache.spark.ml.feature.StringIndexerModel;
    10. import org.apache.spark.ml.feature.VectorAssembler;
    11. import org.apache.spark.sql.Dataset;
    12. import org.apache.spark.sql.Row;
    13. import org.apache.spark.sql.SparkSession;
    14. public class RandomForestClassificationExample {
    15. public static void main(String[] args) {
    16. SparkSession spark = SparkSession.builder()
    17. .appName("RandomForestClassificationExample")
    18. .master("local[*]")
    19. .getOrCreate();
    20. // 读取数据集
    21. Dataset<Row> data = spark.read().format("csv")
    22. .option("header", "true")
    23. .option("inferSchema", "true")
    24. .load("path/to/data.csv");
    25. // 将标签列转换为数值类型
    26. StringIndexerModel labelIndexer = new StringIndexer()
    27. .setInputCol("label")
    28. .setOutputCol("indexedLabel")
    29. .fit(data);
    30. data = labelIndexer.transform(data);
    31. // 将特征列转换为特征向量
    32. VectorAssembler featureAssembler = new VectorAssembler()
    33. .setInputCols(new String[]{"feature1", "feature2", "feature3"})
    34. .setOutputCol("features");
    35. data = featureAssembler.transform(data);
    36. // 将数据集分为训练集和测试集
    37. Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3}, 12345);
    38. Dataset<Row> trainData = splits[0];
    39. Dataset<Row> testData = splits[1];
    40. // 创建随机森林分类器
    41. RandomForestClassifier rf = new RandomForestClassifier()
    42. .setLabelCol("indexedLabel")
    43. .setFeaturesCol("features")
    44. .setNumTrees(10);
    45. // 将标签数值转换回原始标签
    46. IndexToString labelConverter = new IndexToString()
    47. .setInputCol("prediction")
    48. .setOutputCol("predictedLabel")
    49. .setLabels(labelIndexer.labels());
    50. // 创建管道并拟合模型
    51. Pipeline pipeline = new Pipeline()
    52. .setStages(new PipelineStage[]{labelIndexer, featureAssembler, rf, labelConverter});
    53. PipelineModel model = pipeline.fit(trainData);
    54. // 在测试集上进行预测和评估
    55. Dataset<Row> predictions = model.transform(testData);
    56. MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
    57. .setLabelCol("indexedLabel")
    58. .setPredictionCol("prediction")
    59. .setMetricName("accuracy");
    60. double accuracy = evaluator.evaluate(predictions);
    61. System.out.println("Test Error = " + (1.0 - accuracy));
    62. // 获取训练好的随机森林模型并打印树的重要性
    63. RandomForestClassificationModel rfModel = (RandomForestClassificationModel) model.stages()[2];
    64. System.out.println("Learned classification forest model:\n" + rfModel.toDebugString());
    65. spark.stop();
    66. }
    67. }

    该示例代码首先使用 SparkSession 读取 CSV 格式的数据集。接下来,使用 StringIndexer 将标签列转换为数值类型,并使用 VectorAssembler 将特征列转换为特征向量。然后,将数据集分为训练集和测试集。创建 RandomForestClassifier,并将其作为管道的一部分进行拟合。拟合后,使用 MulticlassClassificationEvaluator 对测试集进行预测和评估。最后,获取训练好的随机森林模型并打印树的重要性。

    请注意,上面的示例中,数据集的路径应该被替换为实际数据集的路径,特征列的名称也应该被替换为实际特征列的名称。

    3.1.4 梯度提升树

    Spark MLlib 提供了一个强大的算法——分类梯度提升树(Gradient-Boosted Trees, GBT),它可以用于二元分类和多类分类。GBT 是一种集成学习算法,它通过在先前树的残差上逐步拟合一系列决策树来提高模型的准确性。

    在 Spark MLlib 中,可以使用 ​​GBTClassifier​​ 类来构建分类 GBT 模型。GBT 分类器使用一系列决策树来逐步提高模型的准确性,每个决策树都是在之前决策树的残差上训练得到的。通过这种方式,GBT 可以在更少的迭代次数下得到比随机森林更准确的模型。

    与其他 Spark MLlib 分类器类似,GBT 分类器也使用管道(Pipeline)来处理数据。管道通常包括以下几个步骤:

    1. 数据预处理:包括数据清洗、特征提取、特征转换等操作。
    2. 特征工程:根据特定的特征工程需求,对特征进行过滤、选择、转换等操作。
    3. 模型训练:使用训练集对模型进行拟合。
    4. 模型评估:使用测试集对模型进行评估。
    5. 模型应用:将模型应用到新的数据集上进行预测。

    在使用 GBT 分类器时,你需要指定以下参数:

    • ​featuresCol​​:特征列的名称。
    • ​labelCol​​:标签列的名称。
    • ​maxIter​​:训练迭代次数。
    • ​maxDepth​​:决策树的最大深度。
    • ​minInstancesPerNode​​:每个节点上的最小实例数。
    • ​stepSize​​:每个迭代步骤的步长。
    • ​subsamplingRate​​:用于训练每棵树的数据子样本的比例。

    1. import org.apache.spark.ml.Pipeline;
    2. import org.apache.spark.ml.PipelineModel;
    3. import org.apache.spark.ml.PipelineStage;
    4. import org.apache.spark.ml.classification.GBTClassificationModel;
    5. import org.apache.spark.ml.classification.GBTClassifier;
    6. import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
    7. import org.apache.spark.ml.feature.IndexToString;
    8. import org.apache.spark.ml.feature.StringIndexer;
    9. import org.apache.spark.ml.feature.StringIndexerModel;
    10. import org.apache.spark.ml.feature.VectorIndexer;
    11. import org.apache.spark.ml.feature.VectorIndexerModel;
    12. import org.apache.spark.sql.Dataset;
    13. import org.apache.spark.sql.Row;
    14. import org.apache.spark.sql.SparkSession;
    15. public class GBTExample {
    16. public static void main(String[] args) {
    17. // 创建一个 SparkSession
    18. SparkSession spark = SparkSession
    19. .builder()
    20. .appName("GBTExample")
    21. .getOrCreate();
    22. // 读取数据集
    23. Dataset<Row> data = spark.read()
    24. .format("libsvm")
    25. .load("data/mllib/sample_libsvm_data.txt");
    26. // 对标签列进行索引
    27. StringIndexerModel labelIndexer = new StringIndexer()
    28. .setInputCol("label")
    29. .setOutputCol("indexedLabel")
    30. .fit(data);
    31. // 对特征列进行索引
    32. VectorIndexerModel featureIndexer = new VectorIndexer()
    33. .setInputCol("features")
    34. .setOutputCol("indexedFeatures")
    35. .setMaxCategories(4) // 特征具有少于 4 个不同的值
    36. .fit(data);
    37. // 将数据集拆分为训练集和测试集
    38. Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
    39. Dataset<Row> trainingData = splits[0];
    40. Dataset<Row> testData = splits[1];
    41. // 定义 GBT 分类器
    42. GBTClassifier gbt = new GBTClassifier()
    43. .setLabelCol("indexedLabel")
    44. .setFeaturesCol("indexedFeatures")
    45. .setMaxIter(10)
    46. .setFeatureSubsetStrategy("auto");
    47. // 将索引的标签转换回原始标签
    48. IndexToString labelConverter = new IndexToString()
    49. .setInputCol("prediction")
    50. .setOutputCol("predictedLabel")
    51. .setLabels(labelIndexer.labels());
    52. // 创建管道
    53. Pipeline pipeline = new Pipeline()
    54. .setStages(new PipelineStage[]{
    55. labelIndexer,
    56. featureIndexer,
    57. gbt,
    58. labelConverter
    59. });
    60. // 训练模型
    61. PipelineModel model = pipeline.fit(trainingData);
    62. // 进行预测
    63. Dataset<Row> predictions = model.transform(testData);
    64. // 选择样例行显示
    65. predictions.select("predictedLabel", "label", "features").show(5);
    66. // 评估模型
    67. MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
    68. .setLabelCol("indexedLabel")
    69. .setPredictionCol("prediction")
    70. .setMetricName("accuracy");
    71. double accuracy = evaluator.evaluate(predictions);
    72. System.out.println("Test Error = " + (1.0 - accuracy));
    73. // 获取训练得到的 GBT 模型
    74. GBTClassificationModel gbtModel = (GBTClassificationModel) (model.stages()[2]);
    75. System.out.println("Learned classification GBT model:\n" + gbtModel.toDebugString());
    76. spark.stop();
    77. }
    78. }

    该示例使用了 Spark MLlib 内置的 ​​sample_libsvm_data.txt​​​ 数据集。首先,将数据集加载到 ​​DataFrame​​ 中。接下来,对标签列和特征列进行索引。然后,将数据集拆分为训练集和测试集。接下来,创建 GBT 分类器,并使用管道将标签转换回原始标签。最后,使用训练数据拟合管道并进行预测。最终评估模型并输出模型学习到的 GBT 分类模型的调试字符串。该字符串显示了树的结构和分裂标准,以及在每个节点处对特征的使用情况和分裂点。

    3.2 回归问题

    3.2.1 线性回归

    Spark MLlib 的线性回归算法是一种广泛使用的预测模型。它将输入特征映射到连续的输出值。这是通过训练模型来确定最佳拟合线性函数的系数完成的。

    以下是一个使用 Spark MLlib 实现线性回归的 Java 示例程序。

    1. import org.apache.spark.ml.feature.VectorAssembler;
    2. import org.apache.spark.ml.regression.LinearRegression;
    3. import org.apache.spark.ml.regression.LinearRegressionModel;
    4. import org.apache.spark.ml.regression.LinearRegressionTrainingSummary;
    5. import org.apache.spark.sql.Dataset;
    6. import org.apache.spark.sql.Row;
    7. import org.apache.spark.sql.SparkSession;
    8. public class LinearRegressionExample {
    9. public static void main(String[] args) {
    10. // 创建一个 SparkSession
    11. SparkSession spark = SparkSession
    12. .builder()
    13. .appName("LinearRegressionExample")
    14. .getOrCreate();
    15. // 读取数据集
    16. Dataset<Row> data = spark.read()
    17. .format("libsvm")
    18. .load("data/mllib/sample_linear_regression_data.txt");
    19. // 将数据集拆分为训练集和测试集
    20. Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
    21. Dataset<Row> trainingData = splits[0];
    22. // 将特征列合并到一个向量列中
    23. VectorAssembler assembler = new VectorAssembler()
    24. .setInputCols(new String[]{"features"})
    25. .setOutputCol("featuresVector");
    26. // 定义线性回归模型
    27. LinearRegression lr = new LinearRegression()
    28. .setMaxIter(10)
    29. .setRegParam(0.3)
    30. .setElasticNetParam(0.8);
    31. // // 将数据集拟合到线性回归模型中
    32. Dataset<Row> trainingDataWithFeatures = assembler.transform(trainingData);
    33. //训练模型
    34. LinearRegressionModel lrModel = lr.fit(trainingDataWithFeatures);
    35. //打印线性回归的系数和截距
    36. System.out.println("系数Coefficients: "+lrModel.coefficients() + "");
    37. System.out.println(" 截距Intercept: " + lrModel.intercept()+ "");
    38. //总结训练集上的模型并打印出一些指标。
    39. LinearRegressionTrainingSummary trainingSummary = lrModel.summary();
    40. Dataset<Row> dataset = trainingSummary.predictions().select("prediction", "label", "featuresVector");
    41. dataset.show(5);
    42. spark.stop();
    43. }
    44. }
    3.2.2 决策树回归

    Spark MLlib 提供了决策树回归(Decision Tree Regression)算法来解决回归问题。决策树回归是一种基于树结构的非参数统计方法,能够处理多维输入和输出,并且具有良好的可解释性和鲁棒性。

    下面是一个简单的 Java 代码示例,演示如何使用 Spark MLlib 的决策树回归算法对数据进行训练和预测:

    1. import org.apache.spark.ml.Pipeline;
    2. import org.apache.spark.ml.PipelineModel;
    3. import org.apache.spark.ml.PipelineStage;
    4. import org.apache.spark.ml.feature.VectorAssembler;
    5. import org.apache.spark.ml.regression.DecisionTreeRegressionModel;
    6. import org.apache.spark.ml.regression.DecisionTreeRegressor;
    7. import org.apache.spark.sql.Dataset;
    8. import org.apache.spark.sql.Row;
    9. import org.apache.spark.sql.SparkSession;
    10. public class DecisionTreeRegressionExample {
    11. public static void main(String[] args) {
    12. // 创建 SparkSession
    13. SparkSession spark = SparkSession.builder()
    14. .appName("DecisionTreeRegressionExample")
    15. .master("local[*]")
    16. .getOrCreate();
    17. // 读取数据集
    18. Dataset<Row> data = spark.read().format("csv")
    19. .option("header", "true")
    20. .option("inferSchema", "true")
    21. .load("path/to/your/data.csv");
    22. // 定义特征列和标签列
    23. String[] featureCols = data.columns();
    24. String labelCol = "label";
    25. // 将特征列转换为向量
    26. VectorAssembler assembler = new VectorAssembler()
    27. .setInputCols(featureCols)
    28. .setOutputCol("features");
    29. Dataset<Row> dataWithFeatures = assembler.transform(data).select("features", labelCol);
    30. // 将数据集拆分为训练集和测试集
    31. double[] weights = {0.7, 0.3};
    32. Dataset<Row>[] datasets = dataWithFeatures.randomSplit(weights);
    33. Dataset<Row> trainData = datasets[0];
    34. Dataset<Row> testData = datasets[1];
    35. // 创建决策树回归器
    36. DecisionTreeRegressor dt = new DecisionTreeRegressor()
    37. .setLabelCol(labelCol)
    38. .setFeaturesCol("features");
    39. // 创建 Pipeline
    40. Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] { dt });
    41. // 训练模型
    42. PipelineModel model = pipeline.fit(trainData);
    43. // 预测测试集
    44. Dataset<Row> predictions = model.transform(testData);
    45. // 显示预测结果
    46. predictions.show();
    47. // 获取训练好的决策树模型
    48. DecisionTreeRegressionModel dtModel = (DecisionTreeRegressionModel) model.stages()[0];
    49. System.out.println("Learned regression tree model:\n" + dtModel.toDebugString());
    50. // 停止 SparkSession
    51. spark.stop();
    52. }
    53. }

    这只是一个简单的示例,实际应用中可能需要更复杂的特征工程和模型调整。另外,如果数据集过大,可能需要在集群上运行以获得更好的性能。

    3.2.3 随机森林回归

    Spark MLlib提供了随机森林回归算法,可以用于预测连续的数值型数据。随机森林是一种集成学习算法,它基于决策树,通过随机选择样本和特征来减少过拟合的风险。随机森林回归使用多个决策树对数据进行拟合和预测,并取这些决策树的平均值作为最终预测结果。本文将介绍如何使用Spark MLlib中的随机森林回归算法,并提供一个完整可运行的Java示例。

    示例说明: 在这个示例中,我们将使用Spark MLlib中的随机森林回归算法,对一组汽车数据进行建模,然后使用模型来预测汽车的燃油效率(MPG)。我们将使用UCI Machine Learning Repository中的Auto MPG数据集。该数据集包含8个输入特征,如汽车的气缸数、排量、马力、重量等,以及一个输出特征MPG,表示汽车的燃油效率。我们将使用70%的数据来训练模型,30%的数据用于测试模型性能。

    1.准备数据 我们需要下载Auto MPG数据集,将其保存为CSV文件,并将其加载到Spark DataFrame中。

    数据集下载地址:​​https://archive.ics.uci.edu/ml/datasets/auto+mpg​

    CSV文件格式如下:

    1. mpg,cylinders,displacement,horsepower,weight,acceleration,modelyear,origin
    2. 18.0,8,307.0,130.0,3504.0,12.0,70,1
    3. 15.0,8,350.0,165.0,3693.0,11.5,70,1

    其中,第一列为输出特征MPG,后面的列为输入特征。

    2.构建随机森林回归模型

    1. // 创建随机森林回归模型
    2. RandomForestRegressor rf = new RandomForestRegressor()
    3. .setLabelCol("label")
    4. .setFeaturesCol("features")
    5. .setNumTrees(10);
    6. // 训练模型
    7. RandomForestRegressionModel model = rf.fit(trainingData);

    3.使用模型进行预测

    1. // 使用模型进行预测
    2. Dataset<Row> predictions = model.transform(testData);

    4.评估模型性能

    1. // 评估模型性能
    2. RegressionEvaluator evaluator = new RegressionEvaluator()
    3. .setLabelCol("label")
    4. .setPredictionCol("prediction")
    5. .setMetricName("rmse");
    6. double rmse = evaluator.evaluate(predictions);

    下面是完整 Java 代码示例:

    1. import org.apache.spark.SparkConf;
    2. import org.apache.spark.ml.Pipeline;
    3. import org.apache.spark.ml.evaluation.RegressionEvaluator;
    4. import org.apache.spark.ml.feature.VectorAssembler;
    5. import org.apache.spark.ml.regression.RandomForestRegressor;
    6. import org.apache.spark.ml.tuning.CrossValidator;
    7. import org.apache.spark.ml.tuning.CrossValidatorModel;
    8. import org.apache.spark.ml.tuning.ParamGridBuilder;
    9. import org.apache.spark.sql.Dataset;
    10. import org.apache.spark.sql.Row;
    11. import org.apache.spark.sql.SparkSession;
    12. public class RandomForestRegressionExample {
    13. public static void main(String[] args) {
    14. // Create a Spark session
    15. SparkConf conf = new SparkConf().setAppName("RandomForestRegressionExample").setMaster("local[*]");
    16. SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
    17. // Load data
    18. Dataset<Row> data = spark.read().format("libsvm").load("data/sample_libsvm_data.txt");
    19. // Split the data into training and test sets
    20. Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
    21. Dataset<Row> trainingData = splits[0];
    22. Dataset<Row> testData = splits[1];
    23. // Define the feature column names
    24. String[] featureCols = new String[data.schema().fieldNames().length - 1];
    25. for (int i = 0; i < featureCols.length; i++) {
    26. featureCols[i] = "feature" + (i + 1);
    27. }
    28. // Assemble features into a vector
    29. VectorAssembler assembler = new VectorAssembler()
    30. .setInputCols(featureCols)
    31. .setOutputCol("features");
    32. Dataset<Row> trainingDataWithFeatures = assembler.transform(trainingData);
    33. Dataset<Row> testDataWithFeatures = assembler.transform(testData);
    34. // Create a RandomForestRegressor model
    35. RandomForestRegressor rf = new RandomForestRegressor()
    36. .setLabelCol("label")
    37. .setFeaturesCol("features")
    38. .setMaxDepth(5)
    39. .setNumTrees(20);
    40. // Set up a pipeline
    41. Pipeline pipeline = new Pipeline().setStages(new RandomForestRegressor[]{rf});
    42. // Set up a grid of hyperparameters to search over using 3-fold cross validation
    43. ParamGridBuilder paramGridBuilder = new ParamGridBuilder()
    44. .addGrid(rf.maxDepth(), new int[]{5, 10})
    45. .addGrid(rf.numTrees(), new int[]{20, 50});
    46. CrossValidator crossValidator = new CrossValidator()
    47. .setEstimator(pipeline)
    48. .setEvaluator(new RegressionEvaluator())
    49. .setEstimatorParamMaps(paramGridBuilder.build())
    50. .setNumFolds(3);
    51. // Train the model using cross-validation
    52. crossValidator.setSeed(12345);
    53. CrossValidatorModel crossValidatorModel = crossValidator.fit(trainingDataWithFeatures);
    54. // Evaluate the model on the test set
    55. Dataset<Row> predictions = crossValidatorModel.transform(testDataWithFeatures);
    56. RegressionEvaluator evaluator = new RegressionEvaluator()
    57. .setLabelCol("label")
    58. .setPredictionCol("prediction")
    59. .setMetricName("rmse");
    60. double rmse = evaluator.evaluate(predictions);
    61. System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);
    62. // Stop the Spark session
    63. spark.stop();
    64. }
    65. }
    3.2.4 梯度提升回归树

    Spark MLlib提供了梯度提升回归树(Gradient-Boosted Trees,GBT)的算法,它是一种强大的回归模型,可以用于连续的数值预测。GBT在每一次迭代中,使用决策树模型去拟合残差值,然后将所有的模型的预测结果相加,得到最终的预测结果。

    以下是一个使用Spark MLlib进行梯度提升回归树的示例Java程序。这个程序将使用一个数据集,该数据集包含了关于自行车租赁量的信息。它将使用梯度提升回归树来预测一天中自行车的租赁量。

    1. import org.apache.spark.ml.evaluation.RegressionEvaluator;
    2. import org.apache.spark.ml.feature.VectorAssembler;
    3. import org.apache.spark.ml.regression.GBTRegressionModel;
    4. import org.apache.spark.ml.regression.GBTRegressor;
    5. import org.apache.spark.sql.*;
    6. public class GBTRegressionDemo {
    7. public static void main(String[] args) {
    8. // 创建 SparkSession
    9. SparkSession spark = SparkSession.builder()
    10. .appName("GradientBoostedTreeRegressionDemo")
    11. .master("local[*]")
    12. .getOrCreate();
    13. // 读取数据集
    14. Dataset<Row> data = spark.read().format("libsvm")
    15. .load("data/sample_libsvm_data.txt");
    16. // 将数据集划分为训练集和测试集
    17. Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
    18. Dataset<Row> trainingData = splits[0];
    19. Dataset<Row> testData = splits[1];
    20. // 将特征向量合并为一个向量
    21. VectorAssembler assembler = new VectorAssembler()
    22. .setInputCols(trainingData.columns())
    23. .setOutputCol("features");
    24. Dataset<Row> trainingDataWithFeatures = assembler.transform(trainingData);
    25. Dataset<Row> testDataWithFeatures = assembler.transform(testData);
    26. // 创建梯度提升回归树模型
    27. GBTRegressor gbt = new GBTRegressor()
    28. .setLabelCol("label")
    29. .setFeaturesCol("features")
    30. .setMaxIter(10);
    31. // 训练模型
    32. GBTRegressionModel model = gbt.fit(trainingDataWithFeatures);
    33. // 在测试集上进行预测
    34. Dataset<Row> predictions = model.transform(testDataWithFeatures);
    35. // 评估模型
    36. RegressionEvaluator evaluator = new RegressionEvaluator()
    37. .setLabelCol("label")
    38. .setPredictionCol("prediction")
    39. .setMetricName("rmse");
    40. double rmse = evaluator.evaluate(predictions);
    41. System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);
    42. // 输出模型的节点信息
    43. System.out.println("Learned regression GBT model:\n" + model.toDebugString());
    44. // 关闭 SparkSession
    45. spark.close();
    46. }
    47. }

    该程序与之前的程序类似,只是将 ​​DecisionTreeRegressor​​​ 类替换为 ​​GBTRegressor​​​ 类。需要注意的是,​​GBTRegressor​​​ 类在设置参数时需要设置 ​​maxIter​​​ 参数,表示最大迭代次数。同样需要用 ​​setFeaturesCol​​​ 方法设置特征列,用 ​​setLabelCol​​​ 方法设置标签列。最后需要调用 ​​fit​​​ 方法训练模型,然后用 ​​transform​​​ 方法在测试集上进行预测。最后用 ​​RegressionEvaluator​​ 类评估模型,并输出模型的节点信息。

    关于无监督学习,模型评估和调优,模型部署的内容请看下篇《Java应用|使用Apache Spark MLlib构建机器学习模型【下】》。

    如果您觉得本博客的内容对您有所帮助或启发,请关注我的博客,以便第一时间获取最新技术文章和教程。同时,也欢迎您在评论区留言,分享想法和建议。谢谢支持!

  • 相关阅读:
    WPF-UI布局
    计算机竞赛 深度学习+python+opencv实现动物识别 - 图像识别
    中国纺织服装市场深度全景调研及投资前景分析报告
    offer突击训练营,给你一个offer的保障,求职跳槽的看过来!
    oracle_申明与赋值
    kafka下载|安装
    Spring 6【方法参数校验、SpingAOP介绍、Schema-based方式实现AOP 】(十四)-全面详解(学习总结---从入门到深化)
    Centos7.6安装FTP
    centos7中python3.10找不到openssl解决方案
    在Linux服务器上部署Tornado项目
  • 原文地址:https://blog.csdn.net/bairo007/article/details/133066868