• 分布式机器学习:逻辑回归的并行化实现(PySpark)


    1. 梯度计算式导出

    我们在博客《统计学习:逻辑回归与交叉熵损失(Pytorch实现)》中提到,设ww为权值(最后一维为偏置),样本总数为NN{(xi,yi)}Ni=1{(xi,yi)}Ni=1为训练样本集。样本维度为DDxiRD+1xiRD+1(最后一维扩充),yi{0,1}yi{0,1}。则逻辑回归的损失函数为:

    l(w)=Ni=1[yilogπw(xi)+(1yi)log(1πw(xi))]

    这里

    πw(x)=p(y=1x;w)=11+exp(wTx)

    写成这个形式就已经可以用诸如Pytorch这类工具来进行自动求导然后采用梯度下降法求解了。不过若需要用表达式直接计算出梯度,我们还需要将损失函数继续化简为:

    l(w)=Ni=1(yiwTxilog(1+exp(wTxi)))

    可将梯度表示如下:

    wl(w)=Ni=1(yi1exp(wTx)+1)xi

    2. 基于Spark的并行化实现

    逻辑回归的目标函数常采用梯度下降法求解,该算法的并行化可以采用如下的Map-Reduce架构:

    先将第t轮迭代的权重广播到各worker,各worker计算一个局部梯度(map过程),然后再将每个节点的梯度聚合(reduce过程),最终对参数进行更新。

    在Spark中每个task对应一个分区,决定了计算的并行度(分区的概念详间我们上一篇博客《Spark: 单词计数(Word Count)的MapReduce实现(Java/Python)》 )。在Spark的实现过程如下:

    • map阶段: 各task运行map()函数对每个样本(xi,yi)计算梯度gi, 然后对每个样本对应的梯度运行进行本地聚合,以减少后面的数据传输量。如第1个task执行reduce()操作得到˜g1=3i=1gi

    • reduce阶段:使用reduce()将所有task的计算结果收集到Driver端进行聚合,然后进行参数更新。

    在上图中,训练数据用points:PrallelCollectionRDD来表示,参数向量用w来表示,注意参数向量不是RDD,只是一个单独的参与运算的变量。

    此外需要注意一点,虽然每个task在本地进行了局部聚合,但如果task过多且每个task本地聚合后的结果(单个gradient)过大那么统一传递到Driver端仍然会造成单点的网络平均等问题。为了解决这个问题,Spark设计了性能更好的treeAggregate()操作,使用树形聚合方法来减少网络和计算延迟。

    3. PySpark实现代码

    PySpark的完整实现代码如下:

    from sklearn.datasets import load_breast_cancer
    import numpy as np
    from pyspark.sql import SparkSession
    from operator import add
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import accuracy_score
    
    n_slices = 3  # Number of Slices
    n_iterations = 300  # Number of iterations
    alpha = 0.01  # iteration step_size
    
    
    def logistic_f(x, w):
        return 1 / (np.exp(-x.dot(w)) + 1)
    
    
    def gradient(point: np.ndarray, w: np.ndarray) -> np.ndarray:
        """ Compute linear regression gradient for a matrix of data points
        """
        y = point[-1]    # point label
        x = point[:-1]   # point coordinate
        # For each point (x, y), compute gradient function, then sum these up
        return - (y - logistic_f(x, w)) * x
    
    
    if __name__ == "__main__":
    
        X, y = load_breast_cancer(return_X_y=True)
    
        D = X.shape[1]
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.3, random_state=0)
        n_train, n_test = X_train.shape[0], X_test.shape[0]
    
        spark = SparkSession\
            .builder\
            .appName("Logistic Regression")\
            .getOrCreate()
    
        matrix = np.concatenate(
            [X_train, np.ones((n_train, 1)), y_train.reshape(-1, 1)], axis=1)
    
        points = spark.sparkContext.parallelize(matrix, n_slices).cache()
    
        # Initialize w to a random value
        w = 2 * np.random.ranf(size=D + 1) - 1
        print("Initial w: " + str(w))
    
        for t in range(n_iterations):
            print("On iteration %d" % (t + 1))
            g = points.map(lambda point: gradient(point, w)).reduce(add)
            w -= alpha * g
    
            y_pred = logistic_f(np.concatenate(
                [X_test, np.ones((n_test, 1))], axis=1), w)
            pred_label = np.where(y_pred < 0.5, 0, 1)
            acc = accuracy_score(y_test, pred_label)
            print("iterations: %d, accuracy: %f" % (t, acc))
    
        print("Final w: %s " % w)
        print("Final acc: %f" % acc)
    
        spark.stop()
    

    注意spark.sparkContext.parallelize(matrix, n_slices)中的n_slices就是Spark中的分区数。我们在代码中采用breast cancer数据集进行训练和测试,该数据集是个二分类数据集。模型初始权重采用随机初始化。

    最后,我们来看一下算法的输出结果。

    初始权重如下:

    Initial w: [-0.0575882   0.79680833  0.96928013  0.98983501 -0.59487909 -0.23279241
     -0.34157571  0.93084048 -0.10126002  0.19124314  0.7163746  -0.49597826
     -0.50197367  0.81784642  0.96319482  0.06248513 -0.46138666  0.76500396
      0.30422518 -0.21588114 -0.90260279 -0.07102884 -0.98577817 -0.09454256
      0.07157487  0.9879555   0.36608845 -0.9740067   0.69620032 -0.97704433
     -0.30932467]
    

    最终的模型权重与在测试集上的准确率结果如下:

    Final w: [ 8.22414803e+02  1.48384087e+03  4.97062125e+03  4.47845441e+03
      7.71390166e+00  1.21510016e+00 -7.67338147e+00 -2.54147183e+00
      1.55496346e+01  6.52930570e+00  2.02480712e+00  1.09860082e+02
     -8.82480263e+00 -2.32991671e+03  1.61742379e+00  8.57741145e-01
      1.30270454e-01  1.16399854e+00  2.09101988e+00  5.30845885e-02
      8.28547658e+02  1.90597805e+03  4.93391021e+03 -4.69112527e+03
      1.10030574e+01  1.49957834e+00 -1.02290791e+01 -3.11020744e+00
      2.37012097e+01  5.97116694e+00  1.03680530e+02] 
    Final acc: 0.923977
    

    可见我们的算法收敛良好。

    参考


    __EOF__

  • 本文作者: 猎户座
  • 本文链接: https://www.cnblogs.com/orion-orion/p/16318810.html
  • 关于博主: 本科CS系蒟蒻,机器学习半吊子,并行计算混子。
  • 版权声明: 欢迎您对我的文章进行转载,但请务必保留原始出处哦(*^▽^*)。
  • 声援博主: 如果您觉得文章对您有帮助,可以点击文章右下角推荐一下。
  • 相关阅读:
    Python运维学习Day01-文件基本操作
    一个方法解决nSum 问题
    傅里叶系列 P1 的定价选项
    项目人力资源管理
    【手写数据库toadb】代码又更新了,增加了解析树,查询树,执行计划,向更多复杂SQL迈进了一步
    SpringMVC-HttpMessageConverter(请求体/响应体)/文件下载与上传)
    [附源码]计算机毕业设计JAVA人口老龄化社区服务与管理平台
    NLP入门——数据预处理:子词切分及应用
    十五. 实战——mysql建库建表 字符集 和 排序规则
    Verilog HDL经典电路设计
  • 原文地址:https://www.cnblogs.com/orion-orion/p/16318810.html