• 极简的MapReduce实现


    目录

    1. MapReduce概述

    2. 极简MapReduce内存版

    3. 复杂MapReduce磁盘版

    4. MapReduce思想的总结


    1. MapReduce概述

           以前写过一篇 MapReduce思想 ,这次再深入一点,简单实现一把单机内存的。MapReduce就是把它理解成高阶函数,需要传入map和reduce所需的函数,即可对一个集合的键值对进行按需变换。

    我们按Python的map和reduce逻辑,接收这两个高阶函数所需的函数形态,而不是像Java那样写一个Mapper/Reducer,当然如果需要复杂一点map功能,可以通过闭包来实现。

    2. 极简MapReduce内存版

           如果是效仿Python,很容易想到实现MapReduce 就是map + 分组 + reduce。一头一尾的map和reduce都可以直接用py的内置函数,中间分组也容易用sort+groupby实现。因此首先约定map和reduce的自定义函数写法就是直接按内置函数要求来写,只不过map输出是键值对,要求用长度是2的tuple/list 来装key和value;reduce时候只对一组的values做自定义聚合,不能操作key。到这里很容易实现严格的MapReduce,你用起来大概就和Spark提供的mapreduce一样了。如果想直接做wordcount好像不太容易,因为一行文本得输出一个词表数组,因此可以宽松一点,实现类似flatMap的效果,也就是即使map只输出一个,也要放进列表里,由框架去展开。

    1. def wc_map(line):
    2. return [(x,1) for x in line.strip().split()]

    reduce就是严格的两个参数和一个返回值形式(其中y表示上一轮迭代的结果,x表示遍历数组每次拿到的值,这种严格的reduce函数完成特定功能并容易设计)

    1. def wc_reduce(y, x):
    2. return y+x

    剩下要做的事就是把map出来的数据做 拉平和排序分组,最后对每一组做reduce。

    1. from itertools import groupby
    2. from functools import reduce
    3. def map_reduce(f_map, f_reduce, seq):
    4. map_result = map(f_map, seq)
    5. flattened = sum(map_result, []) #拉平
    6. shuffled = sorted(flattened, key= lambda x:x[0]) #排序
    7. mr_result = [(key, reduce(f_reduce, (x[1] for x in data))) for key, data in groupby( shuffled, key=lambda x:x[0])] #groupby分组
    8. return mr_result

    3. 复杂MapReduce磁盘版

    如果内存不够,就必须不断把数据写到磁盘上,也就是过去大数据面试题最喜欢考察的点。那些题若是有单机版MapReduce,基本都可以直接做出来。大体结构如下图:

    其中partition只有一个,就不用单独设计了。过程也简化很多,也就是每当buffer满了,就排序写出到成一个数据块,最后将所有的数据块merge起来。merge过程写起来不是很容易,需要维持所有数据块的句柄数组有序:每次读取所有句柄中最小的那一条,随后数组需要重新按最小值排序,直到所有数据块读取完。

    另外就是因为中间数据需要落地,所以这一版的MapReduce框架直接从文件到文件。那么让写map和reduce函数直接对着文件输出,类似Java那样提供一个context的参数呢?我选择允许用yield来输出内容,这样相当于也宽松了输出多个键值对的要求。

    1. def wc_map(line):
    2. for t in line.strip().split():
    3. yield (t, 1)

    最后同样宽松reduce的写法,不用写那么复杂的两参数一返回,而是像Java那样,拿到一组key和可迭代的values,甚至可以这一组key输出多个结果。

    1. def reduce_func(key , kv_pairs):
    2. s = 0
    3. for k,v in kv_pairs :
    4. s += v
    5. yield (key, s)

    很显然我们要补充很多东西,要读取文件、要设计缓存、要做merge,因此设计了一个类,过程就不解释了

    1. import os
    2. import itertools
    3. import sys
    4. class MapReduce:
    5. class Buffer:
    6. def __init__(self, buffer_size = 80000):
    7. self.buffer_size = buffer_size
    8. self.buffer = []
    9. self.current_size = 0
    10. def add_data(self, kv) -> bool :
    11. self.current_size += len(str(kv))
    12. self.buffer.append(kv)
    13. return True if self.current_size > self.buffer_size else False
    14. def spill(self, file_path):
    15. if self.buffer:
    16. self.buffer.sort(key = lambda x:x[0])
    17. with open(file_path, mode='w') as f:
    18. for kv in self.buffer:
    19. f.write(str(kv) + '\n')
    20. self.buffer = []
    21. self.current_size = 0
    22. def __init__(self, buffer_size,temp_dir):
    23. self.buffer = self.Buffer(buffer_size)
    24. self.temp_dir = temp_dir
    25. self.block_id = 0
    26. def __fetch_kvs_from_map(self, map_f, infile):
    27. with open(infile, mode='r', encoding='utf-8') as f:
    28. for line in f:
    29. for k,v in map_f(line):
    30. yield (k,v)
    31. def __run_map(self, map_f, infile):
    32. block_files = [x for x in os.listdir(self.temp_dir) if x.startswith('block.') and x.split('.',1)[1].isdigit() ]
    33. for block in block_files:
    34. os.remove(f'{self.temp_dir}/{block}')
    35. for kv in self.__fetch_kvs_from_map(map_f, infile):
    36. if self.buffer.add_data(kv):
    37. self.buffer.spill(f'{self.temp_dir}/block.{self.block_id}')
    38. self.block_id += 1
    39. self.buffer.spill(f'{self.temp_dir}/block.{self.block_id}')
    40. def __merge_sort_from_files(self):
    41. block_files = [x for x in os.listdir(self.temp_dir) if x.startswith('block.') and x.split('.',1)[1].isdigit() ]
    42. block_files.sort(key = lambda filename: int(filename.split('.',1)[1]))
    43. ffs = [open(f'{self.temp_dir}/{block}', 'r') for block in block_files]
    44. first_kvs = [ eval(f.readline()) for f in ffs ]
    45. shuffled_files = [[fk, ff] for fk, ff in zip(first_kvs, ffs)]
    46. shuffled_files.sort(key = lambda x:x[0][0], reverse=True)
    47. with open(f'{self.temp_dir}/final_one.dat', mode='w') as fw:
    48. while len(shuffled_files ) > 1:
    49. first_keys = [x[0][0] for x in shuffled_files]
    50. min_key = first_keys[-1]
    51. min_idx_bound = first_keys.index(min_key)
    52. sffs = shuffled_files[min_idx_bound:]
    53. for sff in sffs:
    54. fw.write(str(sff[0]) + '\n')
    55. n = sff[1].readline()
    56. if n :
    57. sff[0] = eval(n)
    58. else:
    59. sff[0] = ''
    60. sff[1].close()
    61. shuffled_files = sorted(filter(lambda x:x[0], shuffled_files), key = lambda x:x[0][0], reverse=True)
    62. if shuffled_files:
    63. fw.write(str(shuffled_files[0][0]) + '\n') #already exist
    64. for line in shuffled_files[0][1]:
    65. fw.write(line)
    66. shuffled_files[0][1].close()
    67. def __run_reduce(self, reduce_f, outfile):
    68. def read_mapper_output(file):
    69. for line in file:
    70. yield eval(line.rstrip())
    71. with open(f'{self.temp_dir}/final_one.dat', encoding='utf-8') as f , \
    72. open(f'{self.temp_dir}/{outfile}', encoding='utf-8',mode='w') as fw:
    73. stdin_generator=read_mapper_output(f)
    74. for key, kv_pairs in itertools.groupby(stdin_generator, lambda x:x[0] ):
    75. for key,result in reduce_f(key, kv_pairs):
    76. fw.write(f"{key}\t{result}\n")
    77. def run_mrjob(self, map_f, reduce_f, infile, outfile):
    78. #====map
    79. self.__run_map(map_f, infile)
    80. #====shuffle===
    81. self.__merge_sort_from_files()
    82. #====reduce
    83. self.__run_reduce(reduce_f, outfile)
    84. def map_func(x):
    85. for t in x.strip().split():
    86. yield (t, 1)
    87. def reduce_func(key , kv_pairs):
    88. s = 0
    89. for k,v in kv_pairs :
    90. s += v
    91. yield (key, s)
    92. if __name__ == "__main__":
    93. mr = MapReduce(30, "./")
    94. mr.run_mrjob(map_func, reduce_func, "words.txt", "result.txt")

    代码明显复杂了很多,其中reduce读取有序文件依次分key的组,还借鉴了Hadoop streaming 中Python代码的处理方式。

    4. MapReduce思想的总结

           这个系列到这算完结了,为了讲解MapReduce我都是先讲解高阶函数map和reduce,深入思考以后发现这些个东西确实还有一些值得补充的地方。但应该不止于此,对于没有完全学过函数式编程的我来说就纯个人观点一下:

    写map有什么用?在你写了很多独立地循环处理逻辑以后, 你发现你可以把循环与处理分离了。反过来更多使用map会习惯这种分离的思维,甚至提升这种分离。 这种分离意味着一种共性的抽象。

    reduce被设计出来的意义何在? 不使用全局变量,而是部分变量传递,以完成全局功能的设计。全局功能线性拆分成N个独立的有部分依赖的功能总和。其本质也是一种分离或拆分思想。反过来使用reduce实现功能,习惯这种等价的整体拆分成部分的思维,强化这种拆分,意味着进行带依赖的共性的提炼

    最后概括一下MapReduce的学习意义:证明了对任务的工序拆分能有效实现并行加速。即对任务抽象拆分出同质(独立或依赖)的步骤,这些同质工作能并行/自动化。而工序拆分,靠的是前面高阶函数训练出来的思维。

  • 相关阅读:
    数据中台、BI业务访谈(四)—— 十个问题看本质
    AI Navigation导航系统_unity基础开发教程
    【C/C++】BMP格式32位转24位
    【Rust日报】2022-08-02 hypher 将30种自然语言塞到1.1M空间内
    JUC原子类: CAS, Unsafe、CAS缺点、ABA问题如何解决详解
    【机器学习基础】正则化
    动态链接函数(dlopen/dlsym/dlclose)使用总结
    微服务:高性能网关 ShenYu简介
    3. Visual Studio: Debug within k8s Cluster Using Bridge to Kubernetes
    Vue3 如何实现一个全局搜索框
  • 原文地址:https://blog.csdn.net/lgnlgn/article/details/133821116