问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

mapreduce中map是怎么做的?参数又是怎么解析传递给map方法的

发布网友 发布时间:2022-03-23 00:27

我来回答

1个回答

热心网友 时间:2022-03-23 01:57

1.首先介绍一下wordcount 早maprece框架中的 对应关系
大家都知道 maprece 分为 map 和rece 两个部分,那么在wordcount例子中,很显然 对文件word 计数部分为map,对 word 数量累计部分为 rece;
大家都明白 map接受一个参数,经过map处理后,将处理结果作为rece的入参分发给rece,然后在rece中统计了word 的数量,最终输出到输出结果;
但是初看遇到的问题:
一、map的输入参数是个 Text之类的 对象,并不是 file对象
二、rece中并没有if-else之类的判断语句 ,来说明 这个word 数量 加 一次,那个word 加一次。那么这个判断到底只是在 map中已经区分了 还是在rece的时候才判断的
三、map过程到底做了什么,rece过程到底做了什么?为什么它能够做到多个map多个rece?

一、
1. 怎么将 文件参数 传递 到 job中呢?
在 client 我们调用了FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
实际上 addInputPath 做了以下的事情(将文件路径加载到了conf中)
public static void addInputPath(Job job,
Path path) throws IOException {
Configuration conf = job.getConfiguration();
path = path.getFileSystem(conf).makeQualified(path);
String dirStr = StringUtils.escapeString(path.toString());
String dirs = conf.get(INPUT_DIR);
conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
}

我们再来看看 FileInputFormat 是做什么用的, FileInputFormat 实现了 InputFormat 接口 ,这个接口是hadoop用来接收客户端输入参数的。所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库的DBInputFormat等等。

我们会看到 在 InputFormat 接口中 有getSplits方法,也就是说分片操作实际上实在 map之前 就已经做好了
List<InputSplit>getSplits(JobContext job)
Generate the list of files and make them into FileSplits.
具体实现参考 FileInputFormat getSplits 方法:
上面是FileInputFormat的getSplits()方法,它首先得到分片的最小值minSize和最大值maxSize,它们会被用来计算分片大小。可以通过设置mapred.min.split.size和mapred.max.split.size来设置。splits链表用来存储计算得到的输入分片,files则存储作为由listStatus()获取的输入文件列表。然后对于每个输入文件,判断是否可以分割,通过computeSplitSize计算出分片大小splitSize,计算方法是:Math.max(minSize, Math.min(maxSize, blockSize));也就是保证在minSize和maxSize之间,且如果minSize<=blockSize<=maxSize,则设为blockSize。然后我们根据这个splitSize计算出每个文件的inputSplits集合,然后加入分片列表splits中。注意到我们生成InputSplit的时候按上面说的使用文件路径,分片起始位置,分片大小和存放这个文件的hosts列表来创建。最后我们还设置了输入文件数量:maprece.input.num.files。

二、计算出来的分片有时怎么传递给 map呢 ?对于单词数量如何累加?
我们使用了 就是InputFormat中的另一个方法createRecordReader() 这个方法:
RecordReader:
RecordReader是用来从一个输入分片中读取一个一个的K -V 对的抽象类,我们可以将其看作是在InputSplit上的迭代器。我们从API接口中可以看到它的一些方法,最主要的方法就是nextKeyvalue()方法,由它获取分片上的下一个K-V 对。

可以看到接口中有:
public abstract boolean nextKeyValue() throws IOException, InterruptedException;
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
public abstract float getProgress() throws IOException, InterruptedException;
public abstract void close() throws IOException;

FileInputFormat<K,V>
Direct Known Subclasses:
CombineFileInputFormat, KeyValueTextInputFormat, NLineInputFormat, SequenceFileInputFormat, TextInputFormat

对于 wordcount 测试用了 NLineInputFormat和 TextInputFormat 实现类

在 InputFormat 构建一个 RecordReader 出来,然后调用RecordReader initialize 的方法,初始化RecordReader 对象

那么 到底 Map是怎么调用 的呢? 通过前边我们 已经将 文件分片了,并且将文件分片的内容存放到了RecordReader中,

下面继续看看这些RecordReader是如何被MapRece框架使用的

终于 说道 Map了 ,我么如果要实现Map 那么 一定要继承 Mapper这个类
public abstract class Context
implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
protected void setup(Context context) throws IOException, InterruptedException
protected void map(KEYIN key, VALUEIN value, Context context) throws IOException,InterruptedException { }
protected void cleanup(Context context ) throws IOException, InterruptedException { }
public void run(Context context) throws IOException, InterruptedException { }

我们写MapRece程序的时候,我们写的mapper都要继承这个Mapper.class,通常我们会重写map()方法,map()每次接受一个K-V对,然后我们对这个K-V对进行处理,再分发出处理后的数据。我们也可能重写setup()以对这个map task进行一些预处理,比如创建一个List之类的;我们也可能重写cleanup()方法对做一些处理后的工作,当然我们也可能在cleanup()中写出K-V对。举个例子就是:InputSplit的数据是一些整数,然后我们要在mapper中算出它们的和。我们就可以在先设置个sum属性,然后map()函数处理一个K-V对就是将其加到sum上,最后在cleanup()函数中调用context.write(key,value);
最后我们看看Mapper.class中的run()方法,它相当于map task的驱动,我们可以看到run()方法首先调用setup()进行初始操作,然后对每个context.nextKeyValue()获取的K-V对,就调用map()函数进行处理,最后调用cleanup()做最后的处理。事实上,从context.nextKeyValue()就是使用了相应的RecordReader来获取K-V对的。

我们看看Mapper.class中的Context类,它继承与MapContext,使用了一个RecordReader进行构造。下面我们再看这个MapContext。

public MapContextImpl(Configuration conf, TaskAttemptID taskid,
RecordReader<KEYIN,VALUEIN> reader,
RecordWriter<KEYOUT,VALUEOUT> writer,
OutputCommitter committer,
StatusReporter reporter,
InputSplit split) {
super(conf, taskid, writer, committer, reporter);
this.reader = reader;
this.split = split;
}

RecordReader 看来是在这里构造出来了, 那么 是谁调用这个方法,将这个承载着关键数据信息的 RecordReader 传过来了 ?

我们可以想象 这里 应该被框架调用的可能性比较大了,那么maprece 框架是怎么分别来调用map和rece呢?
还以为分析完map就完事了,才发现这里仅仅是做了maprece 框架调用前的一些准备工作,

还是继续分析 下 maprece 框架调用吧:

1.在 job提交 任务之后 首先由jobtrack 分发任务,

在 任务分发完成之后 ,执行 task的时候,这时 调用了 maptask 中的 runNewMapper

在这个方法中调用了 MapContextImpl, 至此 这个map 和框架就可以联系起来了。
简述mapreduce工作原理

1. Map阶段:在Map阶段,程序会对每个数据分片进行并行处理。Map函数会对每个数据项进行解析、过滤和转换等操作,生成一系列的键值对。这些键值对作为中间结果,为后续Reduce阶段提供数据。2.Reduce阶段:在Reduce阶段,根据Map阶段输出的键值对进行汇总处理。系统会根据键的值将具有相同键的键值对集合分配给...

MapReduce源码解析之Mapper

setup():主要为map()方法做准备,例如加载配置文件、传递参数。cleanup():用于清理资源,如关闭文件、处理Key-Value。map():程序的逻辑核心,对输入的文本进行处理(如分割、过滤),以键值对的形式写入context。run():驱动Mapper执行的主方法,按照预设顺序执行setup()、map()、cleanup()。Context抽...

简述mapreduce工作原理

MapReduce的核心原理在于它的分解和聚合能力。这个高效的数据处理模型基于“分而治之”的策略,其工作流程分为两个主要阶段:Map阶段和Reduce阶段。在Map阶段,大规模数据集被分割成小块,分配给集群中的多个节点进行独立处理,每个节点执行map函数,将原始数据转换为一系列中间键值对。在Reduce阶段,这些中间...

mapreduce工作流程

1. 输入分片(Input Split):MapReduce在执行Map任务之前,会根据输入文件的大小将其划分为多个输入分片。每个输入分片对应一个Map任务。这些输入分片并不存储数据本身,而是作为数据处理的单元。例如,对于三个不同大小的文件(3MB、65MB和127MB),MapReduce会创建三个输入分片。如果在此阶段对输入分片...

mapreduce是什么

Reduce 阶段负责处理 Map 阶段输出的数据,它的主要任务是将各个部分的数据汇总起来,生成最终的输出结果。例如,在单词计数的情况下,Reduce 阶段会将所有关于同一个单词的计数汇总在一起,得到每个单词的总数。由于 MapReduce 模型允许并行处理,所以这个过程在处理大量数据时非常高效。此外,MapReduce 还...

mapreduce工作原理

mapreduce工作原理为:MapReduce是一种编程模型,用于大规模数据集的并行运算。mapreduce工作原理为:MapReduce是一种编程模型,用于大规模数据集的并行运算。MapReduce采用”分而治之”的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到...

3、MapReduce详解与源码分析

1 Split阶段 在MapReduce的流程中,Split阶段是将输入文件根据指定大小(默认128MB)切割成多个部分,每个部分称为一个split。split的大小由minSize、maxSize、blocksize决定。以wordcount代码为例,split数量由FileInputFormat的getSplits方法确定,返回值即为mapper的数量。默认情况下,mapper的数量是文件大小...

7.3 MapReduce工作流程

(1) 首先从HDFS中读取数据,并对它做分片操作(split) (2) 每个小分片单独启动一个map任务来处理此分片的数据。map任务的输入和输出都是key-value (3) 把每个map输出的key-value都进行分区,然后做排序、归并、合并后,分发给所有reduce节点去处理——这个过程称为shuffle。因此map输出的分区...

通俗易懂理解MapReduce(一篇就够了)

首先,预处理阶段是对输入数据进行初步处理,为后续的Map任务做准备。Map任务的shuffle过程涉及到将Map任务的结果分区并传输到Reduce任务,确保数据按照键值对的形式进行合并。而Reduce任务的shuffle过程则是对Map任务的结果进行进一步聚合,得出最终的汇总结果。以经典的WorldCount案例为例,我们考虑一个文本文件...

mapreduce计算的主要流程有哪些

1、输入分片:在进行Map计算之前,MapReduce会根据输入文件计算输入分片,每个输入分片对应一个Map任务,输入分片存储的并非数据本身。如果输入文件较大,可以进行输入分片调整,例如合并小文件,以优化计算效率。2、Map阶段:程序员编写Map函数,对输入分片进行处理。Map函数是一个本地化操作,一般在数据存储...

hadoop的mapreduce map类型怎么传参数 swagger传入参数为map spark mapreduce mybatis传入参数为map mapreduce是一个 mapreduce详解 mapreduce适用于什么 mapreduce为什么要排序
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
龚琳娜音乐作品 龚琳娜的歌(全部) ...她说做不了好朋友,还可以做朋友,是什么意思啊!以后还 ..._百度... ...Out )攻略_荒野行动卡在登陆界面怎么办 卡屏解决方法 “柳色和愁为重折”的出处是哪里 ...下身是蓝黑色的牛仔五分裤 配同样暖黄色的帆布鞋还是白色帆布鞋好看... 绝地求生全军出击G港怎么打 G港打法详解-高手进阶-安族网 下肢残疾人 不能动 需要有人照顾的人 她能学什么? ...共6角。如果三种 硬币的钱数相等,各有 多少枚? 什么品牌的睫毛膏值得推荐? 简述Hadoop的MapReduce与Googl的MapReducc 之间的关系 大数据培训一般都将些什么内容? 大数据需要掌握哪些技能 简述MapReduce基本思想,想想在生活中有没有相似的例子? 请简述mapreduce的技术思想 什么是Map/Reduce-Mapreduce-about云开发 Hadoop和MapReduce究竟分别是做什么用的 如何实现mapreduce计算框架以有效实现迭代 什么是MapReduce? mapreduce的工作流程 Hadoop的工作原理是什么 mapreduce处理海量数据的工作原理是什么? 请简要描述Hadoop计算框架MapReduce的工作原理 mapreduce实现原理是怎样的 圆通快递的电话 为什么总是打不通 一个电话打不通是啥原因? 什么情况下电话对方打不通? 为什么我打别人电话打得通,别人打我却打不通 打的电话怎么打不通? 打不通对方的电话是什么原因 mapreduce处理什么任务 大数据培训到底是培训什么 简述HDFS和MapReduce在Hadoop中的角色 如何快速地编写和运行一个属于自己的MapReduce例子程序 为什么手机没有欠费,但是打电话出去说我停机了? 为什么号码没有欠费却被停机了? 手机没欠费被暂停服务怎么回事 为什么我的手机没有停机但是别人打我的电话显示停机 手机没欠费为什么停机 我的手机号码没欠费,为什么会暂停服务?如何解决? 手机没有欠费,为什么是停机状态?只能打进不能打出 为什么号码没有欠费却被停机了 为什么我手机没欠费也停机了 为什么我的手机没有欠费打电话会提示手机暂停服务 为什么我的联通手机没欠费,被暂停服务了? 手机号码没有欠费被停机了? 为什么明明手机是正常的,会显示停机? 没欠费但手机一直停机怎么办? 为什么我的手机没有停机但是别人打我的电话显示停机? 手机没有欠费,为什么是停机状态?