• nDCG笔记及在spark中的实现(更新中)


    0. 前言

    之前在排序项目中用到了nDCG,其离线指标对模型训练及上线具有一定的参考意义,在此做一个总结。因为计算是在集群的hive表进行的,所以加上了spark上的计算代码。

    1. 原理

    • 高度相关的文档在搜索引擎结果列表的前面显示时更有用。
    • 高度相关的文档比不相关的文档更有用,勉强相关的文件比不相关的文件更有用

    2. 步骤

    有以下例子:

    位置( i i i)预测值排序真实的相关性分( r e l i rel_i reli)折损值( l o g 2 ( i + 1 ) log_2(i+1) log2(i+1))折损后的相关性( r e l i / l o g 2 ( i + 1 ) rel_i/log_2(i+1) reli/log2(i+1))
    1D1313
    2D221.5851.262
    3D3321.5
    4D402.3220
    5D512.5850.387
    6D622.8070.712

    2.1 计算CG

    CGCumulative Gain)是结果列表中所有 items 的相关性得分的总和。 CG k \text{CG}_k CGk是前 k k k 个 items 的相关性得分的总和。

    CG k = ∑ i = 1 k r e l i \text{CG}_k=\sum_{i=1}^{k}rel_i CGk=i=1kreli

    比如上面表格中的 CG 1 = 3 \text{CG}_1=3 CG1=3, CG 2 = 5 \text{CG}_2=5 CG2=5, CG 3 = 8 \text{CG}_3=8 CG3=8

    缺点:CG 忽略了位次的重要性,比如 CG 3 = 8 \text{CG}_3=8 CG3=8的序列有{(3,3,2), (3,2,3), (2,3,3)},但最优的序列是将最大值都排在前面的序列,如(3,3,2)。

    2.2 计算DCG

    DCG(Discounted CG),折损累计收益。因为CG的缺点是不能区分位次,所以将位次作为折损,位次越靠后,折损越大,所以DCG的计算为:

    DCG k = ∑ i = 1 k r e l i l o g 2 ( i + 1 ) \text{DCG}_k=\sum_{i=1}^{k}{\frac {rel_i} {log_2(i+1)}} DCGk=i=1klog2(i+1)reli

    2.3 计算nDCG

    通常在排序中,nDCG(normalized DCG)是使用最多的。即对DCG进行归一化,归一化就是理想的排序结果,即相关性最大的排在前面,其DCG成为IDCG(Ideal DCG)。

    nDCG k = D C G I D C G \text{nDCG}_k={\frac {DCG} {IDCG}} nDCGk=IDCGDCG

    3. 本地代码实现

    3.1 自己编写代码

    def get_dcg(y_true, y_scores, k):
        '''
        @pred_arr: 预测顺序的相关度,ndarray,shape=(n,1)
        @gt_arr: 实际顺序相关度,ndarray,shape=(n,1)
        @k: 要计算的最大位置,int
        '''
        arr = [x[1] for x in sorted(zip(y_scores, y_true), reverse=True)[:k]]
        weights = np.power(np.log2(range(2,len(arr)+2)), -1)
        dcg = np.sum(arr*weights)
        return dcg
    
    def get_ndcg(y_true, y_scores, k):
        dcg = get_dcg(y_true, y_scores, k)
        idcg = get_dcg(sorted(y_true), np.arange(len(y_true)), k)
        return dcg/idcg
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    3.2 使用sklearn.metrics.ndcg_score

    使用方式可参考我的博文sklearn.metrics模块重要API的原理与应用总结 的“DGC和nDCG”一节。

    3.3 两种代码的速度比较

    随机生成不同数量的样本,查看两个函数的运行时长。

    m = 100  # 样本量
    y_true = np.random.randint(0,1000000,m).reshape(1,-1)
    y_scores = np.random.randint(0,1000000,m).reshape(1,-1)
    
    k = 10  # 前k个最高排名的ndcg
    ndcg1 = get_ndcg(y_true[0], y_scores[0], k=k)
    ndcg2 = ndcg_score(y_true, y_scores, k=k)
    
    # 在jupyter中执行
    """
    %%timeit
    get_ndcg(y_true[0], y_scores[0], k=k)
    
    %%timeit
    ndcg_score(y_true, y_scores, k=k)
    """
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    样本量(m)自己的代码sklearn
    1000022.7 ms5.61 ms
    500010.5 ms3.06 ms
    30005.99 ms1.93 ms
    20003.79 ms1.44 ms
    10001.81 ms0.95 ms
    500896 µs711 µs
    400717 µs639 µs
    350631 µs618 µs
    300547 µs588 µs
    250462 µs558 µs
    100212 µs458 µs

    可以看到,当样本数量较大时,使用sklearn提供的函数速度较。值得注意的是,自己写的代码还没有优化,与复杂度与数据量大致成线性增长,可以使用前k大个数算法进行优化,进一步缩短时长。

    3.4 两种代码之间的误差

    两种代码之间的误差主要来自于对相同y_scores的值不同的排序位次导致的,比如:

    y_scores = [0.9, 0.5, 0.6, 0.9, 0.9]
    y_true = [7, 4, 1, 0, 0]
    
    • 1
    • 2

    根据y_scores,y_true的排序方式及相应的ndcg为:

    相关性排序Value
    [7,0,0,1,4]0.896
    [0,7,0,1,4]0.638
    [0,0,7,1,4]0.547

    如果y_score保留小数点后6位的话,两个方法的误差将在 1 0 − 6 10^{-6} 106数量级。

    4. spark代码实现

    (1) import 所需要的库

    import os
    os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6.5"
    from pyspark.sql import SparkSession
    import pyspark.sql.functions as F
    spark=SparkSession.builder.appName("appname_202211301352").enableHiveSupport().getOrCreate()
    from pyspark.sql.window import Window
    import numpy as np
    import pandas as pd
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    (2) 创建spark DataFrame

    y_true = np.random.random(10000).reshape(1,-1)
    y_score = np.random.random(10000).reshape(1,-1)
    
    d1 = {"query":["三亚", "成都", "西安", "舟山"]*2500, 'target':y_true[0], 'score':y_score[0]}
    dfA = spark.createDataFrame(pd.DataFrame(d1))
    dfA.show(8)
    
    """
    输出:
    +-----+-------------------+-------------------+
    |query|             target|              score|
    +-----+-------------------+-------------------+
    |   三亚| 0.5639595701241265| 0.5871782502891383|
    |   成都| 0.3831924220224111|0.21391983705716255|
    |   西安| 0.8023402614068417| 0.2101881628533363|
    |   舟山| 0.8276250390471849| 0.1300697976081132|
    |   三亚| 0.8631646566534542|0.09625928182309162|
    |   成都| 0.6041016696702783|0.19275281212842987|
    |   西安| 0.8804418925939791| 0.6438387067088728|
    |   舟山|0.43794448416652887| 0.6674715678386118|
    +-----+-------------------+-------------------+
    """
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    (3) 计算nDCG

    def get_ndcg(dfA, k=10):
        """
        dfA: spark DataFrame, 需要包含target和score两个字段
        """
        # 分别求出每个query按target和score进行排序的位次
        window1 = Window.partitionBy("query").orderBy([F.col('target').desc()])
        window2 = Window.partitionBy("query").orderBy([F.col('score').desc()])
        dfB = dfA.withColumn("target_rank", F.row_number().over(window1)) \
                 .withColumn("score_rank", F.row_number().over(window2))
        # 每个query下的dcg和ndcg
        df_dcg = dfB.filter(F.col("score_rank")<=k).groupby("query").agg(F.sum(F.col("target")/F.log2(F.col("score_rank")+1)).alias("dcg"))
        df_idcg= dfB.filter(F.col("target_rank")<=k).groupby("query").agg(F.sum(F.col("target")/F.log2(F.col("target_rank")+1)).alias("idcg"))
        df_dcg = df_dcg.withColumnRenamed("query", "query1")
        cond=[df_dcg.query1==df_idcg.query]
        df_ndcg = df_dcg.join(df_idcg, on=cond, how='inner') \
                        .select("query", (F.col("dcg")/F.col("idcg")).alias("ndcg"))
        return df_ndcg
    
    df_ndcg = get_ndcg(dfA, k=10)
    df_ndcg.show()
    
    """
    输出:
    +-----+------------------+
    |query|              ndcg|
    +-----+------------------+
    |   三亚|0.6060123512248438|
    |   成都|0.4464340922149389|
    |   舟山|0.5963748392641535|
    |   西安|0.6608559450094559|
    +-----+------------------+
    
    """
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    完。
  • 相关阅读:
    docker容器中创建非root用户
    【更换yarn的位置】解决yarn和nodejs不在同一盘下产生的某些命令应用失败问题
    基于特征选择的二元蜻蜓算法(Matlab代码实现)
    为何每个开发者都在谈论Go?
    SpringSecurity入门
    std::packaged_task 源码分析
    docker-compose部署etcd集群
    软考网络工程师 第五章 第一节 移动通信与5G
    创建Django项目_first_Django
    Vue返回组件(返回不同页面及保留滚动位置)
  • 原文地址:https://blog.csdn.net/u012762410/article/details/127809339