MapReduce

news/2024/5/16 15:29:42/文章来源:https://blog.csdn.net/shield911/article/details/128045266

4.1 MapReduce概述

2003年和2004年,Google公司在国际会议上分别发表了两篇关于Google分布式文件系统和MapReduce的论文,公布了Google的GFS和MapReduce的基本原理和主要设计思想。

4.1.1 MapReduce定义

MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
MapReduce采用"分而治之"思想,把对大规模数据集的操作,分发给一个主节点管理下的各个子节点共同完成,然后整合各个子结点的中间结果,得到最终的计算结果。简而言之,MapReduce就是"分散任务,汇总结果"

4.1.2 MapReduce优缺点

MapReduce擅长的场景:

  1. 易于编程

它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。

  1. 良好的扩展性

当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。

  1. 高容错性

MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。

  1. 能对PB级以上的海量数据离线处理

可以实现上千台服务器集群并发工作,提供数据处理能力。

MapReduce不擅长的场景:

  1. 实时计算

MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果。

  1. 流式计算

流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。

  1. DAG(有向图)计算

多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。 在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,降低使用性能。

4.2 MapReduce编程模型

从MapReduce自身的命名特点可以看出,MapReduce至少由两部分组成:Map和Reduce。Map理解为"映射",Reduce理解为"化简"。用户只需要编写map()和reduce()两个函数即可完成简单的分布式程序的设计。

  1. 输入(input)一个大文件,通过拆分(Split)之后,将其分为多个分片。
  2. 每个文件分片由单独的主机处理,这就是Map方法。
  3. 将各个主机计算的结果进行汇总并得到最终结果,这就是Reduce方法。

MapReduce编程模型如下的几个要点:

  1. 任务Job = Map + Reduce
  2. Map的输出是Reduce的输入。
  3. 所有的输入和输出都是<Key,Value>的形式。

<k1,v1>是Map的输入,<k2,v2>是Map的输出。
<k3,v3>是Reduce的输入,<k4,v4>是Reduce的输出。

  1. k2=k3, v3是一个集合,v3的元素就是v2,表示为v3=list(v2)。
  2. 所有的输入和输出的数据类型必须是Hadoop的数据类型,数据类型如下表:
    在这里插入图片描述

MapReduce要求<key, value>的key和value都要实现Writable接口,从而支持Hadoop的序列化和反序列化。上面的Hadoop的内置类型都实现了Writable接口,用户也必须对自定义的类实现Writable接口。

  1. MapReduce处理的数据都是HDFS的数据(或HBase)。

4.2.1 MapReduce编程模型概述

MapReduce大致工作流程:

主要分为八个步骤:

(1)对输入文件进行切片规划。

(2)启动相应数量的Map Task进程。

(3)调用InputFormat中的RecordReader,读一行数据并封装为<k1,v1>,并将<k1,v1>传给map函数。

(4)调用自定义的map函数计算,并输出<k2,v2>。

(5)收集map的输出,进行分区和排序(shuffle)。

(6)Reduce Task任务启动,并从map端拉取数据。Reduce Task获取到输入数据<k3,v3>。

(7)Reduce Task调用自定义的reduce函数进行计算,得到输出<k4,v4>。

(8)调用Outputformat的RecordWriter将结果数据输出。

4.2.2 MapReduce进程

一个完整的MapReduce程序在分布式运行时有三类实例进程:

(1)MrAppMaster:负责整个程序的过程调度及状态协调。

(2)MapTask:负责Map阶段的整个数据处理流程。

(3)ReduceTask:负责Reduce阶段的整个数据处理流程。

4.2.3 MapReduce编程规范

用户编写的程序分成三个部分:Mapper、Reducer和Driver。

  1. Mapper阶段
    1. 用户自定义的Mapper要继承自己的父类;
    2. Mapper的输入数据是k,v对的形式(k,v的类型可自定义);
    3. Mapper中的业务逻辑写在map()方法中;
    4. Mapper的输入数据是k,v对的形式(k,v的类型可自定义);
    5. map()方法(Map Task进程)对每一个<k, v>调用一次;
  2. Reducer阶段
    1. 用户自定义的Reducer要继承自己的父类;
    2. Reducer的输入数据类型对应Mapper的输出数据类型,也是k, v;
    3. Reducer的业务逻辑卸载reduce()方法中;
    4. Reduce Task进程对每一组相同k的<k, v>组调用一次reduce()方法。
  3. Driver阶段
    相当于YARN集群的客户端,用于提交整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象。

4.2.4 MapReduce编程实例

案例一:WordCount

WordCount案例是大数据并行计算的经典案例,它的主要功能是统计文本中每个单词出现的次数。

  1. 使用Hadoop自带的WordCount代码处理数据

    /input/data.txt是保存在HDSF中的根目录下的,手动创建这个文件,并输入如下内容:

    a goog beginning is half the battle
    where there is a will there is a way
    

    运行Hadoop自带的MapReduce程序,命令为:

    hadoop jar hadoop-mapreduce-examples-3.1.3.jar wordcount /input/data.txt /output/wordcount
    

map阶段
这个内容分片,送到Map阶段。输入是<k1,v1>,k1是一个偏移量,是一个长整数类型。偏移量即这一行内容首字母在整个文档中偏移的字节。

举个例子,文件内容(\n表示换行符):
12\n34
两行,那么第一行的偏移量是0,第二行的偏移量是3

而v1则是这一行的文件内容。
比如第一行的:a good beginning is half the battle。v1的类型则是字符串,对应Hadoop的类型是Text。
<k1,v1>的类型和含义即:
k1:偏移量 LongWritable
v1:数据 Text
v1要计算单词次数的,拆出来后则分别是:
a、good、 beginning、 is、 half、 the、 battle
对每个单词标记次数1,输出<k2,v2>。<k2,v2>的含义和类型基本也清楚了,即:
k2:单词 Text
v2:1 IntWritable
在这里插入图片描述

按MapReduce编程模型要点,k2=k3,v3是一个集合,v3的元素是v2。那<k3,v3>其实与<k2,v2>的类似和含义是相似的。
<k3,v3>是:
k3:单词 Text
v3:1 IntWritable

在Reduce阶段,<k3,v3>中,k3已经是某个单词,v3是这个单词出现的次数组成的集合。那对这个集合进行遍历,把次数相加,则得到这个单词出现的总数输出<k4,v4>。
<k4,v4>是:
k4:单词 Text
v4:频率 IntWritable
再将<k4,v4>再输出到HDFS。
这样,整个MapReduce的数据处理过程就结束了。

  1. 开发自己的WordCount程序

自动动手实现WordCount程序,分为以下几个步骤:
① 准备好开发环境
采用IDEA和Maven开发。
②添加依赖
在pom.xml文件中添加如下依赖:

<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency>
</dependencies>

③在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,填入如下内容:

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

④创建包名:com.li.mapreduce.wordcount
⑤编写程序
继承Mapper类实现自己的Mapper类,并重写map()方法

package com.li.mapreduce.wordcount;import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{Text k = new Text();IntWritable v = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 1 获取一行String line = value.toString();// 2 切割String[] words = line.split(" ");// 3 输出for (String word : words) {k.set(word);context.write(k, v);}}
}

继承Reducer类,实现自己的Reducer类,并重写reduce()方法:

package com.li.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{int sum;IntWritable v = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {// 1 累加求和sum = 0;for (IntWritable count : values) {sum += count.get();}// 2 输出v.set(sum);context.write(key,v);}
}

程序主入口类:


import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCountDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 1 获取配置信息以及获取job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2 关联本Driver程序的jarjob.setJarByClass(WordCountDriver.class);// 3 关联Mapper和Reducer的jarjob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);// 4 设置Mapper输出的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 5 设置最终输出kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 6 设置输入和输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7 提交jobboolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

⑥构建
通过执行mvn clean package对工程进行构建
a. 用maven打jar包,需要添加的打包插件依赖,添加在POM.xml文件中

<build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.6.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>

注意:如果工程上显示红叉。在项目上右键->maven->Reimport刷新即可。
b. 将程序打成jar包
在这里插入图片描述

c. 修改不带依赖的jar包名称为wc.jar,并使用远程工具将该jar包到Hadoop集群的/opt/module/hadoop-3.1.3路径。
⑦运行
启动集群

[li@hadoop102 bin]$ myhadoop.sh start

使用vim在wcinput目录中创建words.txt文件,文件内容为:

a good beginning is half the battle
where there is a will there is a way

将words.txt文件上传到hdfs的/input目录下

[li@hadoop102 wcinput]$ hadoop fs -put words.txt /input

执行WordCount程序

[li@hadoop102 hadoop-3.1.3]$ hadoop jar wc.jar com.li.mapreduce.wordcount.WordCountDriver /input/words.txt /output1

运行完毕后通过浏览器查看输出结果:
在这里插入图片描述
output1目录中存放了计算结果:
在这里插入图片描述
查看结果:
在这里插入图片描述
从结果可以看出不仅统计了单词出现的次数,还按照字母先后顺序进行了排序。

案例二:

统计各部门员工薪水总和
员工工资表,通过MapReduce来统计每个部门的工资总额
数据为三个表:emp.csv、dept.csv、sales.csv文件,存放地址:https://mp.csdn.net/mp_download/manage/download/UpDetailed
emp.csv表结构:

字段名类型说明
EMPNOINT员工ID
ENAMEVARCHAR(10)员工名称
JOBVARCHAR(9)员工职位
MGRINT直接领导的员工ID
HIREDATEDATE雇佣时间
SALINT工资
COMMINT奖金
DEPTNOCHAR(5)部门号

(1)分析:求每个部门的工资总额数据的处理过程
① Map阶段
输入数据<k1, v1>,k1是偏移量,类型固定为LongWritable, v1是每一行的文本,类型为Text。为了计算每个部门的工资总额,SAL(薪金)、DEPTNO(部门号)是主要字段。
可以把DEPTNO(部门号)作为k2,类型是IntWritable;SAL(薪金)作为v2,类型是IntWritable。
map()函数把输入按照“,”拆分,并取到SAL、DEPTNO的值。以DEPTNO为k2,SAL为v2,输出<k2,v2>
② Reduce阶段
输入<k3,v3>,k3和k2一样,是DEPTNO,类型是IntWritable,v3则是v2的集合,集合中每个元素的类型是IntWritable。对于reduce()函数,它已经取得输入的值<k3,v3>。因为v3即list(v2),对于v3这个集合进行遍历,把每个元素相加,即得到该部门的总薪金,作为v4。k4则还是DEPTNO,输出<k4, v4>。
(2)开发自己的MapReduce
Mapper类:
Reducer类:
程序主入口:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_36932.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【无百度智能云与实体经济“双向奔赴”: 一场1+1>2的双赢 标题】

实体经济&#xff0c;已经成为检验科技企业潜力的试金石。 在最近的财报季中&#xff0c;各家大厂的财报里“实体经济”都是关键字眼&#xff0c;已经成为各家心照不宣的共同目的地。 当然&#xff0c;条条大路通罗马。每一家的战略思路和打法都不一样。11月22日&#xff0c;…

DOTA-PEG-麦芽糖 maltose-DOTA 麦芽糖-四氮杂环十二烷四乙酸

DOTA-PEG-麦芽糖 maltose-DOTA 麦芽糖-四氮杂环十二烷四乙酸 PEG接枝修饰麦芽糖&#xff0c;麦芽糖-聚乙二醇-四氮杂环十二烷四乙酸&#xff0c;DOTA-PEG-麦芽糖 中文名称&#xff1a;麦芽糖-四氮杂环十二烷四乙酸 英文名称&#xff1a;maltose-DOTA 别称&#xff1a;DOTA修…

HCIA 访问控制列表ACL

一、前言 ACL又称访问控制列表&#xff0c;其实这个东西在很多地方都有用&#xff0c;可能名字不太一样但原理和功能都差不太多&#xff0c;比如服务器、防火墙&#xff0c;都有类似的东西&#xff0c;功能其实也就是“过滤”掉不想收到的数据包。为什么不想收到一些数据包呢&…

The Seven Tools of Causal Inference with Reflections on Machine Learning 文章解读

目录 THE THREE LAYER CAUSAL HIERARCHY. 4 THE SEVEN TOOLS OF CAUSAL INFERENCE (OR WHAT YOU CAN DO WITH A CAUSAL MODEL THAT YOU COULD NOT DO WITHOUT?) 7 Tool 1: Encoding Causal Assumptions – Transparency and Testability. 10 Tool 2: Do-calculus and the …

深入浅出基于HLS流媒体协议视频加密的解决方案

一套简单的基于HLS流媒体协议&#xff0c;使用video.js NodeJS FFmpeg等相关技术实现的m3u8tsaes128视频加密及播放的解决方案示例。 项目简介 起初是为了将工作中已有的基于Flash的视频播放器替换为不依赖Flash的HTML5视频播放器&#xff0c;主要使用了现有的video.js开源播…

【学习笔记25】JavaScript字符串的基本认识

JavaScript字符串的基本认识一、严格模式二、字符串1、字面量2、构造函数3、包装类型三、字符集&#xff08;了解&#xff09;1、ASCII&#xff1a;128个2、GBK国标码&#xff1a;前128位ASCII&#xff0c;从129开始为汉字3、unicode(万国码)四、字符串的length与下标一、严格模…

学弟:功能测试转测试开发容易吗?

最近看到后台留言问&#xff1a;功能测试转测试开发容易吗&#xff1f; 从这个问题&#xff0c;我能读出一些信息如下&#xff1a; 不知道你从事测试工作多久了&#xff0c;可以看出您特别羡慕测试开发工程师&#xff1b;你可能一直从事功能测试工作&#xff0c;工作模式或大…

排名预测系统

排名预测系统 题目链接 题目背景&#xff1a; 本题大模拟来自真实的需求&#xff0c;即&#xff1a;综合三场网络赛的名次&#xff0c;来预计一个正式队伍在所有正式参赛队伍中的名次以此来估计自己能不能拿牌。本来只有一道题&#xff0c;即为你们看到的T5&#xff0c;经过…

HTTP响应详解

目录 一.状态码 小结&#xff08;记住&#xff09; 二.认识响应正文&#xff08;body&#xff09; 三.如何构造http请求 一.状态码 是一个数字&#xff0c;这个数字描述了当前这次请求的状态&#xff08;成功&#xff0c;失败&#xff0c;失败的原因&#xff09; http的状态…

logcat日志文件分析

3:显示时间戳日志 adb logcat -v time > d:\文件\log.txt 日志文件分析 输出的日志格式由5部分组成 1:写下日志的时间 2:优先级&#xff0c;日志优先级从低到高分以下几种 v -verbose 最低级别&#xff0c;开发调试中的一些详细信息&#xff0c;仅在开发中使用&#…

以分割栅格为例实现FME模板的方案优化

一、利用FME分割栅格 &#xff08;一&#xff09;问题的产生 对于FME使用者来说&#xff0c;利用FME完成栅格的批量分割是一件极为平常且容易的事情。只需要输入栅格和确定分割方案就可以实现利用FME对栅格数据的分割&#xff0c;再配合FME的“扇出”功能&#xff0c;就能够实…

【colab安装mmcv-full和mmclassification】

colab安装mmcv-full和mmclassification改变cuda和pytorch版本查看torch版本安装mmcv-full安装mmclassification克隆并安装mmcls切换到目录源码安装检查mmcls版本改变cuda和pytorch版本 !pip --default-timeout1000 install torch1.9.0cu111 -f https://download.pytorch.org/w…

常用辅助类

CountDownLatch 应用场景&#xff1a;1.多线程任务汇总。2.多线程任务阻塞住&#xff0c;等待发令枪响&#xff0c;一起执行。 减法计数器 每次有线程调用&#xff0c;数量-1&#xff0c;当计数器归零&#xff0c;countDownLatch.await()就会被唤醒向下执行。 import java.uti…

c语言tips-带参main函数

0.写在最前 最近因为工作需要开始重新学c语言&#xff0c;越学越发现c语言深不可测&#xff0c;当初用python轻轻松松处理的一些数据&#xff0c;但是c语言写起来却异常的复杂&#xff0c;这个板块就记录一下我的c语言复习之路 1. main函数的两种表现形式 main函数是c/cpp语言的…

Docker学习(5)—— 在Docker上安装软件

一. 安装Tomcat 1. 下载最新版 (1) 拉取Tomcat镜像 docker pull tomcat (2) 查看是否拉取到Tomcat镜像 docker images tomcat (3) 创建Tomcat容器并启动 docker run -d -p 8080:8080 tomcat 这时访问tomcat首页报404错误&#xff0c;有以下两个原因&#xff1a;①防火…

XSS绕过安全狗WAF

今天继续给大家介绍渗透测试相关知识&#xff0c;本文主要内容是XSS绕过安全狗WAF。 一、测试环境搭建 我们使用Vmware虚拟机搭建靶场环境。在Vmware虚拟机上&#xff0c;安装有PHPStudy&#xff0c;如下所示&#xff1a; 然后安装安全狗WAF&#xff0c;安全狗WAF有一系列的…

【kafka】一、kafka介绍

kafka概述 定义 kafka是一个分布式的基于发布/订阅模式的消息队列&#xff0c;主要应用于大数据实时处理领域。 消息队列 1&#xff09;解耦 允许你独立的扩展或修改两边的处理过程&#xff0c;只要确保它们遵守同样的接口约束。 2&#xff09;可恢复性 系统的一部分组件…

安装Python

搭建 Python 环境 要想能够进行 Python 开发, 就需要搭建好 Python 的环境. 需要安装的环境主要是两个部分: 1、运行环境: Python 2、开发环境: PyCharm 一、安装Python 1.1、官网下载Python 搜索引擎输入 Python 进入Python官网 1.2、找到下载页面 因为是在Windows环境…

npm配置taobao镜像及nrm快速换源工具介绍

文章目录npm配置淘宝镜像1 为什么默认源下载很慢&#xff1f;2 淘宝npm镜像服务器3 切换npm的下包镜像源4 nrmnpm配置淘宝镜像 1 为什么默认源下载很慢&#xff1f; 在使用npm下包的时候&#xff0c;默认从国外的https://registry.npmjs.orgl服务器进行下载&#xff0c;此时&…

前后端分页插件

PageHelper 是一个 MyBatis 的分页插件,支持多种数据库,可查看官网&#xff0c;负责将已经写好的 SQL 语句&#xff0c;进行SQL分页加工。无需你自己去封装以及关心 SQL 分页等问题&#xff0c;支持多种分页方式,如从第0或第一页开始, 使用很方便。 添加依赖 <dependency&…