昨天学了协程,恰巧今天有个小东西要搞,就是文件从一个文件夹移动到另外一个文件夹中,传统界面粘贴复制肯定不是我的作风,那就写个代码移动吧,说搞就搞吧。
文件选择100个图片
正常版
import os
import shutil
import time
# 正常移动文件
def move(all_img):
for each_img in all_img:
try:
if not os.path.exists(os.path.join(dest_path, each_img)):
print('正在移动 %s .....................' % each_img)
shutil.move(os.path.join(img_path, each_img), os.path.join(dest_path, each_img))
else:
continue
except BaseException as e:
print('图片已经被移动了..........................')
if __name__ == '__main__':
start_time = time.time()
img_path = r'D:\test2'
dest_path = r'D:\test1'
all_img = os.listdir(img_path)
print('需要移动%s张图片' % len(all_img))
move(all_img)
end_time = time.time()
print('花费 %s time' % str(end_time - start_time))
结果耗时:
花费 4.666001796722412 time
虽然没有什么界面化,但是速度我不满意呀,不是昨天刚学的协程吗?那就搞一波。
协程版
import os
import shutil
import time
import asyncio
sema = asyncio.Semaphore(20)
# 使用协程移动文件
async def move(each_img):
print('需要移动%s张图片' % len(each_img))
try:
if not os.path.exists(os.path.join(dest_path, each_img)):
print('正在移动 %s .....................' % each_img)
with(await sema):
await shutil_move(each_img)
else:
print('已经存在')
except BaseException as e:
print('图片已经被移动了..........................')
async def shutil_move(each_img):
shutil.move(os.path.join(img_path, each_img), os.path.join(dest_path, each_img))
if __name__ == '__main__':
start_time = time.time()
img_path = r'D:\test1'
dest_path = r'D:\test2'
all_img = os.listdir(img_path)
tasks = [move(each_img) for each_img in all_img]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end_time = time.time()
print('花费 %s time' % str(end_time - start_time))
结果耗时:
花费 4.599006175994873 time
然而从结果来看并不好呀,不禁怀疑是不是文件量比较少,而导致效果不好。果然还是要结合aiothhp呀。
线程版
import os
import shutil
import time
import threadpool
# 使用线程移动文件
def move(each_img):
try:
if not os.path.exists(os.path.join(dest_path, each_img)):
print('正在移动 %s .....................' % each_img)
shutil.move(os.path.join(img_path, each_img), os.path.join(dest_path, each_img))
else:
print('已经存在咯..........')
except BaseException as e:
print('图片已经被移动了..........................')
if __name__ == '__main__':
start_time = time.time()
# img_path = r'D:\test2'
img_path = r'D:\test2'
dest_path = r'D:\test1'
all_img = os.listdir(img_path)
print('需要移动%s张图片' % len(all_img))
task_pool = threadpool.ThreadPool(50)
tasks = threadpool.makeRequests(move, all_img)
for task in tasks:
task_pool.putRequest(task)
task_pool.wait()
end_time = time.time()
print('花费 %s time' % str(end_time - start_time))
结果耗时:
花费 3.795001983642578 time
从效果来看 还是节约了一点时间,但是在文件量比较大的情况下,线程提高的速度真的是厉害。你可以体会一下。
进程版
import os
import shutil
import time
import multiprocessing
img_path = r'D:\test2'
dest_path = r'D:\test1'
# 使用多进程移动文件
def move(each_img):
try:
if not os.path.exists(os.path.join(dest_path, each_img)):
print('正在移动 %s .....................' % each_img)
shutil.move(os.path.join(img_path, each_img), os.path.join(dest_path, each_img))
else:
print('已经存在咯..........')
except BaseException as e:
print(e)
if __name__ == '__main__':
start_time = time.time()
all_img = os.listdir(img_path)
print('需要移动%s张图片' % len(all_img))
task_pool = multiprocessing.Pool(8)
task_pool.map(move, all_img)
task_pool.close()
task_pool.join()
end_time = time.time()
print('花费 %s time' % str(end_time - start_time))
结果耗时:
花费 4.57800030708313 time
从效果来看,进程似乎是不太高效,但这仅仅是在我本机上跑的,我本机那么烂,可能在服务器上跑,效果会很好。
分享了文件移动的N中姿势,当然爬虫也有N中姿势,今天这里就不分享了,以后有时间在搞一波。