mapreduce中map是怎么做的?参数又是怎么解析传递给map方法的
发布网友
发布时间:2022-03-23 00:27
我来回答
共1个回答
热心网友
时间:2022-03-23 01:57
1.首先介绍一下wordcount 早maprece框架中的 对应关系
大家都知道 maprece 分为 map 和rece 两个部分,那么在wordcount例子中,很显然 对文件word 计数部分为map,对 word 数量累计部分为 rece;
大家都明白 map接受一个参数,经过map处理后,将处理结果作为rece的入参分发给rece,然后在rece中统计了word 的数量,最终输出到输出结果;
但是初看遇到的问题:
一、map的输入参数是个 Text之类的 对象,并不是 file对象
二、rece中并没有if-else之类的判断语句 ,来说明 这个word 数量 加 一次,那个word 加一次。那么这个判断到底只是在 map中已经区分了 还是在rece的时候才判断的
三、map过程到底做了什么,rece过程到底做了什么?为什么它能够做到多个map多个rece?
一、
1. 怎么将 文件参数 传递 到 job中呢?
在 client 我们调用了FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
实际上 addInputPath 做了以下的事情(将文件路径加载到了conf中)
public static void addInputPath(Job job,
Path path) throws IOException {
Configuration conf = job.getConfiguration();
path = path.getFileSystem(conf).makeQualified(path);
String dirStr = StringUtils.escapeString(path.toString());
String dirs = conf.get(INPUT_DIR);
conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
}
我们再来看看 FileInputFormat 是做什么用的, FileInputFormat 实现了 InputFormat 接口 ,这个接口是hadoop用来接收客户端输入参数的。所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库的DBInputFormat等等。
我们会看到 在 InputFormat 接口中 有getSplits方法,也就是说分片操作实际上实在 map之前 就已经做好了
List<InputSplit>getSplits(JobContext job)
Generate the list of files and make them into FileSplits.
具体实现参考 FileInputFormat getSplits 方法:
上面是FileInputFormat的getSplits()方法,它首先得到分片的最小值minSize和最大值maxSize,它们会被用来计算分片大小。可以通过设置mapred.min.split.size和mapred.max.split.size来设置。splits链表用来存储计算得到的输入分片,files则存储作为由listStatus()获取的输入文件列表。然后对于每个输入文件,判断是否可以分割,通过computeSplitSize计算出分片大小splitSize,计算方法是:Math.max(minSize, Math.min(maxSize, blockSize));也就是保证在minSize和maxSize之间,且如果minSize<=blockSize<=maxSize,则设为blockSize。然后我们根据这个splitSize计算出每个文件的inputSplits集合,然后加入分片列表splits中。注意到我们生成InputSplit的时候按上面说的使用文件路径,分片起始位置,分片大小和存放这个文件的hosts列表来创建。最后我们还设置了输入文件数量:maprece.input.num.files。
二、计算出来的分片有时怎么传递给 map呢 ?对于单词数量如何累加?
我们使用了 就是InputFormat中的另一个方法createRecordReader() 这个方法:
RecordReader:
RecordReader是用来从一个输入分片中读取一个一个的K -V 对的抽象类,我们可以将其看作是在InputSplit上的迭代器。我们从API接口中可以看到它的一些方法,最主要的方法就是nextKeyvalue()方法,由它获取分片上的下一个K-V 对。
可以看到接口中有:
public abstract boolean nextKeyValue() throws IOException, InterruptedException;
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
public abstract float getProgress() throws IOException, InterruptedException;
public abstract void close() throws IOException;
FileInputFormat<K,V>
Direct Known Subclasses:
CombineFileInputFormat, KeyValueTextInputFormat, NLineInputFormat, SequenceFileInputFormat, TextInputFormat
对于 wordcount 测试用了 NLineInputFormat和 TextInputFormat 实现类
在 InputFormat 构建一个 RecordReader 出来,然后调用RecordReader initialize 的方法,初始化RecordReader 对象
那么 到底 Map是怎么调用 的呢? 通过前边我们 已经将 文件分片了,并且将文件分片的内容存放到了RecordReader中,
下面继续看看这些RecordReader是如何被MapRece框架使用的
终于 说道 Map了 ,我么如果要实现Map 那么 一定要继承 Mapper这个类
public abstract class Context
implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
protected void setup(Context context) throws IOException, InterruptedException
protected void map(KEYIN key, VALUEIN value, Context context) throws IOException,InterruptedException { }
protected void cleanup(Context context ) throws IOException, InterruptedException { }
public void run(Context context) throws IOException, InterruptedException { }
我们写MapRece程序的时候,我们写的mapper都要继承这个Mapper.class,通常我们会重写map()方法,map()每次接受一个K-V对,然后我们对这个K-V对进行处理,再分发出处理后的数据。我们也可能重写setup()以对这个map task进行一些预处理,比如创建一个List之类的;我们也可能重写cleanup()方法对做一些处理后的工作,当然我们也可能在cleanup()中写出K-V对。举个例子就是:InputSplit的数据是一些整数,然后我们要在mapper中算出它们的和。我们就可以在先设置个sum属性,然后map()函数处理一个K-V对就是将其加到sum上,最后在cleanup()函数中调用context.write(key,value);
最后我们看看Mapper.class中的run()方法,它相当于map task的驱动,我们可以看到run()方法首先调用setup()进行初始操作,然后对每个context.nextKeyValue()获取的K-V对,就调用map()函数进行处理,最后调用cleanup()做最后的处理。事实上,从context.nextKeyValue()就是使用了相应的RecordReader来获取K-V对的。
我们看看Mapper.class中的Context类,它继承与MapContext,使用了一个RecordReader进行构造。下面我们再看这个MapContext。
public MapContextImpl(Configuration conf, TaskAttemptID taskid,
RecordReader<KEYIN,VALUEIN> reader,
RecordWriter<KEYOUT,VALUEOUT> writer,
OutputCommitter committer,
StatusReporter reporter,
InputSplit split) {
super(conf, taskid, writer, committer, reporter);
this.reader = reader;
this.split = split;
}
RecordReader 看来是在这里构造出来了, 那么 是谁调用这个方法,将这个承载着关键数据信息的 RecordReader 传过来了 ?
我们可以想象 这里 应该被框架调用的可能性比较大了,那么maprece 框架是怎么分别来调用map和rece呢?
还以为分析完map就完事了,才发现这里仅仅是做了maprece 框架调用前的一些准备工作,
还是继续分析 下 maprece 框架调用吧:
1.在 job提交 任务之后 首先由jobtrack 分发任务,
在 任务分发完成之后 ,执行 task的时候,这时 调用了 maptask 中的 runNewMapper
在这个方法中调用了 MapContextImpl, 至此 这个map 和框架就可以联系起来了。
简述mapreduce工作原理
1. Map阶段:在Map阶段,程序会对每个数据分片进行并行处理。Map函数会对每个数据项进行解析、过滤和转换等操作,生成一系列的键值对。这些键值对作为中间结果,为后续Reduce阶段提供数据。2.Reduce阶段:在Reduce阶段,根据Map阶段输出的键值对进行汇总处理。系统会根据键的值将具有相同键的键值对集合分配给...
MapReduce源码解析之Mapper
setup():主要为map()方法做准备,例如加载配置文件、传递参数。cleanup():用于清理资源,如关闭文件、处理Key-Value。map():程序的逻辑核心,对输入的文本进行处理(如分割、过滤),以键值对的形式写入context。run():驱动Mapper执行的主方法,按照预设顺序执行setup()、map()、cleanup()。Context抽...
简述mapreduce工作原理
MapReduce的核心原理在于它的分解和聚合能力。这个高效的数据处理模型基于“分而治之”的策略,其工作流程分为两个主要阶段:Map阶段和Reduce阶段。在Map阶段,大规模数据集被分割成小块,分配给集群中的多个节点进行独立处理,每个节点执行map函数,将原始数据转换为一系列中间键值对。在Reduce阶段,这些中间...
mapreduce工作流程
1. 输入分片(Input Split):MapReduce在执行Map任务之前,会根据输入文件的大小将其划分为多个输入分片。每个输入分片对应一个Map任务。这些输入分片并不存储数据本身,而是作为数据处理的单元。例如,对于三个不同大小的文件(3MB、65MB和127MB),MapReduce会创建三个输入分片。如果在此阶段对输入分片...
mapreduce是什么
Reduce 阶段负责处理 Map 阶段输出的数据,它的主要任务是将各个部分的数据汇总起来,生成最终的输出结果。例如,在单词计数的情况下,Reduce 阶段会将所有关于同一个单词的计数汇总在一起,得到每个单词的总数。由于 MapReduce 模型允许并行处理,所以这个过程在处理大量数据时非常高效。此外,MapReduce 还...
mapreduce工作原理
mapreduce工作原理为:MapReduce是一种编程模型,用于大规模数据集的并行运算。mapreduce工作原理为:MapReduce是一种编程模型,用于大规模数据集的并行运算。MapReduce采用”分而治之”的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到...
3、MapReduce详解与源码分析
1 Split阶段 在MapReduce的流程中,Split阶段是将输入文件根据指定大小(默认128MB)切割成多个部分,每个部分称为一个split。split的大小由minSize、maxSize、blocksize决定。以wordcount代码为例,split数量由FileInputFormat的getSplits方法确定,返回值即为mapper的数量。默认情况下,mapper的数量是文件大小...
7.3 MapReduce工作流程
(1) 首先从HDFS中读取数据,并对它做分片操作(split) (2) 每个小分片单独启动一个map任务来处理此分片的数据。map任务的输入和输出都是key-value (3) 把每个map输出的key-value都进行分区,然后做排序、归并、合并后,分发给所有reduce节点去处理——这个过程称为shuffle。因此map输出的分区...
通俗易懂理解MapReduce(一篇就够了)
首先,预处理阶段是对输入数据进行初步处理,为后续的Map任务做准备。Map任务的shuffle过程涉及到将Map任务的结果分区并传输到Reduce任务,确保数据按照键值对的形式进行合并。而Reduce任务的shuffle过程则是对Map任务的结果进行进一步聚合,得出最终的汇总结果。以经典的WorldCount案例为例,我们考虑一个文本文件...
mapreduce计算的主要流程有哪些
1、输入分片:在进行Map计算之前,MapReduce会根据输入文件计算输入分片,每个输入分片对应一个Map任务,输入分片存储的并非数据本身。如果输入文件较大,可以进行输入分片调整,例如合并小文件,以优化计算效率。2、Map阶段:程序员编写Map函数,对输入分片进行处理。Map函数是一个本地化操作,一般在数据存储...