使用python 将在csv文件中的一百万条网址,写入mongo数据库中,这里使用多线程进行操作。直接贴上代码,如下:
import os
import threading #导入进程
import csv
import time
from Mongo_cache import MongoCache
import win32com.client
import winsound
NUM_THREAD = 5
COUNT = 0
lock = threading.Lock()
cache = MongoCache() #数据库连接初始化
def worker():
"""
func: 从csv文件中读取数据,并将数据返回
"""
for path in os.listdir(os.getcwd()):
#print("当前工作目录", path)
file_name = path.split('.')
#print(file_name)
if file_name[-1] == 'csv':
#print("地址是:",path)
file = open(path)
data = csv.reader(file)
return data
else:
pass
def save_info(data,i, num_retries=2):
"""
func: 将数据保存
"""
global COUNT
global lock
global cache
for _, website in data:
try:
lock.acquire()
#print("线程{}》》》{}正在运行".format(threading.current_thread().name, i))
item = {'website':website}
cache(item)
COUNT += 1
except:
if num_retries > 0:
save_info(data, i, num_retries-1)
finally:
lock.release()
def main():
"""
启动线程
"""
print("start working")
print("working...")
data = worker()
threads = [] #设置主线程
for i in range(NUM_THREAD):
t = threading.Thread(target=save_info, args=(data, i))
threads.append(t)
for i in range(NUM_THREAD):
threads[i].start()
for i in range(NUM_THREAD):
threads[i].join()
print("all was done!")
if __name__ == '__main__':
s_time = time.time()
main()
e_time = time.time()
print("总的信息条数:", COUNT)
print("总耗时:", e_time-s_time)
speak = win32com.client.Dispatch('SAPI.SPVOICE')
speak.Speak("早上好,eric,程序结束!")
数据存储模块:
import pickle
import zlib
from bson.binary import Binary
from datetime import datetime, timedelta
from pymongo import MongoClient
import time
class MongoCache(object):
def __init__(self, client=None, expires=timedelta(days=30)):
self.client = MongoClient('localhost', 27017) if client is None else client
self.db = self.client.cache
#self.db.webpage.create_index('timestamp', expireAfterSeconds=expires.total_seconds()) #设置自动删除时间
def __call__(self,url):
self.db.webpage.insert(url)
#print("保存成功")
def __contains__(self,url):
try:
self[url]
except KeyError:
return False
else:
return True
def __getitem__(self, url):
record = self.db.webpage.find_one({'_id':url})
if record:
return pickle.loads(zlib.decompress(record['result']))
else:
raise KeyError(url + 'dose not exist')
def __setitem__(self, url, result):
record = {'result': Binary(zlib.compress(pickle.dumps(result))), 'timestamp':datetime.utcnow()}
self.db.webpage.update({'_id':url},{'$set':record},upsert=True)
def clear(self):
self.db.webpage.drop()
将一百万条网址从csv文件保存到数据库所花费的时间为:
start working
working...
all was done!
总的信息条数: 1000000
总耗时: 427.4034459590912