首先业务需求是mysql数据要提取到redis,同时在redis建立索引做分词(别问我为什么这么做,要求就这样),使用flask框架
celery是一个异步框架,但是需要一个队列存储信息,此处使用redis
一共四个py文件
# __init__.py
import pymysql
# 与mysql交互
pymysql.install_as_MySQLdb()
接下来是配置和model创建
# config.py
import os
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
import redis
import logging
import logging.config
app = Flask(__name__)
app.config['SECRET_KEY'] = 'hard to guess'
# 这里登陆的是root用户,要填上自己的密码,MySQL的默认端口是3306,填上之前创建的数据库名day1
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://fenglei:123456@localhost:3306/day1'
# 设置这一项是每次请求结束后都会自动提交数据库中的变动
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 实例化mysql db
db = SQLAlchemy(app)
# 实例化redis db
redis_store = redis.StrictRedis(host='127.0.0.1', port=6379, db=1)
# redis用来作为消息队列
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/7'
app.config['result_backend'] = 'redis://localhost:6379/7'
LOG_PATH = 'logs'
LOG_FILE = 'log.txt'
# 自定义日志
def get_logger(name):
i_logger = logging.getLogger(name)
if os.path.exists(LOG_PATH):
pass
else:
os.mkdir(LOG_PATH)
# 指定logger输出格式
formatter = logging.Formatter('%(asctime)s %(levelname)-8s: %(message)s')
# 文件日志
file_handler = logging.FileHandler("%s/%s" % (LOG_PATH, LOG_FILE))
file_handler.setFormatter(formatter) # 可以通过setFormatter指定输出格式
# 为logger添加的日志处理器,可以自定义日志处理器让其输出到其他地方
i_logger.addHandler(file_handler)
# 指定日志的最低输出级别,默认为WARN级别
i_logger.setLevel(logging.INFO)
return i_logger
# 前提是mysql已经存在invoice表,不用迁徙同步数据库了,直接连接就好
class invoice(db.Model):
__tablename__ = 'invoice'
id = db.Column(db.Integer, primary_key=True)
xfsbh = db.Column(db.String(128))
xfmc = db.Column(db.String(128))
gfmc = db.Column(db.String(128))
gfsbh = db.Column(db.String(128))
def __repr__(self):
return '<invoice %r %s>' % (self.id, self.xfsbh)
接下来是视图函数py文件(包含celery,mysql数据库提取存储到redis)
import logging
from config import invoice, redis_store
import jieba
from celery_redis import celery
n = 0
# 使用装饰器,增加异步celery功能
@celery.task
def get_list():
"""获取mysql中所有id>n的对象列表"""
global n
id_list = invoice.query.filter(invoice.id > n)
save_data(id_list)
def save_data(id_list):
"""相关数据存储到redis,首先是存量数据"""
global n
m = 0
for i in id_list:
# 遍历列表取出id
i_id = i.id
if i_id > n:
u = invoice.query.get(i_id) # 从orm中提取i_id对象
if u:
try:
data_list = jieba.cut(u.xfmc) # 使用jieba分词,可是不太智能,将就用 [北京 瑞 佳 讯 贸易 有限 公司]
for data in data_list:
# 存储到redis set格式, 可以不断追加,而且去重
redis_store.sadd(data, u.xfmc)
except Exception as e:
logging.exception(e) # 输出错误日志
else:
continue
m = u.id
# 当存储增量数据时,id从n开始
n = m
# celery -A tasks:celery worker -l info -B 使用此命令在终端开启celery
# get_list()
接下里初始化celery
# celery_redis.py
from celery import Celery
from config import app
from datetime import timedelta
# 初始化celery
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
# celery 定时调度
celery.conf.beat_schedule = {
'task': {
'task': 'tasks.get_list',
'schedule': timedelta(seconds=5), # 设置自动时间调度
'args': ''
},
}
celery.conf.timezone = 'UTC' # 如果不设置时区,时间调度就容易报错
最后就是接口,返回搜索的数据
import json
from config import redis_store, get_logger, app
logger = get_logger(__name__)
# 输入‘北京’,返回redis中相对应的列表,但是redis存储方式是字节,遍历列表针对每个对象才能编码
@app.route('/search', methods=['GET', 'POST'])
def search_data():
"""搜索结果"""
# data = request.args.get('query')
l = list(redis_store.smembers('北京'))
l2 = []
for i in l:
logger.info('the message is %s', i.decode())
l2.append(i.decode())
print(json.dumps(l2, ensure_ascii=False))
return json.dumps(l2, ensure_ascii=False)
search_data()
异步队列已经开始工作了
日志信息