尽管Hadoop框架是用java写的,但是Hadoop程序不限于java,可以用python、C++、ruby等。本例子中直接用python写一个MapReduce实例,而不是用Jython把python代码转化成jar文件。
例子的目的是统计输入文件的单词的词频。
使用python写MapReduce的“诀窍”是利用Hadoop流的API,通过STDIN(标准输入)、STDOUT(标准输出)在Map函数和Reduce函数之间传递数据。
我们唯一需要做的是利用Python的sys.stdin读取输入数据,并把我们的输出传送给sys.stdout。Hadoop流将会帮助我们处理别的任何事情。
1.1 Map阶段:mapper.py
在这里,我们假设把文件保存到hadoop-0.20.2/test/code/mapper.py
- #!/usr/bin/env python
- import sys
- for line in sys.stdin:
- line = line.strip()
- words = line.split()
- for word in words:
- print "%s\t%s" % (word, 1)
文件从STDIN读取文件。把单词切开,并把单词和词频输出STDOUT。Map脚本不会计算单词的总数,而是输出
为了是脚本可执行,增加mapper.py的可执行权限
chmod +x hadoop-0.20.2/test/code/mapper.py
1.2 Reduce阶段:reducer.py
在这里,我们假设把文件保存到hadoop-0.20.2/test/code/reducer.py
- #!/usr/bin/env python
- from operator import itemgetter
- import sys
-
- current_word = None
- current_count = 0
- word = None
-
- for line in sys.stdin:
- line = line.strip()
- word, count = line.split('\t', 1)
- try:
- count = int(count)
- except ValueError: #count如果不是数字的话,直接忽略掉
- continue
- if current_word == word:
- current_count += count
- else:
- if current_word:
- print "%s\t%s" % (current_word, current_count)
- current_count = count
- current_word = word
-
- if word == current_word: #不要忘记最后的输出
- print "%s\t%s" % (current_word, current_count)
文件会读取mapper.py 的结果作为reducer.py 的输入,并统计每个单词出现的总的次数,把最终的结果输出到STDOUT。
为了是脚本可执行,增加reducer.py的可执行权限
chmod +x hadoop-0.20.2/test/cod