Map-Reduce
Justabug
2018-01-11
http://www.justabug.net/mind-map-reduce/ (左右滚动条在最下方)
3.2
5.3
容错
java实现
例如,计算一个大的文档集合中每个单词出现的次数
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
Map函数输出文档中的每个词、以及这个词的出现次数(在这个简单的例子里就是1)。
Reduce函数把Map函数产生的每一个特定的词的计数累加起来。
例子
Master
Worker故障
master失败
原理
执行过程
在失效方面的处理机制
1.切片
2.Master分配任务给work
3.work解析K-V,使用Map函数处理
4.将缓存的K-V写入磁盘
5.worker获取数据并排序
6.Reduce
7.完成并唤醒用户。
用户程序首先调用的MapReduce库将输入文件分成M个数据片度
然后用户程序在机群中创建大量的程序副本。
master为每个空闲的worker分配任务,一个Map任务或Reduce任务。
worker读取相关的输入数据,
解析出K-V传给Map函数,
由Map函数生成并输出的中间K-V,并缓存在内存中。
缓存的K-V 存储位置将被回传给master,由master负责把这些存储位置再传送给Reduce worker。
对key进行排序后使得具有相同key值的数据聚合在一起。
遍历,对于每一个唯一的中间key值,Reduce worker程序将这个key值和它相关的中间value值的集合传递给用户自定义的Reduce函数。
当所有的Map和Reduce任务都完成之后,master唤醒用户程序。
持有一些数据结构,它存储每一个Map和Reduce任务的状态(空闲、工作中或完成),以及Worker机器(非空闲任务的机器)的标识。
Master就像一个数据管道,中间文件存储区域的位置信息通过这个管道从Map传递到Reduce。
因此,对于每个已经完成的Map任务,master存储了Map任务产生的R个中间文件存储区域的大小和位置。
当Map任务完成时,Master接收到位置和大小的更新信息,这些信息被逐步递增的推送给那些正在工作的Reduce任务。
当用户提供的Map和Reduce操作是幂等的,我们的分布式实现在任何情况下的输出都和所有程序没有出现任何错误、顺序的执行产生的输出是一样的。
依赖底层文件系统提供的重命名操作的原子性来保证最终的文件系统状态仅仅包含一个Reduce任务产生的数据。
依赖【对Map和Reduce任务的输出是原子提交的】这个特性。每个工作中的任务把它的输出写到私有的临时文件中。
(Reduce任务与文件一一对应)
1.周期性检查与失效(master周期性的ping每个worker。)
2.故障处理(worker鼓掌时,map需要重新执行,而reduce不用)
3.重复执行(worker可以互相接盘)
4.MapReduce可以处理大规模worker失效的情况。
一个简单的解决办法是让master周期性的将上面描述的数据结构(alex注:指3.2节)的写入磁盘,即检查点(checkpoint)。
如果这个master任务失效了,可以从最后一个检查点(checkpoint)开始启动另一个master进程。
然而,由于只有一个master进程,master失效后再恢复是比较麻烦的,因此我们现在的实现是如果master失效,就中止MapReduce运算。
客户可以检查到这个状态,并且可以根据需要重新执行MapReduce操作。
幂等使补偿机制容易执行
细节
存储相关
任务粒度
备用任务
技巧
分区函数
顺序保证
Combiner函数
输入和输出的类型
性能
(Google家的测试)
GFS把每个文件按64MB一个Block分隔,每个Block保存在多台机器上,环境中就存放了多份拷贝(一般是3个拷贝)。
MapReduce的master在调度Map任务时会考虑输入文件的位置信息,尽量将一个Map任务调度在包含相关输入数据拷贝的机器上执行
如果上述努力失败了,master将尝试在保存有输入数据拷贝的机器附近的机器上执行Map任务(例如,分配到一个和包含输入数据的机器在一个switch里的worker机器上执行)
当在一个足够大的cluster集群上运行大型MapReduce操作的时候,大部分的输入数据都能从本地机器读取,因此消耗非常少的网络带宽。
我们把Map拆分成了M个片段、把Reduce拆分成R个片段执行。
理想情况:M和R应当比集群中worker的机器数量要多得多。
实际情况:在我们的具体实现中对M和R的取值都有一定的客观限制,因为master必须执行O(M+R)次调度,并且在内存中保存O(M*R)个状态
更进一步:用户可以灵活处理R
分布式的Grep
计算URL访问频率
倒转网络链接图
倒排索引
分布式排序
Map函数输出匹配某个模式的一行,Reduce函数是一个恒等函数,即把中间数据复制到输出。
Map函数处理日志中web页面请求的记录,然后输出(URL,1)。Reduce函数把相同URL的value值都累加起来,产生(URL,记录总数)结果。
Map函数在源页面(source)中搜索所有的链接目标(target)并输出为(target,source)。Reduce函数把给定链接目标(target)的链接组合成一个列表,输出(target,list(source))。
每个主机的检索词向量:检索词向量用一个(词,频率)列表来概述出现在文档或文档集中的最重要的一些词。Map函数为每一个输入文档输出(主机名,检索词向量),其中主机名来自文档的URL。
Reduce函数接收给定主机的所有文档的检索词向量,并把这些检索词向量加在一起,丢弃掉低频的检索词,输出一个最终的(主机名,检索词向量)。
Map函数分析每个文档输出一个(词,文档号)的列表,Reduce函数的输入是一个给定词的所有(词,文档号),排序所有的文档号,输出(词,list(文档号))。
所有的输出集合形成一个简单的倒排索引,它以一种简单的算法跟踪词在文档中的位置。
Map函数从每个记录提取key,输出(key,record)。Reduce函数不改变任何的值。这个运算依赖分区机制(在4.1描述)和排序属性(在4.2描述)。
“落伍者”
减少“落伍者”
影响一个MapReduce的总执行时间最通常的因素是“落伍者”
在运算过程中,如果有一台机器花了很长的时间才完成最后几个Map或Reduce任务,导致MapReduce操作总的执行时间超过预期。出现“落伍者”的原因非常多。
比如:如果一个机器的硬盘出了问题,在读取的时候要经常的进行读取纠错操作,导致读取数据的速度从30M/s降低到1M/s。
如果cluster的调度系统在这台机器上又调度了其他的任务,由于CPU、内存、本地硬盘和网络带宽等竞争因素的存在,导致执行MapReduce代码的执行效率更加缓慢。
我们最近遇到的一个问题是由于机器的初始化代码有bug,导致关闭了的处理器的缓存:在这些机器上执行任务的性能和正常情况相差上百倍。
备用(backup)任务进程,用于处理剩下的1%
当一个MapReduce操作接近完成的时候,master调度备用(backup)任务进程来执行剩下的、处于处理中状态(in-progress)的任务。
无论是最初的执行进程、还是备用(backup)任务进程完成了任务,我们都把这个任务标记成为已经完成。
我们调优了这个机制,通常只会占用比正常操作多几个百分点的计算资源。我们发现采用这样的机制对于减少超大MapReduce操作的总处理时间效果显著。
例如,在5.3节描述的排序任务,在关闭掉备用任务的情况下要多花44%的时间完成排序任务。
在中间key上使用分区函数来对数据进行分区
一个缺省的分区函数是使用hash方法(比如,hash(key) mod R)进行分区。hash方法能产生非常平衡的分区。
然而,有的时候,其它的一些分区函数对key值进行的分区将非常有用。
按KEY值排序
我们确保在给定的分区中,中间key/value pair数据的处理顺序是按照key值增量顺序处理的。
这样的顺序保证对每个分成生成一个有序的输出文件,这对于需要对输出文件按key值随机存取的应用非常有意义,对在排序输出的数据集也很有帮助。
排序
业界
中间key值的重复数据多,允许用户指定combiner函数在本地先合并一次
模型
例子
类型
描述
Map函数
Reduce函数
input:k1,v1,
output:list(k2,v2)[中间]
input:k2,list(v2)[中间]
output:合并后产生list(v2)
MapReduce库把所有具有相同中间key值I的中间value值集合在一起后传递给reduce函数。
Reduce函数合并这些value值,形成一个较小的value值的集合。
一般的,每次Reduce函数调用只产生0或1个输出value值。
通常我们通过一个迭代器把中间value值提供给Reduce函数,这样我们就可以处理无法全部放入内存中的大量的value值的集合。
Combiner函数与Reduce函数
部分的合并中间结果可以显著的提高一些MapReduce操作的速度。
附录A包含一个使用combiner函数的例子
输入格式
MapReduce库支持几种不同的格式的输入数据。
比如,文本模式,以key进行排序来存储的key/value pair的序列
使用者依然可以通过提供一个简单的Reader接口实现就能够支持一个新的输入类型。
读取方式
Reader并非一定要从文件中读取数据,比如,我们可以很容易的实现一个从数据库里读记录的Reader,或者从内存中的数据结构读取数据的Reader。
副作用
跳过损坏的记录
本地执行
操作过程中增加辅助的输出文件
忽略一些有问题的记录也是可以接受的
有时候,用户程序中的bug导致Map或者Reduce函数在处理某些记录的时候crash掉,MapReduce操作无法顺利完成。
惯常的做法是修复bug后再次执行MapReduce操作,但是,有时候找出这些bug并修复它们不是一件容易的事情;
这些bug也许是在第三方库里边,而我们手头没有这些库的源代码。
而且在很多时候,忽略一些有问题的记录也是可以接受的,比如在一个巨大的数据集上进行统计分析的时候。
我们提供了一种执行模式,在这种模式下,为了保证保证整个处理能继续进行,MapReduce会检测哪些记录导致确定性的crash,并且跳过这些记录不处理。
每个worker进程都设置了信号处理函数捕获内存段异常(segmentation violation)和总线错误(bus error)。
在执行Map或者Reduce操作之前,MapReduce库通过全局变量保存记录序号。
如果用户程序触发了一个系统信号,消息处理函数将用“最后一口气”通过UDP包向master发送处理的最后一条记录的序号。
当master看到在处理某条特定记录不止失败一次时,master就标志着条记录需要被跳过,并且在下次重新执行相关的Map或者Reduce任务的时候跳过这条记录。
状态信息
计数器
调试
MapReduce库的本地实现版本,通过使用本地版本的MapReduce库,MapReduce操作在本地计算机上顺序的执行。
master使用嵌入式的HTTP服务器,便于监控
MapReduce库使用计数器统计不同事件发生次数
比如,用户可能想统计已经处理了多少个单词、已经索引的多少篇German文档等等。
worker传递给master,master统计
有些计数器的值是由MapReduce库自动维持的,比如已经处理的输入的key/value pair的数量、输出的key/value pair的数量等等。
集群配置
GREP
对比例子
一类是对数据格式进行转换,从一种表现形式转换为另外一种表现形式
另一类是从海量数据中抽取少部分的用户感兴趣的数据
(= =列的真细)
所有这些程序都运行在一个大约由1800台机器构成的集群上。
每台机器配置2个2G主频、支持超线程的Intel Xeon处理器,4GB的物理内存,两个160GB的IDE硬盘和一个千兆以太网卡。
这些机器部署在一个两层的树形交换网络中,在root节点大概有100-200GBPS的传输带宽。
所有这些机器都采用相同的部署(对等部署),因此任意两点之间的网络来回时间小于1毫秒。
在4GB内存里,大概有1-1.5G用于运行在集群上的其他任务。测试程序在周末下午开始执行,这时主机的CPU、磁盘和网络基本上处于空闲状态。
图2显示了这个运算随时间的处理过程。
其中Y轴表示输入数据的处理速度。
处理速度随着参与MapReduce计算的机器数量的增加而增加,
当1764台worker参与计算的时,处理速度达到了30GB/s。
排序程序处理10的10次方个100个字节组成的记录(大概1TB的数据)
参考:http://blog.csdn.net/linvo/article/details/6596468
排序程序由不到50行代码组成。只有三行的Map函数从文本行中解析出10个字节的key值作为排序的key,
并且把这个key和原始文本行作为中间的key/value pair值输出。
我们使用了一个内置的恒等函数作为Reduce操作函数。这个函数把中间的key/value pair值不作任何改变输出。
最终排序结果输出到两路复制的GFS文件系统(也就是说,程序输出2TB的数据)。
中间过程
输出
分区
过程
a
(正常执行)
【上】输入
排序程序输入数据读取速度小于分布式grep程序。
【中】中间数据从Map任务发送到Reduce任务
图示的第一个高峰是启动了第一批大概1700个Reduce任务
【下】输出
在第一个排序阶段结束和数据开始写入磁盘之间有小延时,
因为worker正在忙于排序中间数据
本地读入提高性能
b
(关闭备用)
(b)显示了关闭了备用任务后排序程序执行情况,部分失效
输出数据写磁盘的动作在时间上拖了一个很长的尾巴
在960秒后,只有5个Reduce任务没有完成。
这些拖后腿的任务又执行了300秒才完成。
c
(worker
部分失效)
(c)我们在程序开始后几分钟有意的kill了1746个worker中的200个。
(c)显示出了一个“负”的输入数据读取速度,这是因为一些已经完成的Map任务丢失了
(由于相应的执行Map任务的worker进程被kill了),需要重新执行这些任务。
(只比正常执行多消耗了5%的时间)
如前所述,输入数据被分成64MB的Block(M=15000)。
我们把排序后的输出结果分区后存储到4000个文件(R=4000)。
分区函数使用key的原始字节来把数据分区到R个片段中。
在这个benchmark测试中,我们使用的分区函数知道key的分区情况。
通常对于排序程序来说,我们会增加一个预处理的MapReduce操作用于采样key值的分布情况,
通过采样的数据来计算对最终排序处理的分区点。
还有一些值得注意的现象:
输入数据的读取速度比排序速度和输出数据写入磁盘速度要高不少,
这是因为我们的输入数据本地化优化策略起了作用 — 绝大部分数据都是从本地硬盘读取的,从而节省了网络带宽。
排序速度比输出数据写入到磁盘的速度快,这是因为输出数据写了两份
(我们使用了2路的GFS文件系统,写入复制节点的原因是为了保证数据可靠性和可用性)。
我们把输出数据写入到两个复制节点的原因是因为这是底层文件系统的保证数据可靠性和可用性的实现机制。
如果底层文件系统使用类似容错编码[14](erasure coding)的方式而不是复制的方式保证数据的可靠性和可用性,
那么在输出数据写入磁盘的时候,就可以降低网络带宽的使用。
成功之处
并行处理
容错处理
数据本地化优化
负载均衡
Hadoop
Lucene
ElasticSearch
主体
Hadoop
入口
Map
Reduce
定义一个 Job, 将 Mapper, Reducer 等必要的值设置进去。
[将 (K1, V1) 的输入转化成 list(K2, V2) 的输出]
[将 (K2, list(V2)) 的输入转化成 list(K3, V3) 的输出]
http://blog.csdn.net/admin1973/article/details/62037603?locationNum=6&fps=1