使用flask从零构建自动化运维平台系列二

版权声明:转载请注明出处 https://blog.csdn.net/weixin_43420337/article/details/84139421


写代码也是一种艺术,结构层次感一定要好,这样做出来的才是一个好作品。

代码管理

git

目录结构

SmartOps
├── app
│   ├── factory.py
│   ├── __init__.py
│   ├── models.py
│   ├── server.py
│   ├── soapi
│   ├── static
│   └── templates
├── config.py
├── manage.py
├── migrations
├── requirements.txt
├── tests
└── venv

目录结构用途说明

目录 用途
app 存放flask应用代码
migrations 数据库迁移脚本
tests 单元测试
venv 虚拟环境

目录文件说明

文件 用途
manage.py 启动程序以及其他的程序任务。
config.py 存储配置
requirements.txt python依赖包

配置文件

配置文件决定程序做出什么的行为,比较常用的就是开发环境,测试环境,正式环境三种,下面是个例子

config.py

import logging
import os
from datetime import timedelta

CONFIG = {
    "development": "config.DevelopmentConfig",
    "testing": "config.TestingConfig",
    "production": "config.ProductionConfig",
    "default": "config.ProductionConfig"
}
USER_SECRET_KEY = 'asdfoasdjgio'
VERIFY_DEBUG = True

class BaseConfig(object):
    """Base class for default set of configs."""

    DEBUG = False
    TESTING = False
    SECURITY_PASSWORD_HASH = 'pbkdf2_sha512'
    SECURITY_TRACKABLE = True
    LOGGING_FORMAT = "[%(asctime)s] [%(funcName)-30s] +\
                                    [%(levelname)-6s] %(message)s"
    LOGGING_LOCATION = 'web.log'
    LOGGING_LEVEL = logging.DEBUG
    SECURITY_TOKEN_MAX_AGE = 60 * 30
    SECURITY_CONFIRMABLE = False
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    CACHE_TYPE = 'simple'
    SECURITY_PASSWORD_SALT = 'super-secret-stuff-here'
    COMPRESS_MIMETYPES = ['text/html', 'text/css', 'text/xml',
                          'application/json', 'application/javascript']

    WTF_CSRF_ENABLED = False
    COMPRESS_LEVEL = 6
    COMPRESS_MIN_SIZE = 500

    # Change it based on your admin user, should ideally read from DB.
    ADMIN_USER = 'admin'
    ADMIN_PASSWORD = 'admin'
    JWT_EXPIRES = timedelta(minutes=10)


class DevelopmentConfig(BaseConfig):
    """Default set of configurations for development mode."""

    DEBUG = True
    TESTING = False
    BASEDIR = os.path.abspath(os.path.dirname(__file__))
    SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(BASEDIR, 'app.db')
    SECRET_KEY = 'not-so-super-secret'
    JWT_SECRET_KEY = 'another_super_awesome_secret_stuff_yo.'
    SQLALCHEMY_TRACK_MODIFICATIONS = True


class ProductionConfig(BaseConfig):
    """Default set of configurations for prod mode."""

    DEBUG = False
    TESTING = False
    BASEDIR = os.path.abspath(os.path.dirname(__file__))
    SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(BASEDIR, 'app.db')
    SECRET_KEY = 'Super-awesome-secret-stuff'
    SQLALCHEMY_TRACK_MODIFICATIONS = True
    JWT_SECRET_KEY = 'another_super_awesome_secret_stuff_yo.'


class TestingConfig(BaseConfig):
    """Default set of configurations for test mode."""

    DEBUG = False
    TESTING = True
    SQLALCHEMY_DATABASE_URI = 'sqlite://'
    SECRET_KEY = '792842bc-c4df-4de1-9177-d5207bd9faa6'
    JWT_SECRET_KEY = 'another_super_awesome_secret_stuff_yo.'

使用工厂来创建app

app/__init__.py

import os

from flask import Flask
from config import CONFIG
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()


def create_app(config_name):
    """Configure the app w.r.t Flask-security, databases, loggers."""
    app = Flask(__name__)
    app.config.from_object(CONFIG[config_name])
    db.init_app(app)

    return app

使用manage来管理

manage.py

from app import create_app, db
from app.models import User
from flask_script import Manager, Shell
from flask_migrate import Migrate, MigrateCommand
from flask_jsonrpc import JSONRPC
import os

app = create_app(os.getenv('FLASK_CONFIG') or 'development')
jsonrpc = JSONRPC(app, '/api', enable_web_browsable_api=True, auth_backend=User.authenticate)
manager = Manager(app)
migrate = Migrate(app, db)


def make_shell_context():
    return dict(app=app, db=db, User=User)


manager.add_command("shell", Shell(make_context=make_shell_context))
manager.add_command('db', MigrateCommand)

import app.soapi.user

if __name__ == '__main__':
    manager.run()

使用manage创建数据库

初始化

python manage.py init

创建历史版本

python manage.py migrate -m ‘first’

创建数据库

python manage.py upgrade

jsonrpc模块化

app/soapi/user.py

from flask import Blueprint

mod = Blueprint('user', __name__)
from manage import jsonrpc
jsonrpc.register_blueprint(mod)
from app.models import User
from app import db


@jsonrpc.method('user.register(username=str,password=str)')
def user_register(username, password):
    if not User.query.filter_by(username=username).first():
        user = User(username=username)
        user.password(password)
        db.session.add(user)
        db.session.commit()
        return {'status': 0, 'message': u'注册成功'}
    else:
        return {'status': 1, 'message': u'用户已存在'}


@jsonrpc.method('user.verify(username=str,password=str)')
def user_verify(username, password):
    user = User.query.filter_by(username=username).first()
    if not user:
        return {'status': 1, 'message': u'用户名不存在'}
    if user.verify_password(password):
        token = user.generate_auth_token()
        return {'status': 0, 'message': u'欢迎%s' % username, 'token': token}
    return {'status': 1, 'message': u'密码错误'}

然后在manage中import过来

import app.soapi.user

数据模型拆分

app/models.py

from itsdangerous import BadSignature, SignatureExpired, TimedJSONWebSignatureSerializer as Serializer
import werkzeug
from functools import wraps
from flask_jsonrpc import InvalidCredentialsError, InvalidParamsError
from app import db
from config import USER_SECRET_KEY, VERIFY_DEBUG


class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True)
    password_hash = db.Column(db.String(164))

    def password(self, password):
        """
        设置密码hash值
        """
        self.password_hash = werkzeug.security.generate_password_hash(password)

    def verify_password(self, password):
        """
        将用户输入的密码明文与数据库比对
        """

        if self.password_hash:
            return werkzeug.security.check_password_hash(self.password_hash, password)
        return None

    def generate_auth_token(self, expiration=600):
        s = Serializer(USER_SECRET_KEY, expires_in=expiration)
        return bytes.decode(s.dumps({'id': self.id}))

    @staticmethod
    def verify_auth_token(token):
        s = Serializer(USER_SECRET_KEY)
        try:
            data = s.loads(token)
        except SignatureExpired:
            return None  # valid token, but expired
        except BadSignature:
            return None  # invalid token
        user = User.query.get(data['id'])
        return user

    @staticmethod
    def authenticate(f, f_check_auth):
        @wraps(f)
        def _f(*args, **kwargs):
            is_auth = False
            try:
                creds = args[:2]
                is_auth = f_check_auth(creds[0], creds[1])
                if is_auth:
                    args = args[2:]
            except IndexError:
                if 'token' in kwargs:
                    is_auth = f_check_auth(kwargs['token'])
                    if is_auth:
                        kwargs.pop('token')
                else:
                    raise InvalidParamsError('Authenticated methods require at least '
                                             '[token] or {token: } arguments')
            if not is_auth:
                raise InvalidCredentialsError()
            return f(*args, **kwargs)

        return _f

    @staticmethod
    def check_auth(token):
        # 启用debug模式不需要进行token认证
        if VERIFY_DEBUG:
            return True
        user = User.verify_auth_token(token)
        if user:
            return True
        return False

    def __init__(self, username):
        self.username = username

    def __repr__(self):
        return '<User %r>' % self.username


猜你喜欢

转载自blog.csdn.net/weixin_43420337/article/details/84139421
今日推荐