linux
package com.laiwang.algo.antispam.event.job;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Created by gray on 14-8-3.
*/
public class GetVersion extends Configured implements Tool {
public static class GetVersionMap extends Mapper<Object, Text, Text, Text> {
@Override
public void map(Object key, Text value, Context context)
throws IOException ,InterruptedException {
List<String> requests = new ArrayList<String>();
String[] parts = value.toString().split("\1",-1);
String url = parts[2] + parts[6];
if(!url.contains("/v2/"))
return;
String[] tmps = url.split("\\?",-1);
if(tmps.length == 1)
return;
int len = tmps[1].length();
String temp = "";
boolean falg = true;
for(int i = 0; i < len; i++) {
if(tmps[1].charAt(i) == ‘=‘) {
requests.add(temp);
temp = "";
falg = false;
}
if(falg)
temp += tmps[1].charAt(i);
if(tmps[1].charAt(i) == ‘&‘) {
falg = true;
}
}
int state = 0;
for(int i = 0; i < requests.size(); i++) {
if(requests.get(i).equals("_s_"))
state ^= 1;
if(requests.get(i).equals("_v_"))
state ^= 2;
if(requests.get(i).equals("_c_"))
state ^= 4;
if(requests.get(i).equals("_t_"))
state ^= 8;
}
if(state == 15) {
int index = parts[16].indexOf("(");
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, NullWritable> {
@Override
public void reduce(Text key,Iterable<Text> values, Context context)
throws IOException, InterruptedException {
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job();
job.setJobName("GetVersion");
job.setJarByClass(GetVersion.class);
FileInputFormat.addInputPath(job, new Path(conf.get("")));
FileOutputFormat.setOutputPath(job, new Path(conf.get("")));
job.setMapperClass(GetVersionMap.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new AggregateUidBySession(), args);
}
}
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。