Map-Reduce
Justabug
2018-01-11
http://www.justabug.net/mind-map-reduce/ (左右滚动条在最下方)
3.2
5.3
容错
java实现
例如,计算一个大的文档集合中每个单词出现的次数
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
Map函数输出文档中的每个词、以及这个词的出现次数(在这个简单的例子里就是1)。
Reduce函数把Map函数产生的每一个特定的词的计数累加起来。
例子
Master
Worker故障
master失败
原理
执行过程
在失效方面的处理机制
1.切片
2.Master分配任务给work
3.work解析K-V,使用Map函数处理
4.将缓存的K-V写入磁盘
5.worker获取数据并排序
6.Reduce
7.完成并唤醒用户。
用户程序首先调用的MapReduce库将输入文件分成M个数据片度
每个数据片段的大小一般从 16MB到64MB(可以通过可选的参数来控制每个数据片段的大小)。然后用户程序在机群中创建大量的程序副本。
这些程序副本中的有一个特殊的程序--master。副本中其它的程序都是worker程序,由master分配任务。
有M个Map任务和R个Reduce任务将被分配,master将一个Map任务或Reduce任务分配给一个空闲的worker。
被分配了map任务的worker程序读取相关的输入数据片段,从输入的数据片段中解析出K-V,
然后把K-V传递给用户自定义的Map函数,由Map函数生成并输出的中间K-V,并缓存在内存中。
缓存的K-V通过分区函数分成R个区域,之后周期性的写入到本地磁盘上。
缓存的K-V在本地磁盘上的存储位置将被回传给master,由master负责把这些存储位置再传送给Reduce worker。
当Reduce worker程序接收到master程序发来的数据存储位置信息后,使用RPC从Map worker所在主机的磁盘上读取这些缓存数据。
当Reduce worker读取了所有的中间数据后,通过对key进行排序后使得具有相同key值的数据聚合在一起。
由于许多不同的key值会映射到相同的Reduce任务上,因此必须进行排序。如果中间数据太大无法在内存中完成排序,那么就要在外部进行排序。
Reduce worker程序遍历排序后的中间数据,
对于每一个唯一的中间key值,Reduce worker程序将这个key值和它相关的中间value值的集合传递给用户自定义的Reduce函数。
Reduce函数的输出被追加到所属分区的输出文件。
当所有的Map和Reduce任务都完成之后,master唤醒用户程序。
在这个时候,在用户程序里的对MapReduce调用才返回。
持有一些数据结构,它存储每一个Map和Reduce任务的状态(空闲、工作中或完成),以及Worker机器(非空闲任务的机器)的标识。
Master就像一个数据管道,中间文件存储区域的位置信息通过这个管道从Map传递到Reduce。
因此,对于每个已经完成的Map任务,master存储了Map任务产生的R个中间文件存储区域的大小和位置。
当Map任务完成时,Master接收到位置和大小的更新信息,这些信息被逐步递增的推送给那些正在工作的Reduce任务。
当用户提供的Map和Reduce操作是幂等的,我们的分布式实现在任何情况下的输出都和所有程序没有出现任何错误、顺序的执行产生的输出是一样的。
当一个Map任务完成时,worker发送一个包含R个临时文件名的完成消息给master。
如果master从一个已经完成的Map任务再次接收到到一个完成消息,master将忽略这个消息;否则,master将这R个文件的名字记录在数据结构里。
当Reduce任务完成时,Reduce worker进程以原子的方式把临时文件重命名为最终的输出文件。
如果同一个Reduce任务在多台机器上执行,针对同一个最终的输出文件将有多个重命名操作执行。
我们依赖底层文件系统提供的重命名操作的原子性来保证最终的文件系统状态仅仅包含一个Reduce任务产生的数据。
我们依赖【对Map和Reduce任务的输出是原子提交的】这个特性。每个工作中的任务把它的输出写到私有的临时文件中。
每个Reduce任务生成一个这样的文件,而每个Map任务则生成R个这样的文件(Reduce任务与文件一一对应)
1.周期性检查与失效
master周期性的ping每个worker。如果在一个约定的时间范围内没有收到worker返回的信息,master将把这个worker标记为失效。
所有由这个失效的worker完成的Map任务被重设为初始的空闲状态,之后这些任务就可以被安排给其他的worker。
同样的,worker失效时正在运行的Map或Reduce任务也将被重新置为空闲状态,等待重新调度。
2.故障处理
当worker故障时,由于已经完成的Map任务的输出存储在这台机器上,Map任务的输出已不可访问了,因此必须重新执行。
而已经完成的Reduce任务的输出存储在全局文件系统上,因此不需要再次执行。
3.重复执行(接盘)
当一个Map任务首先被worker A执行,之后由于worker A失效了又被调度到worker B执行,这个“重新执行”的动作会被通知给所有执行Reduce任务的worker。
任何还没有从worker A读取数据的Reduce任务将从worker B读取数据。
4.MapReduce可以处理大规模worker失效的情况。
MapReduce master只需要简单的再次执行那些不可访问的worker完成的工作,之后继续执行未完成的任务,直到最终完成这个MapReduce操作
一个简单的解决办法是让master周期性的将上面描述的数据结构(alex注:指3.2节)的写入磁盘,即检查点(checkpoint)。
如果这个master任务失效了,可以从最后一个检查点(checkpoint)开始启动另一个master进程。
然而,由于只有一个master进程,master失效后再恢复是比较麻烦的,因此我们现在的实现是如果master失效,就中止MapReduce运算。
客户可以检查到这个状态,并且可以根据需要重新执行MapReduce操作。
使用MapReduce模型的程序员可以很容易的理解他们程序的行为,因为我们绝大多数的Map和Reduce操作是确定性的(幂等),
而且存在这样的一个事实:我们的[失效处理机制]等价于一个[顺序的执行的操作]。
当Map或/和Reduce操作是不确定性的时候,我们提供虽然较弱但是依然合理的处理机制。
当使用非确定操作的时候,一个Reduce任务R1的输出等价于一个非确定性程序顺序执行产生时的输出。
但是,另一个Reduce任务R2的输出也许符合一个不同的非确定顺序程序执行产生的R2的输出。
细节
存储相关
任务粒度
备用任务
技巧
分区函数
顺序保证
Combiner函数
输入和输出的类型
性能
(Google家的测试)
GFS把每个文件按64MB一个Block分隔,每个Block保存在多台机器上,环境中就存放了多份拷贝(一般是3个拷贝)。
MapReduce的master在调度Map任务时会考虑输入文件的位置信息,尽量将一个Map任务调度在包含相关输入数据拷贝的机器上执行
如果上述努力失败了,master将尝试在保存有输入数据拷贝的机器附近的机器上执行Map任务(例如,分配到一个和包含输入数据的机器在一个switch里的worker机器上执行)
当在一个足够大的cluster集群上运行大型MapReduce操作的时候,大部分的输入数据都能从本地机器读取,因此消耗非常少的网络带宽。
我们把Map拆分成了M个片段、把Reduce拆分成R个片段执行。
理想情况:M和R应当比集群中worker的机器数量要多得多。
在每台worker机器都执行大量的不同任务能够提高集群的动态的负载均衡能力,并且能够加快故障恢复的速度:
失效机器上执行的大量Map任务都可以分布到所有其他的worker机器上去执行。
实际情况:在我们的具体实现中对M和R的取值都有一定的客观限制,因为master必须执行O(M+R)次调度,并且在内存中保存O(M*R)个状态
(对影响内存使用的因素还是比较小的:O(M*R)块状态,大概每对Map任务/Reduce任务1个字节就可以了)。
更进一步:R值通常是由用户指定的,因为每个Reduce任务最终都会生成一个独立的输出文件。
实际使用时我们也倾向于选择合适的M值,以使得每一个独立任务都是处理大约16M到64M的输入数据(这样,上面描写的输入数据本地存储优化策略才最有效)。
另外,我们把R值设置为我们想使用的worker机器数量的小的倍数。我们通常会用这样的比例来执行MapReduce:M=200000,R=5000,使用2000台worker机器。
分布式的Grep
计算URL访问频率
倒转网络链接图
倒排索引
分布式排序
Map函数输出匹配某个模式的一行,Reduce函数是一个恒等函数,即把中间数据复制到输出。
Map函数处理日志中web页面请求的记录,然后输出(URL,1)。Reduce函数把相同URL的value值都累加起来,产生(URL,记录总数)结果。
Map函数在源页面(source)中搜索所有的链接目标(target)并输出为(target,source)。Reduce函数把给定链接目标(target)的链接组合成一个列表,输出(target,list(source))。
每个主机的检索词向量:检索词向量用一个(词,频率)列表来概述出现在文档或文档集中的最重要的一些词。Map函数为每一个输入文档输出(主机名,检索词向量),其中主机名来自文档的URL。
Reduce函数接收给定主机的所有文档的检索词向量,并把这些检索词向量加在一起,丢弃掉低频的检索词,输出一个最终的(主机名,检索词向量)。
Map函数分析每个文档输出一个(词,文档号)的列表,Reduce函数的输入是一个给定词的所有(词,文档号),排序所有的文档号,输出(词,list(文档号))。
所有的输出集合形成一个简单的倒排索引,它以一种简单的算法跟踪词在文档中的位置。
Map函数从每个记录提取key,输出(key,record)。Reduce函数不改变任何的值。这个运算依赖分区机制(在4.1描述)和排序属性(在4.2描述)。
“落伍者”
减少“落伍者”
影响一个MapReduce的总执行时间最通常的因素是“落伍者”
在运算过程中,如果有一台机器花了很长的时间才完成最后几个Map或Reduce任务,导致MapReduce操作总的执行时间超过预期。出现“落伍者”的原因非常多。
比如:如果一个机器的硬盘出了问题,在读取的时候要经常的进行读取纠错操作,导致读取数据的速度从30M/s降低到1M/s。
如果cluster的调度系统在这台机器上又调度了其他的任务,由于CPU、内存、本地硬盘和网络带宽等竞争因素的存在,导致执行MapReduce代码的执行效率更加缓慢。
我们最近遇到的一个问题是由于机器的初始化代码有bug,导致关闭了的处理器的缓存:在这些机器上执行任务的性能和正常情况相差上百倍。
备用(backup)任务进程,用于处理剩下的1%
当一个MapReduce操作接近完成的时候,master调度备用(backup)任务进程来执行剩下的、处于处理中状态(in-progress)的任务。
无论是最初的执行进程、还是备用(backup)任务进程完成了任务,我们都把这个任务标记成为已经完成。
我们调优了这个机制,通常只会占用比正常操作多几个百分点的计算资源。我们发现采用这样的机制对于减少超大MapReduce操作的总处理时间效果显著。
例如,在5.3节描述的排序任务,在关闭掉备用任务的情况下要多花44%的时间完成排序任务。
在中间key上使用分区函数来对数据进行分区
MapReduce的使用者通常会指定Reduce任务和Reduce任务输出文件的数量(R)。
我们在中间key上使用分区函数来对数据进行分区,之后再输入到后续任务执行进程。
一个缺省的分区函数是使用hash方法(比如,hash(key) mod R)进行分区。hash方法能产生非常平衡的分区。
然而,有的时候,其它的一些分区函数对key值进行的分区将非常有用。
比如,输出的key值是URLs,我们希望每个主机的所有条目保持在同一个输出文件中。
为了支持类似的情况,MapReduce库的用户需要提供专门的分区函数。
例如,使用“hash(Hostname(urlkey)) mod R”作为分区函数就可以把所有来自同一个主机的URLs保存在同一个输出文件中。
按KEY值排序
我们确保在给定的分区中,中间key/value pair数据的处理顺序是按照key值增量顺序处理的。
这样的顺序保证对每个分成生成一个有序的输出文件,这对于需要对输出文件按key值随机存取的应用非常有意义,对在排序输出的数据集也很有帮助。
排序
业界
在某些情况下,Map函数产生的中间key值的重复数据会占很大的比重,并且,用户自定义的Reduce函数满足结合律和交换律。
在2.1节的词数统计程序是个很好的例子。由于词频率倾向于一个zipf分布(齐夫分布),每个Map任务将产生成千上万个这样的记录<the,1>。
所有的这些记录将通过网络被发送到一个单独的Reduce任务,然后由这个Reduce任务把所有这些记录累加起来产生一个数字。
我们允许用户指定一个可选的combiner函数,combiner函数首先在本地将这些记录进行一次合并,然后将合并的结果再通过网络发送出去。
中间key值的重复数据多,允许用户指定combiner函数在本地先合并一次
模型
例子
类型
描述
Map函数
Reduce函数
input:k1,v1,
output:list(k2,v2)[中间]
input:k2,list(v2)[中间]
output:合并后产生list(v2)
MapReduce库把所有具有相同中间key值I的中间value值集合在一起后传递给reduce函数。
Reduce函数合并这些value值,形成一个较小的value值的集合。
一般的,每次Reduce函数调用只产生0或1个输出value值。
通常我们通过一个迭代器把中间value值提供给Reduce函数,这样我们就可以处理无法全部放入内存中的大量的value值的集合。
Combiner函数与Reduce函数
Combiner函数在每台执行Map任务的机器上都会被执行一次。
一般情况下,Combiner和Reduce函数是一样的。
Combiner函数和Reduce函数之间唯一的区别是MapReduce库怎样控制函数的输出。
Reduce函数的输出被保存在最终的输出文件里,而Combiner函数的输出被写到中间文件里,然后被发送给Reduce任务。
部分的合并中间结果可以显著的提高一些MapReduce操作的速度。
附录A包含一个使用combiner函数的例子
附录A、单词频率统计
本节包含了一个完整的程序,用于统计在一组命令行指定的输入文件中,每一个不同的单词出现频率。
[cpp] view plain copy
#include “mapreduce/mapreduce.h”
// User’s map function
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
// Skip past leading whitespace
while ((i < n) && isspace(text[i]))
i++;
// Find word end
int start = i;
while ((i < n) && !isspace(text[i]))
i++;
if (start < i)
Emit(text.substr(start,i-start),”1″);
}
}
};
REGISTER_MAPPER(WordCounter);
// User’s reduce function
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
// Iterate over all entries with the
// same key and add the values
int64 value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
// Emit sum for input->key()
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// Store list of input files into “spec”
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format(“text”);
input->set_filepattern(argv[i]);
input->set_mapper_class(“WordCounter”);
}
// Specify the output files:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// …
MapReduceOutput* out = spec.output();
out->set_filebase(“/gfs/test/freq”);
out->set_num_tasks(100);
out->set_format(“text”);
out->set_reducer_class(“Adder”);
// Optional: do partial sums within map
// tasks to save network bandwidth
out->set_combiner_class(“Adder”);
// Tuning parameters: use at most 2000
// machines and 100 MB of memory per task
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);
// Now run it
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();
// Done: ‘result’ structure contains info
// about counters, time taken, number of
// machines used, etc.
return 0;
}
输入格式
MapReduce库支持几种不同的格式的输入数据。
比如,文本模式的输入数据的每一行被视为是一个key/value pair。key是文件的偏移量,value是那一行的内容。
另外一种常见的格式是以key进行排序来存储的key/value pair的序列。
每种输入类型的实现都必须能够把输入数据分割成数据片段,该数据片段能够由单独的Map任务来进行后续处理
(例如,文本模式的范围分割必须确保仅仅在每行的边界进行范围分割)。
虽然大多数MapReduce的使用者仅仅使用很少的预定义输入类型就满足要求了,但是使用者依然可以通过提供一个简单的Reader接口实现就能够支持一个新的输入类型。
读取方式
Reader并非一定要从文件中读取数据,比如,我们可以很容易的实现一个从数据库里读记录的Reader,或者从内存中的数据结构读取数据的Reader。
副作用
在某些情况下,MapReduce的使用者发现,如果在Map和/或Reduce操作过程中增加辅助的输出文件会比较省事。
我们依靠程序writer把这种“副作用”变成原子的和幂等的。
通常应用程序首先把输出结果写到一个临时文件中,在输出全部数据之后,在使用系统级的原子操作rename重新命名这个临时文件。
如果一个任务产生了多个输出文件,我们没有提供类似两阶段提交的原子操作支持这种情况。
因此,对于会产生多个输出文件、并且对于跨文件有一致性要求的任务,都必须是确定性的任务。
但是在实际应用过程中,这个限制还没有给我们带来过麻烦。
跳过损坏的记录
本地执行
操作过程中增加辅助的输出文件
忽略一些有问题的记录也是可以接受的
有时候,用户程序中的bug导致Map或者Reduce函数在处理某些记录的时候crash掉,MapReduce操作无法顺利完成。
惯常的做法是修复bug后再次执行MapReduce操作,但是,有时候找出这些bug并修复它们不是一件容易的事情;
这些bug也许是在第三方库里边,而我们手头没有这些库的源代码。
而且在很多时候,忽略一些有问题的记录也是可以接受的,比如在一个巨大的数据集上进行统计分析的时候。
我们提供了一种执行模式,在这种模式下,为了保证保证整个处理能继续进行,MapReduce会检测哪些记录导致确定性的crash,并且跳过这些记录不处理。
每个worker进程都设置了信号处理函数捕获内存段异常(segmentation violation)和总线错误(bus error)。
在执行Map或者Reduce操作之前,MapReduce库通过全局变量保存记录序号。
如果用户程序触发了一个系统信号,消息处理函数将用“最后一口气”通过UDP包向master发送处理的最后一条记录的序号。
当master看到在处理某条特定记录不止失败一次时,master就标志着条记录需要被跳过,并且在下次重新执行相关的Map或者Reduce任务的时候跳过这条记录。
状态信息
计数器
调试
调试Map和Reduce函数的bug是非常困难的,因为实际执行操作时不但是分布在系统中执行的,而且通常是在好几千台计算机上执行,
具体的执行位置是由master进行动态调度的,这又大大增加了调试的难度。
为了简化调试、profile和小规模测试,我们开发了一套MapReduce库的本地实现版本,通过使用本地版本的MapReduce库,MapReduce操作在本地计算机上顺序的执行。
用户可以控制MapReduce操作的执行,可以把操作限制到特定的Map任务上。
用户通过设定特别的标志来在本地执行他们的程序,之后就可以很容易的使用本地调试和测试工具(比如gdb)。
master使用嵌入式的HTTP服务器,便于监控
状态信息页面显示了包括计算执行的进度,
比如已经完成了多少任务、有多少任务正在处理、输入的字节数、中间数据的字节数、输出的字节数、处理百分比等等。
页面还包含了指向每个任务的stderr和stdout文件的链接。用户根据这些数据预测计算需要执行大约多长时间、是否需要增加额外的计算资源。
这些页面也可以用来分析什么时候计算执行的比预期的要慢。
另外,处于最顶层的状态页面显示了哪些worker失效了,以及他们失效的时候正在运行的Map和Reduce任务。
这些信息对于调试用户代码中的bug很有帮助。
MapReduce库使用计数器统计不同事件发生次数
比如,用户可能想统计已经处理了多少个单词、已经索引的多少篇German文档等等。
Counter* uppercase;
uppercase = GetCounter(“uppercase”);
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, “1″);
worker传递给master,master统计
这些计数器的值周期性的从各个单独的worker机器上传递给master(附加在ping的应答包中传递)。
master把执行成功的Map和Reduce任务的计数器值进行累计,当MapReduce操作完成之后,返回给用户代码
计数器当前的值也会显示在master的状态页面上,这样用户就可以看到当前计算的进度。
当累加计数器的值的时候,master要检查重复运行的Map或者Reduce任务,避免重复累加(之前提到的备用任务和失效后重新执行任务这两种情况会导致相同的任务被多次执行)。
有些计数器的值是由MapReduce库自动维持的,比如已经处理的输入的key/value pair的数量、输出的key/value pair的数量等等。
集群配置
GREP
高效的backup任务
失效的机器
对比例子
一类是对数据格式进行转换,从一种表现形式转换为另外一种表现形式
另一类是从海量数据中抽取少部分的用户感兴趣的数据
(= =列的真细)
所有这些程序都运行在一个大约由1800台机器构成的集群上。
每台机器配置2个2G主频、支持超线程的Intel Xeon处理器,4GB的物理内存,两个160GB的IDE硬盘和一个千兆以太网卡。
这些机器部署在一个两层的树形交换网络中,在root节点大概有100-200GBPS的传输带宽。
所有这些机器都采用相同的部署(对等部署),因此任意两点之间的网络来回时间小于1毫秒。
在4GB内存里,大概有1-1.5G用于运行在集群上的其他任务。测试程序在周末下午开始执行,这时主机的CPU、磁盘和网络基本上处于空闲状态。
这个分布式的grep程序需要扫描大概10的10次方个由100个字节组成的记录,查找出现概率较小的3个字符的模式
(这个模式在92337个记录中出现)。
输入数据被拆分成大约64M的Block(M=15000),整个输出数据存放在一个文件中
图2显示了这个运算随时间的处理过程。
其中Y轴表示输入数据的处理速度。
处理速度随着参与MapReduce计算的机器数量的增加而增加,
当1764台worker参与计算的时,处理速度达到了30GB/s。
(装逼部分,可以的)
当Map任务结束的时候,即在计算开始后80秒,输入的处理速度降到0。
整个计算过程从开始到结束一共花了大概150秒。这包括了大约一分钟的初始启动阶段。
初始启动阶段消耗的时间包括了是把这个程序传送到各个worker机器上的时间、
等待GFS文件系统打开1000个输入文件集合的时间、获取相关的文件本地位置优化信息的时间。
排序程序处理10的10次方个100个字节组成的记录(大概1TB的数据)
参考:http://blog.csdn.net/linvo/article/details/6596468
排序程序由不到50行代码组成。只有三行的Map函数从文本行中解析出10个字节的key值作为排序的key,
并且把这个key和原始文本行作为中间的key/value pair值输出。
我们使用了一个内置的恒等函数作为Reduce操作函数。这个函数把中间的key/value pair值不作任何改变输出。
最终排序结果输出到两路复制的GFS文件系统(也就是说,程序输出2TB的数据)。
中间过程
输出
分区
过程
a
【上】输入
峰值会达到13GB/s,大约200秒之后迅速滑落到了0。
值得注意的是,排序程序输入数据读取速度小于分布式grep程序。
这是因为排序程序的Map任务花了大约一半的处理时间和I/O带宽把中间输出结果写到本地硬盘。
相应的分布式grep程序的中间结果输出几乎可以忽略不计。
【中】中间数据从Map任务发送到Reduce任务
这个过程从第一个Map任务完成之后就开始缓慢启动了。
图示的第一个高峰是启动了第一批大概1700个Reduce任务
(整个MapReduce分布到大概1700台机器上,每台机器1次最多执行1个Reduce任务)。
排序程序运行大约300秒后,第一批启动的Reduce任务有些完成了,开始执行剩下的Reduce任务
所有的处理在大约600秒后结束。
【下】输出
在第一个排序阶段结束和数据开始写入磁盘之间有小延时,因为worker正在忙于排序中间数据
磁盘写入速度在2-4GB/s持续一段时间。输出数据写入磁盘大约持续850秒。
计入初始启动部分的时间,整个运算消耗了891秒。
这个速度和TeraSort benchmark[18]的最高纪录1057秒相差不多。
本地读入提高性能
b
显示了关闭了备用任务后排序程序执行情况
(b)执行的过程和(a)很相似,
除了输出数据写磁盘的动作在时间上拖了一个很长的尾巴,而且在这段时间里,几乎没有什么写入动作。
在960秒后,只有5个Reduce任务没有完成。这些拖后腿的任务又执行了300秒才完成。
整个计算消耗了1283秒,多了44%的执行时间。
c
(c)的排序程序执行过程中,我们在程序开始后几分钟有意的kill了1746个worker中的200个。
集群底层的调度立刻在这些机器上重新开始新的worker处理进程
(因为只是worker机器上的处理进程被kill了,机器本身还在工作)。
(c)显示出了一个“负”的输入数据读取速度,这是因为一些已经完成的Map任务丢失了
(由于相应的执行Map任务的worker进程被kill了),需要重新执行这些任务。
相关Map任务很快就被重新执行了。整个运算在933秒内完成,包括了初始启动时间
(只比正常执行多消耗了5%的时间)。
如前所述,输入数据被分成64MB的Block(M=15000)。
我们把排序后的输出结果分区后存储到4000个文件(R=4000)。
分区函数使用key的原始字节来把数据分区到R个片段中。
在这个benchmark测试中,我们使用的分区函数知道key的分区情况。
通常对于排序程序来说,我们会增加一个预处理的MapReduce操作用于采样key值的分布情况,
通过采样的数据来计算对最终排序处理的分区点。
还有一些值得注意的现象:
输入数据的读取速度比排序速度和输出数据写入磁盘速度要高不少,
这是因为我们的输入数据本地化优化策略起了作用 — 绝大部分数据都是从本地硬盘读取的,从而节省了网络带宽。
排序速度比输出数据写入到磁盘的速度快,这是因为输出数据写了两份
(我们使用了2路的GFS文件系统,写入复制节点的原因是为了保证数据可靠性和可用性)。
我们把输出数据写入到两个复制节点的原因是因为这是底层文件系统的保证数据可靠性和可用性的实现机制。
如果底层文件系统使用类似容错编码[14](erasure coding)的方式而不是复制的方式保证数据的可靠性和可用性,
那么在输出数据写入磁盘的时候,就可以降低网络带宽的使用。
成功之处
并行处理
容错处理
数据本地化优化
负载均衡
Hadoop
Lucene
ElasticSearch
主体
Hadoop
入口
Map
Reduce
定义一个 Job, 将 Mapper, Reducer 等必要的值设置进去。
[将 (K1, V1) 的输入转化成 list(K2, V2) 的输出]
[将 (K2, list(V2)) 的输入转化成 list(K3, V3) 的输出]
http://blog.csdn.net/admin1973/article/details/62037603?locationNum=6&fps=1
Nutch
JAVA8
lambda
http://blog.csdn.net/dm_vincent/article/details/40856569
// 串行执行的调用方式
findHighPriced(Tickers.symbols.stream());
// 并行执行的调用方式
findHighPriced(Tickers.symbols.parallelStream());
Phoenix
final StockInfo max = symbols
.map(StockUtil::getPriceInfo)
.filter(StockUtil.isLessThan(150))
.reduce(StockUtil::pickMax)
.get();
分析
Map-Reduce
Created With
MindMaster