mapreduce实现从hbase中统计数据,结果存入mysql中
最近开始学习使用mapreduce统计hbase中的数据,并将结果集存入mysql中,供前台查询使用。
使用hadoop版本为2.5.1,hbase版本为0.98.6.1
mapreduce程序分为三个部分:job、map函数、reduce函数
job类:
1 public class DayFaultStatisticsJob { 2 private static final Logger logger = LoggerFactory.getLogger(DayFaultStatisticsJob.class); 3 4 public void runJob(String start){ 5 try { 6 logger.info("开始运行mapreduce,统计故障信息"); 7 Configuration config = HBaseConfiguration.create(); 8 // 传递统计条件给map 9 config.set("search.time.start",start); 10 11 DBConnector dbConnector = DBConnectorUtil.getDBConnector(); 12 DBConfiguration.configureDB(config, "com.mysql.jdbc.Driver", "jdbc:mysql://" + dbConnector.getHost() + ":" + dbConnector.getPort() + "/xcloud", dbConnector.getUser(), dbConnector.getPwd()); 13 14 Job job = Job.getInstance(config); 15 job.setJarByClass(DayFaultStatisticsJob.class); // class that contains mapper and reducer 16 Scan scan = new Scan(); 17 scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs 18 scan.setCacheBlocks(false); // don‘t set to true for MR jobs 19 scan.addFamily(GlobalConstants.DEVICE_FAULT_FAMILY_NAME); 20 21 // set other scan attrs 22 // TODO 增加遍历条件 23 /*scan.setStartRow(); 24 scan.setStopRow();*/ 25 //scan.setFilter(); 26 27 TableMapReduceUtil.initTableMapperJob( 28 GlobalConstants.TABLE_NAME_DEVICE_FAULT, // input table 29 scan, // Scan instance to control CF and attribute selection 30 DayFaultStatisticsMapper.class, // mapper class 31 Text.class, // mapper output key 32 IntWritable.class, // mapper output value 33 job); 34 job.setReducerClass(DayFaultStatisticsReducer.class); // reducer class 35 job.setNumReduceTasks(2); // at least one, adjust as required 36 //FileOutputFormat.setOutputPath(job, new Path("/usr/local/mapreduce")); // adjust directories as required 37 DBOutputFormat.setOutput(job, GlobalConstants.MYSQL_DEVICE_FAULT_DAY, GlobalConstants.MYSQL_DEVICE_FAULT_DAY_FIELDS); 38 boolean b = job.waitForCompletion(true); 39 if (b) { 40 logger.info("mapreduce任务正常结束"); 41 System.exit(0); 42 } else { 43 logger.info("mapreduce任务异常结束"); 44 System.exit(1); 45 } 46 } catch (IOException e) { 47 logger.error(e.getMessage(),e); 48 } catch (InterruptedException e) { 49 logger.error(e.getMessage(),e); 50 } catch (ClassNotFoundException e) { 51 logger.error(e.getMessage(),e); 52 } 53 } 54 55 }
mapper类:
public class DayFaultStatisticsMapper extends TableMapper<Text, IntWritable> { private static final Logger logger = LoggerFactory.getLogger(DayFaultStatisticsMapper.class); private Text text = new Text(); private IntWritable ONE = new IntWritable(1); HashMap<String,String> conditionMap = new HashMap<String,String>(); // 接收过滤条件 protected void setup(Context context) throws IOException, InterruptedException { conditionMap.put("start",context.getConfiguration().get("search.time.start").trim()); } public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String timestamp = null; String company_id = null; String product_id = null; String model_id = null; String fault_id = null; String province_id = null; String city_id = null; String district_id = null; for(java.util.Map.Entry<byte[], byte[]> val : value.getFamilyMap(Bytes.toBytes("info")).entrySet()){ String ikey = new String(val.getKey()); String ivalue = new String(val.getValue()); // 按条件进行map if (StringUtil.isEmpty(ikey) || StringUtil.isEmpty(ivalue)){ return; } else if (ikey.equals("company_id")){ company_id = ivalue; continue; } else if (ikey.equals("product_id")){ product_id = ivalue; continue; } else if (ikey.equals("model_id")){ model_id = ivalue; continue; } else if (ikey.equals("fault_id")){ fault_id = ivalue; continue; } else if (ikey.equals("province_id")){ province_id = ivalue; continue; } else if (ikey.equals("city_id")){ city_id = ivalue; continue; } else if (ikey.equals("district_id")){ district_id = ivalue; continue; } else if (ikey.equals("timestamp")){ String time = ivalue.substring(0,8);// 判断是否是当日 if (time.equals(conditionMap.get("start"))){ timestamp = time; // 统计日故障发生次数 } continue; } } if (company_id == null || product_id == null || model_id == null || fault_id == null || timestamp == null || fault_id.equals("-1")) { return; } // 故障码为 -1的不统计 String val = company_id + "-" +product_id + "-" +model_id + "-"+fault_id + "-"+timestamp + "-"+province_id + "-"+ city_id+ "-"+ district_id; // 分组key text.set(val); // we can only emit Writables... context.write(text, ONE); } }
reducer类:
1 public class DayFaultStatisticsReducer extends Reducer<Text, IntWritable, FaultDay, Text> { 2 private static final Logger logger = LoggerFactory.getLogger(DayFaultStatisticsReducer.class); 3 4 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 5 int value = 0; 6 for (IntWritable val : values) { 7 value += val.get(); 8 } 9 //company_id + "-" +product_id + "-" +model_id + "-"+fault_id + "-"+timestamp + "-"+province_id + "-"+ city_id+ "-"+ district_id 10 String fields[] = key.toString().split("-"); 11 if (fields.length != 8){ 12 return; 13 } 14 // 处理day day格式:2015-02-01 15 String day = fields[4].substring(0,4) + "-" + fields[4].substring(4,6) + "-" + fields[4].substring(6,8); 16 // DateUtil.format(new Date(), new SimpleDateFormat("yyyy-MM-dd‘T‘HH:mm:ss")); 17 FaultDay faultDay = new FaultDay(); 18 faultDay.setId(UUIDGen.generate()); 19 faultDay.setDay(day); 20 faultDay.setCompany_id(fields[0]); 21 faultDay.setProduct_id(fields[1]); 22 faultDay.setModel_id(fields[2]); 23 faultDay.setFault_id(fields[3]); 24 if (!fields[5].equals("null")){ 25 faultDay.setProvince_id(fields[5]); 26 } 27 if (!fields[6].equals("null")){ 28 faultDay.setCity_id(fields[6]); 29 } 30 if (!fields[7].equals("null")){ 31 faultDay.setDistrict_id(fields[7]); 32 } 33 faultDay.setNum(value); 34 context.write(faultDay, null); 35 } 36 37 }
FaultDay中实现DBWritable接口
1 public class FaultDay implements Writable, DBWritable { 2 private String id; 3 4 private String day; // 2015-02-01 5 private String company_id; 6 private String product_id; 7 private String model_id; 8 private String fault_id; 9 private String province_id; 10 private String city_id; 11 private String district_id; 12 private int num; 13 14 @Override 15 public void write(PreparedStatement statement) throws SQLException { 16 int index = 1; 17 statement.setString(index++, this.getId()); 18 statement.setString(index++, this.getDay()); 19 statement.setString(index++, this.getCompany_id()); 20 statement.setString(index++, this.getProduct_id()); 21 statement.setString(index++, this.getModel_id()); 22 statement.setString(index++, this.getFault_id()); 23 statement.setString(index++, this.getProvince_id()); 24 statement.setString(index++, this.getCity_id()); 25 statement.setString(index++, this.getDistrict_id()); 26 statement.setInt(index++, this.getNum()); 27 } 28 29 @Override 30 public void readFields(ResultSet resultSet) throws SQLException { 31 this.id = resultSet.getString(1); 32 this.day = resultSet.getString(2); 33 this.company_id = resultSet.getString(3); 34 this.product_id = resultSet.getString(4); 35 this.model_id = resultSet.getString(5); 36 this.fault_id = resultSet.getString(6); 37 this.province_id = resultSet.getString(7); 38 this.city_id = resultSet.getString(8); 39 this.district_id = resultSet.getString(9); 40 this.num = resultSet.getInt(10); 41 } 42 43 @Override 44 public void write(DataOutput out) throws IOException { 45 //To change body of implemented methods use File | Settings | File Templates. 46 } 47 48 @Override 49 public void readFields(DataInput in) throws IOException { 50 //To change body of implemented methods use File | Settings | File Templates. 51 } 52 53 54 public String getCity_id() { 55 return city_id; 56 } 57 58 public void setCity_id(String city_id) { 59 this.city_id = city_id; 60 } 61 62 public String getCompany_id() { 63 return company_id; 64 } 65 66 public void setCompany_id(String company_id) { 67 this.company_id = company_id; 68 } 69 70 public String getDay() { 71 return day; 72 } 73 74 public void setDay(String day) { 75 this.day = day; 76 } 77 78 public String getDistrict_id() { 79 return district_id; 80 } 81 82 public void setDistrict_id(String district_id) { 83 this.district_id = district_id; 84 } 85 86 public String getFault_id() { 87 return fault_id; 88 } 89 90 public void setFault_id(String fault_id) { 91 this.fault_id = fault_id; 92 } 93 94 public String getId() { 95 return id; 96 } 97 98 public void setId(String id) { 99 this.id = id; 100 } 101 102 public String getModel_id() { 103 return model_id; 104 } 105 106 public void setModel_id(String model_id) { 107 this.model_id = model_id; 108 } 109 110 public int getNum() { 111 return num; 112 } 113 114 public void setNum(int num) { 115 this.num = num; 116 } 117 118 public String getProduct_id() { 119 return product_id; 120 } 121 122 public void setProduct_id(String product_id) { 123 this.product_id = product_id; 124 } 125 126 public String getProvince_id() { 127 return province_id; 128 } 129 130 public void setProvince_id(String province_id) { 131 this.province_id = province_id; 132 } 133 }
主类:
public class FaultStatistics { private static final Logger logger = LoggerFactory.getLogger(FaultStatistics.class); public static void main(String[] args) { // 传递统计条件给map String start; if ( args.length != 0 && StringUtil.notEmpty(args[0])) { start = args[0].substring(0,8); } else { // 默认为当前时间的前一天 start = DateUtil.format(DateUtil.getPreDay(new Date()), new SimpleDateFormat("yyyyMMdd")); } DayFaultStatisticsJob dayFaultStatisticsJob = new DayFaultStatisticsJob(); dayFaultStatisticsJob.runJob(start); } }
打成jar包,hadoop中运行,可在mysql中查询到运行结果(mysql中要存在对应的表)
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。