python处理日志数据
需求是这样的,我们需要将日志记录里面关于日活与新增做个统计,每天一次统计记录:
源数据是从请求接口放到队列当中,然后再从队列取出,实现异步插入,以下是python把数据从日志表查询计算之后得到的数据再插入到目标表,
原始日志数据:
409351146956247408 2014-08-30 06:25:46 /money/apple/product/list/IPHONE/api_2 {"api_level":["2"],"app_version":["1.9.6"],"platform":["1"],"api_version":["api_2"],"_key":["87f29b03bb8addddddd0d458da2ff"]} 7366056 49.66.48.198 3 0 409351147959838928 2014-08-30 06:25:46 /money/apple/product/list/IPHONE/api_2 {"api_level":["2"],"app_version":["1.9.6"],"platform":["1"],"api_version":["api_2"],"_key":["2dddddddd0000820d458da2ff"]} 4973658 117.136.19.177 5 0 409351147960207591 2014-08-30 06:25:47 /money/apple/product/list/IPHONE/api_2 {"api_level":["2"],"app_version":["1.9.6"],"platform":["1"],"api_version":["api_2"],"_key":["95e6d2efb31dddddddddd20d458da2ff"]} 3897136 123.151.136.53 3 0
表结构是这样:
目标表结构:
处理程序如下:
# -*- coding: utf-8 -*- from datetime import * import MySQLdb import sys import time import datetime import json import logging logging.basicConfig(level=logging.INFO, format=‘%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s‘, datefmt=‘%a, %d %b %Y %H:%M:%S‘, filename=‘tb_app_profile2.log‘, filemode=‘w‘) console = logging.StreamHandler() console.setLevel(logging.INFO) formatter = logging.Formatter(‘%(name)-12s: %(levelname)-8s %(message)s‘) console.setFormatter(formatter) logging.getLogger(‘‘).addHandler(console) reload(sys) sys.setdefaultencoding(‘utf-8‘) s_connection = MySQLdb.connect(host="127.0.0.1",port=4277,user="xxx",passwd="xxx$meu78",db="db_access_log"); s_connection.set_character_set(‘utf8‘) s_cursor = s_connection.cursor(); s_cursor.execute(‘SET NAMES utf8;‘) s_cursor.execute(‘SET CHARACTER SET utf8;‘) s_cursor.execute(‘SET character_set_connection=utf8;‘) def getDataS(sql): s_cursor.execute(sql) return s_cursor.fetchall() t_connection = MySQLdb.connect(host="127.0.0.1",port=3361,user="xxx",passwd="xxxxx",db="db_statistics"); t_connection.set_character_set(‘utf8‘) t_cursor = t_connection.cursor(); t_cursor.execute(‘SET NAMES utf8;‘) t_cursor.execute(‘SET CHARACTER SET utf8;‘) t_cursor.execute(‘SET character_set_connection=utf8;‘) def getDataT(sql): t_cursor.execute(sql) return t_cursor.fetchall() def setModelT(sql): t_cursor.execute(sql) def process(dtstr): s_table_pre = "tb_access_log_" #用户新增 user_add_all_num = 0; user_add_android_num = 0; user_add_ios_num = 0; user_add_female_num = 0; user_add_male_num = 0; user_add_alldev_num = 0; user_add_alldev_set = set([]); #用户活跃 user_active_all_num = 0; user_active_android_num = 0; user_active_ios_num = 0; user_active_ios_num = 0; user_active_all_set = set([]); user_active_android_set = set([]); user_active_ios_set = set([]); maxId = 0; flag = 0; while 1: logging.debug("maxId:%d" % maxId) logging.debug("flag:%d" % flag) flag = flag + 1; sql = "SELECT ID,AccessTime,Uri,Params,UserID,RealIp,TimeSpent,Code from %s%s where id > %d order by id limit 10000" %(s_table_pre,dtstr,maxId); result = getDataS(sql); result_len = len(result); if result_len == 0: break elif result_len > 0: if flag > 3: break pass maxId = result[result_len-1][0] for i in result: user_active_all_set.add(i[4]) try: platform = int(json.loads(i[3])["platform"][0]) logging.debug("platform:%d" % platform) if platform == 1: user_active_android_set.add(i[4]) if platform == 2: user_active_ios_set.add(i[4]) except Exception,e : logging.error("Exception:%s" % e) if i[2] == "/account/register" and i[7] == 0: try: user_add_alldev_set.add(json.loads(i[3])["device_code"][0]) except Exception,e : logging.error("Exception:%s" % e) user_add_all_num = user_add_all_num + 1; try: platform = int(json.loads(i[3])["platform"][0]) logging.debug("platform:%d" % platform) if platform == 1: user_add_android_num = user_add_android_num + 1; elif platform == 2: user_add_ios_num = user_add_ios_num + 1; except Exception,e : logging.error("Exception:%s" % e) try: gender = int(json.loads(i[3])["gender"][0]) logging.debug("gender:%d" % gender) if gender == 1: user_add_female_num = user_add_female_num + 1; elif gender == 2: user_add_male_num = user_add_male_num + 1; except Exception,e : logging.error("Exception:%s" % e) logging.info("user_active_all_num:%d" % len(user_active_all_set) ) logging.info("user_active_android_num:%d" % len(user_active_android_set) ) logging.info("user_active_ios_num:%d" % len(user_active_ios_set) ) logging.info("user_add_alldev_num:%d" % len(user_add_alldev_set) ) logging.info("user_add_all_num:%d" % user_add_all_num) logging.info("user_add_android_num:%d" % user_add_android_num) logging.info("user_add_ios_num:%d" % user_add_ios_num) logging.info("user_add_female_num:%d" % user_add_female_num) logging.info("user_add_male_num:%d" % user_add_male_num) user_active_all_num = len(user_active_all_set) user_active_android_num = len(user_active_android_set) user_active_ios_num = len(user_active_ios_set) user_add_alldev_num = len(user_add_alldev_set) logging.debug("user_active_all_num:%d" % user_active_all_num) logging.debug("user_active_android_num:%d" % user_active_android_num) logging.debug("user_active_ios_num:%d" % user_active_ios_num) logging.debug("user_add_alldev_num:%d" % user_add_alldev_num) logging.debug("user_add_all_num:%d" % user_add_all_num) logging.debug("user_add_android_num:%d" % user_add_android_num) logging.debug("user_add_ios_num:%d" % user_add_ios_num) logging.debug("user_add_female_num:%d" % user_add_female_num) logging.debug("user_add_male_num:%d" % user_add_male_num) #查询目标表 t_table = "tb_app_profile" t_sql = "select ID,TDate,AppKey,AppValue from %s where TDate=‘%s‘" % (t_table,dtstr); t_result = getDataT(t_sql); t_len = len(t_result); if t_len == 0: logging.debug("insert") sql = ( "insert into %s (TDate,AppKey,AppValue) values " "(‘%s‘,‘user_active_all‘,‘%d‘)," "(‘%s‘,‘user_active_android‘,‘%d‘)," "(‘%s‘,‘user_active_ios‘,‘%d‘)," "(‘%s‘,‘user_add_alldev‘,‘%d‘)," "(‘%s‘,‘user_add_all‘,‘%d‘)," "(‘%s‘,‘user_add_android‘,‘%d‘)," "(‘%s‘,‘user_add_ios‘,‘%d‘)," "(‘%s‘,‘user_add_female‘,‘%d‘)," "(‘%s‘,‘user_add_male‘,‘%d‘);" % ( t_table ,dtstr,user_active_all_num ,dtstr,user_active_android_num ,dtstr,user_active_ios_num ,dtstr,user_add_alldev_num ,dtstr,user_add_all_num ,dtstr,user_add_android_num ,dtstr,user_add_ios_num ,dtstr,user_add_female_num ,dtstr,user_add_male_num ) ); setModelT(sql) elif t_len > 0: logging.debug("update") sql = "update %s set AppValue=%d where TDate=‘%s‘ and AppKey=‘user_active_all‘;" % (t_table,user_active_all_num,dtstr); setModelT(sql) sql = "update %s set AppValue=%d where TDate=‘%s‘ and AppKey=‘user_active_android‘;" % (t_table,user_active_android_num,dtstr); setModelT(sql) sql = "update %s set AppValue=%d where TDate=‘%s‘ and AppKey=‘user_active_ios‘;" % (t_table,user_active_ios_num,dtstr); setModelT(sql) sql = "update %s set AppValue=%d where TDate=‘%s‘ and AppKey=‘user_add_alldev‘;" % (t_table,user_add_alldev_num,dtstr); setModelT(sql) sql = "update %s set AppValue=%d where TDate=‘%s‘ and AppKey=‘user_add_all‘;" % (t_table,user_add_all_num,dtstr); setModelT(sql) sql = "update %s set AppValue=%d where TDate=‘%s‘ and AppKey=‘user_add_android‘;" % (t_table,user_add_android_num,dtstr); setModelT(sql) sql = "update %s set AppValue=%d where TDate=‘%s‘ and AppKey=‘user_add_ios‘;" % (t_table,user_add_ios_num,dtstr); setModelT(sql) sql = "update %s set AppValue=%d where TDate=‘%s‘ and AppKey=‘user_add_female‘;" % (t_table,user_add_female_num,dtstr); setModelT(sql) sql = "update %s set AppValue=%d where TDate=‘%s‘ and AppKey=‘user_add_male‘;" % (t_table,user_add_male_num,dtstr); setModelT(sql) if __name__ == "__main__": days = 6 while days >= 0: tdate = (datetime.datetime.now() - datetime.timedelta(days=days)) dtstr = tdate.strftime(‘%Y%m%d‘) process(dtstr) days = days - 1
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。