1、添加依赖jar包
在Maven仓库中找到符合你spark版本的jar包,
Xgboost4j , 这里我选择了1.6.0以上的版本
xgboost4j-spark , 同上面选定的版本
如果是scala,可以在pml文件中指定依赖,通过MAVEN库自动下载。这里我是手动下载jar包后使用的。
2、从github上找到能用的sparkxgb脚本
(1)推荐下载的sparkxgb代码
在spark 2.12版本之后,推荐下载使用的sparkxgb是 spark-xgboost 或者 xgboost-on-pyspark,
这两个的sparkxgb代码是一样的,都是2020年左右更新的。
(2)在开发代码中引用sparkxgb的模块
这里有两种方式,第一种是直接把zip包加载到hdfs上,添加到分布式的环境中去使用。
- os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars xgboost4j-spark-1.6.0.jar,xgboost4j-1.6.0.jar pyspark-shell'
-
- spark.sparkContext.addPyFile("sparkxgb.zip")
第二种把sparkxgb.zip文件解压后,修改下引用路径,添加到你自己的脚本中去用。
我使用的是这种方式,因为我需要写的工程脚本比较多,放在一起打包更方便。
3、代码使用示例
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # @Time: 2022/8/14 02:30 上午
- # @Author: gaoToby
-
- "训练Xgboost模型"
- import sys,os
- from sys import argv
- import datetime
- from pyspark.sql.functions import countDistinct
- from pyspark.ml.feature import RFormula
- from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
- from pyspark.ml.evaluation import BinaryClassificationEvaluator
- from src.cvr_model.util_cvr import *
-
- def spark_xgb_model(train_data, test_data, save_model_path):
- from src.sparkxgb.xgboost import XGBoostClassifier
-
- rf = RFormula(formula="label ~ .")
- rf_model = rf.fit(train_data)
- dataset = rf_model.transform(train_data)
-
- # Define and train model
- xgboost = XGBoostClassifier(featuresCol="features",
- labelCol="label",
- predictionCol="prediction",
- objective="binary:logistic",
- missing=0.0)
- xgboost_model = xgboost.fit(dataset)
- # Write model/classifier
- xgboost_model.write().overwrite().save(save_model_path)
-
- df1 = xgboost_model.transform(dataset)
- df1.cache()
- df1.groupBy("prediction","label").agg(
- countDistinct("user").alias("user_cnt")).show(200)
-
- evaluator = BinaryClassificationEvaluator(metricName ="areaUnderROC")\
- .setLabelCol("label").setRawPredictionCol("rawPrediction")
- print("XGB model train data AUC:", evaluator.evaluate(df1))
- print("XGB model test data AUC:", evaluator.evaluate(xgboost_model.transform(rf_model.transform(test_data))))
-
-
-
- if __name__ == '__main__':
- init_py = '__init__.py'
- script, calc_time = argv
-
- # os.environ[
- # 'PYSPARK_SUBMIT_ARGS'] = '--jars ../xgboost4j-spark_2.12-1.6.0.jar,../xgboost4j_2.12-1.6.0.jar pyspark_v1-shell'
-
- from pyspark.conf import SparkConf
- from pyspark.sql import SparkSession
-
- sparkConf = SparkConf()
- spark = SparkSession.builder.config(
- conf=sparkConf).enableHiveSupport().getOrCreate()
-
- # 获取样本
- featurePath = getFeaturesPath("train_data")
- trainData = spark.read.format("parquet").load(featurePath)
-
- #拆分训练/测试样本
- train_data, test_data = trainData.randomSplit([0.7, 0.3])
- train_data.cache()
- train_data.show()
-
- # 3-搭建训练模型、模型训练和保存
- xgb_model_path = getModelPath(model_version="2022-08-14",model_type="xgb")
- spark_xgb_model(train_data, test_data, xgb_model_path)
-
- trainData.unpersist()
4、Xgboost参数理解和调参
官方文档 :XGBoost Parameters — xgboost 2.0.0-dev documentation