🔎大家好,我是Sonhhxg_柒,希望你看完之后,能对你有所帮助,不足请指正!共同学习交流🔎
📝个人主页-Sonhhxg_柒的博客_CSDN博客 📃
🎁欢迎各位→点赞👍 + 收藏⭐️ + 留言📝
📣系列专栏 - 机器学习【ML】 自然语言处理【NLP】 深度学习【DL】
🖍foreword
✔说明⇢本人讲解主要包括Python、机器学习(ML)、深度学习(DL)、自然语言处理(NLP)等内容。
如果你对这个系列感兴趣的话,可以关注订阅哟👋
文章目录
在上一章中,我们介绍了如何使用 MLlib 构建机器学习管道。本章将重点介绍如何管理和部署您训练的模型。在本章结束时,您将能够使用 MLflow 来跟踪、重现和部署您的 MLlib 模型,讨论各种模型部署场景的困难和权衡,并构建可扩展的机器学习解决方案。但在我们讨论部署模型之前,让我们先讨论一些模型管理的最佳实践,以使您的模型为部署做好准备。
在部署机器学习模型之前,您应该确保可以重现和跟踪模型的性能。对我们来说,机器学习解决方案的端到端可重现性意味着我们需要能够重现生成模型的代码、训练中使用的环境、训练数据以及模型本身。每个数据科学家都喜欢提醒您设置种子,以便您可以重现您的实验(例如,对于训练/测试拆分,当使用具有固有随机性的模型(如随机森林)时)。然而,除了设置种子之外,还有更多方面有助于重现性,其中一些方面要微妙得多。这里有一些例子:
库版本控制
当数据科学家把他们的代码交给你时,他们可能会也可能不会提到依赖库。虽然您可以通过查看错误消息来确定需要哪些库,但您无法确定它们使用了哪些库版本,因此您可能会安装最新版本。但是,如果他们的代码是基于以前版本的库构建的,这可能会利用与您安装的版本不同的某些默认行为,那么使用最新版本可能会导致代码中断或结果不同(例如,考虑XGBoost如何改变它在 v0.90 中处理缺失值的方式)。
数据演变
假设您在 2020 年 6 月 1 日构建了一个模型,并跟踪了所有超参数、库等。然后您尝试在 2020 年 7 月 1 日重现相同的模型,但由于基础数据已中断,因此管道中断或结果不同如果有人在初始构建后添加了额外的列或增加了一个数量级的数据,则可能会发生这种情况。
执行顺序
如果数据科学家将他们的代码交给你,你应该能够自上而下地运行它而不会出错。然而,数据科学家因无序运行或多次运行同一个有状态单元而臭名昭著,这使得他们的结果很难重现。(他们还可能签入具有与用于训练最终模型的超参数不同的超参数的代码副本!)
并行操作
为了最大化吞吐量,GPU 将并行运行许多操作。但是,执行顺序并不总是得到保证,这可能导致不确定的输出。tf.reduce_sum()这是在聚合浮点数(精度有限)等函数时的一个已知问题:添加它们的顺序可能会产生稍微不同的结果,这可能会在多次迭代中加剧。
无法重现您的实验通常会阻碍业务部门采用您的模型或将其投入生产。虽然您可以构建自己的内部工具来跟踪您的模型、数据、依赖版本等,但它们可能会变得过时、脆弱,并且需要大量的开发工作来维护。同样重要的是拥有用于管理模型的行业标准,以便可以轻松地与合作伙伴共享它们。有开源和专有工具可以帮助我们通过抽象出许多这些常见的困难来重现我们的机器学习实验。本节将重点介绍 MLflow,因为它与当前可用的开源模型管理工具中的 MLlib 具有最紧密的集成。
MLflow是一个开源平台,可帮助开发人员复制和共享实验、管理模型等等。它提供 Python、R 和 Java/Scala 接口,以及 REST API. 如图 11-1所示,MLflow 有四个主要组件:
Tracking
提供 API 来记录参数、指标、代码版本、模型和工件,例如绘图和文本。
Projects
一种标准化格式,用于打包您的数据科学项目及其依赖项以在其他平台上运行。它可以帮助您管理模型训练过程。
Models
一种标准化格式,用于打包模型以部署到不同的执行环境。它为加载和应用模型提供了一致的 API,无论用于构建模型的算法或库如何。
Registry
用于跟踪模型沿袭、模型版本、阶段转换和注释的存储库。
图 11-1。MLflow 组件
让我们跟踪我们在第 10 章中运行的 MLlib 模型实验的可重复性。然后,当我们讨论模型部署时,我们将看到 MLflow 的其他组件如何发挥作用。要开始使用 MLflow,只需pip install mlflow
在本地主机上运行。
MLflow Tracking 是一个日志 API,它与实际进行训练的库和环境无关。它是围绕运行的概念组织的,运行是数据科学代码的执行。运行被聚合到实验中,因此许多运行可以成为给定实验的一部分。
MLflow 跟踪服务器可以托管许多实验。您可以使用笔记本、本地应用程序或云作业登录到跟踪服务器,如图 11-2所示。
图 11-2。MLflow 跟踪服务器
Parameters
代码的键/值输入——例如,像随机森林num_trees
或max_depth
在随机森林中的超参数
Metrics
数值(可以随时间更新)——例如,RMSE 或精度值
Artifacts
文件、数据和模型——例如matplotlib
图像或 Parquet 文件
Metadata
有关运行的信息,例如执行运行的源代码或代码的版本(例如,代码版本的 Git 提交哈希字符串)
Models
您训练的模型
默认情况下,跟踪服务器将所有内容记录到文件系统中,但您可以指定一个数据库以加快查询速度,例如参数和指标。让我们将 MLflow 跟踪添加到第 10 章中的随机森林代码中:
- # In Python
- from pyspark.ml import Pipeline
- from pyspark.ml.feature import StringIndexer, VectorAssembler
- from pyspark.ml.regression import RandomForestRegressor
- from pyspark.ml.evaluation import RegressionEvaluator
-
- filePath = """/databricks-datasets/learning-spark-v2/sf-airbnb/
- sf-airbnb-clean.parquet"""
- airbnbDF = spark.read.parquet(filePath)
- (trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed=42)
-
- categoricalCols = [field for (field, dataType) in trainDF.dtypes
- if dataType == "string"]
- indexOutputCols = [x + "Index" for x in categoricalCols]
- stringIndexer = StringIndexer(inputCols=categoricalCols,
- outputCols=indexOutputCols,
- handleInvalid="skip")
-
- numericCols = [field for (field, dataType) in trainDF.dtypes
- if ((dataType == "double") & (field != "price"))]
- assemblerInputs = indexOutputCols + numericCols
- vecAssembler = VectorAssembler(inputCols=assemblerInputs,
- outputCol="features")
-
- rf = RandomForestRegressor(labelCol="price", maxBins=40, maxDepth=5,
- numTrees=100, seed=42)
-
- pipeline = Pipeline(stages=[stringIndexer, vecAssembler, rf])
要开始使用 MLflow 进行日志记录,您需要使用mlflow.start_run()
. mlflow.end_run()
本章中的示例将使用with
子句在块的末尾自动结束运行,而不是显式调用with
:
- # In Python
- import mlflow
- import mlflow.spark
- import pandas as pd
-
- with mlflow.start_run(run_name="random-forest") as run:
- # Log params: num_trees and max_depth
- mlflow.log_param("num_trees", rf.getNumTrees())
- mlflow.log_param("max_depth", rf.getMaxDepth())
-
- # Log model
- pipelineModel = pipeline.fit(trainDF)
- mlflow.spark.log_model(pipelineModel, "model")
-
- # Log metrics: RMSE and R2
- predDF = pipelineModel.transform(testDF)
- regressionEvaluator = RegressionEvaluator(predictionCol="prediction",
- labelCol="price")
- rmse = regressionEvaluator.setMetricName("rmse").evaluate(predDF)
- r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
- mlflow.log_metrics({"rmse": rmse, "r2": r2})
-
- # Log artifact: feature importance scores
- rfModel = pipelineModel.stages[-1]
- pandasDF = (pd.DataFrame(list(zip(vecAssembler.getInputCols(),
- rfModel.featureImportances)),
- columns=["feature", "importance"])
- .sort_values(by="importance", ascending=False))
-
- # First write to local filesystem, then tell MLflow where to find that file
- pandasDF.to_csv("feature-importance.csv", index=False)
- mlflow.log_artifact("feature-importance.csv")
让我们检查一下 MLflow UI,您可以通过mlflow ui
在终端中运行并导航到http://localhost:5000/来访问它。图 11-3显示了 UI 的屏幕截图。
图 11-3。MLflow 用户界面
UI 存储给定实验的所有运行。您可以搜索所有运行,筛选符合特定条件的运行,并排比较运行等。如果您愿意,还可以将内容导出为 CSV 文件以在本地进行分析。在名为"random-forest"
. 您应该会看到如图 11-4 所示的屏幕。
图 11-4。随机森林运行
您会注意到它会跟踪用于此 MLflow 运行的源代码,并存储所有相应的参数、指标等。您可以在自由文本中添加有关此运行的注释以及标签。运行完成后,您无法修改参数或指标。
您还可以使用MlflowClient
REST API 或 REST API 查询跟踪服务器:
- # In Python
- from mlflow.tracking import MlflowClient
-
- client = MlflowClient()
- runs = client.search_runs(run.info.experiment_id,
- order_by=["attributes.start_time desc"],
- max_results=1)
-
- run_id = runs[0].info.run_id
- runs[0].data.metrics
这会产生以下输出:
{'r2':0.22794251914574226,'rmse':211.5096898777315}
我们将此代码作为MLflow 项目托管在本书的GitHubmax_depth
存储库中,因此您可以尝试使用和的不同超参数值运行它num_trees
。MLflow 项目中的 YAML 文件指定了库依赖项,因此此代码可以在其他环境中运行:
- # In Python
- mlflow.run(
- "https://github.com/databricks/LearningSparkV2/#mlflow-project-example",
- parameters={"max_depth": 5, "num_trees": 100})
-
- # Or on the command line
- mlflow run https://github.com/databricks/LearningSparkV2/#mlflow-project-example
- -P max_depth=5 -P num_trees=100
部署机器学习模型对于每个组织和用例来说都意味着不同的东西。业务限制将对延迟、吞吐量、成本等提出不同的要求,这决定了哪种模型部署模式适合手头的任务——批处理、流式传输、实时或移动/嵌入式。在移动/嵌入式系统上部署模型超出了本书的范围,因此我们将主要关注其他选项。表 11-1显示了用于生成预测的这三个部署选项的吞吐量和延迟权衡。我们关心并发请求的数量和这些请求的大小,最终的解决方案看起来会大不相同。
Throughput | Latency | 示例应用程序 | |
---|---|---|---|
Batch | 高的 | 高(几小时到几天) | 客户流失预测 |
Streaming | 中等的 | 中(秒到分钟) | 动态定价 |
Real-time | 低的 | 低(毫秒) | 在线广告竞价 |
批处理会定期生成预测并将结果写入持久存储以供其他地方使用。它通常是最便宜和最简单的部署选项,因为您只需要在计划运行期间支付计算费用。每个数据点的批处理效率要高得多,因为在所有预测中摊销时累积的开销更少。Spark 的情况尤其如此,因为驱动程序和执行程序之间来回通信的开销——你不会希望一次预测一个数据点!然而,它的主要缺点是延迟,因为它通常安排在几个小时或几天的时间内来生成下一批预测。
流式传输在吞吐量和延迟之间提供了很好的权衡。您将不断地对微批量数据进行预测,并在几秒钟到几分钟内得到您的预测。如果您使用的是结构化流式处理,几乎所有代码看起来都与批处理用例相同,因此可以轻松地在这两个选项之间来回切换。使用流式传输时,您必须为用于持续保持正常运行和运行的虚拟机或计算资源付费,并确保您已正确配置流以容错,并在传入数据出现峰值时提供缓冲。
实时部署优先考虑延迟而不是吞吐量,并在几毫秒内生成预测。您的基础架构将需要支持负载平衡,并且能够在需求激增的情况下扩展到许多并发请求(例如,对于假期前后的在线零售商)。有时当人们说“实时部署”时,他们的意思是实时提取预先计算的预测,但这里我们指的是生成实时模型预测。实时部署是 Spark 无法满足延迟要求的唯一选项,因此要使用它,您需要将模型导出到 Spark 之外。例如,如果您打算使用 REST 端点进行实时模型推理(例如,在 50 毫秒内计算预测),则 MLlib 不满足此应用程序所需的延迟要求,如图 11-5所示。您将需要从 Spark 中进行功能准备和建模,这可能既费时又困难。
图 11-5。MLlib 的部署选项
在开始建模过程之前,您需要定义模型部署要求。MLlib 和 Spark 只是您工具箱中的几个工具,您需要了解应该在何时何地应用它们。本节的其余部分将更深入地讨论 MLlib 的部署选项,然后我们将考虑 Spark 用于非 MLlib 模型的部署选项。
批量部署代表了部署机器学习模型的大多数用例,这可以说是最容易实现的选项。您将运行常规作业来生成预测,并将结果保存到表、数据库、数据湖等以供下游使用。事实上,你已经在第 10 章看到了如何使用 MLlib 生成批量预测。MLlibmodel.transform()
会将模型并行应用于 DataFrame 的所有分区:
- # In Python
- # Load saved model with MLflow
- import mlflow.spark
- pipelineModel = mlflow.spark.load_model(f"runs:/{run_id}/model")
-
- # Generate predictions
- inputDF = spark.read.parquet("/databricks-datasets/learning-spark-v2/
- sf-airbnb/sf-airbnb-clean.parquet")
-
- predDF = pipelineModel.transform(inputDF)
您将多久生成一次预测?
延迟和吞吐量之间存在权衡。您将获得更高的吞吐量,将许多预测批处理在一起,但是接收任何单个预测所需的时间会更长,从而延迟您对这些预测采取行动的能力。
您多久重新训练一次模型?
与 sklearn 或 TensorFlow 等库不同,MLlib 不支持在线更新或热启动。如果您想重新训练模型以包含最新数据,则必须从头开始重新训练整个模型,而不是利用现有参数。在重新训练的频率方面,有些人会设置一个固定的工作来重新训练模型(例如,每月一次),而另一些人会主动监控模型漂移以确定何时需要重新训练。
您将如何对模型进行版本化?
您可以使用MLflow 模型注册表来跟踪您正在使用的模型,并控制它们如何转换到/从暂存、生产和存档。您可以在图 11-6中看到模型注册表的屏幕截图。您也可以将模型注册表与其他部署选项一起使用。
图 11-6。MLflow 模型注册表
除了使用 MLflow UI 来管理您的模型之外,您还可以通过编程方式管理它们。例如,一旦您注册了生产模型,它就有一个一致的 URI,您可以使用它来检索最新版本:
- # Retrieve latest production model
- model_production_uri = f"models:/{model_name}/production"
- model_production = mlflow.spark.load_model(model_production_uri)
结构化流式处理无需等待每小时或每晚的工作来处理数据并生成预测,而是可以持续对传入数据执行推理。虽然这种方法比批处理解决方案成本更高,因为您必须不断地为计算时间付费(并获得较低的吞吐量),但您可以获得更频繁地生成预测的额外好处,以便您可以更快地对它们采取行动。流解决方案通常比批处理解决方案更难维护和监控,但它们提供更低的延迟。
使用 Spark,将批量预测转换为流预测非常容易,而且几乎所有代码都是相同的。唯一不同的是,当你读入数据时,你需要使用spark.readStream()
而不是spark.read()
改变数据的来源。在下面的示例中,我们将通过在 Parquet 文件目录中流式传输来模拟读取流式数据。你会注意到我们正在指定一个schema
即使我们正在使用 Parquet 文件。这是因为我们在处理流数据时需要先验地定义模式。在这个例子中,我们将使用在前一章的 Airbnb 数据集上训练的随机森林模型来执行这些流预测。我们将使用 MLflow 加载保存的模型。我们已将源文件划分为 100 个小型 Parquet 文件,因此您可以看到输出在每个触发时间间隔发生变化:
- # In Python
- # Load saved model with MLflow
- pipelineModel = mlflow.spark.load_model(f"runs:/{run_id}/model")
-
- # Set up simulated streaming data
- repartitionedPath = "/databricks-datasets/learning-spark-v2/sf-airbnb/
- sf-airbnb-clean-100p.parquet"
- schema = spark.read.parquet(repartitionedPath).schema
-
- streamingData = (spark
- .readStream
- .schema(schema) # Can set the schema this way
- .option("maxFilesPerTrigger", 1)
- .parquet(repartitionedPath))
-
- # Generate predictions
- streamPred = pipelineModel.transform(streamingData)
生成这些预测后,您可以将它们写出到任何目标位置以供以后检索(有关结构化流式处理技巧,请参阅第 8 章)。如您所见,批处理和流处理场景之间的代码几乎没有变化,这使得 MLlib 成为两者的绝佳解决方案。但是,根据您的任务的延迟需求,MLlib 可能不是最佳选择。使用 Spark,在生成查询计划以及在驱动程序和工作程序之间传达任务和结果时会产生大量开销。因此,如果您需要真正的低延迟预测,则需要将模型导出到 Spark 之外。
近实时
如果您的用例需要数百毫秒到几秒的预测,您可以构建一个使用 MLlib 生成预测的预测服务器。虽然这不是 Spark 的理想用例,因为您正在处理非常少量的数据,但与流式处理或批处理解决方案相比,您将获得更低的延迟。
有些领域需要实时推理,包括欺诈检测、广告推荐等。虽然使用少量记录进行预测可能会实现实时推理所需的低延迟,但您将需要应对负载平衡(处理许多并发请求)以及延迟关键任务中的地理位置。有一些流行的托管解决方案,例如AWS SageMaker和Azure ML,它们提供了低延迟的模型服务解决方案。在本节中,我们将向您展示如何导出您的 MLlib 模型,以便将它们部署到这些服务中。
将模型导出到 Spark 的一种方法是在 Python、C 等中本地重新实现模型。虽然提取模型的系数似乎很简单,但导出所有特征工程和预处理步骤以及它们(OneHotEncoder
、、VectorAssembler
等) .) 很快就会变得很麻烦并且很容易出错。有一些开源库,例如 MLeap 和ONNX,可以帮助您自动导出受支持的 MLlib 模型子集,以消除它们对 Spark 的依赖。然而,在撰写本文时,开发 MLeap 的公司不再支持它。MLeap 也不支持 Scala 2.12/Spark 3.0。
另一方面,ONNX(开放神经网络交换)已成为机器学习互操作性的事实上的开放标准。你们中的一些人可能还记得其他 ML 互操作性格式,例如 PMML(预测模型标记语言),但这些格式从未像现在的 ONNX 那样获得相同的牵引力。ONNX 作为一种允许开发人员在库和语言之间轻松切换的工具在深度学习社区中非常流行,并且在撰写本文时它对 MLlib 具有实验性支持。
除了导出 MLlib 模型外,还有其他与 Spark 集成的第三方库,便于在实时场景中部署,例如XGBoost和 H2O.ai 的Sparkling Water(其名称来源于 H2O 和 Spark 的组合) .
XGBoost 是Kaggle 结构化数据问题竞赛中最成功的算法之一,它是数据科学家中非常受欢迎的库。虽然 XGBoost 在技术上不是 MLlib 的一部分,但XGBoost4J-Spark 库允许您将分布式 XGBoost 集成到您的 MLlib 管道中。XGBoost 的一个好处是易于部署:训练 MLlib 管道后,您可以提取 XGBoost 模型并将其保存为非 Spark 模型以便在 Python 中提供服务,如下所示:
- // In Scala
- val xgboostModel =
- xgboostPipelineModel.stages.last.asInstanceOf[XGBoostRegressionModel]
- xgboostModel.nativeBooster.saveModel(nativeModelPath)
- # In Python
- import xgboost as xgb
- bst = xgb.Booster({'nthread': 4})
- bst.load_model("xgboost_native_model")
笔记
在撰写本文时,分布式 XGBoost API 仅在 Java/Scala 中可用。本书的GitHub 存储库中包含了一个完整的示例。
现在您已经了解了导出 MLlib 模型以用于实时服务环境的不同方法,让我们讨论如何将 Spark 用于非 MLlib 模型.
如前所述,MLlib 并不总是满足您的机器学习需求的最佳解决方案。它可能无法满足超低延迟推理要求或内置支持您想要使用的算法。对于这些情况,您仍然可以使用 Spark,但不能使用 MLlib。在本节中,我们将讨论如何使用 Spark 执行使用 Pandas UDF 的单节点模型的分布式推理、执行超参数调整和缩放特征工程。
虽然 MLlib 非常适合模型的分布式训练,但您不仅限于使用 MLlib 通过 Spark 进行批量或流式预测 - 您可以创建自定义函数来大规模应用预训练模型,称为用户定义函数(UDF,涵盖在第 5 章中)。一个常见的用例是在单台机器上构建 scikit-learn 或 TensorFlow 模型,可能是在数据的子集上,但使用 Spark 对整个数据集执行分布式推理。
如果您在 Python 中定义自己的 UDF 以将模型应用于 DataFrame 的每条记录,请选择pandas UDF以优化序列化和反序列化,如第 5 章所述。但是,如果您的模型非常大,那么 Pandas UDF 在同一 Python 工作进程中为每个批次重复加载相同模型的开销会很高。在 Spark 3.0 中,Pandas UDF 可以接受pandas.Series
or的迭代器,pandas.DataFrame
因此您可以只加载一次模型,而不是为迭代器中的每个系列加载它。有关带有 Pandas UDF 的 Apache Spark 3.0 中的新功能的更多详细信息,请参阅第 12 章。
笔记
如果工作人员在第一次加载模型权重后对其进行缓存,则后续调用相同 UDF 并加载相同模型的速度将显着加快。
在以下示例中,我们将使用mapInPandas()
Spark 3.0 中引入的 将scikit-learn
模型应用于我们的 Airbnb 数据集。mapInPandas()
将 的迭代器pandas.DataFrame
作为输入,并输出 的另一个迭代器pandas.DataFrame
。如果您的模型需要所有列作为输入,那么它很灵活且易于使用,但它需要对整个 DataFrame 进行序列化/反序列化(因为它被传递到其输入)。您可以pandas.DataFrame
使用spark.sql.execution.arrow.maxRecordsPerBatch
配置控制每个的大小。生成模型的完整代码副本可在本书的GitHub存储库中找到,但在这里我们将只专注于scikit-learn
从 MLflow 加载保存的模型并将其应用到我们的 Spark DataFrame:
- # In Python
- import mlflow.sklearn
- import pandas as pd
-
- def predict(iterator):
- model_path = f"runs:/{run_id}/random-forest-model"
- model = mlflow.sklearn.load_model(model_path) # Load model
- for features in iterator:
- yield pd.DataFrame(model.predict(features))
-
- df.mapInPandas(predict, "prediction double").show(3)
-
- +-----------------+
- | prediction|
- +-----------------+
- | 90.4355866254844|
- |255.3459534312323|
- | 499.625544914651|
- +-----------------+
除了使用 Pandas UDF 大规模应用模型外,您还可以使用它们来并行化构建多个模型的过程。例如,您可能希望为每种 IoT 设备类型构建一个模型来预测故障时间。您可以使用pyspark.sql.GroupedData.applyInPandas()
(在 Spark 3.0 中引入)来完成此任务。该函数接受一个pandas.DataFrame
并返回另一个pandas.DataFrame
。本书的 GitHub 存储库包含完整的代码示例,用于为每种 IoT 设备类型构建模型并使用 MLflow 跟踪单个模型;为简洁起见,此处仅包含一个片段:
- # In Python
- df.groupBy("device_id").applyInPandas(build_model, schema=trainReturnSchema)
这groupBy()
将导致您的数据集完全洗牌,您需要确保您的模型和每个组的数据可以放在一台机器上。你们中的一些人可能熟悉(pyspark.sql.GroupedData.apply()
例如,df.groupBy("device_id").apply(build_model
)),但该 API 将在未来的 Spark 版本中被弃用,而支持pyspark.sql.GroupedData.applyInPandas()
.
现在您已经了解了如何应用 UDF 来执行分布式推理和并行化模型构建,让我们看看如何使用 Spark 进行分布式超参数调优。
即使您不打算进行分布式推理或不需要 MLlib 的分布式训练功能,您仍然可以利用 Spark 进行分布式超参数调优。本节将特别介绍两个开源库:Joblib 和 Hyperopt。
根据其文档,Joblib是“一组在 Python 中提供轻量级流水线的工具”。它有一个 Spark 后端,用于在 Spark 集群上分发任务。Joblib 可用于超参数调整,因为它会自动将您的数据副本广播给所有工作人员,然后这些工作人员在其数据副本上创建具有不同超参数的自己的模型。这使您可以并行训练和评估多个模型。您仍然有一个基本限制,即单个模型和所有数据必须适合单个机器,但是您可以简单地并行化超参数搜索,如图 11-7所示。
图 11-7。分布式超参数搜索
要使用 Joblib,请通过pip install joblibspark
. 确保您使用的是scikit-learn
0.21 或更高版本以及pyspark
2.4.4 或更高版本。这里展示了如何进行分布式交叉验证的示例,同样的方法也适用于分布式超参数调整:
- # In Python
- from sklearn.utils import parallel_backend
- from sklearn.ensemble import RandomForestRegressor
- from sklearn.model_selection import train_test_split
- from sklearn.model_selection import GridSearchCV
- import pandas as pd
- from joblibspark import register_spark
-
- register_spark() # Register Spark backend
-
- df = pd.read_csv("/dbfs/databricks-datasets/learning-spark-v2/sf-airbnb/
- sf-airbnb-numeric.csv")
- X_train, X_test, y_train, y_test = train_test_split(df.drop(["price"], axis=1),
- df[["price"]].values.ravel(), random_state=42)
-
- rf = RandomForestRegressor(random_state=42)
- param_grid = {"max_depth": [2, 5, 10], "n_estimators": [20, 50, 100]}
- gscv = GridSearchCV(rf, param_grid, cv=3)
-
- with parallel_backend("spark", n_jobs=3):
- gscv.fit(X_train, y_train)
-
- print(gscv.cv_results_)
有关从交叉验证器返回的参数的说明,请参阅scikit-learn
GridSearchCV 文档。
Hyperopt是一个 Python 库,用于“对尴尬的搜索空间进行串行和并行优化,其中可能包括实值、离散和条件维度”。您可以通过pip install hyperopt
. 使用 Apache Spark 扩展 Hyperopt有两种主要方法:
将单机 Hyperopt 与分布式训练算法(例如 MLlib)一起使用
SparkTrials
将分布式Hyperopt与单机训练算法一起使用
对于前一种情况,与任何其他库相比,您无需配置任何特殊配置即可将 MLlib 与 Hyperopt 结合使用。那么,让我们看看后一种情况:具有单节点模型的分布式 Hyperopt。不幸的是,在撰写本文时,您无法将分布式超参数评估与分布式训练模型结合起来。用于并行化Keras模型的超参数搜索的完整代码示例可以在本书的GitHub 存储库中找到;这里只包含一个片段来说明 Hyperopt 的关键组件:
- # In Python
- import hyperopt
-
- best_hyperparameters = hyperopt.fmin(
- fn = training_function,
- space = search_space,
- algo = hyperopt.tpe.suggest,
- max_evals = 64,
- trials = hyperopt.SparkTrials(parallelism=4))
fmin()
生成新的超参数配置以供您使用training_function
并将它们传递给SparkTrials
. SparkTrials
在每个 Spark 执行器上并行运行这些训练任务的批次作为单任务 Spark 作业。当 Spark 任务完成后,它会将结果和相应的损失返回给驱动程序。Hyperopt 使用这些新结果为未来的任务计算更好的超参数配置。这允许大规模扩展超参数调整。MLflow 还与 Hyperopt集成,因此您可以跟踪作为超参数调整的一部分训练的所有模型的结果。
的一个重要参数SparkTrials
是parallelism
。这决定了同时评估的最大试验次数。如果parallelism=1
,那么您正在按顺序训练每个模型,但是通过充分利用自适应算法,您可能会得到更好的模型。如果您设置parallelism=max_evals
(要训练的模型总数),那么您只是在进行随机搜索。1
和之间的任何数字max_evals
都允许您在可扩展性和适应性之间进行权衡。默认情况下,parallelism
设置为 Spark 执行器的数量。您还可以指定 atimeout
来限制允许的最大秒数fmin()
。
即使 MLlib 不适合您的问题,希望您能看到在任何机器学习任务中使用 Spark 的价值。
考拉
Pandas是 Python 中非常流行的数据分析和操作库,但仅限于在单机上运行。Koalas是一个开源库,它在 Apache Spark 之上实现了 Pandas DataFrame API,简化了从 Pandas 到 Spark 的过渡。您可以使用 安装它pip install koalas
,然后只需将pd
代码中的任何 (Pandas) 逻辑替换为ks
(Koalas)。这样,您可以使用 Pandas 扩展您的分析,而无需在 PySpark 中完全重写您的代码库。以下是如何将 Pandas 代码更改为 Koalas 的示例(您需要已经安装 PySpark):
- # In pandas
- import pandas as pd
- pdf = pd.read_csv(csv_path, header=0, sep=";", quotechar='"')
- pdf["duration_new"] = pdf["duration"] + 100
- # In koalas
- import databricks.koalas as ks
- kdf = ks.read_csv(file_path, header=0, sep=";", quotechar='"')
- kdf["duration_new"] = kdf["duration"] + 100
虽然 Koalas 的目标是最终实现所有 Pandas 功能,但并非所有功能都已实现。如果您需要 Koalas 未提供的功能,您可以随时通过调用kdf.to_spark()
. 或者,您可以通过调用和使用 Pandas API 将数据带到驱动程序kdf.to_pandas()
(注意数据集不要太大,否则会导致驱动程序崩溃!).