web.py中实现类似Django中的ORM的查询效果

Django中的对象查询

Django框架自带了ORM,实现了一些比较强大而且方便的查询功能,这些功能和表无关。比如下面这个例子:

class Question(models.Model):
  question_text = models.CharField(max_length=200)
  pub_date = models.DateTimeField(‘date published‘)
>>> Question.objects.all()
>>> Question.objects.get(pk=1)

从例子可以看出, objects.all 和 objects.get 这些功能都不是在 class Question 中定义的,可能在其父类 models.Model 中定义,也可能不是。那么我们在 web.py 中如何实现这样的功能呢?(如果你选择使用 SQLAlchemy 就不需要自己实现了)。

实现

思路

我们注意到 Question.objects.all() 这样的调用是直接访问了类属性 objects ,并调用了 objects 属性的方法 all() 。这里 objects 可能是一个实例,也可能是一个类。我个人认为(我没看过Django的实现)这应该是一个实例,因为实例化的过程可以传递一些表的信息,使得类似 all() 这样的函数可以工作。经过分析之后,我们可以列出我们需要解决的问题:

  1. 需要实现一个模型的父类 Model ,实际的表可以从这个父类继承以获得自己没有定义的功能。
  2. 实际的模型类(比如 Question 类)定义后,不实例话的情况下就要具备 objects.all() 这样的查询效果。

从上面的需求可以看出,我们需要在类定义的时候就实现这些功能,而不是等到类实例化的时候再实现这些功能。 类定义的时候实现功能 ?这不就是 metaclass (元类)做的事情嘛。因此实现过程大概是下面这样的:

  1. 实现一个 Model 类,其绑定方法和表的增、删、改有关。
  2. 修改 Model 类的元类为 ModelMetaClass ,该元类定义的过程中为类增加一个 objects 对象,该对象是一个 ModelDefaultManager 类的实例,实现了表的查询功能。

代码

都说不给代码就是耍流氓,我还是给吧。说明下:使用的数据库操作都是web.py的db库中的接口。

Python	# -*- coding: utf-8 -*-
  import web
  import config  # 自定义的配置类,可以忽略
  def _connect_to_db():
    return web.database(dbn="sqlite", db=config.dbname)
  def init_db():
    db = _connect_to_db()
    for statement in config.sql_statements:
      db.query(statement)
  class ModelError(Exception):
    """Exception raised by all models.
    Attributes:
      msg: Error message.
    """
    def __init__(self, msg=""):
      self.msg = msg
    def __str__(self):
      return "ModelError: %s" % self.msg
  class ModelDefaultManager(object):
    """ModelManager implements query functions against a model.
    Attributes:
      cls: The class to be managed.
    """
    def __init__(self, cls):
      self.cls = cls
      self._table_name = cls.__name__.lower()
    def all(self):
      db = _connect_to_db()
      results = db.select(self._table_name)
      return [self.cls(x) for x in results]
    def get(self, query_vars, where):
      results = self.filter(query_vars, where, limit=1)
      if len(results) > 0:
        return results[0]
      else:
        return None
    def filter(self, query_vars, where, limit=None):
      db = _connect_to_db()
      try:
        results = db.select(self._table_name, vars=query_vars, where=where,
                  limit=limit)
      except (Exception) as e:
        raise ModelError(str(e))
      return [self.cls(x) for x in results]
  class ModelMetaClass(type):
    def __new__(cls, classname, bases, attrs):
      new_class = super(ModelMetaClass, cls).__new__(cls, classname,
                               bases, attrs)
      objects = ModelDefaultManager(new_class)
      setattr(new_class, "objects", objects)
      return new_class
  class Model(object):
    """Parent class of all models.
    """
    __metaclass__ = ModelMetaClass
    def __init__(self):
      pass
    def _table_name(self):
      return self.__class__.__name__.lower()
    def insert(self, **kargs):
      db = _connect_to_db()
      try:
        with db.transaction():
          db.insert(self._table_name(), **kargs)
      except (Exception) as e:
        raise ModelError(str(e))
    def delete(self, where, using=None, vars=None):
      db = _connect_to_db()
      try:
        with db.transaction():
          db.delete(self._table_name(), where, vars=vars)
      except (Exception) as e:
        raise ModelError(str(e))
    def save(self, where, vars=None, **kargs):
      db = _connect_to_db()
      try:
        with db.transaction():
          db.update(self._table_name(), where, vars, **kargs)
      except (Exception) as e:
        raise ModelError(str(e))

使用

首先定义表对应的类:

class Users(Model):
    ...

使用就和Django的方式一样:

>>> user_list = Users.objects.all()

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。