• 总结一些 spark 处理小trick


    前言

    最近做了很多数据清洗以及摸底的工作,由于处理的数据很大,所以采用了spark进行辅助处理,期间遇到了很多问题,特此记录一下,供大家学习,。

    由于比较熟悉python, 所以笔者采用的是pyspark,所以下面给的demo都是基于pyspark,其实其他语言脚本一样,重在学习思想,具体实现改改对应的API即可。

    这里尽可能的把一些坑以及实现技巧以demo的形式直白的提供出来,顺序不分先后。有了这些demo,大家在实现自己各种各样需求尤其是一些有难度需求的时候,就可以参考了,当然了有时间笔者后续还会更新一些demo,感兴趣的同学可以关注下。

    trick

    首先说一个最基本思想:能map绝不reduce

    换句话说当在实现某一需求时,要尽可能得用map类的算子,这是相当快的。但是聚合类的算子通常来说是相对较慢,如果我们最后不得不用聚合类算子的时候,我们也要把这一步逻辑看看能不能尽可能的往后放,而把一些诸如过滤什么的逻辑往前放,这样最后的数据量就会越来越少,再进行聚合的时候就会快很多。如果反过来,那就得不偿失了,虽然最后实现的效果是一样的,但是时间差却是数量级的。

    • 常用API

    这里列一下我们最常用的算子

    rdd = rdd.filter(lambda x: fun(x))
    rdd = rdd.map(lambda x: fun(x))
    rdd = rdd.flatMap(lambda x: fun(x))
    rdd = rdd.reduceByKey(lambda a, b: a + b)
    • 1

    filter: 过滤,满足条件的返回True, 需要过滤的返回False。

    map: 每条样本做一些共同的操作。

    flatMap: 一条拆分成多条返回,具体的是list。

    reduceByKey: 根据key进行聚合。

    • 聚合

    一个最常见的场景就是需要对某一个字段进行聚合:假设现在我们有一份流水表,其每一行数据就是一个用户的一次点击行为,那现在我们想统计一下每个用户一共点击了多少次,更甚至我们想拿到每个用户点击过的所有item集合。伪代码如下:

    def get_key_value(x):
      user = x[0]
      item = x[1]
      return (user, [item])
    rdd = rdd.map(lambda x: get_key_value(x))
    rdd = rdd.reduceByKey(lambda a, b: a + b)
    • 1

    首先我们先通过get_key_value函数将每条数据转化成(key, value)的形式,然后通过reduceByKey聚合算子进行聚合,它就会把相同key的数据聚合在一起,说到这里,大家可能不觉得有什么?这算什么trick!其实笔者这里想展示的是get_key_value函数返回形式: [item]

    为了对比,这里笔者再列一下两者的区别:

    def get_key_value(x):
      user = x[0]
      item = x[1]
      return (user, [item])

    def get_key_value(x):
      user = x[0]
      item = x[1]
      return (user, item)
    • 1

    可以看到第一个的value是一个列表,而第二个就是单纯的item,我们看reduceByKey这里我们用的具体聚合形式是相加,列表相加就是得到一个更大的列表即:

    所以最后我们就拿到了:每个用户点击过的所有item集合,具体的是一个列表。

    • 抽样、分批

    在日常中我们需要抽样出一部分数据进行数据分析或者实验,甚至我们需要将数据等分成多少份,一份一份用(后面会说),这个时候怎么办呢?

    当然了spark也有类似sample这样的抽样算子

    那其实我们也可以实现,而且可以灵活控制等分等等且速度非常快,如下:

    def get_prefix(x, num):
        prefix = random.randint(1, num)
        return [x, num]

    def get_sample(x):
        prefix = x[1]
        if prefix == 1:
            return True
        else:
            return False
    rdd = rdd.map(lambda x: get_prefix(x, num))
    rdd = rdd.filter(lambda x: get_sample(x))
    • 1

    假设我们需要抽取1/10的数据出来,总的思路就是先给每个样本打上一个[1,10]的随机数,然后只过滤出打上1的数据即可。

    以此类推,我们还可以得到3/10的数据出来,那就是在过滤的时候,取出打上[1,2,3]的即可,当然了[4,5,6]也行,只要取三个就行。

    • 笛卡尔积

    有的时候需要在两个集合之间做笛卡尔积,假设这两个集合是A和B即两个rdd。

    首先spark已经提供了对应的API即cartesian,具体如下:

    rdd_cartesian = rdd_A.cartesian(rdd_B)
    • 1

    其更具体的用法和返回形式大家可以找找相关博客,很多,笔者这里不再累述。

    但是其速度非常慢

    尤其当rdd_A和rdd_B比较大的时候,这个时候怎么办呢?

    这个时候我们可以借助广播机制,其实已经有人也用了这个trick:

    http://xiaoyu.world/spark/spark-cartesian/

    首先说一下spark中的广播机制,假设一个变量被申请为了广播机制,那么其实是缓存了一个只读的变量在每台机器上,假设当前rdd_A比较小,rdd_B比较大,那么我可以把rdd_A转化为广播变量,然后用这个广播变量和每个rdd_B中的每个元素都去做一个操作,进而实现笛卡尔积的效果,好了,笔者给一下pyspark的实现:

    def ops(A, B):
        pass
        
    def fun(A_list, B):
        result = []
        for cur_A in A_list:
            result.append(cur_A + B)
        return result
            
    rdd_A = sc.broadcast(rdd_A.collect())
    rdd_cartesian = rdd_B.flatMap(lambda x: fun(rdd_A.value, x))
    • 1

    可以看到我们先把rdd_A转化为广播变量,然后通过flatMap,将rdd_A和所有rdd_B中的单个元素进行操作,具体是什么操作大家可以在ops函数中自己定义自己的逻辑。

    关于spark的广播机制更多讲解,大家也可以找找文档,很多的,比如:

    https://www.cnblogs.com/Lee-yl/p/9777857.html

    但目前为止,其实还没有真真结束,从上面我们可以看到,rdd_A被转化为了广播变量,但是其有一个重要的前提:那就是rdd_A比较小。但是当rdd_A比较大的时候,我们在转化的过程中,就会报内存错误,当然了可以通过增加配置:

    spark.driver.maxResultSize=10g
    • 1

    但是如果rdd_A还是极其大呢?换句话说rdd_A和rdd_B都是非常大的,哪一个做广播变量都是不合适的,怎么办呢?

    其实我们一部分一部分的做。假设我们把rdd_A拆分成10份,这样的话,每一份的量级就降下来了,然后把每一份转化为广播变量且都去和rdd_B做笛卡尔积,最后再汇总一下就可以啦。

    有了想法,那么怎么实现呢?

    分批大家都会了,如上。但是这里面会有另外一个问题,那就是这个广播变量名会被重复利用,在进行下一批广播变量的时候,需要先销毁,再创建,demo如下:

    def ops(A, B):
        pass
        
    def fun(A_list, B):
        result = []
        for cur_A in A_list:
            result.append(cur_A + B)
        return result
        
    def get_rdd_cartesian(rdd_A, rdd_B):   
        rdd_cartesian = rdd_B.flatMap(lambda x: fun(rdd_A.value, x))
        return rdd_cartesian
        
    for i in range(len(rdd_A_batch))
        qb_rdd_temp = rdd_A_batch[i]
        qb_rdd_temp = sc.broadcast(qb_rdd_temp.collect())
        rdd_cartesian_batch = get_rdd_cartesian(qb_rdd_temp, rdd_B)
        dw.saveToTable(rdd_cartesian_batch, tdw_table, "p_" + ds, overwrite=False)
        qb_rdd_temp.unpersist()
        
    • 1

    可以看到,最主要的就是unpersist()

    • 广播变量应用之向量索引

    说到广播机制,这里就再介绍一个稍微复杂的demo,乘热打铁。

    做算法的同学,可能经常会遇到向量索引这一场景:即每一个item被表征成一个embedding,然后两个item的相似度便可以基于embedding的余弦相似度进行量化。向量索引是指假设来了一个query,候选池子里面假设有几百万的doc,最终目的就是要从候选池子中挑选出与query最相似的n个topk个doc。

    关于做大规模数量级的索引已经有很多现成好的API可以用,最常见的包比如有faiss。如果还不熟悉faiss的同学,可以先简单搜一下其基本用法,看看demo,很简单。

    好啦,假设现在query的量级是10w,doc的量级是100w,面对这么大的量级,我们当然是想通过spark来并行处理,加快计算流程。那么该怎么做呢?

    这时我们便可以使用spark的广播机制进行处理啦,而且很显然doc应该是广播变量,因为每一个query都要和全部的doc做计算。

    废话不多说,直接看实现

    首先建立doc索引:

    # 获取index embedding,并collect,方便后续建立索引
    index_embedding_list = index_embedding_rdd.collect()
    all_ids = np.array([row[1] for row in index_embedding_list], np.str)
    all_vectors = np.array([str_to_vec(row[2]) for row in index_embedding_list], np.float32)
    del(index_embedding_list)
    #faiss.normalize_L2(all_vectors)
    print(all_ids[:2])
    print(all_vectors[:2])
    print("all id size: {}, all vec shape: {}".format(len(all_ids), all_vectors.shape))

    # 建立index索引,并转化为广播变量
    faiss_index = FaissIndex(all_ids, all_vectors, self.args.fast_mode, self.args.nlist, self.args.nprobe)
    del(all_vectors)
    del(all_ids)
    print("broadcast start")
    bc_faiss_index = self.sc.broadcast(faiss_index)
    print("broadcast done")
    • 1

    这里的index_embedding_rdd就是doc的embedding,可以看到先要collect,然后建立索引。

    建立完索引后,就可以开始计算了,但是这里会有一个问题就是query的量级也是比较大的,如果一起计算可能会OM,所以我们分批次进行即batch:

    # 开始检索
    # https://blog.csdn.net/wx1528159409/article/details/125879542
    query_embedding_rdd = query_embedding_rdd.repartition(300)
    top_n = 5
    batch_size = 1000
    query_sim_rdd = query_embedding_rdd.mapPartitions(
                  lambda iters: batch_get_nearest_ids(
                    iters, bc_faiss_index, top_n, batch_size
                    )
    )
    • 1

    假设query_embedding_rdd是全部query的embedding,为了实现batch,我们先将query_embedding_rdd进行分区repartition,然后每个batch进行,可以看到核心就是batch_get_nearest_ids这个函数:

    def batch_get_nearest_ids(iters, bc_faiss_index, top_n, batch_size):
        import mkl
        mkl.get_max_threads()
        res = list()
        rows = list()
        for it in iters:
            rows.append(it)
            if len(rows) >= batch_size:
                batch_res = __batch_get_nearest_ids(rows, bc_faiss_index, top_n)
                res.extend(batch_res)
                rows = list()
        if rows:
            batch_res = __batch_get_nearest_ids(rows, bc_faiss_index, top_n)
            res.extend(batch_res)
        return res
    • 1

    从这里可以清楚的看到就是组batch,组够一个batch后就可以给当前这个batch内的query进行计算最相似的候选啦即__batch_get_nearest_ids这个核心函数:

    def __batch_get_nearest_ids(rows, bc_faiss_index, top_n):
        import mkl
        mkl.get_max_threads()
        import faiss
        embs = [str_to_vec(row[3]) for row in rows]
        vec = np.array(embs, np.float32)
        #faiss.normalize_L2(vec)
        similarities, dst_ids = bc_faiss_index.value.batch_search(vec, top_n)
        batch_res = list()
        for i in range(len(rows)):
            batch_res.append([str("\\t".join([rows[i][1], rows[i][2]])), "$$$".join(["\\t".join(dst.split("\\t")+[str(round(sim, 2))]) for dst, sim in zip(dst_ids[i], similarities[i])])])
        return batch_res
    • 1

    这里就是真真的调用faiss的索引API进行召回啦,当然了batch_res这个就是结果,自己可以想怎么定义都行,笔者这里不仅返回了召回的item,还返回了query自身的一些信息。

    • 注意点

    在map的时候,不论是self的类成员还是类方法都要放到外面,不要放到类里面,不然会报错

    总结

    总之,在用spark做任何需求之前,一定要牢记能map就map,尽量不要聚合算子,实在不行就尽可能放到最后。

    关注

    欢迎关注,下期再见啦~

    知乎csdngithub微信公众号

    本文由 mdnice 多平台发布

  • 相关阅读:
    uniapp 拉起微信客服功能
    最小编辑距离-动态规划
    【Python】基础(学习笔记)
    如何在外网访问内网服务器数据库
    gerapy下载和安装以及部署全流程
    集线器和交换机
    MySQL开发规范小节
    深度学习理论(李宏毅
    为什么鸿蒙调用弹窗组件(CommonDialog )却不展示或闪退?
    动态内存管理(2)
  • 原文地址:https://blog.csdn.net/weixin_42001089/article/details/127933069