2023年11月26日发(作者:)

HadoopMapReduce详细⼯作流程

Hadoop MapReduce全流程分析总结

InputFormat阶段

根据输⼊格式:InputFormat的实现类

①切⽚, getSplit()

②使⽤输⼊格式的RR读取数据, createRecordReader()

具体实现类有:

1. 默认的TextInputFormat

场景: 普通的⽂本格式数据来源

切⽚: 采⽤默认的切⽚策略,以⽂件为单位,先判断⽂件是否可切,如果可切,循环以⽚⼤⼩为单位切⽚!

不可切,整个⽂件作为1⽚!

RR : LineRecordReader(将⼀⾏封装为⼀个key-value)

LongWritable key: ⾏的偏移量

Text value: ⾏的内容

2. NLineInputFormat

场景: 适合⼀⾏的内容特别多,在Map阶段map()处理的逻辑⾮常复杂!

根据⾏数⾃定义切⽚的⼤⼩!

切⽚: 可以设置以⽂件为单位,每N⾏作为⼀个切⽚!

RR LineRecordReader(将⼀⾏封装为⼀个key-value

LongWritable key: ⾏的偏移量

Text value: ⾏的内容

3. KeyValueTextInputFormat

场景: ⼀⾏的内容的格式 为 key-value,⽅便地将key,value拆分封装

切⽚: 采⽤默认的切⽚策略,以⽂件为单位,先判断⽂件是否可切,如果可切,循环以⽚⼤⼩为单位切⽚!

不可切,整个⽂件作为1⽚!

RR KeyValueRecordReader(将⼀⾏封装为⼀个key-value

Text key: ⾏的分隔符之前的部分内容

Text value: ⾏的分隔符之后的部分内容

4. CombineTextInputFormat

场景: 输⼊⽬录中⼩⽂件过多,可以将多个⼩⽂件设置到⼀个切⽚中!

切⽚: 根据maxSize对每个⽂件进⾏逻辑切⽚,切分为若⼲part

将多个part组合,知道超过maxSize,这些part作为⼀个切⽚

RR LineRecordReader(将⼀⾏封装为⼀个key-value

LongWritable key: ⾏的偏移量

Text value: ⾏的内容

这⾥以FileInputFormat源码来看怎么计算切⽚⼤⼩的:

long splitSize = (minSize, (maxSize, blockSize));//切⽚⼤⼩计算规则

maxSize值为: e默认为Long的最⼤值;

minSize值为: e默认为1L

blockSize默认为块⼤⼩,hadoop2默认为128M

根据默认的策略策略,可以调整切⽚的⼤⼩:

调整切⽚⼤⼩⼤于块⼤⼩:调整minSize ⼤于块⼤⼩

调整切⽚⼤⼩⼩于块⼤⼩:调整maxSize ⼩于块⼤⼩

默认切⽚⼤⼩等于块⼤⼩,主要为了减少在运⾏MR时,⼤量的跨机器读取切⽚内容带来额外的⽹络IO

⾃定义输⼊格式,根据⾃⼰业务重写createRecordReader及isSplitable⽅法:

/*

* 1. 改变切⽚策略,⼀个⽂件固定切1⽚,通过指定⽂件不可切

* 2. 提供RR ,这个RR读取切⽚的⽂件名作为key,读取切⽚的内容封装到bytes作为value

*/

public class MyInputFormat extends FileInputFormat {

@Override

public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context)

throws IOException, InterruptedException {

return new MyRecordReader();

}

// 重写isSplitable

@Override

protected boolean isSplitable(JobContext context, Path filename) {

return false;

}

}

Map阶段

编织规范:⾃⼰map类继承Mapper类,重写map⽅法:

/**

* 单词统计map

*/

public class wcMap extends Mapper {

private Text out_key=new Text();

private IntWritable out_value=new IntWritable(1);

map阶段流程图:

1Read 阶段:MapTask 通过⽤户编写的 RecordReader

从输⼊ InputSplit 中解析出 ⼀个个 key/value

2Map 阶段:该节点主要是将解析出的 key/value 交给⽤

户编写 map()函数处理,并 产⽣⼀系列新的 key/value

3Collect 收集阶段:在⽤户编写 map()函数中,当数据处

理完成后,⼀般会调⽤ t()输出结果。在

该函数内部,它会将⽣成的 key/value 分区(调⽤ Partitioner),

并写⼊⼀个环形内存缓冲区中。

4Spill 阶段:即溢写,当环形缓冲区满后,MapReduce

将数据写到本地磁盘上, ⽣成⼀个临时⽂件。需要注意的是,将

数据写⼊本地磁盘之前,先要对数据进⾏⼀次本地排 序,并在必

要时对数据进⾏合并、压缩等操作。 溢写阶段详情:

步骤 1:利⽤快速排序算法对缓存区内的数据进⾏排序,排序

⽅式是,先按照分区编号 Partition 进⾏排序,然后按照 key

进⾏排序。这样,经过排序后,数据以分区为单位聚集在 ⼀起,

且同⼀分区内所有数据按照 key 有序。

步骤 2:按照分区编号由⼩到⼤依次将每个分区中的数据写⼊

任务⼯作⽬录下的临时⽂ output/N 表⽰当前溢写次数)中。

如果⽤户设置了 Combiner,则写⼊⽂件之 前,对每个分区中的数据进

⾏⼀次聚集操作。

步骤 3:将分区数据的元信息写到内存索引数据结构 SpillRecord 中,其

中每个分区的元信息包括在临时⽂件中的偏移量、压缩前数据⼤⼩和压缩

后数据⼤⼩。如果当前内存索引⼤ ⼩超过 1MB,则将内存索引写到⽂件

output/ 中。

5Combine 阶段:当所有数据处理完成后,MapTask 对所有临时⽂件进⾏

⼀次合并, 以确保最终只会⽣成⼀个数据⽂件。 当所有数据处理完后,MapTask

会将所有临时⽂件合并成⼀个⼤⽂件,并保存到⽂件 output/ 中,同时⽣成

相应的索引⽂件 output/ 在进⾏⽂件合并过程中,MapTask 以分区

为单位进⾏合并。对于某个分区,它将采⽤多 轮递归合并的⽅式。每轮合并

(默认 10)个⽂件,并将产⽣的⽂件重新加⼊待 合并列表中,

对⽂件排序后,重复以上过程,直到最终得到⼀个⼤⽂件。 让每个 MapTask

最终只⽣成⼀个数据⽂件,可避免同时打开⼤量⽂件和同时读取⼤量 ⼩⽂件

产⽣的随机读取带来的开销。

Shuffle阶段

发⽣阶段在mapTask跟reduceTask之间,具体流程如图:

1. 阶段定义

MapTask: map--------sort

map: ()中将输出的key-value写出之前

sort: ()中将输出的key-value写出之后

2. sort

当在map()将输出的key-value写出后,记录是会被Partitioner计算⼀个分区号

计算后,记录被收集到⼀个缓冲区(MapOutPutBuffer)

收集线程负责向缓冲区收集数据,缓冲区初始值为100M,当使⽤到80%阈值,

唤醒溢写线程,溢写线程会将缓冲区已经收集的数据溢写到磁盘

在溢写前,会对缓冲区中的数据进⾏排序(快速排序),在排序时,只通过⽐较key进⾏排序

排序后,按照分区,依次将数据写⼊到磁盘的临时⽂件的若⼲分区中

每次溢写都会⽣成⼀个临时⽂件,当所有的数据都溢写完成之后,会将所有的临时⽂件⽚段合并为⼀个总的最终的⽂件

在合并时,将所有的临时⽂件的相同分区的数据,进⾏合并,合并后再对所有的数据进⾏排序(归并排序)

最终⽣成⼀个结果⽂件,这个⽂件分为若⼲分区,每个分区的数据已经按照key进⾏了排序,等待reduceTaskshuffle线程来拷贝数据

这⾥有⼏个组件值得了解,我们可以根据业务需要重写:

⼀、分区

1. 分区是在MapTask中通过Partitioner来计算分区号

2. Partitioner的初始化

计算总的分区数partitions,取决于⽤户设置的reduceTask的数量

partitions>1,默认尝试获取⽤户设置Partitioner,如果⽤户没有定义,那么会使⽤HashPartitioner

HashPartitioner根据keyhashcode进⾏计算,相同的key以及hash值相同的key会分到⼀个区

partitions<=1,默认初始化⼀个Partitioner,这个Partitioner计算的所有的区号都为0

3. 注意

通常在Job的设置中,希望将数据分为⼏个区,就设置reduceTask的数量为对应的数量!

partitions=设置的reduceTask的数量,0<=分区器计算的区号 < partitions

⼆、排序

1. 排序是MR框架在shuffle阶段⾃动进⾏

2. MapTask端发⽣两次排序,在排序时,⽤户唯⼀可以控制的是提供⼀个key的⽐较器

3. 设置key的⽐较器

⽤户可以⾃定义key的⽐较器,⾃定义的⽐较器必须是⼀个RawComparator类型的类,

重点是实现compareTo()⽅法

⽤户通过key,让key实现WritableComparable接⼝,系统⾃动提供⼀个⽐较器,

重点是实现compareTo()⽅法

4. 排序的分类

全排序: 对所有的数据进⾏排序,指⽣成⼀个结果⽂件,这个结果⽂件整体有序

部分排序: 最终⽣成N个结果⽂件,每个⽂件内部整体有序

⼆次排序: 在对key进⾏⽐较时,⽐较的条件为多个

辅助排序: 在进⼊reduce阶段时,通过⽐较key是否相同,将相同的key分为1

三、分组

1. 分组通过分组⽐较器,对进⼊reducekey进⾏对⽐,key相同的分为⼀组,⼀次性进⼊Reducer,被调⽤reduce⽅法

2. 分组⽐较器的设置

⽤户可以⾃定义key的分组⽐较器,⾃定义的⽐较器必须是⼀个RawComparator类型的类

重点是实现compareTo()⽅法

如果没有设置key的分组⽐较器,默认采取在Map阶段排序时,key的⽐较器

3. Reduce的细节

在进⼊reduce()Reducer会⾃动实例化⼀个key,value,这个key-valueRedcuer⼯作期间,⼀直是⼀个不变的对象

每次迭代,reducer会把读到的新的key-value的属性值赋值给key-value!

四、Combiner

1. Combiner的本质是⼀个Reducer,对key-value进⾏合并

2. Combiner Reducer的区别

Combinershuffle阶段运⾏

Reducerreduce阶段运⾏

3. Combiner适⽤于 +-操作,不适合 */操作

4. Combiner的运⾏时机

MapTask端: 每次从缓冲区将数据溢写到磁盘之前,如果设置了Combiner,数据会被Combine之后

再溢写到磁盘

MapTask最后的merge阶段,如果溢写的⽚段数据>=3,,如果设置了Combiner,在⽣成

最终的数据时,也会先执⾏Combine之后再溢写到磁盘

ReduceTask端: shuffle线程从多个MapTask读取同⼀个分区的数据,之后进⾏合并,在合并时,如果

shuffle所使⽤的内存不够,也会将部分数据临时溢写到磁盘,此时如果设置了Combiner,数据会被Combine之后

再溢写到磁盘

5. Combiner的本质⽬的是为了减少MR在运⾏期间的磁盘IO和⽹络IO

Reduce阶段

reduce业务流程:

1Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝⼀⽚数据,

并针对某⼀⽚数 据,如果其⼤⼩超过⼀定阈值,则写到磁盘上,

否则直接放到内存中。

2Merge 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个

后台线程对内存和 磁盘上的⽂件进⾏合并,以防⽌内存使⽤过多或

磁盘上⽂件过多。

3Sort 阶段:按照 MapReduce 语义,⽤户编写 reduce()函数输⼊

数据是按 key 进⾏ 聚集的⼀组数据。为了将 key 相同的数据聚在⼀起,

Hadoop 采⽤了基于排序的策略。由于 各个 MapTask 已经实现对⾃⼰

的处理结果进⾏了局部排序,因此,ReduceTask 只需对所有 数据进⾏⼀次归并排序即可。

4Reduce 阶段:reduce()函数将计算结果写到 HDFS 上。

设置 ReduceTask 并⾏度(个数) ReduceTask 的并⾏度同样影响整个 job

的执⾏并发度和执⾏效率,但与 MapTask 的并发数由切⽚数决定不同,

Reducetask 数量的决定是可以直接⼿动设置: //默认值是 1,⼿动设置为 4

ReduceTasks(4);

注意事项:

测试 ReduceTask 多少合适。 (1)实验环境:1 个 Master 节点,16 个 Slave 节点:

OutputFormat阶段

使⽤,在job阶段设置:

putFormatClass();

这⾥如果没有reduce阶段,会在map阶段根据⾃定义输出格式输出;

如果设置了reduce阶段,会在reduce阶段输出根据⾃定义输出格式输出,

map阶段输出是按照MapTask的NewOutputCollector来输出;