一、需求
自定义输入格式 完成统计任务 输出多个文件
输入数据:5个网站的 每天电视剧的 播放量 收藏数 评论数 踩数 赞数
输出数据:按网站类别 统计每个电视剧的每个指标的总量
任务目标:自定义输入格式 完成统计任务 输出多个文件
二、数据
部分数据
三、思路
第一步:定义一个电视剧热度数据的bean。
第二步:定义一个读取热度数据的InputFormat类。
第三步:写MapReduce统计程序
第四步:上传tvplay.txt数据集到HDFS,并运行程序
四、代码
1.利用WritableComparable接口,自定义一个TVWritable类,实现WritableComparable类,将各个参数封装起来,便于计算。
package com.pc.hadoop.pc.tv;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class TVWritable implements WritableComparable{//定义5个成员变量private int view;private int collection;private int comment;private int diss;private int up;//构造函数public TVWritable(){}//定义一个set方法,用this关键字对封装好的数据进行引用public void set(int view,int collection,int comment, int diss,int up){this.view = view;this.collection = collection;this.comment = comment;this.diss = diss;this.up = up;}//使用get和set对封装好的数据进行存取public int getView(){return view;}public void setView(int view){this.view = view;}public int getCollection(){return collection;}public void setCollection(int collection){this.collection = collection;}public int getComment(){return comment;}public void setComment(int comment){this.comment = comment;}public int getDiss(){return diss;}public void setDiss(int diss){this.diss = diss;}public int getUp(){return up;}public void setUp(int up){this.up = up;}//实现WritableComparaqble的redafields()方法,以便该数据能被序列化后完成网络传输或文件输入。@Overridepublic void readFields(DataInput in) throws IOException {// TODO Auto-generated method stubview = in.readInt();collection = in.readInt();comment = in.readInt();diss = in.readInt();up = in.readInt();}//实现WritableComparaqble的write()方法,以便该数据能被反序列化后完成网络传输或文件输入。@Overridepublic void write(DataOutput out) throws IOException {// TODO Auto-generated method stubout.writeInt(view);out.writeInt(collection);out.writeInt(comment);out.writeInt(diss);out.writeInt(up);}//使用compareTo对其中的数据进行比较@Overridepublic int compareTo(Object o) {// TODO Auto-generated method stubreturn 0;}}
2.自定义一个TVInputFormat类取继承FileInputFormat文件输入格式这个父类,然后对createRecordReader()方法进行重写,其实质则是重写TVRecordReader()这个方法,得到其返回值,利用TVRecordReader()这个方法去继承RecordReader()这个方法。
package com.pc.hadoop.pc.tv;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;public class TVInputFormat extends FileInputFormat<Text,TVWritable>
{protected boolean isSplitable(){return false;}@Overridepublic RecordReader<Text, TVWritable> createRecordReader(InputSplit inputsplit, TaskAttemptContext context) throws IOException, InterruptedException {// TODO Auto-generated method stubreturn new TVRecordReader();}public static class TVRecordReader extends RecordReader<Text,TVWritable>{public LineReader in; //自定义行读取器public Text lineKey; //声明key类型public TVWritable lineValue; //自定义valuepublic Text line; //每行数据类型//@Overridepublic void close() throws IOException {// TODO Auto-generated method stubif(in != null){in.close();}}//获取当前key@Overridepublic Text getCurrentKey() throws IOException, InterruptedException {// TODO Auto-generated method stubreturn lineKey;}//获取当前value@Overridepublic TVWritable getCurrentValue() throws IOException, InterruptedException {// TODO Auto-generated method stubreturn lineValue;}//获取当前进程@Overridepublic float getProgress() throws IOException, InterruptedException {// TODO Auto-generated method stubreturn 0;}//初始化@Overridepublic void initialize(InputSplit inputsplit, TaskAttemptContext context) throws IOException, InterruptedException {// TODO Auto-generated method stubFileSplit split = (FileSplit) inputsplit;//获取分片内容Configuration job = context.getConfiguration();//读取配置信息Path file = split.getPath();//获取路径FileSystem fs = file.getFileSystem(job);//获取文件系统FSDataInputStream filein = fs.open(file);//通过文件系统打开文件,对文件进行读取in = new LineReader(filein,job);lineKey = new Text();//新建一个Text实例作为自定义输入格式的keylineValue = new TVWritable();line = new Text();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {// TODO Auto-generated method stubint lineSize = in.readLine(line);if(lineSize == 0)return false;//读取每行数据解数组iString[] i = line.toString().split("\t");if(i.length != 7){throw new IOException("Invalid record received");}//自定义key和value的值lineKey.set(i[0]+"\t"+i[1]);//电视剧名称和所属视频网站lineValue.set(Integer.parseInt(i[2].trim()), Integer.parseInt(i[3].trim()), Integer.parseInt(i[4].trim()), Integer.parseInt(i[5].trim()), Integer.parseInt(i[6].trim() ));return true;}}}
3.使用MapperReducer对输入的数据进行进行相应的处理输出想要得到的结果。
在reduce在定义一个多输出的对象MultipleOutputs
/*** @input Params Text TvPlayData* @output Params Text TvPlayData* @author yangjun* @function 直接输出*/public static class TVPlayMapper extendsMapper<Text, TVWritable, Text, TVWritable> {@Overrideprotected void map(Text key, TVWritable value, Context context)throws IOException, InterruptedException {context.write(key, value);}}/*** @input Params Text TvPlayData* @output Params Text Text* @author yangjun* @fuction 统计每部电视剧的 点播数 收藏数等 按source输出到不同文件夹下*/public static class TVPlayReducer extendsReducer<Text, TVWritable, Text, Text> {private Text m_key = new Text();private Text m_value = new Text();private MultipleOutputs<Text, Text> mos;protected void setup(Context context) throws IOException,InterruptedException {mos = new MultipleOutputs<Text, Text>(context);}//将 MultipleOutputs 的初始化放在 setup() 中,因为在 setup() 只会被调用一次
//定义reduce() 方法里的 multipleOutputs.write(…)。你需要把以前的 context.write(…) 替换成现在的这个protected void reduce(Text Key, Iterable<TVWritable> Values,Context context) throws IOException, InterruptedException {int view = 0;int collection = 0;int comment = 0;int diss = 0;int up = 0;for (TVWritable a:Values) {view += a.getView();collection += a.getCollection();comment +=a.getComment();diss += a.getDiss();up += a.getUp();}//tvname sourceString[] records = Key.toString().split("\t");// 1优酷2搜狐3土豆4爱奇艺5迅雷看看String source = records[1];// 媒体类别m_key.set(records[0]);m_value.set(view+"\t"+collection+"\t"+comment+"\t"+diss+"\t"+up);if (source.equals("1")) {mos.write("youku", m_key, m_value);} else if (source.equals("2")) {mos.write("souhu", m_key, m_value);} else if (source.equals("3")) {mos.write("tudou", m_key, m_value);} else if (source.equals("4")) {mos.write("aiqiyi", m_key, m_value);} else if (source.equals("5")) {mos.write("xunlei", m_key, m_value);}}protected void cleanup(Context context) throws IOException,InterruptedException {mos.close(); //关闭 MultipleOutputs,也就是关闭 RecordWriter,并且是一堆 RecordWriter,因为这里会有很多 reduce 被调用。}}
4 运行run函数对作业进行运行,并自定义输出MultipleOutputs函数调用addNameoutput方法对其进行设置多路径的输出。
@Overridepublic int run(String[] args) throws Exception {Configuration conf = new Configuration();// 配置文件对象Path mypath = new Path(args[1]);FileSystem hdfs = mypath.getFileSystem(conf);// 创建输出路径if (hdfs.isDirectory(mypath)) {hdfs.delete(mypath, true);}Job job = new Job(conf, "tvplay");// 构造任务job.setJarByClass(TVplay.class);// 设置主类job.setMapperClass(TVPlayMapper.class);// 设置Mapperjob.setMapOutputKeyClass(Text.class);// key输出类型job.setMapOutputValueClass(TVWritable.class);// value输出类型job.setInputFormatClass(TVInputFormat.class);//自定义输入格式job.setReducerClass(TVPlayReducer.class);// 设置Reducerjob.setOutputKeyClass(Text.class);// reduce key类型job.setOutputValueClass(Text.class);// reduce value类型// 自定义文件输出格式,通过路径名(pathname)来指定输出路径MultipleOutputs.addNamedOutput(job, "youku", TextOutputFormat.class,Text.class, Text.class);MultipleOutputs.addNamedOutput(job, "souhu", TextOutputFormat.class,Text.class, Text.class);MultipleOutputs.addNamedOutput(job, "tudou", TextOutputFormat.class,Text.class, Text.class);MultipleOutputs.addNamedOutput(job, "aiqiyi", TextOutputFormat.class,Text.class, Text.class);MultipleOutputs.addNamedOutput(job, "xunlei", TextOutputFormat.class,Text.class, Text.class);FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径job.waitForCompletion(true);return 0;}public static void main(String[] args) throws Exception {String[] args0 = { "hdfs://pc1:9000/home/hadoop/tvplay/tvplay.txt","hdfs://pc1:9000/home/hadoop/tvplay/out/" };int ec = ToolRunner.run(new Configuration(), new TVplay(), args0);//public static int run(Configuration conf,Tool tool, String[] args),可以在job运行的时候指定配置文件或其他参数//这个方法调用tool的run(String[])方法,并使用conf中的参数,以及args中的参数,而args一般来源于命令行。System.exit(ec);}
五、运行
在myeclipse上运行:
1.创建目录、home/hadoop/tvplay,将数据文件上传至目录下
2.右键->run as->run on hadoop
控制台显示信息
右键refresh
部分结果
.在hdfs上运行:
1.修改args0的两个路径,或者删除运行结果的out文件夹
2.将三个java文件打包到本机,右键->export->JAR file
3.将jar包上传到hdfs的文件系统
4.运行程序
[hadoop@pc1 hadoop]$ bin/hadoop jar tvplay.jar com.pc.hadoop.pc.tv.TVplay /home/hadoop/tvplay/tvplay.txt /home/hadoop/tvplay/out
5.查看运行结果