受某咨询公司委托,拟根据泰坦尼克号乘客信息数据,来预测乘客在遇到海难时的幸存结果。经过分析,这是一个二元分类问题,因此决定使用逻辑回归算法进行预测分析。
本案例所使用的数据集,以CSV格式提供,有两个文件:train.csv和test.csv。其中train.csv文件包含label列。
数据集路径:/data/dataset/ml/titanic/
1、启动HDFS集群
在Linux终端窗口下,输入以下命令,启动HDFS集群:
1. $ start-dfs.sh
2、启动Spark集群
在Linux终端窗口下,输入以下命令,启动Spark集群:
1. $ cd /opt/spark
2. $ ./sbin/start-all.sh
3、启动zeppelin服务器
在Linux终端窗口下,输入以下命令,启动zeppelin服务器:
1. $ zeppelin-daemon.sh start
4、验证以上进程是否已启动
在Linux终端窗口下,输入以下命令,查看启动的服务进程:
1. $ jps
如果显示以下6个进程,则说明各项服务启动正常,可以继续下一阶段。
2288 NameNode
2402 DataNode
2603 SecondaryNameNode
2769 Master
2891 Worker
2984 ZeppelinServer
1、将本案例要用到的数据集上传到HDFS文件系统的/data/dataset/ml/目录下。在Linux终端窗口下,输入以下命令:
1. $ hdfs dfs -mkdir -p /data/dataset/ml
2. $ hdfs dfs -put /data/dataset/ml/titanic /data/dataset/ml/
2、在Linux终端窗口下,输入以下命令,查看HDFS上是否已经上传了该数据集:
1. $ hdfs dfs -ls /data/dataset/ml/
这时应该看到titanic文件夹及其中的数据集已经上传到了HDFS的/data/datset/ml/目录下。
1、新建一个zeppelin notebook文件,并命名为titanic_project。
2、读取数据源。在notebook单元格中,输入以下代码:
1. // 使用Logistic回归算法预测Titanic乘客的存活率
2. import org.apache.spark.ml.Pipeline
3. import org.apache.spark.ml.feature.{VectorAssembler,StringIndexer}
4. import org.apache.spark.ml.classification.LogisticRegression
5. import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
6.
7. // 加载数据
8. val file = "hdfs://localhost:9000/data/dataset/ml/titanic/train.csv"
9. val titanic_data = spark.read.option("header", "true").option("inferSchema","true").csv(file)
10.
11. println("乘客共有" + titanic_data.count() + "位")
12. titanic_data.show(5,false)
同时按下”【Shift+Enter】”键,执行以上代码,输出内容如下:

3、探索模式schema。在notebook单元格中,输入以下代码:
1. titanic_data.printSchema
同时按下”【Shift+Enter】”键,执行以上代码,输出内容如下:

4、选择特征列。所提供的数据本身包含许多特征,不过我们只使用age、gender和ticket_class这三个属性作为特征。在notebook单元格中,输入以下代码:
1. val titanic_data1 = titanic_data.select('Survived.as("label"), 'Pclass.as("ticket_class"),
2. 'Sex.as("gender"), 'Age.as("age")).filter('age.isNotNull)
同时按下”【Shift+Enter】”,执行以上代码。
5、将数据集拆分成训练集(80%)和测试集(20%)。在notebook单元格中,输入以下代码:
1. // 将数据分成训练集(80%)和测试集(20%)
2. val Array(training, test) = titanic_data1.randomSplit(Array(0.8, 0.2))
3.
4. println(s"training count: ${training.count}, test count: ${test.count}")
同时按下”【Shift+Enter】”,执行以上代码,输出内容如下:

6、组建管道。在notebook单元格中,输入以下代码:
1. // estimator(数据转换算法的estimator): 将gender字符串转换为数值
2. val genderIndxr = new StringIndexer().setInputCol("gender").setOutputCol("genderIdx")
3.
4. // transfomer: 将这些特征组合成一个特征向量
5. val assembler = new VectorAssembler().setInputCols(Array("ticket_class","genderIdx", "age"))
6. .setOutputCol("features")
7.
8. // estimator(机器学习算法的estimator)family值:"auto"、"binomial"、"multinomial"
9. val logisticRegression = new LogisticRegression().setFamily("binomial")
10.
11. // 设置三个阶段的管道
12. val pipeline = new Pipeline().setStages(Array(genderIndxr, assembler, logisticRegression))
同时按下”【Shift+Enter】”,执行以上代码。
7、拟合数据,训练模型。在notebook单元格中,输入以下代码:
1. val model = pipeline.fit(training)
同时按下”【Shift+Enter】”,执行以上代码。
8、使用训练出来的模型,对测试数据集进行预测。在notebook单元格中,输入以下代码:
1. // 执行预测
2. val predictions = model.transform(test)
3. predictions.show(5,false)
同时按下【Shift+Enter】,执行以上代码,输出内容如下:

由以上输出内容可以看出,最后一列”prediction”即为预测结果列。
9、执行模型性能的评估,默认的度量标准是ROC下面的面积。在notebook单元格中,输入以下代码:
1. val evaluator = new BinaryClassificationEvaluator()
2. evaluator.evaluate(predictions)
同时按下【Shift+Enter】,执行以上代码,输出内容如下:

注:每个人每次运行的结果可能不同,因为数据集是随机拆分的。
BinaryClassificationEvaluator产生的度量值为0.86,对于只使用了三个特征,这算是一个很好的性能了。
— END —