mapreduce实现从hbase中统计数据,结果存入mysql中

最近开始学习使用mapreduce统计hbase中的数据,并将结果集存入mysql中,供前台查询使用。

使用hadoop版本为2.5.1,hbase版本为0.98.6.1

mapreduce程序分为三个部分:job、map函数、reduce函数

job类:

 1 public class DayFaultStatisticsJob {
 2     private static final Logger logger = LoggerFactory.getLogger(DayFaultStatisticsJob.class);
 3 
 4     public void runJob(String start){
 5         try {
 6             logger.info("开始运行mapreduce,统计故障信息");
 7             Configuration config = HBaseConfiguration.create();
 8             // 传递统计条件给map
 9             config.set("search.time.start",start);
10 
11             DBConnector dbConnector = DBConnectorUtil.getDBConnector();
12             DBConfiguration.configureDB(config, "com.mysql.jdbc.Driver", "jdbc:mysql://" + dbConnector.getHost() + ":" + dbConnector.getPort() + "/xcloud", dbConnector.getUser(), dbConnector.getPwd());
13 
14             Job job = Job.getInstance(config);
15             job.setJarByClass(DayFaultStatisticsJob.class); // class that contains mapper and reducer
16             Scan scan = new Scan();
17             scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
18             scan.setCacheBlocks(false);  // don‘t set to true for MR jobs
19             scan.addFamily(GlobalConstants.DEVICE_FAULT_FAMILY_NAME);
20 
21             // set other scan attrs
22             // TODO 增加遍历条件
23             /*scan.setStartRow();
24               scan.setStopRow();*/
25             //scan.setFilter();
26 
27             TableMapReduceUtil.initTableMapperJob(
28                     GlobalConstants.TABLE_NAME_DEVICE_FAULT,          // input table
29                     scan,                // Scan instance to control CF and attribute selection
30                     DayFaultStatisticsMapper.class,     // mapper class
31                     Text.class,         // mapper output key
32                     IntWritable.class,  // mapper output value
33                     job);
34             job.setReducerClass(DayFaultStatisticsReducer.class);    // reducer class
35             job.setNumReduceTasks(2);    // at least one, adjust as required
36             //FileOutputFormat.setOutputPath(job, new Path("/usr/local/mapreduce"));  // adjust directories as required
37             DBOutputFormat.setOutput(job, GlobalConstants.MYSQL_DEVICE_FAULT_DAY, GlobalConstants.MYSQL_DEVICE_FAULT_DAY_FIELDS);
38             boolean b = job.waitForCompletion(true);
39             if (b) {
40                 logger.info("mapreduce任务正常结束");
41                 System.exit(0);
42             } else {
43                 logger.info("mapreduce任务异常结束");
44                 System.exit(1);
45             }
46         } catch (IOException e) {
47             logger.error(e.getMessage(),e);
48         } catch (InterruptedException e) {
49             logger.error(e.getMessage(),e);
50         } catch (ClassNotFoundException e) {
51             logger.error(e.getMessage(),e);
52         }
53     }
54 
55 }

mapper类:

public class DayFaultStatisticsMapper extends TableMapper<Text, IntWritable> {
    private static final Logger logger = LoggerFactory.getLogger(DayFaultStatisticsMapper.class);

    private Text text = new Text();
    private IntWritable ONE = new IntWritable(1);
    HashMap<String,String> conditionMap = new HashMap<String,String>();

    // 接收过滤条件
    protected void setup(Context context) throws IOException,
            InterruptedException {
        conditionMap.put("start",context.getConfiguration().get("search.time.start").trim());
    }

    public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        String timestamp = null;
        String company_id = null;
        String product_id = null;
        String model_id = null;
        String fault_id = null;
        String province_id = null;
        String city_id = null;
        String district_id = null;
        for(java.util.Map.Entry<byte[], byte[]> val : value.getFamilyMap(Bytes.toBytes("info")).entrySet()){
            String ikey = new String(val.getKey());
            String ivalue = new String(val.getValue());
            // 按条件进行map
            if (StringUtil.isEmpty(ikey) || StringUtil.isEmpty(ivalue)){
                 return;
            } else if (ikey.equals("company_id")){
                company_id = ivalue;
                continue;
            } else if (ikey.equals("product_id")){
                product_id = ivalue;
                continue;
            } else if (ikey.equals("model_id")){
                model_id = ivalue;
                continue;
            } else if (ikey.equals("fault_id")){
                fault_id = ivalue;
                continue;
            } else if (ikey.equals("province_id")){
                province_id = ivalue;
                continue;
            } else if (ikey.equals("city_id")){
                city_id = ivalue;
                continue;
            } else if (ikey.equals("district_id")){
                district_id = ivalue;
                continue;
            } else if (ikey.equals("timestamp")){
                String time = ivalue.substring(0,8);// 判断是否是当日
                if (time.equals(conditionMap.get("start"))){
                    timestamp = time; // 统计日故障发生次数
                }
                continue;
            }
        }
        if (company_id == null || product_id ==  null || model_id == null || fault_id == null || timestamp == null || fault_id.equals("-1")) {
            return;
        }
        // 故障码为 -1的不统计
        String val = company_id + "-" +product_id + "-" +model_id + "-"+fault_id + "-"+timestamp + "-"+province_id + "-"+ city_id+ "-"+ district_id; // 分组key
        text.set(val);     // we can only emit Writables...
        context.write(text, ONE);
    }
}

reducer类:

 1 public class DayFaultStatisticsReducer extends Reducer<Text, IntWritable, FaultDay, Text> {
 2     private static final Logger logger = LoggerFactory.getLogger(DayFaultStatisticsReducer.class);
 3 
 4     public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
 5         int value = 0;
 6         for (IntWritable val : values) {
 7             value += val.get();
 8         }
 9         //company_id + "-" +product_id + "-" +model_id + "-"+fault_id + "-"+timestamp + "-"+province_id + "-"+ city_id+ "-"+ district_id
10         String fields[] = key.toString().split("-");
11         if (fields.length != 8){
12             return;
13         }
14         // 处理day day格式:2015-02-01
15         String day = fields[4].substring(0,4) + "-"  + fields[4].substring(4,6) + "-" + fields[4].substring(6,8);
16         // DateUtil.format(new Date(), new SimpleDateFormat("yyyy-MM-dd‘T‘HH:mm:ss"));
17         FaultDay faultDay = new FaultDay();
18         faultDay.setId(UUIDGen.generate());
19         faultDay.setDay(day);
20         faultDay.setCompany_id(fields[0]);
21         faultDay.setProduct_id(fields[1]);
22         faultDay.setModel_id(fields[2]);
23         faultDay.setFault_id(fields[3]);
24         if (!fields[5].equals("null")){
25             faultDay.setProvince_id(fields[5]);
26         }
27         if (!fields[6].equals("null")){
28             faultDay.setCity_id(fields[6]);
29         }
30         if (!fields[7].equals("null")){
31             faultDay.setDistrict_id(fields[7]);
32         }
33         faultDay.setNum(value);
34         context.write(faultDay, null);
35     }
36 
37 }


FaultDay中实现DBWritable接口

  1 public class FaultDay implements Writable, DBWritable {
  2     private String id;
  3 
  4     private String day; // 2015-02-01
  5     private String company_id;
  6     private String product_id;
  7     private String model_id;
  8     private String fault_id;
  9     private String province_id;
 10     private String city_id;
 11     private String district_id;
 12     private int num;
 13 
 14     @Override
 15     public void write(PreparedStatement statement) throws SQLException {
 16         int index = 1;
 17         statement.setString(index++, this.getId());
 18         statement.setString(index++, this.getDay());
 19         statement.setString(index++, this.getCompany_id());
 20         statement.setString(index++, this.getProduct_id());
 21         statement.setString(index++, this.getModel_id());
 22         statement.setString(index++, this.getFault_id());
 23         statement.setString(index++, this.getProvince_id());
 24         statement.setString(index++, this.getCity_id());
 25         statement.setString(index++, this.getDistrict_id());
 26         statement.setInt(index++, this.getNum());
 27     }
 28 
 29     @Override
 30     public void readFields(ResultSet resultSet) throws SQLException {
 31         this.id = resultSet.getString(1);
 32         this.day = resultSet.getString(2);
 33         this.company_id = resultSet.getString(3);
 34         this.product_id = resultSet.getString(4);
 35         this.model_id = resultSet.getString(5);
 36         this.fault_id = resultSet.getString(6);
 37         this.province_id = resultSet.getString(7);
 38         this.city_id = resultSet.getString(8);
 39         this.district_id = resultSet.getString(9);
 40         this.num = resultSet.getInt(10);
 41     }
 42 
 43     @Override
 44     public void write(DataOutput out) throws IOException {
 45         //To change body of implemented methods use File | Settings | File Templates.
 46     }
 47 
 48     @Override
 49     public void readFields(DataInput in) throws IOException {
 50         //To change body of implemented methods use File | Settings | File Templates.
 51     }
 52 
 53 
 54     public String getCity_id() {
 55         return city_id;
 56     }
 57 
 58     public void setCity_id(String city_id) {
 59         this.city_id = city_id;
 60     }
 61 
 62     public String getCompany_id() {
 63         return company_id;
 64     }
 65 
 66     public void setCompany_id(String company_id) {
 67         this.company_id = company_id;
 68     }
 69 
 70     public String getDay() {
 71         return day;
 72     }
 73 
 74     public void setDay(String day) {
 75         this.day = day;
 76     }
 77 
 78     public String getDistrict_id() {
 79         return district_id;
 80     }
 81 
 82     public void setDistrict_id(String district_id) {
 83         this.district_id = district_id;
 84     }
 85 
 86     public String getFault_id() {
 87         return fault_id;
 88     }
 89 
 90     public void setFault_id(String fault_id) {
 91         this.fault_id = fault_id;
 92     }
 93 
 94     public String getId() {
 95         return id;
 96     }
 97 
 98     public void setId(String id) {
 99         this.id = id;
100     }
101 
102     public String getModel_id() {
103         return model_id;
104     }
105 
106     public void setModel_id(String model_id) {
107         this.model_id = model_id;
108     }
109 
110     public int getNum() {
111         return num;
112     }
113 
114     public void setNum(int num) {
115         this.num = num;
116     }
117 
118     public String getProduct_id() {
119         return product_id;
120     }
121 
122     public void setProduct_id(String product_id) {
123         this.product_id = product_id;
124     }
125 
126     public String getProvince_id() {
127         return province_id;
128     }
129 
130     public void setProvince_id(String province_id) {
131         this.province_id = province_id;
132     }
133 }


主类:

public class FaultStatistics {
    private static final Logger logger = LoggerFactory.getLogger(FaultStatistics.class);

    public static void main(String[] args) {
        // 传递统计条件给map
        String start;
        if ( args.length != 0 && StringUtil.notEmpty(args[0])) {
            start = args[0].substring(0,8);
        } else {
            // 默认为当前时间的前一天
            start = DateUtil.format(DateUtil.getPreDay(new Date()), new SimpleDateFormat("yyyyMMdd"));
        }
        
        DayFaultStatisticsJob dayFaultStatisticsJob = new DayFaultStatisticsJob();
        dayFaultStatisticsJob.runJob(start);
    }

}


打成jar包,hadoop中运行,可在mysql中查询到运行结果(mysql中要存在对应的表)

 

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。