python多线程取值问题

        1.最近工作中遇到一个问题,需要多线程去解决,Casandra数据库表中数据量太大的话使用count(*)统计数量会导致超时问题,可是有需求必须统计数据量,所以想了个解决方案,把一天的查询时间切分为一小时的数组,使用多线程统计每一小时的数据量,每个线程将结果写入到队列中,最后将队列中的值聚合运算就拿到了总的数据量,这其中遇到一个问题就是多线程无法直接拿到返回值,多线程写入到队列中的是一个多线程对象,解决方案就是写一个获取返回i值的类,每次多线程初始化一个取值类,线程的结果通过类方法取值之后保存到队列中。

class Value:
    def __init__(self):
        self.value = None

    def get_value(self):
        return self.value

    def run(self,cql2,starttime,endtime,*args):
        res = CassandraDb().select_data(cql2,starttime,endtime,*args)
        try:
            for i in res:
                mes_num = i.count
        except:
            mes_num = 0
        self.value = mes_num
        return self.value



def message_num(start_time,end_time,cql2,*args):
    time_list = []
    for i in range(start_time,end_time+1,3600000):
        time_list.append(i)
    time_list.append(end_time)
    result = []
    for ti in time_list:
        va = Value()
        if ti <= time_list[-1]:
            if ti +3600000>time_list[-1]:
                starttime = ti
                endtime = time_list[-1]
                t = Thread(target=va.run,args=(cql2,str(starttime),str(endtime),*args))
                t.start()
                t.join()
                result.append(va.get_value())
                break
            else:
                starttime = ti
                endtime = ti + 3600000
                t = Thread(target=va.run,args= (cql2,str(starttime),str(endtime),*args))
                t.start()
                t.join()
                result.append(va.get_value())
    return sum(result)

        2.还有一种方法是使用ThreadPoolExecutor线程池,线程池中有一个result()方法可以直接拿到线程返回方法。线程池用起来还是简单方便的
"""

desc:
"""

import time
import random
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed


def func1():
    rs = []
    for i in range(100):
        rs.append(uuid.uuid1())
    time.sleep(5)
    return rs


def func2():
    rs = []
    for i in range(100):
        rs.append(i)
    time.sleep(5)
    return rs


def func3(a):
    rs = []
    for i in range(100):
        rs.append(random.random())
    time.sleep(5)
    return rs


def main():
    # 顺序执行
    rs1 = func1()
    rs2 = func2()
    rs3 = func3(2)
    print(rs1, rs2, rs3)


def main1():
    func_list = [(func1, None), (func2, None), (func3, 3)]
    with ThreadPoolExecutor(max_workers=len(func_list)) as t:
        obj_list = []
        for func_tuple in func_list:
            func, args = func_tuple
            obj = t.submit(func, args)
            obj_list.append(obj)

        for future in as_completed(obj_list):
            data = future.result()
            print(data)
        # 等所有线程执行完毕后可以做一些别的事情..


if __name__ == "__main__":
    start = time.time()
    # main()
    main1()
    print("耗时: ", time.time() - start)

猜你喜欢

转载自blog.csdn.net/weixin_43697214/article/details/109765206