Map-Reduce
2018-01-11

http://www.justabug.net/mind-map-reduce/ (左右滚动条在最下方)
3.2 容错 java实现 例如,计算一个大的文档集合中每个单词出现的次数 map(String key, String value): // key: document name // value: document contents for each word w in value: EmitIntermediate(w, "1"); reduce(String key, Iterator values): // key: a word // values: a list of counts int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result)); Map函数输出文档中的每个词、以及这个词的出现次数(在这个简单的例子里就是1)。 Reduce函数把Map函数产生的每一个特定的词的计数累加起来。 例子 Master Worker故障 master失败 原理 执行过程 在失效方面的处理机制 1.切片 2.Master分配任务给work 3.work解析K-V,使用Map函数处理 4.将缓存的K-V写入磁盘 5.worker获取数据并排序 6.Reduce 7.完成并唤醒用户。 用户程序首先调用的MapReduce库将输入文件分成M个数据片度 然后用户程序在机群中创建大量的程序副本。 master为每个空闲的worker分配任务,一个Map任务或Reduce任务。 worker读取相关的输入数据, 解析出K-V传给Map函数, 由Map函数生成并输出的中间K-V,并缓存在内存中。 缓存的K-V 存储位置将被回传给master,由master负责把这些存储位置再传送给Reduce worker。 对key进行排序后使得具有相同key值的数据聚合在一起。 遍历,对于每一个唯一的中间key值,Reduce worker程序将这个key值和它相关的中间value值的集合传递给用户自定义的Reduce函数。 当所有的Map和Reduce任务都完成之后,master唤醒用户程序。 当用户提供的Map和Reduce操作是幂等的,我们的分布式实现在任何情况下的输出都和所有程序没有出现任何错误、顺序的执行产生的输出是一样的。 依赖底层文件系统提供的重命名操作的原子性来保证最终的文件系统状态仅仅包含一个Reduce任务产生的数据。 依赖【对Map和Reduce任务的输出是原子提交的】这个特性。每个工作中的任务把它的输出写到私有的临时文件中。 (Reduce任务与文件一一对应) 1.周期性检查与失效(master周期性的ping每个worker。) 2.故障处理(worker鼓掌时,map需要重新执行,而reduce不用) 3.重复执行(worker可以互相接盘) 4.MapReduce可以处理大规模worker失效的情况。 一个简单的解决办法是让master周期性的将上面描述的数据结构(alex注:指3.2节)的写入磁盘,即检查点(checkpoint)。 如果这个master任务失效了,可以从最后一个检查点(checkpoint)开始启动另一个master进程。 然而,由于只有一个master进程,master失效后再恢复是比较麻烦的,因此我们现在的实现是如果master失效,就中止MapReduce运算。 客户可以检查到这个状态,并且可以根据需要重新执行MapReduce操作。 幂等使补偿机制容易执行 细节 存储相关 任务粒度 备用任务 技巧 分区函数 顺序保证 Combiner函数 输入和输出的类型 性能 (Google家的测试) GFS把每个文件按64MB一个Block分隔,每个Block保存在多台机器上,环境中就存放了多份拷贝(一般是3个拷贝)。 MapReduce的master在调度Map任务时会考虑输入文件的位置信息,尽量将一个Map任务调度在包含相关输入数据拷贝的机器上执行 如果上述努力失败了,master将尝试在保存有输入数据拷贝的机器附近的机器上执行Map任务(例如,分配到一个和包含输入数据的机器在一个switch里的worker机器上执行) 当在一个足够大的cluster集群上运行大型MapReduce操作的时候,大部分的输入数据都能从本地机器读取,因此消耗非常少的网络带宽。 我们把Map拆分成了M个片段、把Reduce拆分成R个片段执行。 更进一步:用户可以灵活处理R 分布式的Grep 计算URL访问频率 倒转网络链接图 倒排索引 分布式排序 “落伍者” 减少“落伍者” 影响一个MapReduce的总执行时间最通常的因素是“落伍者” 备用(backup)任务进程,用于处理剩下的1% 在中间key上使用分区函数来对数据进行分区 一个缺省的分区函数是使用hash方法(比如,hash(key) mod R)进行分区。hash方法能产生非常平衡的分区。 然而,有的时候,其它的一些分区函数对key值进行的分区将非常有用。 按KEY值排序 排序 业界 中间key值的重复数据多,允许用户指定combiner函数在本地先合并一次 模型 例子 类型 描述 Map函数 Reduce函数 input:k1,v1, output:list(k2,v2)[中间] input:k2,list(v2)[中间] output:合并后产生list(v2) MapReduce库把所有具有相同中间key值I的中间value值集合在一起后传递给reduce函数。 Reduce函数合并这些value值,形成一个较小的value值的集合。 一般的,每次Reduce函数调用只产生0或1个输出value值。 通常我们通过一个迭代器把中间value值提供给Reduce函数,这样我们就可以处理无法全部放入内存中的大量的value值的集合。 Combiner函数与Reduce函数 部分的合并中间结果可以显著的提高一些MapReduce操作的速度。 附录A包含一个使用combiner函数的例子 输入格式 读取方式 副作用 跳过损坏的记录 本地执行 操作过程中增加辅助的输出文件 忽略一些有问题的记录也是可以接受的 状态信息 计数器 调试 MapReduce库的本地实现版本,通过使用本地版本的MapReduce库,MapReduce操作在本地计算机上顺序的执行。 master使用嵌入式的HTTP服务器,便于监控 MapReduce库使用计数器统计不同事件发生次数 worker传递给master,master统计 集群配置 GREP 对比例子 一类是对数据格式进行转换,从一种表现形式转换为另外一种表现形式 另一类是从海量数据中抽取少部分的用户感兴趣的数据 图2显示了这个运算随时间的处理过程。 其中Y轴表示输入数据的处理速度。 处理速度随着参与MapReduce计算的机器数量的增加而增加, 当1764台worker参与计算的时,处理速度达到了30GB/s。 参考:http://blog.csdn.net/linvo/article/details/6596468 中间过程 过程 a (正常执行) 【上】输入 排序程序输入数据读取速度小于分布式grep程序。 【中】中间数据从Map任务发送到Reduce任务 图示的第一个高峰是启动了第一批大概1700个Reduce任务 【下】输出 在第一个排序阶段结束和数据开始写入磁盘之间有小延时, 因为worker正在忙于排序中间数据 本地读入提高性能 b (关闭备用) (b)显示了关闭了备用任务后排序程序执行情况,部分失效 输出数据写磁盘的动作在时间上拖了一个很长的尾巴 在960秒后,只有5个Reduce任务没有完成。 这些拖后腿的任务又执行了300秒才完成。 c (worker 部分失效) (c)我们在程序开始后几分钟有意的kill了1746个worker中的200个。 (c)显示出了一个“负”的输入数据读取速度,这是因为一些已经完成的Map任务丢失了 (由于相应的执行Map任务的worker进程被kill了),需要重新执行这些任务。 (只比正常执行多消耗了5%的时间) 成功之处 并行处理 容错处理 数据本地化优化 负载均衡 Hadoop Lucene ElasticSearch 主体 Hadoop 入口 Map Reduce 定义一个 Job, 将 Mapper, Reducer 等必要的值设置进去。 [将 (K1, V1) 的输入转化成 list(K2, V2) 的输出] [将 (K2, list(V2)) 的输入转化成 list(K3, V3) 的输出] http://blog.csdn.net/admin1973/article/details/62037603?locationNum=6&fps=1