Mapper 与 Reducer 解析

1 . 旧版 API 的 Mapper/Reducer 解析
Mapper/Reducer 中封装了应用程序的数据处理逻辑。为了简化接口,MapReduce 要求所有存储在底层分布式文件系统上的数据均要解释成 key/value 的形式,并交给Mapper/Reducer 中的 map/reduce 函数处理,产生另外一些 key/value。Mapper 与 Reducer 的类体系非常类似,我们以 Mapper 为例进行讲解。Mapper 的类图如图所示,包括初始化、Map操作和清理三部分。
技术分享
(1)初始化
Mapper 继承了 JobConfigurable 接口。该接口中的 configure 方法允许通过 JobConf 参数对 Mapper 进行初始化。

(2)Map 操作
MapReduce 框架会通过 InputFormat 中 RecordReader 从 InputSplit 获取一个个 key/value 对, 并交给下面的 map() 函数处理:

void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter) throws IOException;

该函数的参数除了 key 和 value 之外, 还包括 OutputCollector 和 Reporter 两个类型的参数, 分别用于输出结果和修改 Counter 值。

(3)清理
Mapper 通过继承 Closeable 接口(它又继承了 Java IO 中的 Closeable 接口)获得 close方法,用户可通过实现该方法对 Mapper 进行清理。
MapReduce 提供了很多 Mapper/Reducer 实现,但大部分功能比较简单,具体如图所示。它们对应的功能分别是:

?ChainMapper/ChainReducer:用于支持链式作业。

?IdentityMapper/IdentityReducer:对于输入 key/value 不进行任何处理, 直接输出。

?InvertMapper:交换 key/value 位置。

? RegexMapper:正则表达式字符串匹配。

?TokenMapper:将字符串分割成若干个 token(单词),可用作 WordCount 的 Mapper。

?LongSumReducer:以 key 为组,对 long 类型的 value 求累加和。

技术分享

对于一个 MapReduce 应用程序,不一定非要存在 Mapper。MapReduce 框架提供了比 Mapper 更通用的接口:MapRunnable,如图所示。用 户可以实现该接口以定制Mapper 的调用 方式或者自己实现 key/value 的处理逻辑,比如,Hadoop Pipes 自行实现了MapRunnable,直接将数据通过 Socket 发送给其他进程处理。提供该接口的另外一个好处是允许用户实现多线程 Mapper。
技术分享

如图所示, MapReduce 提供了两个 MapRunnable 实现,分别是 MapRunner 和MultithreadedMapRunner,其中 MapRunner 为默认实现。 MultithreadedMapRunner 实现了一种多线程的 MapRunnable。 默认情况下,每个 Mapper 启动 10 个线程,通常用于非 CPU类型的作业以提供吞吐率。
技术分享

2. 新版 API 的 Mapper/Reducer 解析

从图可知, 新 API 在旧 API 基础上发生了以下几个变化:
技术分享

?Mapper 由接口变为类,且不再继承 JobConfigurable 和 Closeable 两个接口,而是直接在类中添加了 setup 和 cleanup 两个方法进行初始化和清理工作。

?将参数封装到 Context 对象中,这使得接口具有良好的扩展性。

?去掉 MapRunnable 接口,在 Mapper 中添加 run 方法,以方便用户定制 map() 函数的调用方法,run 默认实现与旧版本中 MapRunner 的 run 实现一样。

?新 API 中 Reducer 遍历 value 的迭代器类型变为 java.lang.Iterable,使得用户可以采用“ foreach” 形式遍历所有 value,如下所示:

void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {
    for(VALUEIN value: values)  { // 注意遍历方式
        context.write((KEYOUT) key, (VALUEOUT) value);
    }
}

Mapper类的完整代码如下:

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;

/** 
 * Maps input key/value pairs to a set of intermediate key/value pairs.  
 * 
 * <p>Maps are the individual tasks which transform input records into a 
 * intermediate records. The transformed intermediate records need not be of 
 * the same type as the input records. A given input pair may map to zero or 
 * many output pairs.</p> 
 * 
 * <p>The Hadoop Map-Reduce framework spawns one map task for each 
 * {@link InputSplit} generated by the {@link InputFormat} for the job.
 * <code>Mapper</code> implementations can access the {@link Configuration} for 
 * the job via the {@link JobContext#getConfiguration()}.
 * 
 * <p>The framework first calls 
 * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
 * {@link #map(Object, Object, Context)} 
 * for each key/value pair in the <code>InputSplit</code>. Finally 
 * {@link #cleanup(Context)} is called.</p>
 * 
 * <p>All intermediate values associated with a given output key are 
 * subsequently grouped by the framework, and passed to a {@link Reducer} to  
 * determine the final output. Users can control the sorting and grouping by 
 * specifying two key {@link RawComparator} classes.</p>
 *
 * <p>The <code>Mapper</code> outputs are partitioned per 
 * <code>Reducer</code>. Users can control which keys (and hence records) go to 
 * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
 * 
 * <p>Users can optionally specify a <code>combiner</code>, via 
 * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the 
 * intermediate outputs, which helps to cut down the amount of data transferred 
 * from the <code>Mapper</code> to the <code>Reducer</code>.
 * 
 * <p>Applications can specify if and how the intermediate
 * outputs are to be compressed and which {@link CompressionCodec}s are to be
 * used via the <code>Configuration</code>.</p>
 *  
 * <p>If the job has zero
 * reduces then the output of the <code>Mapper</code> is directly written
 * to the {@link OutputFormat} without sorting by keys.</p>
 * 
 * <p>Example:</p>
 * <p><blockquote><pre>
 * public class TokenCounterMapper 
 *     extends Mapper<Object, Text, Text, IntWritable>{
 *    
 *   private final static IntWritable one = new IntWritable(1);
 *   private Text word = new Text();
 *   
 *   public void map(Object key, Text value, Context context) throws IOException {
 *     StringTokenizer itr = new StringTokenizer(value.toString());
 *     while (itr.hasMoreTokens()) {
 *       word.set(itr.nextToken());
 *       context.collect(word, one);
 *     }
 *   }
 * }
 * </pre></blockquote></p>
 *
 * <p>Applications may override the {@link #run(Context)} method to exert 
 * greater control on map processing e.g. multi-threaded <code>Mapper</code>s 
 * etc.</p>
 * 
 * @see InputFormat
 * @see JobContext
 * @see Partitioner  
 * @see Reducer
 */
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

  public class Context 
    extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
    public Context(Configuration conf, TaskAttemptID taskid,
                   RecordReader<KEYIN,VALUEIN> reader,
                   RecordWriter<KEYOUT,VALUEOUT> writer,
                   OutputCommitter committer,
                   StatusReporter reporter,
                   InputSplit split) throws IOException, InterruptedException {
      super(conf, taskid, reader, writer, committer, reporter, split);
    }
  }
  
  /**
   * Called once at the beginning of the task.
   */
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING
  }

  /**
   * Called once for each key/value pair in the input split. Most applications
   * should override this, but the default is the identity function.
   */
  @SuppressWarnings("unchecked")
  protected void map(KEYIN key, VALUEIN value, 
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }

  /**
   * Called once at the end of the task.
   */
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }
  
  /**
   * Expert users can override this method for more complete control over the
   * execution of the Mapper.
   * @param context
   * @throws IOException
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    while (context.nextKeyValue()) {
      map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);
  }
}

 

 

 

 

 

参考资料

《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。