首页 » 机器学习实战 » 机器学习实战全文在线阅读

《机器学习实战》15.5 在Python中使用mrjob来自动化MapReduce

关灯直达底部

上面列举的算法大多是迭代的。也就是说,它们不能用一次MapReduce作业来完成,而通常需要多步。在15.3节中,Amazon的EMR上运行MapReduce作业只是一个简例。如果想在大数据集上运行AdaBoost算法该怎么办呢?如果想运行10个MapReduce作业呢?

有一些框架可以将MapReduce作业流自动化,例如Cascading和Oozie,但它们不支持在Amazon的EMR上执行。Pig可以在EMR上执行,也可以使用Python脚本,但需要额外学习一种脚本语言。(Pig是一个Apache项目,为文本处理提供高级编程语言,可以将文本处理命令转换成Hadoop的MapReduce作业。)还有一些工具可以在Python中运行MapReduce作业,如本书将要介绍的mrjob。

mrjob1 (http://packages.python.org/mrjob/)之前是Yelp(一个餐厅点评网站)的内部框架,它在2010年底实现了开源。读者可以参考附录A来学习如何安装和使用。本书将介绍如何使用mrjob重写之前的全局均值和方差计算的代码,相信读者能体会到mrjob的方便快捷。(需要指出的是,mrjob是一个很好的学习工具,但仍然使用Python语言编写,如果想获得更好的性能,应该使用Java。)

1.mrjob文档: http://packages.python.org/mrjob/index.html; 源代码: https://github.com/Yelp/mrjob.

15.5.1 mrjob与EMR的无缝集成

与15.3节介绍的一样,本节将使用mrjob在EMR上运行Hadoop流,区别在于mrjob不需要上传数据到S3,也不需要担心命令输入是否正确,所有这些都由mrjob后台完成。有了mrjob,读者还可以在自己的Hadoop集群上运行MapReduce作业,当然也可以在单机上进行测试。作业在单机执行和在EMR执行之间的切换十分方便。例如,将一个作业在单机执行,可以输入以下命令:

 % python mrMean.py < inputFile.txt > myOut.txt    

如果要在EMR上运行同样的任务,可以执行以下命令:

 % python mrMean.py -r emr < inputFile.txt > myOut.txt    

在15.3节中,所有的上传以及表单填写全由mrjob自动完成。读者还可以添加一条在本地的Hadoop集群上执行作业的命令1,也可以添加一些命令行参数来指定本作业在EMR上的服务器类型和数目。

1. 意指除了上面两条命令之外,再加一条。——译者注

另外,15.3节中的mapper和reducer分别存于两个不同的文件中,而mrjob中的mapper和reducer可以写在同一个脚本中。下节将展示该脚本的内容,并分析其工作原理。

15.5.2 mrjob的一个MapReduce脚本剖析

用mrjob可以做很多事情,本书仍从最典型的MapReduce作业开始介绍。为了方便阐述,继续沿用前面的例子,计算数据集的均值和方差。这样读者可以更专注于框架的实现细节,所以程序清单15-3的代码与程序清单15-1和15-2的功能一致。打开文本编辑器,创建一个新文件mrMean.py,并加入下面程序清单的代码。

程序清单15-3 分布式均值方差计算的mrjob实现

from mrjob.job import MRJobclass MRmean(MRJob):    def __init__(self, *args, **kwargs):        super(MRmean, self).__init__(*args, **kwargs)        self.inCount = 0        self.inSum = 0        self.inSqSum = 0# 接收输入数据流def map(self, key, val):    if False: yield    inVal = float(val)    self.inCount += 1    self.inSum += inVal    self.inSqSum += inVal*inVal #所有输入到达后开始处理def map_final(self):    mn = self.inSum/self.inCount    mnSq = self.inSqSum/self.inCount    yield (1, [self.inCount, mn, mnSq])def reduce(self, key, packedValues):    cumVal=0.0; cumSumSq=0.0; cumN=0.0    for valArr in packedValues:        nj = float(valArr[0])        cumN += nj        cumVal += nj*float(valArr[1])        cumSumSq += nj*float(valArr[2])    mean = cumVal/cumN    var = (cumSumSq - 2*mean*cumVal + cumN*mean*mean)/cumN    yield (mean, var)def steps(self):    return ([self.mr(mapper=self.map, reducer=self.reduce,mapper_final=self.map_final)])if __name__ == /'__main__/':    MRmean.run  

该代码分布式地计算了均值和方差。输入文本分发给很多mappers来计算中间值,这些中间值再通过reducer进行累加,从而计算出全局的均值和方差。

为了使用mrjob库,需要创建一个新的MRjob继承类,在本例中该类的类名为MRmean。代码中的mapper和reducer都是该类的方法,此外还有一个叫做steps的方法定义了执行的步骤。执行顺序不必完全遵从于map-reduce的模式,也可以是map-reduce-reduce-reduce,或者map-reduce-map-reduce-map-reduce(下节会给出相关例子)。在steps方法里,需要为mrjob指定mapper和reducer的名称。如果未给出,它将默认调用mapperreducer方法。

首先来看一下mapper的行为:它类似于for循环,在每行输入上执行同样的步骤。如果想在收到所有的输入之后进行某些处理,可以考虑放在mapper_final中实现。这乍看起来有些古怪,但非常实用。另外在mappermapper_final中还可以共享状态。所以在上述例子中,首先在mapper中对输入值进行积累,所有值收集完毕后计算出均值和平方均值,最后把这些值作为中间值通过yield语句传出去。1

1. 在一个标准的map-reduce流程中,作业的输入即mapper的输入,mapper的输出也称为中间值,中间值经过排序、组合等操作会转为reducer的输入,而reducer的输出即为作业的输出。——译者注

中间值以key/value对的形式传递。如果想传出去多个中间值,一个好的办法是将它们打包成一个列表。这些值在map阶段之后会按照key来排序。Hadoop提供了更改排序方法的选项,但默认的排序方法足以应付大多数的常见应用。拥有相同key的中间值将发送给同一个reducer。因此你需要考虑key的设计,使得在sort阶段后相似的值能够收集在一起。这里所有的mapper都使用“1”作为key,因为我希望所有的中间值都在同一个reducer里加和起来。2

2.只要所有mapper都使用相同的key就可以。当然,不必是“1”,也可以是其他值。——译者注

mrjob里的reducer与mapper有一些不同之处,reducer的输入存放在迭代器对象里。为了能读取所有的输入,需要使用类似for循环的迭代器。mappermapper_finalreducer之间不能共享状态,因为Python脚本在map和reduce阶段中间没有保持活动。如果需要在mapperreducer之间进行任何通信,那么只能通过key/value对。在reducer的最后有一条输出语句,该语句没有key,因为输出的key值已经固定。如果该reducer之后不是输出而是执行另一个mapper,那么key仍需要赋值。

无须多言,下面看一下实际效果,先运行一下mapper,在Linux/DOS的命令行输入下面的命令(注意不是在Python提示符下)。其中的文件inputFile.txt在第15章的代码里。

%python mrMean.py --mapper < inputFile.txt  

运行该命令后,将得到如下输出:

1 [100, 0.50956970000000001, 0.34443931307935999]  

要运行整个程序,移除--mapper选项。

%python mrMean.py < inputFile.txt    

你将在屏幕上看到很多中间步骤的描述文字,最终的输出如下:

                             .                             .                             .streaming final output from c:userspeterappdatalocaltempmrMean.Peter.20110228.172656.279000outputpart-000000.50956970000000001 0.34443931307935999removing tmp directory c:userspeterappdatalocaltempmrMean.Peter.20110228.172656.279000To stream the valid output into a file, enter the following command:%python mrMean.py < inputFile.txt > outFile.txt     

最后,要在Amazon的EMR上运行本程序,输入如下命令(确保你已经设定了环境变量AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY,这些变量的设定见附录A)。

完成了mrjob的使用练习,下面将用它来解决一些机器学习问题。上文提到,一些迭代算法仅使用EMR难以完成,因此下一节将介绍如何用mrjob完成这项任务。