[Python] 使用futures模块处理并发(超好用的并发库)

2024-01-25 03:08

本文主要是介绍[Python] 使用futures模块处理并发(超好用的并发库),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

使用futures模块处理并发

concurrent.futures模块的主要特色是ThreadPoolExecutor和ProcessPoolExecutor类,这两个类实现的接口能分别在不同的线程或进程中执行可调用的对象。这两个类在内部维护着一个工作线程或进程池,以及要执行的任务队列。ThreadPoolExecutor和ProcessPoolExecutor的API接口一样,本文重点讲解ThreadPoolExecutor的用法。

一、用法示例

1. 使用 map() 处理多个任务

使用两行代码,就能帮我们调用多线程完成任务:

with futures.ThreadPoolExecutor(max_workers=5) as executor: 	# max_workers 最大线程数res = executor.map(fn, params)		# fn 是被调用函数,params 是 fn 的参数

例子如下:

from concurrent import futuresdef download_source(url):print(url)return url * 100urls = range(1, 20)with futures.ThreadPoolExecutor(max_workers=5) as executor:res = executor.map(download_source, urls)print(list(res))
1
2
3
4
...	# 省略了输出
18
19
[100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200, 1300, 1400, 1500, 1600, 1700, 1800, 1900]

注意事项: map() 返回结果的顺序和调用开始的顺序一直,如果第一个调用生成结果用时10秒,而其他调用只用1秒,代码会阻塞10秒以获取第一个结果。在此之后,获取后续结果不会阻塞,因为后续调用已经结束。更可取的方式是,不管提交顺序如何,只要有结果就获取。而使用 Executor.submit 方法和 futures.as_completed 函数可以帮我们完成这种方式。

2. 使用 submit() 提交单个任务

map() 可以帮我们快速完成一批任务,但是我们有时候需要的是单个处理,可以使用 submit():

import time
from concurrent import futuresdef sleep_and_print(number):time.sleep(1)print('I am number', number)return number * 100numbers = range(1, 100)with futures.ThreadPoolExecutor(max_workers=5) as executor:for num in numbers:future = executor.submit(sleep_and_print, num)	# 当submit时,executor会创建线程,并将事件加入_work_queue(threading.Queue),多个线程从_work_queue获取Work并执行。

此处 submit() 返回的 future 并不是结果,而是指待完成的操作。future 封装待完成的操作,可以放入队列,完成的状态可以查询,得到结果后可以获取结果(或者异常)。

3. 使用 as_completed() 逐个获取已完成的任务

as_completed() 返回一个包含 fs 所指定的 Future 实例(可能由不同的 Executor 实例创建)的迭代器,这些实例会在完成时生成 future 对象(包括正常结束或被取消的 future 对象)。 任何由 fs 所指定的重复 future 对象将只被返回一次。 任何在 as_completed() 被调用之前完成的 future 对象将优先被生成。 如果 __next__() 被调用并且在对 as_completed() 的原始调用 timeout 秒之后结果仍不可用,则返回的迭代器将引发 concurrent.futures.TimeoutErrortimeout 可以为整数或浮点数。 如果 timeout 未指定或为 None,则不限制等待时间。

3.1 获取运行结果

我们尝试获取结果:

with futures.ThreadPoolExecutor(max_workers=3) as executor:to_do = []		# 待完成的队列,用于保存所有 futurefor _i in numbers:future = executor.submit(sleep_and_print, _i)	# 加入执行to_do.append(future)results = []	for future in futures.as_completed(to_do):	# 等待完成res = future.result()	# 接收结果results.append(res)print("Already Finished", res)print(results)
3.2 获取异常
import time
from concurrent import futuresclass DemoException(Exception):		# 自定义异常passdef sleep_and_print(number):time.sleep(1)print('I am number', number)raise DemoException				# 产出异常numbers = range(1, 10)with futures.ThreadPoolExecutor(max_workers=3) as executor:to_do = []for _i in numbers:future = executor.submit(sleep_and_print, _i)  to_do.append(future)exceptions = []for future in futures.as_completed(to_do):res = future.exception()	# future.exception() 获取异常exceptions.append(res)print(exceptions)
I am numberI am number 1 3I am number 2
I am number 4
I am number 5
I am number 6
I am numberI am number I am number 7
8 9[DemoException(), DemoException(), DemoException(), DemoException(), DemoException(), DemoException(), DemoException(), DemoException(), DemoException()]

4. 使用 Lock 解决(打印混乱)争用问题

上面的代码是有问题的,我们访问同一个打印控制台,打印会造成混乱。ThreadPoolExecutor 内部是使用的原生的threading去创建线程,所以我们使用 threading.Lock 就可以解决争用问题:

from concurrent import futures
from threading import Lock
import timeg_Lock = Lock()     # 多线程互斥访问 print 打印控制台,需要加锁def sleep_and_print(number):time.sleep(1)with g_Lock:	# 尝试获取锁print('I am number', number)return number * 100numbers = range(1, 1000)with futures.ThreadPoolExecutor(max_workers=3) as executor:results = executor.map(sleep_and_print, numbers)

5. 使用 add_done_callback() 回调函数

import time
from concurrent import futures
from threading import Lockg_Lock = Lock()def sleep_and_print(number):time.sleep(1)with g_Lock:print('I am number', number)return number * 100def _done(f):		# f 是 future 对象,是唯一参数with g_Lock:print(f, 'Done')numbers = range(1, 1000)with futures.ThreadPoolExecutor(max_workers=3) as executor:to_do = []for _i in numbers:future = executor.submit(sleep_and_print, _i)  # 当submit时,executor会创建线程,并将事件加入_work_queue(threading.Queue),多个线程从_work_queue获取Work并执行。to_do.append(future)for future in to_do:future.add_done_callback(_done)		# 回调函数
I am number 1
<Future at 0x25154d1ae08 state=finished returned int> Done
I am number 2
<Future at 0x25154d6a508 state=finished returned int> Done
I am number 3
<Future at 0x25154d75108 state=finished returned int> Done
I am number 4
<Future at 0x25154d753c8 state=finished returned int> Done
I am number 5
I am number 6
<Future at 0x25154d75748 state=finished returned int> Done
<Future at 0x25154d75588 state=finished returned int> Done
...

6. 使用 wait() 获取所有 future 的状态

wait() 会返回一个命名2元组,包含两个 set(),一个 set 包含已完成的 future (finished or cancelled) ,另一个包含未完成的 future:

import time
from concurrent import futures
from threading import Lockg_Lock = Lock()def sleep_and_print(number):time.sleep(1)with g_Lock:print('I am number', number)return number * 100numbers = range(1, 10)with futures.ThreadPoolExecutor(max_workers=3) as executor:to_do = []for _i in numbers:future = executor.submit(sleep_and_print, _i)  # 当submit时,executor会创建线程,并将事件加入_work_queue(threading.Queue),多个线程从_work_queue获取Work并执行。to_do.append(future)time.sleep(2)with g_Lock:print(futures.wait(to_do, timeout=1))
I am number 2
I am number 1
I am number 3
I am number 5
DoneAndNotDoneFutures(done={<Future at 0x2724b629748 state=finished returned int>, <Future at 0x2724b635488 state=finished returned int>, <Future at 0x2724b5b1ac8 state=finished returned int>, <Future at 0x2724b62cfc8 state=finished returned int>}, not_done={<Future at 0x2724b635648 state=running>, <Future at 0x2724b6352c8 state=running>, <Future at 0x2724b635b08 state=pending>, <Future at 0x2724b635948 state=pending>, <Future at 0x2724b6357c8 state=running>})
I am number 6
I am number 4
I am number 7
I am number 9
I am number 8

二、核心内容

1. Executor 对象详解

concurrent.futures 模块中的 Executor 提供了异步执行调用方法,是 ThreadPoolExecutor 和 ProcessPoolExecutor 的父类。Executor 提供了如下方法:

  • submit(fn, *args, **kwargs):调度可调用对象 fn,以 fn(*args **kwargs) 方式执行并返回 Future 对象代表可调用对象的执行。
  • map(func, *iterables, timeout=None, chunksize=1) :类似于全局函数 map(func, *iterables), 将 iterables 和 func 组成任务立即丢入任务池,生成多线程或多进程从任务池中取任务。func 会被异步执行。
  • shutdown(wait=True):关闭 Executor,释放资源。不管 wait 的值是什么,整个 Python 程序将等到所有待执行的 future 对象完成执行后才退出。

2. future 对象详解

future 是 concurrent.futures 模块和 asyncio 包的重要组件。

future 封装待完成的操作,可以放入队列,完成的状态可以查询,得到结果(或抛出异常)后可以获取结果(或异常)。Future 提供了如下方法:

  • future.result(timeout=None) :返回可调用对象的结果,或者重新抛出执行可调用的对象时抛出的异常。result()会阻塞调用方所在的线程,直到有结果可返回,但是result()可以接收可选的timeout参数,如果在指定的时间内future没有运行完毕,会抛出TimeoutError异常

    try:print(to_do[9].result(timeout=1))
    except futures.TimeoutError:print('TimeoutError')
    
  • future.add_done_callback(fn) :当 future 对象被取消或完成运行时,将会调用 fn,而这个 future 对象将作为它唯一的参数。

  • future.cancel() :尝试取消调用。 如果调用正在执行或已结束运行不能被取消则该方法将返回 False,否则调用会被取消并且该方法将返回 True

  • future.done():返回布尔值,指明future链接的可调用对象是否已经执行,不会阻塞

  • future.cancelled():返回布尔值,指明future链接的可调用对象是否已经成功取消,不会阻塞

  • future.running():返回布尔值,指明future链接的可调用对象是否已经正在执行而且不能被取消,不会阻塞

3. map() 源码剖析

我们知道 map() 是这么调用的:

with futures.ThreadPoolExecutor(workers) as executor: res = executor.map(download_source, urls)

前面说过,map() 返回结果的顺序和调用开始的顺序一直,如果第一个调用生成结果用时10秒,而其他调用只用1秒,代码会阻塞10秒以获取第一个结果。map() 封装很优秀,忍不住看了看源码,Executor.map() 定义在 Lib.concurrent.futures._base 中:

def map(self, fn, *iterables, timeout=None, chunksize=1):...fs = [self.submit(fn, *args) for args in zip(*iterables)]	# 将可迭代对象使用submit()构造生成futuresdef result_iterator():...while fs:if timeout is None:yield fs.pop().result()		# fs.pop()按顺序返回一个future; 而future.result()会阻塞else:yield fs.pop().result(end_time - time.monotonic()) ...return result_iterator()

可以看到,Executor.map() 内部也是使用了 submit() 和 result(),但是它返回一个迭代器,迭代器中包含的是各个 future 的结果,而非 future 本身。

4. ThreadPoolExecutor 核心工作源码剖析

[Python] future线程处理并发的核心源码详解

这篇关于[Python] 使用futures模块处理并发(超好用的并发库)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/641899

相关文章

使用Java将各种数据写入Excel表格的操作示例

《使用Java将各种数据写入Excel表格的操作示例》在数据处理与管理领域,Excel凭借其强大的功能和广泛的应用,成为了数据存储与展示的重要工具,在Java开发过程中,常常需要将不同类型的数据,本文... 目录前言安装免费Java库1. 写入文本、或数值到 Excel单元格2. 写入数组到 Excel表格

redis中使用lua脚本的原理与基本使用详解

《redis中使用lua脚本的原理与基本使用详解》在Redis中使用Lua脚本可以实现原子性操作、减少网络开销以及提高执行效率,下面小编就来和大家详细介绍一下在redis中使用lua脚本的原理... 目录Redis 执行 Lua 脚本的原理基本使用方法使用EVAL命令执行 Lua 脚本使用EVALSHA命令

Java并发编程之如何优雅关闭钩子Shutdown Hook

《Java并发编程之如何优雅关闭钩子ShutdownHook》这篇文章主要为大家详细介绍了Java如何实现优雅关闭钩子ShutdownHook,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起... 目录关闭钩子简介关闭钩子应用场景数据库连接实战演示使用关闭钩子的注意事项开源框架中的关闭钩子机制1.

Python中pywin32 常用窗口操作的实现

《Python中pywin32常用窗口操作的实现》本文主要介绍了Python中pywin32常用窗口操作的实现,pywin32主要的作用是供Python开发者快速调用WindowsAPI的一个... 目录获取窗口句柄获取最前端窗口句柄获取指定坐标处的窗口根据窗口的完整标题匹配获取句柄根据窗口的类别匹配获取句

利用Python打造一个Excel记账模板

《利用Python打造一个Excel记账模板》这篇文章主要为大家详细介绍了如何使用Python打造一个超实用的Excel记账模板,可以帮助大家高效管理财务,迈向财富自由之路,感兴趣的小伙伴快跟随小编一... 目录设置预算百分比超支标红预警记账模板功能介绍基础记账预算管理可视化分析摸鱼时间理财法碎片时间利用财

Java 中的 @SneakyThrows 注解使用方法(简化异常处理的利与弊)

《Java中的@SneakyThrows注解使用方法(简化异常处理的利与弊)》为了简化异常处理,Lombok提供了一个强大的注解@SneakyThrows,本文将详细介绍@SneakyThro... 目录1. @SneakyThrows 简介 1.1 什么是 Lombok?2. @SneakyThrows

在 Spring Boot 中实现异常处理最佳实践

《在SpringBoot中实现异常处理最佳实践》本文介绍如何在SpringBoot中实现异常处理,涵盖核心概念、实现方法、与先前查询的集成、性能分析、常见问题和最佳实践,感兴趣的朋友一起看看吧... 目录一、Spring Boot 异常处理的背景与核心概念1.1 为什么需要异常处理?1.2 Spring B

Python中的Walrus运算符分析示例详解

《Python中的Walrus运算符分析示例详解》Python中的Walrus运算符(:=)是Python3.8引入的一个新特性,允许在表达式中同时赋值和返回值,它的核心作用是减少重复计算,提升代码简... 目录1. 在循环中避免重复计算2. 在条件判断中同时赋值变量3. 在列表推导式或字典推导式中简化逻辑

python处理带有时区的日期和时间数据

《python处理带有时区的日期和时间数据》这篇文章主要为大家详细介绍了如何在Python中使用pytz库处理时区信息,包括获取当前UTC时间,转换为特定时区等,有需要的小伙伴可以参考一下... 目录时区基本信息python datetime使用timezonepandas处理时区数据知识延展时区基本信息

Python位移操作和位运算的实现示例

《Python位移操作和位运算的实现示例》本文主要介绍了Python位移操作和位运算的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录1. 位移操作1.1 左移操作 (<<)1.2 右移操作 (>>)注意事项:2. 位运算2.1