Hadoop Streaming编程框架mrjob

众所周知,Hadoop有Java和Streaming两种方式来编写MapReduce任务。Java的优点是计算效率高,并且部署方便,直接打包成一个jar文件就行了;而Streaming的优点是开发灵活方便,尤其是Python在其中可谓如鱼得水,无需IDE任何编辑器都能书写,而且对于要求计算效率的地方,我们可以使用C/C++来获得更好的性能。所以,这里介绍一下mrjob这个Python框架。

mrjob是Yelp释出的一份用于辅助书写MapReduce任务的Python框架,当然实质上也就是在Hadoop Streaming的命令行上包了一层,有了统一的Python界面,而无需你再去直接调用Hadoop Streaming命令。此外,mrjob对AWS支持得非常好,或者说其实这个框架大半的代码都是在为AWS服务,只是我们也可以拿它来书写一般的Hadoop Streaming程序。由于mrjob只是wrap了一下Hadoop Streaming的命令行,由此带来的另外一个好处就是它只需要在你的客户机上部署即可使用,而无需部署到Hadoop集群的所有机器上去。

如果你有多个mapper或者你的任务有多个step,平常也许我们会把逻辑分割到多个python脚本里去,而利用mrjob可以直接在一个脚本里完成,比如,下面这个任务其实分了两步,第一步统计单词个数,第二步把所有单词个数x2:

from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']+")

class MRDoubleWordFreqCount(MRJob):
    """Word frequency count job with an extra step to double all the
    values"""

    def get_words(self, _, line):
        for word in WORD_RE.findall(line):
            yield word.lower(), 1

    def sum_words(self, word, counts):
        yield word, sum(counts)

    def double_counts(self, word, counts):
        yield word, counts * 2

    def steps(self):
        return [self.mr(mapper=self.get_words,
                        combiner=self.sum_words,
                        reducer=self.sum_words),
                self.mr(mapper=self.double_counts)]

if __name__ == '__main__':
    MRDoubleWordFreqCount.run()

此外,mrjob也自带了计数器功能,比如可以这样使用:

class MRCountingJob(MRJob):
    def mapper(self, _, value):
        self.increment_counter('group', 'counter_name', 1)
        yield _, value

这里的increment_counter()函数,当然功能类似于:

sys.stderr.write("reporter:counter:group,counter_name,1\n")

额外的好处是,我们利用mrjob本地调试的时候,可以正确的输出计数器。

mrjob默认从stdin输入,输出到stdout。假定上面统计单词个数的python文件名是 countword2.py,则可以在本地直接运行该python文件,比如:

$ ./countword2.py < input-file > output-file

你也可以指定输入文件,也可以用参数-o指定输出文件,比如:

$ ./countword2.py input-file1 input-file2 -o output-file

还可以用参数指定仅执行mapper,比如:

$ ./countword2.py --mapper < input-file

或者只执行第1个步骤的reducer,比如:

$ ./countword2.py --reducer --step-num=0

本地调试成功后,可以使用参数-r hadoop指定其在Hadoop集群上运行了,比如:

$ ./countword2.py -r hadoop --jobconf mapred.reduce.tasks=10 hdfs:///input-file -o hdfs:///output-file