前言
- 据说,自0.20.0版本开始,Hadoop同时提供了新旧两套MapReduce API,并在后续版本中也同时支持这两种API的使用。新版本MR API在旧的基础进行了扩展,也制定了新的split计算方式。新版本MR API在包org.apache.hadoop.mapreduce及其子包中,而旧版本MR API则在包org.apache.hadoop.mapred及其子包中。
- 本文主要从源码角度,简单谈谈新旧MR API中常用的FileInputFormat类(TextInputFormat的父类)中分片Split的计算方式,可以以此来确定每次MR的Mapper个数,默认情况下Mapper的个数即等于分片Split个数。
新旧MapReduce API的作业配置方式对比
旧API 作业配置实例:
JobConf job = new JobConf(new Configuration(), MyJob.class);
job.setJobName("myjob");
job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);
JobClient.runJob(job);
新API 作业配置实例:
Configuration conf = new Configuration();
Job job = new Job(conf, "myjob ");
job.setJarByClass(MyJob.class);
job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
分片Split计算过程
在提交MR Job时,Job在setUseNewAPI
方法中根据设置的Mapper类来确定设置参数"mapred.mapper.new-api"
的值,如果是使用的是旧版API,则此参数为false,否则为true;
然后JobSubmitter会根据参数"mapred.mapper.new-api"
的值来判断,是使用旧版还是新版API中的getSplits
方法来确定分片Split。
分片Split计算公式
由上一节图可知,最终SplitSize的计算,在旧版本API中由org.apache.hadoop.mapred.InputFormat#getSplits
方法决定,在新版本API中由org.apache.hadoop.mapreduce.InputFormat#getSplits
方法决定。
旧版本FileInputFormat中分片大小splitsSize计算公式
通过查看org.apache.hadoop.mapred.FileInputFormat中的getSplits方法,可以得出对应的SplitSize计算公式:
Math.max(minSize,Math.min(goalSize,blockSize))
PS:
goalSize
等于所有输入文件的总大小除以参数"mapreduce.job.maps"
的值(此参数默认值为1,可以在 mapred-site.xml 文件中配置);
blockSize
指的是当前文件在HDFS中存储时的BLOCK大小(可以通过在 hdfs-site.xml 文件中设置"dfs.block.size"
或者"dfs.blocksize"
参数来调整之后新生成的HDFS文件BLOCK大小);
minSize
为参数"mapreduce.input.fileinputformat.split.minsize"
的值(此参数默认值为1,可以在 mapred-site.xml 文件中配置)。
新版本FileInputFormat中分片大小splitsSize计算公式
通过查看org.apache.hadoop.mapreduce.lib.input.FileInputFormat中的getSplits方法,可以得出对应的SplitSize计算公式:
Math.max(minSize,Math.min(maxSize,blockSize))
PS:
minSize
等于参数"mapreduce.input.fileinputformat.split.minsize"
的值(此参数默认值为1,可以在 mapred-site.xml 文件中配置);
maxSize
为参数"mapreduce.input.fileinputformat.split.maxsize"
的值(此参数默认值为Long.MAX_VALUE
,即0x7fffffffffffffffL,可以在 mapred-site.xml 文件中配置);
blockSize
指的是当前文件在DFS中存储时的BLOCK大小(可以通过在 hdfs-site.xml 文件中设置"dfs.block.size"
或者"dfs.blocksize"
参数来调整之后新生成文件的BLOCK大小)。
分片规则
当一个文件的剩余未分片大小除以splitSize大于1.1(即超过splitSize的10%)时,则认为文件大小“溢出”,需要切割分成多个分片,每切割一次,剩余未分片大小减少splitSize;否则则将整个剩余未分片的内容作为单个分片。
FileInputFormat分片数量计算
对于输入路径中的每个文件都应用split分片规则来对其进行分片,每个文件至少切成一个Split(如果文件内容为空,大小为0,则创建空分片Split)。
Mapper个数的确定
默认情况下,Mapper的个数即等于最终的分片个数
旧版本API:
可以通过修改"mapreduce.job.maps"
、"mapreduce.input.fileinputformat.split.minsize"
、"dfs.blocksize"
参数来调整分片大小splitSize来调整最终分片个数,进而调整Mapper数量。
根据旧版API中splitSize计算公式,当goalSize小于blockSize,大于minSize时,Mapper数量大致等于参数"mapreduce.job.maps"
的值。
新版本API:
可以通过修改"mapreduce.input.fileinputformat.split.minsize"
、"mapreduce.input.fileinputformat.split.maxsize"
、"dfs.blocksize"
参数来调整分片大小splitSize来调整最终分片个数,进而调整Mapper数量。
根据新版API中splitSize计算公式,一般情况下通过适当减小"mapreduce.input.fileinputformat.split.maxsize"
的值,并使其置于minSize与blockSize之间,则可以减小分片大小splitSize,来增加最终分片个数,进而增加Mapper数量。