2023年11月26日发(作者:)
Hadoop之MapReduce详细⼯作流程
Hadoop MapReduce全流程分析总结
InputFormat阶段
根据输⼊格式:InputFormat的实现类
①切⽚, getSplit()
②使⽤输⼊格式的RR读取数据, createRecordReader()
具体实现类有:
1. 默认的TextInputFormat
场景: 普通的⽂本格式数据来源
切⽚: 采⽤默认的切⽚策略,以⽂件为单位,先判断⽂件是否可切,如果可切,循环以⽚⼤⼩为单位切⽚!
不可切,整个⽂件作为1⽚!
RR : LineRecordReader(将⼀⾏封装为⼀个key-value)
LongWritable key: ⾏的偏移量
Text value: ⾏的内容
2. NLineInputFormat
场景: 适合⼀⾏的内容特别多,在Map阶段map()处理的逻辑⾮常复杂!
根据⾏数⾃定义切⽚的⼤⼩!
切⽚: 可以设置以⽂件为单位,每N⾏作为⼀个切⽚!
RR : LineRecordReader(将⼀⾏封装为⼀个key-value)
LongWritable key: ⾏的偏移量
Text value: ⾏的内容
3. KeyValueTextInputFormat
场景: ⼀⾏的内容的格式 为 key-value,⽅便地将key,value拆分封装
切⽚: 采⽤默认的切⽚策略,以⽂件为单位,先判断⽂件是否可切,如果可切,循环以⽚⼤⼩为单位切⽚!
不可切,整个⽂件作为1⽚!
RR : KeyValueRecordReader(将⼀⾏封装为⼀个key-value)
Text key: ⾏的分隔符之前的部分内容
Text value: ⾏的分隔符之后的部分内容
4. CombineTextInputFormat
场景: 输⼊⽬录中⼩⽂件过多,可以将多个⼩⽂件设置到⼀个切⽚中!
切⽚: ①根据maxSize对每个⽂件进⾏逻辑切⽚,切分为若⼲part
②将多个part组合,知道超过maxSize,这些part作为⼀个切⽚
RR : LineRecordReader(将⼀⾏封装为⼀个key-value)
LongWritable key: ⾏的偏移量
Text value: ⾏的内容
这⾥以FileInputFormat源码来看怎么计算切⽚⼤⼩的:
long splitSize = (minSize, (maxSize, blockSize));//切⽚⼤⼩计算规则
maxSize值为: e默认为Long的最⼤值;
minSize值为: e默认为1L;
blockSize默认为块⼤⼩,hadoop2默认为128M;
根据默认的策略策略,可以调整切⽚的⼤⼩:
调整切⽚⼤⼩⼤于块⼤⼩:调整minSize ⼤于块⼤⼩
调整切⽚⼤⼩⼩于块⼤⼩:调整maxSize ⼩于块⼤⼩
默认切⽚⼤⼩等于块⼤⼩,主要为了减少在运⾏MR时,⼤量的跨机器读取切⽚内容带来额外的⽹络IO
⾃定义输⼊格式,根据⾃⼰业务重写createRecordReader及isSplitable⽅法:
/*
* 1. 改变切⽚策略,⼀个⽂件固定切1⽚,通过指定⽂件不可切
* 2. 提供RR ,这个RR读取切⽚的⽂件名作为key,读取切⽚的内容封装到bytes作为value
*/
public class MyInputFormat extends FileInputFormat {
@Override
public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
return new MyRecordReader();
}
// 重写isSplitable
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
}
Map阶段
编织规范:⾃⼰map类继承Mapper类,重写map⽅法:
/**
* 单词统计map
*/
public class wcMap extends Mapper
private Text out_key=new Text();
private IntWritable out_value=new IntWritable(1);
map阶段流程图:
(1)Read 阶段:MapTask 通过⽤户编写的 RecordReader,
从输⼊ InputSplit 中解析出 ⼀个个 key/value。
(2)Map 阶段:该节点主要是将解析出的 key/value 交给⽤
户编写 map()函数处理,并 产⽣⼀系列新的 key/value。
(3)Collect 收集阶段:在⽤户编写 map()函数中,当数据处
理完成后,⼀般会调⽤ t()输出结果。在
该函数内部,它会将⽣成的 key/value 分区(调⽤ Partitioner),
并写⼊⼀个环形内存缓冲区中。
(4)Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会
将数据写到本地磁盘上, ⽣成⼀个临时⽂件。需要注意的是,将
数据写⼊本地磁盘之前,先要对数据进⾏⼀次本地排 序,并在必
要时对数据进⾏合并、压缩等操作。 溢写阶段详情:
步骤 1:利⽤快速排序算法对缓存区内的数据进⾏排序,排序
⽅式是,先按照分区编号 Partition 进⾏排序,然后按照 key
进⾏排序。这样,经过排序后,数据以分区为单位聚集在 ⼀起,
且同⼀分区内所有数据按照 key 有序。
步骤 2:按照分区编号由⼩到⼤依次将每个分区中的数据写⼊
任务⼯作⽬录下的临时⽂ 件 output/(N 表⽰当前溢写次数)中。
如果⽤户设置了 Combiner,则写⼊⽂件之 前,对每个分区中的数据进
⾏⼀次聚集操作。
步骤 3:将分区数据的元信息写到内存索引数据结构 SpillRecord 中,其
中每个分区的元信息包括在临时⽂件中的偏移量、压缩前数据⼤⼩和压缩
后数据⼤⼩。如果当前内存索引⼤ ⼩超过 1MB,则将内存索引写到⽂件
output/ 中。
(5)Combine 阶段:当所有数据处理完成后,MapTask 对所有临时⽂件进⾏
⼀次合并, 以确保最终只会⽣成⼀个数据⽂件。 当所有数据处理完后,MapTask
会将所有临时⽂件合并成⼀个⼤⽂件,并保存到⽂件 output/ 中,同时⽣成
相应的索引⽂件 output/。 在进⾏⽂件合并过程中,MapTask 以分区
为单位进⾏合并。对于某个分区,它将采⽤多 轮递归合并的⽅式。每轮合并
(默认 10)个⽂件,并将产⽣的⽂件重新加⼊待 合并列表中,
对⽂件排序后,重复以上过程,直到最终得到⼀个⼤⽂件。 让每个 MapTask
最终只⽣成⼀个数据⽂件,可避免同时打开⼤量⽂件和同时读取⼤量 ⼩⽂件
产⽣的随机读取带来的开销。
Shuffle阶段
发⽣阶段在mapTask跟reduceTask之间,具体流程如图:
1. 阶段定义
MapTask: map--------sort
map: ()中将输出的key-value写出之前
sort: ()中将输出的key-value写出之后
2. sort
①当在map()将输出的key-value写出后,记录是会被Partitioner计算⼀个分区号
②计算后,记录被收集到⼀个缓冲区(MapOutPutBuffer)
③收集线程负责向缓冲区收集数据,缓冲区初始值为100M,当使⽤到80%阈值,
唤醒溢写线程,溢写线程会将缓冲区已经收集的数据溢写到磁盘
④在溢写前,会对缓冲区中的数据进⾏排序(快速排序),在排序时,只通过⽐较key进⾏排序
⑤排序后,按照分区,依次将数据写⼊到磁盘的临时⽂件的若⼲分区中
⑥每次溢写都会⽣成⼀个临时⽂件,当所有的数据都溢写完成之后,会将所有的临时⽂件⽚段合并为⼀个总的最终的⽂件
⑦在合并时,将所有的临时⽂件的相同分区的数据,进⾏合并,合并后再对所有的数据进⾏排序(归并排序)
⑧最终⽣成⼀个结果⽂件,这个⽂件分为若⼲分区,每个分区的数据已经按照key进⾏了排序,等待reduceTask的shuffle线程来拷贝数据
这⾥有⼏个组件值得了解,我们可以根据业务需要重写:
⼀、分区
1. 分区是在MapTask中通过Partitioner来计算分区号
2. Partitioner的初始化
①计算总的分区数partitions,取决于⽤户设置的reduceTask的数量
②partitions>1,默认尝试获取⽤户设置Partitioner,如果⽤户没有定义,那么会使⽤HashPartitioner
HashPartitioner根据key的hashcode进⾏计算,相同的key以及hash值相同的key会分到⼀个区
②partitions<=1,默认初始化⼀个Partitioner,这个Partitioner计算的所有的区号都为0
3. 注意
通常在Job的设置中,希望将数据分为⼏个区,就设置reduceTask的数量为对应的数量!
partitions=设置的reduceTask的数量,0<=分区器计算的区号 < partitions
⼆、排序
1. 排序是MR框架在shuffle阶段⾃动进⾏
2. 在MapTask端发⽣两次排序,在排序时,⽤户唯⼀可以控制的是提供⼀个key的⽐较器
3. 设置key的⽐较器
①⽤户可以⾃定义key的⽐较器,⾃定义的⽐较器必须是⼀个RawComparator类型的类,
重点是实现compareTo()⽅法
②⽤户通过key,让key实现WritableComparable接⼝,系统⾃动提供⼀个⽐较器,
重点是实现compareTo()⽅法
4. 排序的分类
全排序: 对所有的数据进⾏排序,指⽣成⼀个结果⽂件,这个结果⽂件整体有序
部分排序: 最终⽣成N个结果⽂件,每个⽂件内部整体有序
⼆次排序: 在对key进⾏⽐较时,⽐较的条件为多个
辅助排序: 在进⼊reduce阶段时,通过⽐较key是否相同,将相同的key分为1组
三、分组
1. 分组通过分组⽐较器,对进⼊reduce的key进⾏对⽐,key相同的分为⼀组,⼀次性进⼊Reducer,被调⽤reduce⽅法
2. 分组⽐较器的设置
①⽤户可以⾃定义key的分组⽐较器,⾃定义的⽐较器必须是⼀个RawComparator类型的类
重点是实现compareTo()⽅法
②如果没有设置key的分组⽐较器,默认采取在Map阶段排序时,key的⽐较器
3. Reduce的细节
在进⼊reduce(),Reducer会⾃动实例化⼀个key,value,这个key-value在Redcuer⼯作期间,⼀直是⼀个不变的对象
每次迭代,reducer会把读到的新的key-value的属性值赋值给key-value!
四、Combiner
1. Combiner的本质是⼀个Reducer,对key-value进⾏合并
2. Combiner 和 Reducer的区别
Combiner在shuffle阶段运⾏
Reducer在reduce阶段运⾏
3. Combiner适⽤于 +,-操作,不适合 *,/操作
4. Combiner的运⾏时机
在MapTask端: ①每次从缓冲区将数据溢写到磁盘之前,如果设置了Combiner,数据会被Combine之后
再溢写到磁盘
②在MapTask最后的merge阶段,如果溢写的⽚段数据>=3,,如果设置了Combiner,在⽣成
最终的数据时,也会先执⾏Combine之后再溢写到磁盘
在ReduceTask端: ③shuffle线程从多个MapTask读取同⼀个分区的数据,之后进⾏合并,在合并时,如果
shuffle所使⽤的内存不够,也会将部分数据临时溢写到磁盘,此时如果设置了Combiner,数据会被Combine之后
再溢写到磁盘
5. Combiner的本质⽬的是为了减少MR在运⾏期间的磁盘IO和⽹络IO
Reduce阶段
reduce业务流程:
(1)Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝⼀⽚数据,
并针对某⼀⽚数 据,如果其⼤⼩超过⼀定阈值,则写到磁盘上,
否则直接放到内存中。
(2)Merge 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个
后台线程对内存和 磁盘上的⽂件进⾏合并,以防⽌内存使⽤过多或
磁盘上⽂件过多。
(3)Sort 阶段:按照 MapReduce 语义,⽤户编写 reduce()函数输⼊
数据是按 key 进⾏ 聚集的⼀组数据。为了将 key 相同的数据聚在⼀起,
Hadoop 采⽤了基于排序的策略。由于 各个 MapTask 已经实现对⾃⼰
的处理结果进⾏了局部排序,因此,ReduceTask 只需对所有 数据进⾏⼀次归并排序即可。
(4)Reduce 阶段:reduce()函数将计算结果写到 HDFS 上。
设置 ReduceTask 并⾏度(个数) ReduceTask 的并⾏度同样影响整个 job
的执⾏并发度和执⾏效率,但与 MapTask 的并发数由切⽚数决定不同,
Reducetask 数量的决定是可以直接⼿动设置: //默认值是 1,⼿动设置为 4
ReduceTasks(4);
注意事项:
测试 ReduceTask 多少合适。 (1)实验环境:1 个 Master 节点,16 个 Slave 节点:
OutputFormat阶段
使⽤,在job阶段设置:
putFormatClass();
这⾥如果没有reduce阶段,会在map阶段根据⾃定义输出格式输出;
如果设置了reduce阶段,会在reduce阶段输出根据⾃定义输出格式输出,
map阶段输出是按照MapTask的NewOutputCollector来输出;


发布评论