版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/oFeiHongYe/article/details/86611266
工作中常常存在对端到端流程的过程梳理或者优化,使用mongodb作为存储的时候,需要梳理出各个集合的大小以及TTL(TTL能不用就不用),以下是快速的统计方案,能够帮助将统计结果输出到csv表格中。需要pymongo和pandas依赖库。收集的信息包括数据库统计信息,集合统计信息[文档数,平均大小,集合大小,存储大小,索引],甚至可以在此基础上进行扩展。
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
@time: 2019/1/14 9:43
@desc:
"""
import time
import pandas
import pymongo
class CollectionsStat:
"""
统计核心算法来源于command函数,该函数能够使用的mongo命令可以参考以下网址:\n
https://docs.mongodb.com/manual/reference/command/
"""
MONGODB_URI = "mongodb://ip:port,ip:port,ip:port"
DATABASE__STAT_INDEX_ALL = ["raw", "objects", "avgObjSize", "dataSize", "storageSize", "numExtents", "indexes",
"indexSize", "fileSize", "extentFreeList"]
DATABASE__STAT_INDEX = ["objects", "avgObjSize", "dataSize", "storageSize", "numExtents", "indexes", "indexSize",
"fileSize"]
COLLECTION__STAT_INDEX_ALL = ["ns", "sharded", "capped", "count", "size", "storageSize", "totalIndexSize",
"indexSizes",
"avgObjSize", "nindexes", "nchunks", "shards"]
COLLECTION__STAT_INDEX = ["ns", "sharded", "capped", "count", "size", "storageSize", "totalIndexSize", "avgObjSize",
"nindexes", "nchunks"]
def __init__(self, db_name):
self.client = pymongo.MongoClient(self.MONGODB_URI)
self.database = self.client.get_database(db_name)
print("连接数据库成功,并开始统计")
def get_db_stat(self):
# 输出数据库统计
db_cursor = self.database.command("dbstats") # type:dict
db_data = {}
for ele in self.DATABASE__STAT_INDEX:
db_data[ele] = db_cursor[ele]
print(db_data)
def get_coll_stat(self):
# 集合统计
coll_cursor_list = self.database.command("listCollections")["cursor"]["firstBatch"]
coll_data = {}
for ele in self.COLLECTION__STAT_INDEX:
coll_data[ele] = []
for coll_ele in coll_cursor_list:
collections_name = coll_ele["name"]
coll_stat = self.database.command("collstats", collections_name) # type:dict
for ele in self.COLLECTION__STAT_INDEX:
if ele in coll_stat.keys():
coll_data[ele].append(coll_stat[ele])
else:
coll_data[ele].append(0)
# 将集合统计结果转为DataFrame
coll_df = pandas.DataFrame(coll_data)
# 获取当前时间
current_time = time.strftime("%Y%m%d%H%M%S", time.localtime())
result_path = "e:/data/mongo/coll_stat/coll_stat_%s.csv" % current_time
# 输出到文件
coll_df.to_csv(result_path, index=False)
def __del__(self):
print("统计成功,并断开连接")
self.client.close()
if __name__ == "__main__":
collection_stat = CollectionsStat("ion")
collection_stat.get_db_stat()
collection_stat.get_coll_stat()