Hadoop mapreduce自定义排序WritableComparable

本文发表于本人博客

    今天继续写练习题,上次对分区稍微理解了一下,那根据那个步骤分区、排序、分组、规约来的话,今天应该是要写个排序有关的例子了,那好现在就开始!

     说到排序我们可以查看下hadoop源码里面的WordCount例子中对LongWritable类型定义,它实现抽象接口WritableComparable,代码如下:

public interface WritableComparable<T> extends Writable, Comparable<T> {
}
public interface Writable {
  void write(DataOutput out) throws IOException;
  void readFields(DataInput in) throws IOException;
}

其中Writable抽象接口定义了write以及readFields方法,分别是写入数据流以及读取数据流。而Comparable中又有compareTo方法定义比较。竟然hadoop的内置类型有比较大小功能,那么它使用这个内置类型作为map端输出的话是怎么样去排序的,这个问题我们先来查看下map任务类MapTask源代码,内部有内置MapOutputBuffer类,在spill accounting注释下面有个排序字段:

private final IndexedSorter sorter;

这个字段是由:

sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);

可以看出,这个排序算法可以在配置文件中指定,不过默认是快速排序QuickSort。这个QuickSort内部有几个重要的方法:

public void sort(final IndexedSortable s, int p, int r,final Progressable rep);
private static void sortInternal(final IndexedSortable s, int p, int r,final Progressable rep, int depth);

其中在传递参数IndexSortable的时候是用MapOutputBuffer当前来传递,因为这个MapOutputBuffer也继承IndexedSortable.这样在QuickSort排序sort中就会使用MapOutputBuffer类中的compare方法进行比较,可以看下面源代码:

    public int compare(int i, int j) {
      final int ii = kvoffsets[i % kvoffsets.length];
      final int ij = kvoffsets[j % kvoffsets.length];
      // sort by partition
      if (kvindices[ii + PARTITION] != kvindices[ij + PARTITION]) {
        return kvindices[ii + PARTITION] - kvindices[ij + PARTITION];
      }
      // sort by key
      return comparator.compare(kvbuffer,
          kvindices[ii + KEYSTART],
          kvindices[ii + VALSTART] - kvindices[ii + KEYSTART],
          kvbuffer,
          kvindices[ij + KEYSTART],
          kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]);
    }

然而这个方法中comparator默认是由节点“mapred.output.key.comparator.class”决定,也可以看源码:

  public RawComparator getOutputKeyComparator() {
    Class<? extends RawComparator> theClass = getClass("mapred.output.key.comparator.class",
            null, RawComparator.class);
    if (theClass != null)
      return ReflectionUtils.newInstance(theClass, this);
    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
  }

就是这样把排序以及比较方法关联起来了!那现在我们可以按照LongWritable的思路实现自己的自定义类型并且读取、写入、比较。下面写写代码加深下记忆,既然是排序那我们准备下数据,如下有2列数据要求按照第一列升序,第二列降序排序:

1    2
1    1
3    0
3    2
2    2
1    2

先自定义类型SortAPI:

public class SortAPI implements WritableComparable<SortAPI> {
    /**
     * 第一列数据
     */
    public Long first;
    /**
     * 第二列数据
     */
    public Long second;
    
    public SortAPI(){}
    public SortAPI(long first,long second){
        this.first = first;
        this.second = second;
    }
    /**
     * 排序就在这里当:this.first - o.first > 0 升序,小于0倒序
     */
    @Override
    public int compareTo(SortAPI o) {
        long mis = (this.first - o.first);
        if(mis != 0 ){
            return (int)mis;
        }
        else{
            return (int)(this.second - o.second);
        }
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(first);
        out.writeLong(second);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.first = in.readLong();
        this.second = in.readLong();
        
    }

    @Override
    public int hashCode() {
        return this.first.hashCode() + this.second.hashCode();
    }

    @Override
    public boolean equals(Object obj) {
        if(obj instanceof SortAPI){
            SortAPI o = (SortAPI)obj;
            return this.first == o.first && this.second == o.second;
        }
        return false;
    }
    @Override
    public String toString() {
        return "first:" + this.first + "second:" + this.second;
    }
}

这类型重写compareTo(SortAPI o)、write(DataOutput out)、readFields(DataInput in),既然是有比较那么以前说的就一定要重写hashCode()、equals(Object obj)方法了,这点不要忘记!还需要主要在write方法以及readFields方法中读写是有顺序:先write什么字段就先read什么字段。其次这个compareTo(SortAPI o)方法中返回是整型大于0、0、以及小于0代表大于、等于、小于。至于怎么判断2行数据是不是相等,不相等怎么比较着逻辑可以慢慢看下。

下面写个自定义Mapper、Reducer类以及main函数:

public class MyMapper extends Mapper<LongWritable, Text, SortAPI, LongWritable> {
        
    @Override
    protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
        String[] splied = value.toString().split("\t");
        try {
            long first = Long.parseLong(splied[0]);
            long second = Long.parseLong(splied[1]);
            context.write(new SortAPI(first,second), new LongWritable(1));
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}
public class MyReduce extends Reducer<SortAPI, LongWritable, LongWritable, LongWritable> {

    @Override
    protected void reduce(SortAPI key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        context.write(new LongWritable(key.first), new LongWritable(key.second));
    }
    
}
    static final String OUTPUT_DIR = "hdfs://hadoop-master:9000/sort/output/";
    static final String INPUT_DIR = "hdfs://hadoop-master:9000/sort/input/test.txt";
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, Test.class.getSimpleName());        
        deleteOutputFile(OUTPUT_DIR);
        
        //1设置输入目录
        FileInputFormat.setInputPaths(job, INPUT_DIR);
        //2设置输入格式化类
        job.setInputFormatClass(TextInputFormat.class);
        //3设置自定义Mapper以及键值类型
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(SortAPI.class);
        job.setMapOutputValueClass(LongWritable.class);
        //4分区
        job.setPartitionerClass(HashPartitioner.class);
        job.setNumReduceTasks(1);
        
        //5排序分组
        //6设置在一定Reduce以及键值类型
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(LongWritable.class);
        //7设置输出目录
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_DIR));
        //8提交job
        job.waitForCompletion(true);
    }
    
    static void deleteOutputFile(String path) throws Exception{
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI(INPUT_DIR),conf);
        if(fs.exists(new Path(path))){
            fs.delete(new Path(path));
        }
    }

这样在eclipse下就可以直接运行查看结果:

1       1
1       2
2       2
3       0
3       2

这结果正确,那如果要求第一列倒叙第二列升序呢,怎么办,这只需要修改下compareTo(SortAPI o):

    @Override
    public int compareTo(SortAPI o) {
        long mis = (this.first - o.first) * -1 ;
        if(mis != 0 ){
            return (int)mis;
        }
        else{
            return (int)(this.second - o.second);
        }
    }

这样保存在运行,结果:

3       0
3       2
2       2
1       1
1       2

也正确吧符合自己的这个要求。

留个小问题:这个compareTo(SortAPI o)方法在什么时候调用了,总共调用了几次?

这次先到这里。坚持记录点点滴滴!


郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。