• 使用PySpark计算AUC,KS与PSI


    当特征数量或者模型数量很多的时候,使用PySpark去计算相关风控指标会节省很多的时间。网上关于使用PySpark计算相关风控指标的资料较少,尤其是PSI计算不管是国内还是国外相关的代码都没有正确的,这里抛砖引玉,写了三个风控常用的指标AUC,KS和PSI相关的计算方法,供参考。

    AUC

    AUC的相关概念网上已经有很多的很好的文章,这里不在赘述,AUC使用的到的计算公式如下:

    \[AUC=\frac{\sum_{i\in positiveClass}rank_i-{\displaystyle\frac{M(1+M)}2}}{M\times N} \]

    其中M为负类样本的数目,N为正类样本的数目

    使用PySpark计算代码如下:

    Copy
    from pyspark.sql import functions as F from pyspark.sql.window import Window true_y_col = 'y' pred_y_col = 'pred_y' date_col = 'day' auc_df = df.filter(F.col(true_y_col)>=0).filter(F.col(pred_y_col)>=0)\ .select(true_y_col, pred_y_col, date_col, 'model_name')\ .withColumn('totalbad', F.sum(F.col(true_y_col)).over(Window.patitonBy(date_col, 'model_name').orderBy(F.lit(1))))\ .withColumn('totalgood', F.sum(1-F.col(true_y_col)).over(Window.patitonBy(date_col, 'model_name').orderBy(F.lit(1))))\ .withColumn('rnk2', F.row_number().over(Window.partitionBy(date_col, 'model_name').orderBy(F.col(pred_y_col).asc())))\ .filter(F.col(true_y_col)==1)\ .groupBy(date_col, 'model_name')\ .agg(((F.sum(F.col('rnk2'))-0.5*(F.max(F.col('totalbad')))*(1+F.max(F.col('totalbad'))))/(F.max(F.col('totalbad'))*F.max(F.col('totalgood')))).alias('AUC'))\ .orderBy('model_name', date_col)

    KS

    KS统计量是基于经验累积分布函数(Empirical Cumulative Distribution Function,ECDF)
    建立的,一般定义为:

    \[KS=\max\left\{\left|cum\left(bad\_rate\right)-cum\left(good\_rate\right)\right|\right\} \]

    即为TPRFPR差值绝对值的最大值。

    \[KS=max\left(\left|TPR-FPR\right|\right) \]

    KS计算方法有很多种,这里使用的是分箱法分别计算TPRFPR,然后得到KS。
    使用PySpark计算代码如下:

    Copy
    from pyspark.sql import functions as F from pyspark.sql.window import Window true_y_col = 'y' pred_y_col = 'pred_y' date_col = 'day' nBins = 10 ks_df = df.filter(F.col(true_y_col)>=0).filter(F.col(pred_y_col)>=0)\ .select(true_y_col, pred_y_col, date_col, 'model_name')\ .withColumn('Bin', F.ntile(nBins).over(Window.partitionBy(date_col, 'model_name').orderBy(pred_y_col)))\ .groupBy(date_col, 'model_name', 'Bin').agg(F.sum(true_y_col).alias('N_1'), F.sum(1-F.col(true_y_col)).alias('N-0'))\ .withColumn('ALL_1', F.sum('N_1').over(Window.partitionBy(date_col, 'model_name')))\ .withColumn('ALL_0', F.sum('N_0').over(Window.partitionBy(date_col, 'model_name')))\ .withColumn('SUM_1', F.sum('N_1').over(Window.partitionBy(date_col, 'model_name').orderBy('Bin')))\ .withColumn('ALL_0', F.sum('N_0').over(Window.partitionBy(date_col, 'model_name').orderBy('Bin')))\ .withColumn('KSn', F.expr('round(abs(SUM_1/ALL_1-SUM_0/ALL_0),6)'))\ .withColumn('KS', F.round(F.max('KSn').over(Window.partitionBy(date_col, 'model_name')),6)) ks_df = ks_df.select(date_col, 'model_name', 'KS').filter(col('KS').isNotNull()).dropDuplicates()

    PSI

    群体稳定性指标(Population Stability Index,PSI)是风控场景常用的验证样本在各分数段的分布与建模样本分布的稳定性。在建模中,常用来筛选特征变量、评估模型稳定性

    计算公式如下:

    \[psi=\sum_{i=1}^n\left(A_i-E_i\right)\ast\ln\left(A_i/E_i\right) \]

    其中\(A_i\)代表的是第i个分箱中实际分布(actual)样本占比,同理\(E_i\)代表的是第i个分箱中预期分布(excepted)样本占比

    使用PySpark计算代码如下:

    Copy
    from pyspark.sql import functions as F from pyspark.sql.window import Window from pyspark.sql.functions import when date_col = 'day' nBins = 10 feature_list = ['fea_1', 'fea_2', 'fea_3'] df = df.withColumn('flag', when(F.col(date_col) == 'actual_date', 0).when(F.col(date_col) == 'excepted_date', 1).otherwise(None) quantitles = df.filter(F.col('flag') == 0)\ .approxQuantile(feature_list, [i/nBins for i in range(1, nBins)], 0.001) # 基准样本分箱 quantitles_dict = {col: quantitles[idx] for idx, col in enumerate(feature_list)} f_quantitles_dict = F.create_map([F.lit(x) if isinstance(x, str) else F.array(*[F.lit(xx) for xx in x]) for i in quantitles_dict.items() for x in i]) unpivotExpr = "stack(3, 'fea_1', fea_1, 'fea_2', fea_2, 'fea_3', fea_3)" psi_df = df.filter(F.col('flag').isNotNull()).select('flag', F.expr(unpivotExpr))\ .withColumn('Bin', when(F.col('value').isNull(), 'Missing').otherwise( when(F.col('value') < f_quantitles_dict[F.col('varname')][0], 'bin_0') .when(F.col('value') < f_quantitles_dict[F.col('varname')][1], 'bin_1') .when(F.col('value') < f_quantitles_dict[F.col('varname')][2], 'bin_2') .when(F.col('value') < f_quantitles_dict[F.col('varname')][3], 'bin_3') .when(F.col('value') < f_quantitles_dict[F.col('varname')][4], 'bin_4') .when(F.col('value') < f_quantitles_dict[F.col('varname')][5], 'bin_5') .when(F.col('value') < f_quantitles_dict[F.col('varname')][6], 'bin_6') .when(F.col('value') < f_quantitles_dict[F.col('varname')][7], 'bin_7') .when(F.col('value') < f_quantitles_dict[F.col('varname')][8], 'bin_8') .when(F.col('value') < f_quantitles_dict[F.col('varname')][8], 'bin_9')))\ .groupBy('varname', 'Bin').agg(F.sum('flag').alias('N_1'), F.sum(1-F.col('flag')).alias('N_0'))\ .withColumn('ALL_1', F.sum('N_1').over(Window.partitionBy('varname')))\ .withColumn('ALL_0', F.sum('N_0').over(Window.partitionBy('varname')))\ .withColumn('actual', F.expr('round(N_0/ALL_0, 6)'))\ .withColumn('excepted', F.expr('round(N_1/ALL_1, 6)'))\ .withColumn('PSIn', F.expr('round((actual-excepted)*ln(actual/excepted), 6'))\ .withColumn('PSI', F.round(F.sum('PSIn').over(Window.partitionBy('varname')), 6))

    Reference

  • 相关阅读:
    基于c#上位机制作(WPF控件)
    (四)Spring Security Oauth2.0 源码分析--客户端端鉴权(token校验)
    吊打面试官系列之:掌握了这166个Linux常用命令,面试官果然被我征服了。。
    在QGIS中加载显示3DTiles数据
    进程,线程,并发相关入门
    【Linux常用命令】
    MAUI Blazor (Windows) App 动态设置窗口标题
    Mybatis参数传递方式
    navigation2 编写规划器(planner)插件
    如何使用 Terraform 和 Git 分支有效管理多环境?
  • 原文地址:https://www.cnblogs.com/harrylyx/p/17644162.html