Python多线程并发时通过线程池限流

时间:2021-07-15 19:01:30   收藏:0   阅读:0

Python支持多线程,但是由于GIL的限制并不能无限制的开启子线程。

通过semaphore我们可以控制子线程对于共享资源的访问,即可以阻塞一些子线程直到有空余的semaphore资源,但是并不能实际限制子线程数。

当我们需要开启成千上万个子线程时,很多时候并不希望这些子线程同时执行(可能受限于系统资源or后端数据库),而是更希望一次性执行一批子线程,然后有空余资源时补充一批继续执行。

在Python2中,一种变通的方法是自己设置一个简易的线程池,如下所示:

if __name__ == ‘__main__‘:
    max_workers = 50
    all_resouces = get_all_resouces()
    thread_pool = {}
    i = 0
    while i < len(all_resouces):
        if len(thread_pool) < max_workers:
            thd = Thread(target=handle_resource, args=(all_resouces[i],), name=all_resouces[i])
            thd.start()
            thread_pool[i] = thd
            i += 1
        else:
            sleep(3)
            for thd_index,thd in thread_pool.items():
                if not thd.is_alive():
                    thread_pool.pop(thd_index)
    for thd in thread_pool.values():
        thd.join()
# 这里把线程池定义为dict而非list是因为遍历list时对list本身做remove会导致线程池满后清除死线程的效率降低一半。
# 这个BUG是因为for遍历list其实是根据下标索引来遍历的,每当删除一个元素就会导致后边的下标整体-1,这会导致下一次遍历时跳过被删除位置的元素
# 这个BUG有多种处理方式,例如list copy,queue等等,在stackoverflow上也有诸多讨论,这里不过多描述。

上述Demo中,我们将线程池大小限制为50。

当线程池未满,直接创建新的子线程并启动然后加入thread_pool。

当线程池已满,便等待数秒(也可以不等待),之后检查线程池看看能不能空出坑位,进入下一次循环,直到所有子线程创建完毕。

最后join(),等待所有子线程执行完毕后结束主进程。

在Python3中直接执行上述代码会遇到:

RuntimeError: dictionary changed size during iteration

这个错误比较熟悉,但并不会影响程序实际执行。说他熟悉其实是因为这个报错换个单词就可以描述上边代码中为什么使用了dict而不是list:

RuntimeError: list changed size during iteration
问题:是否有现成的标准库或第三方库实现上述功能?
使用python3的标准库concurrent.futures会很好: concurrent.futures — Launching parallel tasks — Python 3.9.6 documentation
其实这个库就是实现了上述限流的目的,其底层依然是Threading和multiprocessing模块,一个future对象其实就是一个子线程,通过其线程池功能,我们可以像使用threading模块那样使用concurrent.futures,只是方法名和使用方式有些许差异。并且threading模块里的Lock和信号量等同步原语也可以直接在concurrent.futures的代码中照常使用,这些同步原语是与资源处理函数绑定的与并发库倒是无关。
另外python2中好像也有threadpool这个第三方库,但是现在已经很少用了,官网已经404了没找到什么有效信息。
那么使用concurrent.futures实现上述代码一样的功能就可以写为:
from concurrent.futures import ThreadPoolExecutor
......
if __name__ == ‘__main__‘:
    all_resouces = get_all_resouces()
    with ThreadPoolExecutor(max_workers=50) as pool:
		for r in all_resouces:
            pool.submmit(handle_resource, *args)
concurrent.futures通过封装Threading与multiprocessing模块实现了线程池限流的功能,并且写法更加简洁。
此模块中的wait和as_completed两个module function实现了与Threading的join()方法相同的功能,但是返回的future对象功能更加强大,可以通过result存储子线程的执行结果。
相比于Threading模块,concurrent.futures启动的子线程会默认阻塞主进程(直到所有子线程执行完毕),这应该不能算作缺点,总而言之,使用concurrent.futures启动多线程是推荐的做法。
评论(0
© 2014 mamicode.com 版权所有 京ICP备13008772号-2  联系我们:gaon5@hotmail.com
迷上了代码!