数据库数据缓存类设计
对于需要从数据库中获取的数据,如果频繁的进行数据库操作,在高并发的情况下会对数据库造成较大的压力, 此时可以对每次数据的查询结果进行缓存,并进行定期的更新,可以有效的解决这个问题。
在游戏中,关于服务端对于非玩家本身数据的加载获取与更新, 多设计到数据库的操作, 其中关于好友和战队成员等信息往往需要从数据库中获取,但如果每次都从数据库获取,那么大量玩家操作的情况下就会对数据库造成一定的压力, 此外好友和战队队员的信息在游戏中不断的变化,玩家想要获取这种变化就不得不去更新这些数据,从数据库操作会造成数据库的压力, 直接由变化的玩家向外辐射又会造成大量的夸进程rpc通信,那么一个折中的策略就是缓存。缓存的概念比较古老,技术上最鲜为人知的应用就是电脑磁盘与内存,早期内存空间有限, 访问速度较快, 磁盘空间大但速度很慢, 这使得缓存技术有了较大的发展。 鉴于此,我们可以在每个游戏进程上对需要从数据库中获得的信息进行缓存, 这样可以避免夸进程的通信。既然是缓存就会存在数据过期的情况, 更新数据的时机和策略就是显得十分重要。
class DataCacheManager(object):
"""
数据库内数据缓存管理器, 每个进程一份, 提供从数据库读取数据并进行缓存,
缓存空间有限, 当前采用FIFO的策略, 之后可改进LRU
为分布式数据提供数据访问点, 避免夸进程通信
"""
def __init__(self):
"""
初始化数据缓存管理器
"""
super(DataCacheManager, self).__init__()
self.logger = LogManager.get_logger("server.DataCacheManager")
# 数据缓存字典,以数据表名称进行分类缓存, 方便进行动态添加
self._data_cache_dict = {}
# 数据获取所需的必要信息
self._data_fetch_info = {}
def register_data_type_info(self, db_name, key_str, fields, cache_time, opacity):
"""
注册想要缓存的数据表数据的相关信息, 可重复注册, 但是会删除之前注册与缓存的所有信息哦
@ db_name: 数据表的名称
@ key_str: 数据表中查询的键值名称, 必须是创建了索引的键值!!!!!!
@ fields: 需要缓存的字段名称列表
@ cache_time: 数据缓存的时间
@ opacity: 缓存容量的大小
"""
# 检查参数类型
if not isinstance(fields, list):
return
if key_str not in fields:
fields.append(key_str)
# 添加数据查询信息
self._data_fetch_info[db_name] = {
'key_str': key_str,
'fields': fields,
}
# 创建数据缓冲对象
self._data_cache_dict[db_name] = LRUData(cache_time, opacity)
def unregister_data_type_info(self, db_name):
"""
数据表信息注册取消
@ db_name: 数据表的名称
"""
self._data_fetch_info.pop(db_name, 0)
self._data_cache_dict.pop(db_name, 0)
def fetch_data(self, db_name, key_id_list, done_cb=None, refresh=False):
"""
获取缓存的数据, 单个或者多个都可以
@ db_name: 数据表的名称
@ key_id_list: 需要查询的数据索引Key列表
@ done_cb: 获取数据后的回调函数
@ refresh: 是否直接绕过缓存从数据库中获取与更新
"""
# 检查想要获取的数据类型是否已经注册必要信息
data_fetch_info = self._data_fetch_info.get(db_name, {})
if not data_fetch_info:
return
# 包装key_id
if not isinstance(key_id_list, list):
key_id_list = [key_id_list]
# 计算需要查询的数据
no_cached_key_list = []
if refresh:
no_cached_key_list = key_id_list
else:
data_cache_obj = self._data_cache_dict[db_name]
for key_id in key_id_list:
if data_cache_obj.get_data(key_id) is None and key_id not in no_cached_key_list:
no_cached_key_list.append(key_id)
# 如果不需要查询就直接返回结果
if not no_cached_key_list:
self.get_data_from_cache(db_name, key_id_list, done_cb=done_cb)
return
# 从数据库中查询数据需要请求的数据
query = {data_fetch_info.get('key_str'): {'$in': no_cached_key_list, }}
fields = data_fetch_info.get('fields')
db_mgr.find(db_name, query, fields, limit=len(no_cached_key_list),
callback=lambda docs: self.fetch_data_cb(db_name, docs, key_id_list, done_cb))
def fetch_data_cb(self, db_name, docs, key_id_list, done_cb):
"""
数据库信息加载回调
"""
# 规范查询结果
if not docs:
self.get_data_from_cache(db_name, key_id_list, done_cb=done_cb)
return
if isinstance(docs, dict):
docs = [docs]
# 获取键值
data_fetch_info = self._data_fetch_info.get(db_name, {})
if not data_fetch_info:
return
key_str = data_fetch_info.get('key_str')
# 缓存数据
data_cache_obj = self._data_cache_dict[db_name]
for info in docs:
uid = info.get(key_str)
data_cache_obj.push_data(uid, info)
# 查询缓存中的数据
self.get_data_from_cache(db_name, key_id_list, done_cb=done_cb)
def get_data_from_cache(self, db_name, key_id_list, done_cb=None):
"""
从缓存中获取数据, 并执行获取数据后的回调
"""
# 从缓存中获取数据
fetch_result_info = {}
data_cache_obj = self._data_cache_dict.get(db_name)
if data_cache_obj is None:
return fetch_result_info
for key_id in key_id_list:
data = data_cache_obj.get_data(key_id)
if data:
fetch_result_info[key_id] = data
# 执行数据获取后的回调
done_cb and done_cb(fetch_result_info)
return fetch_result_info
class LRUData(object):
"""
数据缓存类, 当前使用FIFO策略, 设置过期时间戳
"""
def __init__(self, cache_time, opacity):
"""
数据缓存类初始化
"""
# 数据缓存的时间长短, 秒为单位
self.cache_time = cache_time
# 数据池的大小
self.opacity = opacity
# 当前数据的索引
self._index = 0
# 缓存的字典key的列表
self._key_list = [None for i in xrange(self.opacity)]
# 缓存数据的查询字典
self._data_cache = {}
def push_data(self, key, data):
"""
将数据缓存
"""
# 数据进来的时候的时间戳
data['_time_stamps'] = int(time.time())
# 索引循环
if self._index >= self.opacity:
self._index = 0
# 删除老的数据
old_key = self._key_list[self._index]
if old_key is not None:
self._data_cache.pop(old_key, 0)
# 加入新的数据
self._key_list[self._index] = key
self._data_cache[key] = data
# 索引递增
self._index += 1
def get_data(self, key):
"""
取出缓存的数据
"""
# 检查下出来时候时间戳
data = self._data_cache.get(key, {})
time_stamp = data.get('_time_stamps', 0)
if time.time() - time_stamp < self.cache_time:
return data
else:
return None
数据库缓存类DataCacheManager通过 register_data_type_info和 unregister_data_type_info 接口对数据库操作进行注册, 注册_id 和要查询的字段, 注册必要的查询信息后就能从数据表中查询获取必要的信息。
fetch_data 接口则是从缓存中取出数据,其包含从数据库中获取和从缓存中获取, 具体看数据是否在缓存中和数据是否过期,而数据对象LRUData 提供了数据的基本操作, 是真正缓存数据的对象。 同时获取数据有回调函数, 在完成数据的获取后就会执行回调。此外接口还提供了强行更新的参数,以便需要最新数据时直接从数据库进行查询,而非缓存。
LRUData 类是数据缓存类, 提供了两个接口, 数据的获取和插入, 数据在插入时会有一个时间戳, 以便判定数据在取出时是否过期, 缓存空间的大小必须有限,因此空间不足时需要对数据进行剔除, 这是需要数据移除策略, 常用的策略有FIFO、LRU、频率最低等,主要是看应用的场合进行选择,好友战队信息, 对于好友较多的成员,自然是使用频率最低的先剔除较为有效。但频率最低需要维护数据频率堆,并且对于存在好友上限以及好友登录时间随机等情况优势并不十分明显,因此可以这种考虑FIFO和LRU的策略,但后期还是希望实现使用频率最低的最先覆盖的策略。