MapReduce排序程序
1 输入数据
import java.io.DataOutputStream; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; /** * 随机生成一批32位长的有符号整数 * 用法:INTs <生成整数的数量> <输出文件的路径> */ public class INTs { public static void main(String[] args)/*----*/throws Exception { long num = Long.parseLong(args[0]); Random random = new Random(1234567890); FileSystem fileSystem = FileSystem.get(new Configuration()); DataOutputStream out = fileSystem.create(new Path(args[1])); try { for (long i = 0; i < num; ++i) { Integer value = random.nextInt(); out.writeBytes(value.toString()); out.write('\n'); } } finally { out.close(); } } }
2 排序程序
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; /** * 对一批32位长的有符号整数排序 * 用法:Sort <输入文件的路径> <输出目录的路径> <生成结果文件的数量> */ public class Sort { public static void main(String[] args)/*----*/throws Exception { JobConf conf = new JobConf(); conf.setJobName("Sort INTs"); conf.setJarByClass(Sort.class); conf.setMapOutputKeyClass(IntWritable.class); conf.setMapOutputValueClass(NullWritable.class); conf.setMapperClass(SortMapper.class); conf.setPartitionerClass(SortPartitioner.class); conf.setInputFormat(TextInputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); conf.setOutputFormat(TextOutputFormat.class); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setNumReduceTasks(Integer.parseInt(args[2])); JobClient.runJob(conf); } public static class SortMapper extends MapReduceBase implements /* */Mapper<LongWritable, Text, IntWritable, NullWritable> { public void map(LongWritable key, Text value, OutputCollector<IntWritable, NullWritable> output, Reporter reporter) throws IOException { int i = Integer.parseInt(value.toString()); output.collect(new IntWritable(i), NullWritable.get()); } } public static class SortPartitioner implements /* */Partitioner<IntWritable, NullWritable> { private int[] rangeUpperLimits; public void configure(JobConf job) { int numRanges = job.getNumReduceTasks(); rangeUpperLimits = new int[numRanges - 1]; long perRangeSize = (1L << 32) / numRanges; long upperLimit = Integer.MIN_VALUE; for (int i = 0; i < numRanges - 1; ++i) { upperLimit += perRangeSize; rangeUpperLimits[i] = (int) upperLimit; } } public int getPartition(IntWritable key, /* */NullWritable value, int numPartitions) { int n = rangeUpperLimits.length; if (n == 0) return 0; int x = key.get(); if (x >= rangeUpperLimits[n - 1]) return n; int l = 0, h = n - 1; while (l < h) { int i = (l + h) / 2; int y = rangeUpperLimits[i]; if (x >= y) l = i + 1; else h = i; } return l; } } }
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。