Flask---flask_sqlalchemy源码分析

这里写图片描述

基本使用

安装 pip3 install flask_sqlalchemy
我们在使用时候,会执行如下的代码

db = SQLAlchemy()
app = Flask(__name__)
db.init_app(app)

然后models

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, UniqueConstraint, Index,DateTime,ForeignKey
from s8day130_pro import db

class Users(db.Model):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True,autoincrement=True)
    name = Column(String(32),nullable=False,unique=True)

然后创建表

app = create_app()
#从栈拿app
with app.app_context():
    # db.drop_all()
    db.create_all()
    # data = db.session.query(models.Users).all()
    # print(data)

另外需要配置如下:

class BaseConfig(object):
    # SESSION_TYPE = 'redis'  # session类型为redis
    # SESSION_KEY_PREFIX = 'session:'  # 保存到session中的值的前缀
    # SESSION_PERMANENT = True  # 如果设置为False,则关闭浏览器session就失效。
    # SESSION_USE_SIGNER = False  # 是否对发送到浏览器上 session:cookie值进行加密

    SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:[email protected]:3306/test?charset=utf8"
    SQLALCHEMY_POOL_SIZE = 5
    SQLALCHEMY_POOL_TIMEOUT = 30
    SQLALCHEMY_POOL_RECYCLE = -1

    # 追踪对象的修改并且发送信号
    SQLALCHEMY_TRACK_MODIFICATIONS = False

源码分析

我们就来简单的看下源码流程
首先从db = SQLAlchemy()入手吧

 def __init__(self, app=None, use_native_unicode=True, session_options=None,
                 metadata=None, query_class=BaseQuery, model_class=Model):

        self.use_native_unicode = use_native_unicode
        self.Query = query_class
        self.session = self.create_scoped_session(session_options)
        self.Model = self.make_declarative_base(model_class, metadata)
        self._engine_lock = Lock()
        self.app = app
        _include_sqlalchemy(self, query_class)

        if app is not None:
            self.init_app(app)

初始化类中有几个属性Query、session 、Model 、app 还有一个方法init_app(app)

首先看下Query属性:

class BaseQuery(orm.Query):

该类有get_or_404、first_or_404、paginate3个方法,是做了查询异常处理,和分页数据
看看它的父类orm.Query,它是sqlalchemy\orm\Query类,定义了一些操作数据库的系列方法

然后看下session属性

self.session = self.create_scoped_session(session_options)
    def create_scoped_session(self, options=None):
        if options is None:
            options = {}

        scopefunc = options.pop('scopefunc', _app_ctx_stack.__ident_func__)
        options.setdefault('query_cls', self.Query)
        return orm.scoped_session(
            self.create_session(options), scopefunc=scopefunc
        )

上述方法中scopefunc是从_app_ctx_stack = LocalStack()中取如下方法

    def _get__ident_func__(self):
        return self._local.__ident_func__

options初始化字典,然后进行options.setdefault('query_cls', self.Query)添加元素
然后通过self.create_session(options)取构造sessionmaker实例对象,然后在执行构造出来的实例对象的__call__方法
在self.create_session(options)中,我们先看self.create_session部分,是返回一个sessionmaker实例对象

return orm.sessionmaker(class_=SignallingSession, db=self, **options)

然后看下self.create_session(options),也就是sessionmaker实例对象的__call__方法

每次执行数据库操作时,都需要创建一个session,这个session就是sessionmaker的实例对象调用__call__方法,以下是代码

 def __call__(self, **local_kw):
        """Produce a new :class:`.Session` object using the configuration
        established in this :class:`.sessionmaker`.

        In Python, the ``__call__`` method is invoked on an object when
        it is "called" in the same way as a function::

            Session = sessionmaker()
            session = Session()  # invokes sessionmaker.__call__()

        """
        for k, v in self.kw.items():
            if k == 'info' and 'info' in local_kw:
                d = v.copy()
                d.update(local_kw['info'])
                local_kw['info'] = d
            else:
                local_kw.setdefault(k, v)
        return self.class_(**local_kw)

最后在create_scoped_session方法中返回如下:

return orm.scoped_session(
            self.create_session(options), scopefunc=scopefunc
        )

接下来我们看下self.Model属性

self.Model = self.make_declarative_base(model_class, metadata)
 def make_declarative_base(self, model, metadata=None):
        """Creates the declarative base that all models will inherit from.

        :param model: base model class (or a tuple of base classes) to pass
            to :func:`~sqlalchemy.ext.declarative.declarative_base`. Or a class
            returned from ``declarative_base``, in which case a new base class
            is not created.
        :param: metadata: :class:`~sqlalchemy.MetaData` instance to use, or
            none to use SQLAlchemy's default.

        .. versionchanged 2.3.0::
            ``model`` can be an existing declarative base in order to support
            complex customization such as changing the metaclass.
        """
        if not isinstance(model, DeclarativeMeta):
            model = declarative_base(
                cls=model,
                name='Model',
                metadata=metadata,
                metaclass=DefaultMeta
            )

        # if user passed in a declarative base and a metaclass for some reason,
        # make sure the base uses the metaclass
        if metadata is not None and model.metadata is not metadata:
            model.metadata = metadata

        if not getattr(model, 'query_class', None):
            model.query_class = self.Query

        model.query = _QueryProperty(self)
        return model

我们在代码中会使用如下,是对象属性跟数据库字段的映射

 class User(db.Model):
            username = db.Column(db.String(80), unique=True)
            pw_hash = db.Column(db.String(80))

然后看下SQLAlchemy初始化方法中的如下代码

_include_sqlalchemy(self, query_class)
def _include_sqlalchemy(obj, cls):
    for module in sqlalchemy, sqlalchemy.orm:
        for key in module.__all__:
            if not hasattr(obj, key):
                setattr(obj, key, getattr(module, key))
    # Note: obj.Table does not attempt to be a SQLAlchemy Table class.
    obj.Table = _make_table(obj)
    obj.relationship = _wrap_with_default_query_class(obj.relationship, cls)
    obj.relation = _wrap_with_default_query_class(obj.relation, cls)
    obj.dynamic_loader = _wrap_with_default_query_class(obj.dynamic_loader, cls)
    obj.event = event

意思就是获取relationship、event、dynamic_loader等进行关联查询、事件等操作的

最后看SQLAlchemy类中的__init__方法

        if app is not None:
            self.init_app(app)

就是为flask_app进行一些配置信息设置,

   def init_app(self, app):
        """This callback can be used to initialize an application for the
        use with this database setup.  Never use a database in the context
        of an application not initialized that way or connections will
        leak.
        """
        if (
            'SQLALCHEMY_DATABASE_URI' not in app.config and
            'SQLALCHEMY_BINDS' not in app.config
        ):
            warnings.warn(
                'Neither SQLALCHEMY_DATABASE_URI nor SQLALCHEMY_BINDS is set. '
                'Defaulting SQLALCHEMY_DATABASE_URI to "sqlite:///:memory:".'
            )

        app.config.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite:///:memory:')
        app.config.setdefault('SQLALCHEMY_BINDS', None)
        app.config.setdefault('SQLALCHEMY_NATIVE_UNICODE', None)
        app.config.setdefault('SQLALCHEMY_ECHO', False)
        app.config.setdefault('SQLALCHEMY_RECORD_QUERIES', None)
        app.config.setdefault('SQLALCHEMY_POOL_SIZE', None)
        app.config.setdefault('SQLALCHEMY_POOL_TIMEOUT', None)
        app.config.setdefault('SQLALCHEMY_POOL_RECYCLE', None)
        app.config.setdefault('SQLALCHEMY_MAX_OVERFLOW', None)
        app.config.setdefault('SQLALCHEMY_COMMIT_ON_TEARDOWN', False)
        track_modifications = app.config.setdefault(
            'SQLALCHEMY_TRACK_MODIFICATIONS', None
        )

        if track_modifications is None:
            warnings.warn(FSADeprecationWarning(
                'SQLALCHEMY_TRACK_MODIFICATIONS adds significant overhead and '
                'will be disabled by default in the future.  Set it to True '
                'or False to suppress this warning.'
            ))

        app.extensions['sqlalchemy'] = _SQLAlchemyState(self)

        @app.teardown_appcontext
        def shutdown_session(response_or_exc):
            if app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN']:
                if response_or_exc is None:
                    self.session.commit()

            self.session.remove()
            return response_or_exc

我们在使用flask-sqlalchemy时,需要在项目为其进行设置,比如如下

class BaseConfig(object):
    # SESSION_TYPE = 'redis'  # session类型为redis
    # SESSION_KEY_PREFIX = 'session:'  # 保存到session中的值的前缀
    # SESSION_PERMANENT = True  # 如果设置为False,则关闭浏览器session就失效。
    # SESSION_USE_SIGNER = False  # 是否对发送到浏览器上 session:cookie值进行加密

    SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:[email protected]:3306/test?charset=utf8"
    SQLALCHEMY_POOL_SIZE = 5
    SQLALCHEMY_POOL_TIMEOUT = 30
    SQLALCHEMY_POOL_RECYCLE = -1

    # 追踪对象的修改并且发送信号
    SQLALCHEMY_TRACK_MODIFICATIONS = False

以上就是db = SQLAlchemy()进行的初始化操作,然后我们通过如下的代码生成数据库表

app = create_app()
#从栈拿app
with app.app_context():
    # db.drop_all()
    db.create_all()
    # data = db.session.query(models.Users).all()
    # print(data)

我们在初始化就提到过scopefunc = options.pop('scopefunc', _app_ctx_stack.__ident_func__)

所以我们需要通过with app.app_context():获取atx,以下是with函数需要调用的方法

 def __enter__(self):
        self.push()
        return self

    def __exit__(self, exc_type, exc_value, tb):
        self.pop(exc_value)

        if BROKEN_PYPY_CTXMGR_EXIT and exc_type is not None:
            reraise(exc_type, exc_value, tb)

然后进行创建数据库表

   def create_all(self, bind='__all__', app=None):
        self._execute_for_all_tables(app, bind, 'create_all')
 def _execute_for_all_tables(self, app, bind, operation, skip_tables=False):
        app = self.get_app(app)

        if bind == '__all__':
            binds = [None] + list(app.config.get('SQLALCHEMY_BINDS') or ())
        elif isinstance(bind, string_types) or bind is None:
            binds = [bind]
        else:
            binds = bind

        for bind in binds:
            extra = {}
            if not skip_tables:
                tables = self.get_tables_for_bind(bind)
                extra['tables'] = tables
            op = getattr(self.Model.metadata, operation)
            op(bind=self.get_engine(app, bind), **extra)

以上就是flask-sqlalchemy的大概执行流程

猜你喜欢

转载自blog.csdn.net/u013210620/article/details/80215727