1.安装blinker库
MongoEngine在进行数据操作时会发出一些信号,我们可以连接这些信号进行一些额外的操作。注意:要在MongoEngine中使用信号,需要安装 blinker 这个库。
$ pip install blinker
MongoEngine提供的信号如下:
- pre_init: 在创建一个新的 Document 或者 EmbeddedDocument 实例对象之后,并且对象初始化之前调用。
- post_init:在 Document 或者 EmbeddedDocument 实例对象初始化完成之后调用。
- pre_save:在 save 方法执行之前调用。
- pre_save_post_validation:在数据检验完成之后,数据保存之前调用。
- post_save:在数据保存完成之后调用。
- pre_delete:在 delete 方法执行之前调用。
- post_delete:在记录成功删除之后调用。
- pre_bulk_insert:在数据检验之后,数据插入之前调用。
- post_bulk_insert:在数据成功插入之后调用。
2.事件连接
使用 signals 将信号与回调函数进行连接。
from mongoengine import *
from mongoengine import signals
class Author(Document):
name = StringField()
@classmethod
def pre_save(cls, sender, document, **kwargs):
print("Pre Save: %s" % document.name)
@classmethod
def post_save(cls, sender, document, **kwargs):
print("Post Save: %s" % document.name)
if 'created' in kwargs:
if kwargs['created']:
print("Created")
else:
print("Updated")
signals.pre_save.connect(Author.pre_save, sender=Author)
signals.post_save.connect(Author.post_save, sender=Author)
注意:对于 RefereneField 的reverse_delete_rules参数不会触发信号。
3.工作实例
# coding=UTF-8
import datetime
from collections import defaultdict
from mongoengine import signals
from mongoengine.document import Document
from mongoengine.fields import IntField, DateTimeField, ListField, StringField
class UploadRecord(Document):
"""保存用户的操作明细"""
OPERATOR_CHOICES = ((0, '系统'), (1, '用户'), (2, '技术专家'))
FUNC_TYPE_CHOICES = (
(0, '基础功能'), (1, '新品测款'), (2, '宝贝复制'), (3, '自动托管'), (4, '实时优化'), (5, '一键优化'), (6, '手动抢位'),
(7, '自动抢位'), (8, '全店删词'), (9, '创意测试'), (10, '删垃圾词'), (11, '设置人群'), (12, '加大/减小投入'))
OP_TYPE_CHOICES = ((1, '计划'), (2, '宝贝'), (3, '创意'), (5, '人群'), (4, '关键词'))
DATA_TYPE_CHOICES = ((101, '修改推广状态'), (102, '修改计划名称'), (103, '设置日限额'),
(104, '设置投放平台'), (105, '修改投放时间'), (106, '修改投放地域'),
(107, '开启自动托管'), (108, '取消自动托管'), (109, '调整计划投入'),
(110, '设置关键词限价'), (111, '设置屏蔽词'), (112, '设置托管策略'),
(113, '系统优化检查'), (114, '设置实时优化'), (115, '设置人群优化'),
(116, '设置节日策略'), (117, '设置上新策略'),
(201, '添加宝贝'), (202, '删除宝贝'), (203, '修改推广状态'),
(204, '修改默认出价'), (206, '修改托管状态'),
(207, '调整宝贝投入'), (208, '设置关键词限价'), (209, '配置产品词'),
(210, '设置托管方式'), (211, '设置屏蔽词'), (212, '一键优化'),
(213, '修改宝贝标题'), (214, '宝贝复制'), (215, '设置人群'),
(216, '关注宝贝'), (217, '取消关注'),
(301, '添加创意'), (302, '删除创意'), (303, '修改创意'),
(401, '添加关键词'), (402, '删除关键词'), (403, '删除违规词'),
(404, '删除屏蔽词'), (405, '修改关键词'),
(501, '添加人群'), (502, '删除人群'), (503, '修改状态'), (504, '修改溢价'),
(505, '人群优化'),
)
shop_id = IntField(verbose_name="店铺ID", required=True)
campaign_id = IntField(verbose_name="计划ID")
adgroup_id = IntField(verbose_name="广告组ID")
item_name = StringField(verbose_name="宝贝名称")
op_type = IntField(verbose_name="操作父类别", choices=OP_TYPE_CHOICES, required=True) # 操作类型父编号
data_type = IntField(verbose_name="操作子类别", choices=DATA_TYPE_CHOICES, required=True) # 操作类型子编号
detail_list = ListField(verbose_name="操作明细", default=[])
opter = IntField(verbose_name="操作者类型", choices=OPERATOR_CHOICES, default=3)
opter_name = StringField(verbose_name="操作者名字", default='')
opt_time = DateTimeField(verbose_name="操作时间", default=datetime.datetime.now)
func_type = IntField(verbose_name="功能类别", choices=FUNC_TYPE_CHOICES, default=0)
meta = {'collection': 'subway_opt_history', 'indexes': ['campaign_id', 'adgroup_id', 'opt_time'],
"shard_key": ('shop_id',)}
RESERVED_DAYS = 90
# 根据id从CHOICES中获取text
@staticmethod
def get_choices_text(choices, cid):
text = ''
for cho in choices:
if cid in cho:
text = cho[1]
break
return text
@classmethod
def clean_outdated(cls):
"""清除过期数据"""
cls._get_collection().remove(
{'opt_time': {'$lte': datetime.datetime.now() - datetime.timedelta(days=cls.RESERVED_DAYS)}})
return True
@classmethod
def post_bulk_insert(cls, *args, **kwargs):
"""当批量插入到操作记录表后,执行处理,对应的方法eg:UploadRecord.objects.insert(rcd_list)"""
document_list = kwargs.get('documents', [])
opt_dict = defaultdict(int) # 优化字典,k:shop_id, v:api调用次数
for doc in document_list:
temp_num = len(doc.detail_list)
opt_dict[doc.shop_id] += temp_num if temp_num > 0 else 1
cls._update_account(opt_dict)
@classmethod
def post_save(cls, sender, document, **kwargs):
"""当保存数据时到操作记录表时,执行处理,对应的方法eg: UploadRecord.objects.create(**rcd_dict)"""
opt_dict = {} # 优化字典,k:shop_id, v:api调用次数
temp_num = len(document.detail_list)
opt_dict[document.shop_id] = temp_num if temp_num > 0 else 1
cls._update_account(opt_dict)
@classmethod
def _update_account(cls, opt_dict):
"""根据优化次数字典,更新Account表字段"""
# 优化字典,k:shop_id, v:api调用次数
pass
# 定义信号量,当mongodb的数据发生改变时执行方法
signals.post_bulk_insert.connect(UploadRecord.post_bulk_insert, sender=UploadRecord)
signals.post_save.connect(UploadRecord.post_save, sender=UploadRecord)
uprcd_coll = UploadRecord._get_collection()