python multiprocessing 下载图片示例 多进程通信方式 删除父进程复制的内存

Linux多进程通信–管道、消息队列、共享内存
Python multiprocessing 使用手记[2] – 跨进程对象共享
限制子进程内存
python的引用计数分析(一)

# fork后就同时有两份进程在跑这份代码,并且fork之前的变量,两个进程都存有。但是两个进程的变量不是共享的,而是进程独有,因此一个进程修改变量不会影响到另外一个进程。
import os
var = "unchanged"
pid = os.fork()
if pid:
    print('parent:', os.getpid(), var)
    os.waitpid(pid, 0)
else:
    print('child:', os.getpid(), var)
    var = "changed"

# show parent and child views
print(os.getpid(), var)

因为linux上的fork是完全复制父进程的内存空间。当我们父进程使用内存很大时子进程也会占用大量内存,而且是非必须的。
#可以使用spawn方式启用进程:
# Note that this is Python 3.4+ only
import time
import multiprocessing 
def foo(x):
    for x in range(2**28):pass
    print(x**2)
if __name__ == "__main__":
    completely_unrelated_array = list(range(2**23))  # Again, this only exists in the parent
    ctx = multiprocessing.get_context("spawn") # Use process spawning instead of fork
    P = ctx.Pool()
    for x in range(8):
        ctx.Process(target=foo, args=(x,)).start()

#启动进程后迅速删除不用变量
import time
import multiprocessing 
import gc
def foo(x):
    init()
    for x in range(2**28):pass
    print(x**2)
def init():
    global completely_unrelated_array
    completely_unrelated_array = None
    del completely_unrelated_array
    gc.collect()
if __name__ == "__main__":
    completely_unrelated_array = list(range(2**23))
    P = multiprocessing.Pool(initializer=init)
    for x in range(8):
        multiprocessing.Process(target=foo, args=(x,)).start()
    time.sleep(100)
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division

import multiprocessing
import glob
import json
import sys
import os
import time
from urllib.request import urlretrieve


data_dir = "/Users/downloadimg/"
img_dir = "/Users/downloadimg/imgs/"


def parallel_download(queue, flag, num, count):
    print(f"thread {num} starting...")

    while not flag.value or not queue.empty():
        imgurl = queue.get()
        with count.get_lock():
            count.value += 1

        # print(imgurl)
        write_file = img_dir+imgurl.split("/")[-1][:100]
        if not os.path.exists(write_file):
            try:
                urlretrieve(imgurl, write_file)
            except:
                print("error url: ", imgurl)
                continue

        # if flag and queue.empty():
            # break
    print(f"thread {num} finished.")

def single_read(queue, files_list, flag, total):
    for infile in files_list:
        with open(infile, "r") as inp:
            for line in inp.readlines():
                line_json = json.loads(line.split("\t")[3])
                queue.put(line_json["img_url"])
                total.value += 1

    queue.close()
    print(f"single_read finished.")
    with flag.get_lock():
        flag.value = True


if __name__ == "__main__":

    parallel_num = int(sys.argv[2]) if sys.argv[2] else 10
    files_list = glob.glob(data_dir + sys.argv[1])
    files_list = sorted(files_list)
    print(files_list)

    queue = multiprocessing.Queue()
    flag = multiprocessing.Value('b', False)
    progress_total = multiprocessing.Value('i', 0)
    progress_count = multiprocessing.Value('i', 0)
    process_pool = []

    process = multiprocessing.Process(target=single_read, args=(queue, files_list, flag, progress_total))
    process_pool.append(process)
    for i in range(parallel_num):
        process = multiprocessing.Process(target=parallel_download, args=(queue, flag, i, progress_count))
        process_pool.append(process)

    for p in process_pool:
        p.start()

    while not flag.value or not queue.empty():
        time.sleep(2)
        print("num:", progress_count.value, progress_total.value, end="\r", flush=True)

    for p in process_pool:
        p.join()

    print(progress_count.value, progress_total.value)
    print("finish all downloading.")
发布了557 篇原创文章 · 获赞 500 · 访问量 153万+

猜你喜欢

转载自blog.csdn.net/qq_16234613/article/details/96021616