json数据源文件解析

文章: http://blog.csdn.net/lili72


背景: 分布式文件通过rsync同步到生产机。 文件数 1440=24*60  也就是一分钟生成一个文件 文件命名 0000 0001 0002 ... 2358 2359 。由于文件传过来是JSON格式,需要对文件进行解析,导入HDFS中。

 

  过程

   1  rsync同步文件到当天的日期目录,每天实时把文件同步到生产机制定目录,每天生成新文件夹,由于是每分钟生成文件,每个文件夹都有1440个文件。

1.1  修改配置文件,增加一个目录的同步权限。

  Vi  /etc/rsyncd.conf

   [orders]   

  path = /etldata/order    

  list=yes 

  ignore errors 

  auth users = hadoop

  secrets file = /etc/rsyncd.secrets  

  comment = This is test  data  

  

1.2  一条命令即可同步。

 rsync -az --port=8730 /data1/queue/ex_user_lastlogin/20141210/  [email protected]::userOrder/

 

 

   2  对当天日期文件进行解析成以|分隔,检查文件数量是否达到1440个,对当天的日期文件夹中的文件进行解析转换。文件命名用一定的规则。 处理好该天的数据生成日期.ok文件

 

 

   3  导入HDFShive)中,每天定时检查日期.ok文件是否生成,生成则load 前一天日期的数据。

 

 

2.1  Json解析工具选择,Jackson效率比较高

Json数据样例:{"data":{"serverid":"1001","appid":1005,"client_ip":"118.180.156.249","time":"2014-12-11 23:59:59","userid":361443577},"ordertype":1}

  新建java model 

 

 
public class UserLog {
private String   serverid="";
private String  servertime="";
private  String  userid="";
private  String  appid="";
private  String  client_ip="";
    //空的构造函数一定要
public UserLog(){
}
public UserLog(String serverid, String servertime, String userid, String appid,
String client_ip) {
this.serverid = serverid;
this.servertime = servertime;
this.userid = userid;
this.appid = appid;
this.client_ip = client_ip;
}
 
public String getServerid() {
return serverid;
}
 
public void setServerid(String serverid) {
this.serverid = serverid;
}
 
 
public String getUserid() {
return userid;
}
 
public void setUserid(String userid) {
this.userid = userid;
}
 
public String getAppid() {
return appid;
}
 
public void setAppid(String appid) {
this.appid = appid;
}
 
public String getClient_ip() {
return client_ip;
}
 
public void setClient_ip(String client_ip) {
this.client_ip = client_ip;
}
 
public String getServertime() {
return servertime;
}
 
public void setServertime(String servertime) {
this.servertime = servertime;
}
 
@Override
public String toString() {
return  serverid+"|" + userid+"|" +appid+"|"+servertime+"|"+client_ip;
}
 

 

 

另外一个model

 

public class UserModel {
private  UserLog  data;
private String  type="" ;
public String getType() {
return type;
}
 
public void setType(String type) {
this.type = type;
}
 
public  UserModel(){
}
 
public UserLog getData() {
return data;
}
 
public void setData(UserLog data) {
this.data = data;
}
@Override
public String toString() {
return data.toString()+"|"+type;
}
 
}
 

 

解析程序:

 

 

import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.LineNumberReader;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
 
import org.codehaus.jackson.map.ObjectMapper;
 
public class UserLoginLog {
 private static final ObjectMapper mapper = new ObjectMapper();
 
 public  static  final  String   descPath="/etldata/usertest/data";
 
 public static  long  total_count=0;
 
 
public static  void JsonToBean(String  strline) throws Exception {        
UserLog bean = mapper.readValue(strline, UserLog.class);
        System.out.println(bean.toString());
    }
public static void main(String[] args) throws Exception {
if(args.length<1){
System.err.println("Usage Main : [java   -jar  sourcePathFileName  [descPath]  ");
System.exit(0);
}
 // 传入基础路径参数     
//  /etldata/userlogin/85
String  basePath =args[0];
String  desFileName =null ;
String  sourcFileName="F:/0000.log";
//获取  24 * 60 分钟的文件名称
List<String>  paths=getFileName();
//记录开始时间
long   start_time= System.currentTimeMillis();
SimpleDateFormat  sf = new SimpleDateFormat("yyyyMMdd");
String currentDate= sf.format(new Date());
//输出路径    默认是   /etldata/userlogin/data/yyyymmdd/0000.txt
desFileName =descPath+"/"+currentDate ;
//如果传入的目标参数 不为空   则生成文件到指定的目录
if(args.length==2){
desFileName=args[1];
}
//循环读取源文件夹下的文件   循环解析到目标目录
for(String fileNum: paths){
sourcFileName =basePath+"/"+fileNum+".log";
System.out.println("sourcFileName-----" +sourcFileName);
System.out.println("desFileName------" + desFileName);
readFile(sourcFileName,desFileName+"/" +fileNum+".txt");
}
long  end_time =System.currentTimeMillis();
System.out.println("do  finsh -----!  花费时间    " +(end_time-start_time)  +"  处理 "+ total_count +"  行数据");
}
/**
 * @return
 * 构造 24 * 60  每分钟生成一个文件的  文件名称  0000  0001  0002 .... 2358  2359  
 */
public static List<String> getFileName() {
String hour = "";
String minu = "";
List<String> fileNameList = new ArrayList<String>(1441);
for (int i = 0; i < 24; i++) {
hour = i + "";
if (hour.length() == 1) {
hour = "0" + hour;
}
for (int j = 0; j < 60; j++) {
minu = j + "";
if (minu.length() == 1) {
minu = "0" + minu;
}
fileNameList.add(hour + minu);
}
}
return fileNameList;
}
 /**
 * @param sourcFileName
 * @param desFileName
 * @throws Exception
 *  读取数据出来 然后解析    同时记录条数
 *  {"data":{"serverid":"1001","appid":1001,"client_ip":"120.217.97.205","time":"2014-12-11 23:59:59","userid":19617632},"ordertype":1}
 */
public  static  void   readFile(String  sourcFileName,String desFileName) throws Exception{
 
 FileReader fr=new FileReader(sourcFileName);
         LineNumberReader lr=new LineNumberReader(fr,512);
         while(lr.readLine()!=null){
             String str=lr.readLine();
             UserModel bean =null;
             if(str!=null){
             bean = mapper.readValue(str, UserModel.class);
             writeFile(bean.toString()+"\n",desFileName);
             total_count = total_count +1 ;
             }
         }
         lr.close();
 }
 /**
 * @param content
 * @param pathName
 * @return
 * @throws IOException
 * 解析之后的文件   写到另外一个目录
 */
public static String writeFile(String content,String pathName) throws IOException {
File file = new File(pathName);
if(!file.exists()){
file.getParentFile().mkdirs();
file.createNewFile(); 
}
appendFileStr(pathName, content );
return  pathName ;
}
/**
 * @param fileName
 * @param content
 * 追加文件内容
 */
public static void appendFileStr(String fileName, String content){  
try {  
FileWriter writer = new FileWriter(fileName, true);  
writer.write(content);  
writer.close();  
} catch (IOException e) {  
e.printStackTrace();  
}  
}  
 
}

 

依赖的jar    下载地址: http://download.csdn.net/detail/lili72/8279053

 

3  用脚本定时起调。

 

#!/usr/bin/env bash
# ************************************************************************
# yyyymmdd       version      author               modified
# --------       ---------    -------------      ----------------------
# 20141208       V14.00.001   lisc               
#
# ************************************************************************


if [ $# -gt 1 ];then # 参数个数,需视具体参数修改 
   echo "Params error."
   echo "Useage: load_user_login_log.sh [data_date]"
   exit 1
fi

logfile=$BIPROG_ROOT/logs/`basename $0`.log #定义写日志文件名

###############################引入公共函数库#########################



vDay=${1:-"`lastday YYYY-MM-DD`"} #如果没有传日期的参数,默认取昨天
vDay2=${1:-"`lastday YYYYMMDD`"} #如果没有传日期的参数,默认取昨天
dtstr=`date -d "0 day ago " +%Y-%m-%d" "%H:%M:%S`

echo $vDay2
############################ 功能执行部分 ############################
writelog "Program($VERSION) start..."

# 先解析文件
java -jar   /etldata/userorder/pare_json.jar   /etldata/userorder/85  /etldata/userorder/data/${vDay2} 
touch   /etldata/userorder/data/${vDay2}/${vDay2}.ok

# load  到hive 
SQL="
  load    data   local  inpath '/etldata/userorder/data/${vDay2}/*.txt' overwrite into table userorder.st_userorder_log partition(dt='${vDay}');"

echo $SQL | $HIVE_HOME/bin/hive


 

 

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。