Pyton 实现SQLHelper

看了廖老师的教程实现了这个模块,按照自己的思路实现了一个,代码附下。

技术分享

 

 

 

 

 

 

 

 

 

 

 

 

 

需要说名的几点:

1. dbcontext继承自threading.local,确保每个线程中都有独立的一个dbcontext对象,保证个用户数据独立。

2. connection对象是对dbcontext对象的一个封装,实现了getcursor方法,和关闭方法。

3. 现在的代码不够DRY,select方法和update方法调用了connection对象后都要手动关闭,可以实现一个装饰器,把要调用connection对象的代码块儿包裹起来,初始化和关闭connection对象的代码提取到装饰器中。下次补上。

代码附下:

 

技术分享
  1 import MySQLdb,logging,threading,time,uuid
  2 logging.basicConfig(level=logging.INFO,format=%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s,datefmt=%a, %d %b %Y %H:%M:%S)
  3 
  4 def next_id(t=None):
  5     ‘‘‘
  6     Return next id as 50-char string.
  7 
  8     Args:
  9         t: unix timestamp, default to None and using time.time().
 10     ‘‘‘
 11     if t is None:
 12         t = time.time()
 13     return %015d%s000 % (int(t * 1000), uuid.uuid4().hex)
 14 
 15 class Dict(dict):
 16     def __init__(self,names=(),values=(),**args):
 17         super(Dict,self).__init__(**args)
 18         for k,v in zip(names,values):
 19             self[k]=v
 20     def __getattr__(self, item):
 21         try:
 22             return self[item]
 23         except KeyError:
 24             raise AttributeError(it has no attribute named %s%(str(item),))
 25     def __setattr__(self, key, value):
 26         self[key]=value
 27 
 28 class dbcontext(threading.local):
 29     def __init__(self):
 30         self.db= MySQLdb.connect(host="localhost",port=3306,passwd=toor,user=root,db=xdyweb)
 31     def getdb(self):
 32         return self.db
 33 
 34 class conntection(object):
 35     def __init__(self):
 36         self.dbctx=dbcontext()
 37         self.db=self.dbctx.getdb()
 38         self.cursor=self.db.cursor()
 39     def getcursor(self):
 40         return self.cursor
 41     def close(self):
 42         self.db.close()
 43         self.cursor=None
 44 def _select(sql,first,*args):
 45     conn=conntection()
 46     csr=conn.getcursor()
 47     sql=sql.replace(?,%s)
 48     values=csr.execute(sql,*args)
 49     try:
 50         if csr.description:
 51             names=[x[0] for x in csr.description]
 52         if first:
 53             values=csr.fetchone()
 54             if values is None:
 55                 return None
 56             return Dict(names,values)
 57         return  [Dict(names,x) for x in csr.fetchall()]
 58     finally:
 59         conn.close()
 60 
 61 def select(sql,first,*args):
 62     return _select(sql,first,*args)
 63 def select_one(sql,pk):
 64     return _select(sql,True,pk)
 65 
 66 def update(sql,*args):
 67     r‘‘‘
 68     Execute update SQL.
 69 
 70     >>> u1 = dict(id=1000, name=‘Michael‘, email=‘[email protected]‘, passwd=‘123456‘, last_modified=time.time())
 71     >>> insert(‘user‘, **u1)
 72     1
 73     >>> u2 = select_one(‘select * from user where id=?‘, 1000)
 74     >>> u2.email
 75     u‘[email protected] 76     >>> u2.passwd
 77     u‘123456‘
 78     >>> update(‘update user set email=?, passwd=? where id=?‘, ‘[email protected]‘, ‘654321‘, 1000)
 79     1
 80     >>> u3 = select_one(‘select * from user where id=?‘, 1000)
 81     >>> u3.email
 82     u‘[email protected] 83     >>> u3.passwd
 84     u‘654321‘
 85     >>> update(‘update user set passwd=? where id=?‘, ‘***‘, ‘123\‘ or id=\‘456‘)
 86     0
 87     ‘‘‘
 88     conn=conntection()
 89     csr=conn.getcursor()
 90     sql=sql.replace(?,%s)
 91 
 92     try:
 93         csr.execute(sql,args)
 94         return csr.rowcount
 95     finally:
 96         conn.close()
 97 def insert(table,**kw):
 98     ‘‘‘
 99     Execute insert SQL.
100 
101     >>> u1 = dict(id=2000, name=‘Bob‘, email=‘[email protected]‘, passwd=‘bobobob‘, last_modified=time.time())
102     >>> insert(‘user‘, **u1)
103     1
104     >>> u2 = select_one(‘select * from user where id=?‘, 2000)
105     >>> u2.name
106     u‘Bob‘
107     >>> insert(‘user‘, **u2)
108     Traceback (most recent call last):
109       ...
110     IntegrityError: 1062 (23000): Duplicate entry ‘2000‘ for key ‘PRIMARY‘
111     ‘‘‘
112     cols, args = zip(*kw.iteritems())
113     sql = insert into `%s` (%s) values (%s) % (table, ,.join([`%s` % col for col in cols]), ,.join([? for i in range(len(cols))]))
114     logging.info(sql)
115     return update(sql,*args)
116 
117 def main():
118     # db=MySQLdb.connect(host="localhost",port=3306,passwd=‘toor‘,user=‘root‘,db=‘xdyweb‘)
119 
120     # conn=conntection()
121     # c=conn.getcursor()
122     # r=c.execute("select * from users where email=%s",(‘[email protected]‘,))
123     # logging.warning(r)
124 
125     # f=_select(‘select * from users where email =?‘,False,(‘[email protected]‘,))
126     # logging.info(f)
127     # pass
128 
129     u1 = dict(id=2000, name=Bob, email=[email protected], passwd=bobobob, last_modified=time.time())
130     r=insert(user, **u1)
131     logging.info(r)
132 if __name__=="__main__":
133     main()
View Code

 

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。