通过Load table命令将数据文件加载到Sybase IQ数据库里面的Python脚本

CREATE TABLE poc_app.sys_ftp_cfg
(
    ftp_id              varchar(100) NOT NULL,          --话单文件名标记
    ftp_cycle_id        varchar(1) NOT NULL,            --话单文件名周期
    ftp_stage_filepath  varchar(255) NOT NULL,          --话单处理后路径
    ftp_stage_filereg   varchar(100) NOT NULL,          --话单处理后名称格式
    stage_schema        varchar(100) NOT NULL,          --schema名称
    table_name          varchar(100) NOT NULL,          --表名
    delimiter_type_id varchar(10) NOT NULL              --分隔符
);
 
insert into poc_app.sys_ftp_cfg
values(‘jiang_test_d‘,‘D‘,‘/home/sybase/day‘,‘jiang_test_[YYYYMMDD].dat‘,‘poc_app‘,‘jiang_test‘,‘|‘);


#!/usr/bin/python

#-*- encoding: utf-8 -*-
####################################################################################
# name:     SybaseIQ_LoadData.py
# describe: 通过Load table命令将数据文件加载到Sybase IQ数据库里面
####################################################################################
import os
import pyodbc
import string
import sys
from subprocess import Popen,PIPE
import ConfigParser
reload(sys)
sys.setdefaultencoding(‘utf8‘)

‘‘‘
将数据文件加载到Sybase IQ数据库里面
‘‘‘
class SybaseIQLoad:
    debug = 0
    def __init__(self,dbinfo):
        self.UID = dbinfo[1]
        self.PWD = dbinfo[2]
        odbcinfo    = ‘DSN=%s;UID=%s;PWD=%s‘%(dbinfo[0],dbinfo[1],dbinfo[2])
        self.cnxn   = pyodbc.connect(odbcinfo,autocommit=True,ansi=True)
        self.cursor = self.cnxn.cursor()

    def __del__(self):
        if self.cursor:
            self.cursor.close()
        if self.cnxn:
            self.cnxn.close()

    def _printinfo(self,msg):
        print "%s"%(msg)
        print "\n"

    def _GetStageName(self,ftp_stage_filereg,ftp_cycle_id,cur_static_time):
        if ftp_cycle_id.lower() == ‘h‘:
            ftp_stage_filename = ftp_stage_filereg.replace(‘[YYYYMMDDHH]‘,cur_static_time[0:10])            
        if ftp_cycle_id.lower() == ‘d‘:
            ftp_stage_filename = ftp_stage_filereg.replace(‘[YYYYMMDD]‘,cur_static_time[0:8])
        if ftp_cycle_id.lower() == ‘w‘:
            ftp_stage_filename = ftp_stage_filereg.replace(‘[YYYY_WW]‘,cur_static_time[0:7])
        if ftp_cycle_id.lower() == ‘m‘:
            ftp_stage_filename = ftp_stage_filereg.replace(‘[YYYYMM]‘,cur_static_time[0:6])
        return  ftp_stage_filename
            
    def _getLoadInfo(self,ftp_id):
        sql = ‘‘‘
                select
                  ftp_cycle_id
                 ,ftp_stage_filepath
                 ,ftp_stage_filereg
                 ,stage_schema
                 ,delimiter_type_id
                 ,table_name
                from  jiang.sys_ftp_cfg
                where  ftp_id = ‘%s‘
              ‘‘‘ %(ftp_id)
        self.cursor.execute(sql.strip())
        row = self.cursor.fetchone()
        return row
    
    def _getSybIQServInfo(self):
        # 保存SybaseIQ的主机和端口号     
        sybservinfo = []
        
        # ODBC配置文件绝对路径    
        unixodbc_file = "/etc/unixODBC/odbc.ini"
        config = ConfigParser.ConfigParser()
        config.read(unixodbc_file)
        # 获取SybaseIQ的IP地址             
        ServerIP = config.get("SybaseIQDSN", "Server")         
        # 获取SybaseIQ的端口号            
        Port = config.get("SybaseIQDSN", "Port")
        
        # 保存获取的IP地址和端口号    
        sybservinfo.append(ServerIP)
        sybservinfo.append(Port)
        
        return sybservinfo
        
    def loaddata(self,ftp_id,cur_static_time):
        #取文件加载相关配置信息
        row =  self._getLoadInfo(ftp_id)

        ftp_cycle_id        = row[0]
        ftp_stage_filepath  = row[1]
        ftp_stage_filereg   = row[2]
        stage_schema        = row[3]
        delimiter_type_id   = row[4]
        table_name          = row[5]
        
        # 获取指定日期的文件名    
        ftp_stage_filename  = self._GetStageName(ftp_stage_filereg,ftp_cycle_id,cur_static_time)
        
        # 获取清洗后文件的绝对路径    
        ftp_stage_absolute_filename = os.path.join(ftp_stage_filepath,ftp_stage_filename)

        # 对清洗后的文件再进行处理  
        #ftp_stage_absolute_filename_final = ftp_stage_absolute_filename + ‘*‘

        # 获取SybaseIQ的主机IP地址和端口号    
        sybaseiq_ipport = self._getSybIQServInfo()  
                
        # 获取表的所有字段     
        table_columns = ‘‘‘
                            select column_name
                              from syscolumn a
                              join systable b
                             on a.table_id = b.table_id  
                            where b.table_name = ‘%s‘ ># /tmp/table_name.log
                        ‘‘‘%(table_name)
        load_sql=‘‘‘dbisql -c "uid=%s;pwd=%s" -Host %s -port %s -nogui "%s"‘‘‘%(self.UID,self.PWD,sybaseiq_ipport[0],sybaseiq_ipport[1],table_columns)
        os.system(load_sql)
        
        # 处理生成的表字段文件    
        columns_sql = ‘‘‘
                        cat /tmp/table_name.log | sed "s/‘//g" | awk ‘{printf "%s,",$0}‘| sed ‘s/,$//g‘
                      ‘‘‘
        result = Popen(columns_sql,shell=True,stdout=PIPE,stderr=PIPE)
        right_info = result.stdout.read().strip(‘\xef|\xbb|\xbf‘)
        err_info = result.stderr.read()               
           
        loadsql = ‘‘‘
                    load table %s.cpms_area_user
                    (
                        %s    
                    )
                    USING FILE ‘%s‘
                    FORMAT ASCII
                    ESCAPES OFF
                    QUOTES OFF
                    NOTIFY 1000000
                    DELIMITED BY ‘%s‘
                    WITH CHECKPOINT ON;
                    COMMIT;
                  ‘‘‘%(stage_schema, right_info, ftp_stage_absolute_filename, delimiter_type_id)                          

        try:
            iserr = 0
            print "*************Begin to execute load table command...*************\n"
            if self.debug == 1:
                self._printinfo(loadsql.strip())   
            #self.cursor.execute(loadsql.strip())    
            loadsql=‘‘‘dbisql -c "uid=%s;pwd=%s" -Host %s -port %s -nogui "%s"‘‘‘%(self.UID,self.PWD,sybaseiq_ipport[0],sybaseiq_ipport[1],loadsql)
            os.system(loadsql)            
            print "\n*************End to execute load table command...*************"
            print "**************************Successful**************************"
        except Exception,err:
            iserr = 1
            print "Return value %s,Error %s" % (iserr,err)
       
        return iserr    
#Main
def main():
    # 检查传入参数个数     
    if len(sys.argv) < 6 :
        print ‘usage: python SybaseIQ_LoadData.py SybaseDSN username password ftp_id cur_static_time\n‘
        sys.exit(1)
    
    # 定义连接Sybase IQ的信息      
    dbinfo = []
    #dbinfo.append(‘SybaseIQDSN‘)
    #dbinfo.append(‘jiang‘)
    #dbinfo.append(‘jiang‘)
    dbinfo.append(sys.argv[1])
    dbinfo.append(sys.argv[2])
    dbinfo.append(sys.argv[3])
    
    ftp_id             = sys.argv[4]
    cur_static_time    = sys.argv[5]
    
    SIQ  =  SybaseIQLoad(dbinfo)
    ret =  SIQ.loaddata(ftp_id,cur_static_time)
    return  ret
if __name__ == ‘__main__‘:
   sys.exit(main())
   
  

通过Load table命令将数据文件加载到Sybase IQ数据库里面的Python脚本,古老的榕树,5-wow.com

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。