Django操作mongoDB Mongoengine之——信号

1.安装blinker库

MongoEngine在进行数据操作时会发出一些信号,我们可以连接这些信号进行一些额外的操作。注意:要在MongoEngine中使用信号,需要安装 blinker 这个库。

$ pip install blinker

MongoEngine提供的信号如下:

  • pre_init: 在创建一个新的 Document 或者 EmbeddedDocument 实例对象之后,并且对象初始化之前调用。
  • post_init:在 Document 或者 EmbeddedDocument 实例对象初始化完成之后调用。
  • pre_save:在 save 方法执行之前调用。
  • pre_save_post_validation:在数据检验完成之后,数据保存之前调用。
  • post_save:在数据保存完成之后调用。
  • pre_delete:在 delete 方法执行之前调用。
  • post_delete:在记录成功删除之后调用。
  • pre_bulk_insert:在数据检验之后,数据插入之前调用。
  • post_bulk_insert:在数据成功插入之后调用。

2.事件连接

使用 signals 将信号与回调函数进行连接。

from mongoengine import *
from mongoengine import signals
 
class Author(Document):
    name = StringField()
 
    @classmethod
    def pre_save(cls, sender, document, **kwargs):
        print("Pre Save: %s" % document.name)
 
    @classmethod
    def post_save(cls, sender, document, **kwargs):
        print("Post Save: %s" % document.name)
        if 'created' in kwargs:
            if kwargs['created']:
                print("Created")
            else:
                print("Updated")
 
signals.pre_save.connect(Author.pre_save, sender=Author)
signals.post_save.connect(Author.post_save, sender=Author)

注意:对于 RefereneField 的reverse_delete_rules参数不会触发信号。

3.工作实例

# coding=UTF-8

import datetime

from collections import defaultdict
from mongoengine import signals
from mongoengine.document import Document
from mongoengine.fields import IntField, DateTimeField, ListField, StringField


class UploadRecord(Document):
    """保存用户的操作明细"""
    OPERATOR_CHOICES = ((0, '系统'), (1, '用户'), (2, '技术专家'))

    FUNC_TYPE_CHOICES = (
        (0, '基础功能'), (1, '新品测款'), (2, '宝贝复制'), (3, '自动托管'), (4, '实时优化'), (5, '一键优化'), (6, '手动抢位'),
        (7, '自动抢位'), (8, '全店删词'), (9, '创意测试'), (10, '删垃圾词'), (11, '设置人群'), (12, '加大/减小投入'))

    OP_TYPE_CHOICES = ((1, '计划'), (2, '宝贝'), (3, '创意'), (5, '人群'), (4, '关键词'))

    DATA_TYPE_CHOICES = ((101, '修改推广状态'), (102, '修改计划名称'), (103, '设置日限额'),
                         (104, '设置投放平台'), (105, '修改投放时间'), (106, '修改投放地域'),
                         (107, '开启自动托管'), (108, '取消自动托管'), (109, '调整计划投入'),
                         (110, '设置关键词限价'), (111, '设置屏蔽词'), (112, '设置托管策略'),
                         (113, '系统优化检查'), (114, '设置实时优化'), (115, '设置人群优化'),
                         (116, '设置节日策略'), (117, '设置上新策略'),

                         (201, '添加宝贝'), (202, '删除宝贝'), (203, '修改推广状态'),
                         (204, '修改默认出价'), (206, '修改托管状态'),
                         (207, '调整宝贝投入'), (208, '设置关键词限价'), (209, '配置产品词'),
                         (210, '设置托管方式'), (211, '设置屏蔽词'), (212, '一键优化'),
                         (213, '修改宝贝标题'), (214, '宝贝复制'), (215, '设置人群'),
                         (216, '关注宝贝'), (217, '取消关注'),

                         (301, '添加创意'), (302, '删除创意'), (303, '修改创意'),

                         (401, '添加关键词'), (402, '删除关键词'), (403, '删除违规词'),
                         (404, '删除屏蔽词'), (405, '修改关键词'),

                         (501, '添加人群'), (502, '删除人群'), (503, '修改状态'), (504, '修改溢价'),
                         (505, '人群优化'),
                         )
    shop_id = IntField(verbose_name="店铺ID", required=True)
    campaign_id = IntField(verbose_name="计划ID")
    adgroup_id = IntField(verbose_name="广告组ID")
    item_name = StringField(verbose_name="宝贝名称")
    op_type = IntField(verbose_name="操作父类别", choices=OP_TYPE_CHOICES, required=True)  # 操作类型父编号
    data_type = IntField(verbose_name="操作子类别", choices=DATA_TYPE_CHOICES, required=True)  # 操作类型子编号
    detail_list = ListField(verbose_name="操作明细", default=[])
    opter = IntField(verbose_name="操作者类型", choices=OPERATOR_CHOICES, default=3)
    opter_name = StringField(verbose_name="操作者名字", default='')
    opt_time = DateTimeField(verbose_name="操作时间", default=datetime.datetime.now)
    func_type = IntField(verbose_name="功能类别", choices=FUNC_TYPE_CHOICES, default=0)


    meta = {'collection': 'subway_opt_history', 'indexes': ['campaign_id', 'adgroup_id', 'opt_time'],
            "shard_key": ('shop_id',)}

    RESERVED_DAYS = 90

    # 根据id从CHOICES中获取text
    @staticmethod
    def get_choices_text(choices, cid):
        text = ''
        for cho in choices:
            if cid in cho:
                text = cho[1]
                break
        return text

    @classmethod
    def clean_outdated(cls):
        """清除过期数据"""
        cls._get_collection().remove(
            {'opt_time': {'$lte': datetime.datetime.now() - datetime.timedelta(days=cls.RESERVED_DAYS)}})
        return True

    @classmethod
    def post_bulk_insert(cls, *args, **kwargs):
        """当批量插入到操作记录表后,执行处理,对应的方法eg:UploadRecord.objects.insert(rcd_list)"""

        document_list = kwargs.get('documents', [])
        opt_dict = defaultdict(int)  # 优化字典,k:shop_id, v:api调用次数
        for doc in document_list:
            temp_num = len(doc.detail_list)
            opt_dict[doc.shop_id] += temp_num if temp_num > 0 else 1

        cls._update_account(opt_dict)

    @classmethod
    def post_save(cls, sender, document, **kwargs):
        """当保存数据时到操作记录表时,执行处理,对应的方法eg: UploadRecord.objects.create(**rcd_dict)"""
        opt_dict = {}  # 优化字典,k:shop_id, v:api调用次数
        temp_num = len(document.detail_list)
        opt_dict[document.shop_id] = temp_num if temp_num > 0 else 1
        cls._update_account(opt_dict)

    @classmethod
    def _update_account(cls, opt_dict):
        """根据优化次数字典,更新Account表字段"""

        # 优化字典,k:shop_id, v:api调用次数
        pass


# 定义信号量,当mongodb的数据发生改变时执行方法
signals.post_bulk_insert.connect(UploadRecord.post_bulk_insert, sender=UploadRecord)
signals.post_save.connect(UploadRecord.post_save, sender=UploadRecord)

uprcd_coll = UploadRecord._get_collection()
发布了146 篇原创文章 · 获赞 66 · 访问量 5万+

猜你喜欢

转载自blog.csdn.net/qq_38923792/article/details/103789484