Python Asyncio 库之同步原语常用函数详解

2024-01-08 21:44

本文主要是介绍Python Asyncio 库之同步原语常用函数详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前记

Asyncio的同步原语可以简化我们编写资源竞争的代码和规避资源竞争导致的Bug的出现。 但是由于协程的特性,在大部分业务代码中并不需要去考虑资源竞争的出现,导致Asyncio同步原语被使用的频率比较低,但是如果想基于Asyncio编写框架则需要学习同步原语的使用。

0.基础

同步原语都是适用于某些条件下对某个资源的争夺,在代码中大部分的资源都是属于一个代码块,而Python对于代码块的管理的最佳实践是使用with语法,with语法实际上是调用了一个类中的__enter____exit__方法,比如下面的代码:

class Demo(object):def __enter__(self):return def __exit__(self, exc_type, exc_val, exc_tb):return with Demo():pass

代码中的Demo类实现了__enter____exit__方法后,就可以被with语法调用,其中__enter__方法是进入代码块执行的逻辑,__enxi__方法是用于退出代码块(包括异常退出)的逻辑。这两个方法符合同步原语中对资源的争夺和释放,但是__enter____exit__两个方法都是不支持await调用的,为了解决这个问题,Python引入了async with语法。

async with语法和with语法类似 ,我们只要编写一个拥有__aenter____aexit__方法的类,那么这个类就支持asyncio with语法了,如下:

import asyncioclass Demo(object):async def __aenter__(self):returnasync def __aexit__(self, exc_type, exc_val, exc_tb):returnasync def main():async with Demo():passasyncio.run(main())

其中,类中的__aenter__方法是进入代码块时执行的方法,__aexit__是退出代码块时执行的方法。

有了async with语法的加持,asyncio的同步原语使用起来会比较方便,所以asyncio中对资源争夺的同步原语都会继承于_ContextManagerMixin类:

class _ContextManagerMixin:async def __aenter__(self):await self.acquire()# We have no use for the "as ..."  clause in the with# statement for locks.return Noneasync def __aexit__(self, exc_type, exc, tb):self.release()

并实现了acquirerelease方法,供__aenter____aexit__方法调用,同时我们在使用同步原语的时候尽量用到async with语法防止忘记释放资源的占用。

1.Lock

由于协程的特性,在编写协程代码时基本上可以不考虑到锁的情况,但在一些情况下我们还是需要用到锁,并通过锁来维护并发时的数据安全性,如下例子:

import asyncioshare_data = {}async def sub(i):# 赋上相同的key和valueshare_data[i] = iawait asyncio.sleep(0)print(i, share_data[i] == i)async def sub_add(i):# 赋上的value值是原来的+1share_data[i] = i + 1await asyncio.sleep(0)print(i, share_data[i] == i + 1)async def main():# 创建并发任务task_list = []for i in range(10):task_list.append(sub(i))task_list.append(sub_add(i))# 并发执行await asyncio.gather(*task_list)if __name__ == "__main__":asyncio.run(main())

在这个例子中程序会并发的执行subsub_add函数,他们是由不同的asyncio.Task驱动的,这意味着会出现这样一个场景。 当负责执行sub(1)函数的asyncio.Task在执行完share_data[i]=i后就执行await asyncio.sleep(0)从而主动让出控制权并交还给事件循环,等待事件循环的下一次调度。 不过事件循环不会空下来,而是马上安排下一个asyncio.Task执行,此时会先执行到sub_add(1)函数的share_data[i] = i + 1,并同样的在执行到await asyncio.sleep(0)的时候把控制权交会给事件循环。 这时候控制权会由事件循环转移给原先执行sub(1)函数的asyncio.Task,获取到控制权l后sub(1)函数的逻辑会继续走,但由于share_data[i]的数据已经被share_data[i] = i + 1修改了,导致最后执行print时,share_data[i]的数据已经变为脏数据,而不是原本想要的数据了。

为了解决这个问题,我们可以使用asyncio.Lock来解决资源的冲突,如下:

import asyncioshare_data = {}
# 存放对应资源的锁
lock_dict = {}async def sub(i):async with lock_dict[i]:  # <-- 通过async with语句来控制锁的粒度share_data[i] = iawait asyncio.sleep(0)print(i, share_data[i] == i)async def sub_add(i):async with lock_dict[i]:share_data[i] = i + 1await asyncio.sleep(0)print(i, share_data[i] == i + 1)async def main():task_list = []for i in range(10):lock_dict[i] = asyncio.Lock()task_list.append(sub(i))task_list.append(sub_add(i))await asyncio.gather(*task_list)if __name__ == "__main__":asyncio.run(main())

从例子可以看到asyncio.Lock的使用方法跟多线程的Lock差不多,通过async with语法来获取和释放锁,它的原理也很简单,主要做了如下几件事:

  • 1.确保某一协程获取锁后的执行期间,别的协程在获取锁时需要一直等待,直到执行完成并释放锁。
  • 2.当有协程持有锁的时候,其他协程必须等待,直到持有锁的协程释放了锁。
  • 2.确保所有协程能够按照获取的顺序获取到锁。

这意味着需要有一个数据结构来维护当前持有锁的协程的和下一个获取锁协程的关系,同时也需要一个队列来维护多个获取锁的协程的唤醒顺序。

asyncio.Lock跟其它asyncio功能的用法一样,使用asyncio.Future来同步协程之间锁的状态,使用deque维护协程间的唤醒顺序,源码如下:

class Lockl(_ContextManagerMixin, mixins._LoopBoundMixin):def __init__(self):self._waiters = Noneself._locked = Falsedef locked(self):return self._lockedasync def acquire(self):if (not self._locked and (self._waiters is None or all(w.cancelled() for w in self._waiters))):# 目前没有其他协程持有锁,当前协程可以运行self._locked = Truereturn Trueif self._waiters is None:self._waiters = collections.deque()# 创建属于自己的容器,并推送到`_waiters`这个双端队列中fut = self._get_loop().create_future()self._waiters.append(fut)try:try:await futfinally:# 如果执行完毕,需要把自己移除,防止被`wake_up_first`调用self._waiters.remove(fut)except exceptions.CancelledError:# 如果是等待的过程中被取消了,需要唤醒下一个调用`acquire`if not self._locked:self._wake_up_first()raise# 持有锁self._locked = Truereturn Truedef release(self):if self._locked:# 释放锁self._locked = Falseself._wake_up_first()else:raise RuntimeError('Lock is not acquired.')def _wake_up_first(self):if not self._waiters:return# 获取还处于锁状态协程对应的容器try:# 获取下一个等待获取锁的waiterfut = next(iter(self._waiters))except StopIteration:return# 设置容器为True,这样对应协程就可以继续运行了。if not fut.done():fut.set_result(True)

通过源码可以知道,锁主要提供了获取和释放的功能,对于获取锁需要区分两种情况:

  • 1:当有协程想要获取锁时会先判断锁是否被持有,如果当前锁没有被持有就直接返回,使协程能够正常运行。
  • 2:如果协程获取锁时,锁发现自己已经被其他协程持有则创建一个属于当前协程的asyncio.Future,用来同步状态,并添加到deque中。

而对于释放锁就比较简单,只要获取deque中的第一个asyncio.Future,并通过fut.set_result(True)进行标记,使asyncio.Futurepeding状态变为done状态,这样一来,持有该asyncio.Future的协程就能继续运行,从而持有锁。

不过需要注意源码中acquire方法中对CancelledError异常进行捕获,再唤醒下一个锁,这是为了解决acquire方法执行异常导致锁一直被卡住的场景,通常情况下这能解决大部分的问题,但是如果遇到错误的封装时,我们需要亲自处理异常,并执行锁的唤醒。比如在通过继承asyncio.Lock编写一个超时锁时,最简单的实现代码如下:

import asyncioclass TimeoutLock(asyncio.Lock):def __init__(self, timeout, *, loop=None):self.timeout = timeoutsuper().__init__(loop=loop)async def acquire(self) -> bool:return await asyncio.wait_for(super().acquire(), self.timeout)

这份代码非常简单,他只需要在__init__方法传入timeout参数,并在acuiqre方法中通过wait_for来实现锁超时即可,现在假设wait_for方法是一个无法传递协程cancel的方法,且编写的acquire没有进行捕获异常再释放锁的操作,当异常发生的时候会导致锁一直被卡住。 为了解决这个问题,只需要对TimeoutLockacquire方法添加异常捕获,并在捕获到异常时释放锁即可,代码如下:

class TimeoutLock(asyncio.Lock):def __init__(self, timeout, *, loop=None):self.timeout = timeoutsuper().__init__(loop=loop)async def acquire(self) -> bool:try:return await asyncio.wait_for(super().acquire(), self.timeout)except Exception:self._wake_up_first()raise

2.Event

asyncio.Event也是一个简单的同步原语,但它跟asyncio.Lock不一样,asyncio.Lock是确保每个资源只能被一个协程操作,而asyncio.Event是确保某个资源何时可以被协程操作,可以认为asyncio.Lock锁的是资源,asyncio.Event锁的是协程,所以asyncio.Event并不需要acquire来锁资源,release释放资源,所以也用不到async with语法。

asyncio.Event的简单使用示例如下:

import asyncioasync def sub(event: asyncio.Event) -> None:await event.wait()print("I'm Done")async def main() -> None:event = asyncio.Event()for _ in range(10):asyncio.create_task(sub(event))await asyncio.sleep(1)event.set()asyncio.run(main())

在这个例子中会先创建10个asyncio.Task来执行sub函数,但是所有sub函数都会在event.wait处等待,直到main函数中调用event.set后,所有的sub函数的event.wait会放行,使sub函数能继续执行。

可以看到asyncio.Event功能比较简单,它的源码实现也很简单,源码如下:

class Event(mixins._LoopBoundMixin):def __init__(self):self._waiters = collections.deque()self._value = Falsedef is_set(self):return self._valuedef set(self):if not self._value:# 确保每次只能set一次self._value = True# 设置每个协程存放的容器为True,这样对应的协程就可以运行了for fut in self._waiters:if not fut.done():fut.set_result(True)def clear(self):# 清理上一次的setself._value = Falseasync def wait(self):if self._value:# 如果设置了,就不需要等待了return True# 否则需要创建一个容器,并需要等待容器完成fut = self._get_loop().create_future()self._waiters.append(fut)try:await futreturn Truefinally:self._waiters.remove(fut)

通过源码可以看到wait方法主要是创建了一个asyncio.Future,并把它加入到deque队列后就一直等待着,而set方法被调用时会遍历整个deque队列,并把处于peding状态的asyncio.Future设置为done,这时其他在调用event.wait方法的协程就会得到放行。

通过源码也可以看出,asyncio.Event并没有继承于_ContextManagerMixin,这是因为它锁的是协程,而不是资源。

asyncio.Event的使用频率比asyncio.Lock多许多,不过通常都会让asyncio.Event和其他数据结构进行封装再使用,比如实现一个服务器的优雅关闭功能,这个功能会确保服务器在等待n秒后或者所有连接都关闭后才关闭服务器,这个功能就可以使用setasyncio.Event结合,如下:

import asyncioclass SetEvent(asyncio.Event):def __init__(self, *, loop=None):self._set = set()super().__init__(loop=loop)def add(self, value):self._set.add(value)self.clear()def remove(self, value):self._set.remove(value)if not self._set:self.set()

这个SetEvent结合了setSetEvent的功能,当set有数据的时候,会通过clear方法使SetEvent变为等待状态,而set没数据的时候,会通过set方法使SetEvent变为无需等待的状态,所有调用wait的协程都可以放行,通过这种结合,SetEvent拥有了等待资源为空的功能。 接下来就可以用于服务器的优雅退出功能:

async def mock_conn_io() -> None:await asyncio.sleep(1)def conn_handle(set_event: SetEvent):task: asyncio.Task = asyncio.create_task(mock_conn_io())set_event.add(task)task.add_done_callback(lambda t: set_event.remove(t))async def main():set_event: SetEvent = SetEvent()for _ in range(10):conn_handle(set_event)# 假设这里收到了退出信号await asyncio.wait(set_event.wait(), timeout=9)asyncio.run(main())

在这个演示功能中,mock_conn_io用于模拟服务器的连接正在处理中,而conn_handle用于创建服务器连接,main则是先创建10个连接,并模拟在收到退出信号后等待资源为空或者超时才退出服务。

这只是简单的演示,实际上的优雅关闭功能要考虑的东西不仅仅是这些。

4.Condition

condition只做简单介绍

asyncio.Condition是同步原语中使用最少的一种,因为他使用情况很奇怪,而且大部分场景可以被其他写法代替,比如下面这个例子:

import asyncioasync def task(condition, work_list):await asyncio.sleep(1)work_list.append(33)print('Task sending notification...')async with condition:condition.notify()async def main():condition = asyncio.Condition()work_list = list()print('Main waiting for data...')async with condition:_ = asyncio.create_task(task(condition, work_list))await condition.wait()print(f'Got data: {work_list}')asyncio.run(main())
# >>> Main waiting for data...
# >>> Task sending notification...
# >>> Got data: [33]

在这个例子中可以看到,notifywait方法只能在async with condition中可以使用,如果没有在async with condition中使用则会报错,同时这个示例代码有点复杂,没办法一看就知道执行逻辑是什么,其实这个逻辑可以转变成一个更简单的写法:

import asyncioasync def task(work_list):await asyncio.sleep(1)work_list.append(33)print('Task sending notification...')returnasync def main():work_list = list()print('Main waiting for data...')_task = asyncio.create_task(task(work_list))await _taskprint(f'Got data: {work_list}')asyncio.run(main())# >>> Main waiting for data...
# >>> Task sending notification...
# >>> Got data: [33]

通过这个代码可以看到这个写法更简单一点,而且更有逻辑性,而condition的写法却更有点Go协程写法/或者回调函数写法的感觉。 所以建议在认为自己的代码可能会用到asyncio.Conditon时需要先考虑到是否需要asyncio.Codition?是否有别的方案代替,如果没有才考虑去使用asyncio.Conditonk。

5.Semaphore

asyncio.Semaphore–信号量是同步原语中被使用最频繁的,大多数都是用在限流场景中,比如用在爬虫中和客户端网关中限制请求频率。

asyncio.Semaphore可以认为是一个延缓触发的asyncio.Lockasyncio.Semaphore内部会维护一个计数器,无论何时进行获取或释放,它都会递增或者递减(但不会超过边界值),当计数器归零时,就会进入到锁的逻辑,但是这个锁逻辑会在计数器大于0的时候释放j,它的用法如下:`

import asyncioasync def main():semaphore = asyncio.Semaphore(10):async with semaphore:passasyncio.run(main())

示例中代码通过async with来指明一个代码块(代码用pass代替),这个代码块是被asyncio.Semaphore管理的,每次协程在进入代码块时,asyncio.Semaphore的内部计数器就会递减一,而离开代码块则asyncio.Semaphore的内部计数器会递增一。 当有一个协程进入代码块时asyncio.Semaphore发现计数器已经为0了,则会使当前协程进入等待状态,直到某个协程离开这个代码块时,计数器会递增一,并唤醒等待的协程,使其能够进入代码块中继续执行。

asyncio.Semaphore的源码如下,需要注意的是由于asyncio.Semaphore是一个延缓的asyncio.Lock,所以当调用一次release后可能会导致被唤醒的协程和刚进入代码块的协程起冲突,所以在acquire方法中要通过一个while循环来解决这个问题:`

class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin):def __init__(self, value=1):if value < 0:raise ValueError("Semaphore initial value must be >= 0")self._value = valueself._waiters = collections.deque()self._wakeup_scheduled = Falsedef _wake_up_next(self):while self._waiters:# 按照放置顺序依次弹出容器 waiter = self._waiters.popleft()if not waiter.done():# 设置容器状态,使对应的协程可以继续执行waiter.set_result(None)# 设置标记 self._wakeup_scheduled = Truereturndef locked(self):return self._value == 0async def acquire(self):# 如果`self._wakeup_scheduled`为True或者value小于0while self._wakeup_scheduled or self._value <= 0:# 创建容器并等待执行完成fut = self._get_loop().create_future()self._waiters.append(fut)try:await futself._wakeup_scheduled = Falseexcept exceptions.CancelledError:# 如果被取消了,也要唤醒下一个协程self._wake_up_next()raiseself._value -= 1return Truedef release(self):# 释放资源占用,唤醒下一个协程。self._value += 1self._wake_up_next()

针对asyncio.Semaphore进行修改可以实现很多功能,比如基于信号量可以实现一个简单的协程池,这个协程池可以限制创建协程的量,当协程池满的时候就无法继续创建协程,只有协程中的协程执行完毕后才能继续创建(当然无法控制在协程中创建新的协程),代码如下:

import asyncio
import time
from typing import Coroutineclass Pool(object):def __init__(self, max_concurrency: int):self._semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrency)async def create_task(self, coro: Coroutine) -> asyncio.Task:await  self._semaphore.acquire()task: asyncio.Task = asyncio.create_task(coro)task.add_done_callback(lambda t: self._semaphore.release())return taskasync def demo(cnt: int) -> None:print(f"{int(time.time())} create {cnt} task...")await  asyncio.sleep(cnt)async def main() -> None:pool: Pool = Pool(3)for i in range(10):await pool.create_task(demo(i))asyncio.run(main())
# >>> 1677517996 create 0 task...
# >>> 1677517996 create 1 task...
# >>> 1677517996 create 2 task...
# >>> 1677517996 create 3 task...
# >>> 1677517997 create 4 task...
# >>> 1677517998 create 5 task...
# >>> 1677517999 create 6 task...
# >>> 1677518001 create 7 task...
# >>> 1677518003 create 8 task...
# >>> 1677518005 create 9 task...

如果你对Python感兴趣,想要学习python,这里给大家分享一份Python全套学习资料,都是我自己学习时整理的,希望可以帮到你,一起加油!

😝有需要的小伙伴,可以V扫描下方二维码免费领取🆓

1️⃣零基础入门

① 学习路线

对于从来没有接触过Python的同学,我们帮你准备了详细的学习成长路线图。可以说是最科学最系统的学习路线,你可以按照上面的知识点去找对应的学习资源,保证自己学得较为全面。
在这里插入图片描述

② 路线对应学习视频

还有很多适合0基础入门的学习视频,有了这些视频,轻轻松松上手Python~
在这里插入图片描述

③练习题

每节视频课后,都有对应的练习题哦,可以检验学习成果哈哈!
在这里插入图片描述

2️⃣国内外Python书籍、文档

① 文档和书籍资料

在这里插入图片描述

3️⃣Python工具包+项目源码合集

①Python工具包

学习Python常用的开发软件都在这里了!每个都有详细的安装教程,保证你可以安装成功哦!
在这里插入图片描述

②Python实战案例

光学理论是没用的,要学会跟着一起敲代码,动手实操,才能将自己的所学运用到实际当中去,这时候可以搞点实战案例来学习。100+实战案例源码等你来拿!
在这里插入图片描述

③Python小游戏源码

如果觉得上面的实战案例有点枯燥,可以试试自己用Python编写小游戏,让你的学习过程中增添一点趣味!
在这里插入图片描述

4️⃣Python面试题

我们学会了Python之后,有了技能就可以出去找工作啦!下面这些面试题是都来自阿里、腾讯、字节等一线互联网大厂,并且有阿里大佬给出了权威的解答,刷完这一套面试资料相信大家都能找到满意的工作。
在这里插入图片描述
在这里插入图片描述

上述所有资料 ⚡️ ,朋友们如果有需要的,可以扫描下方👇👇👇二维码免费领取🆓

这篇关于Python Asyncio 库之同步原语常用函数详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/584922

相关文章

python panda库从基础到高级操作分析

《pythonpanda库从基础到高级操作分析》本文介绍了Pandas库的核心功能,包括处理结构化数据的Series和DataFrame数据结构,数据读取、清洗、分组聚合、合并、时间序列分析及大数据... 目录1. Pandas 概述2. 基本操作:数据读取与查看3. 索引操作:精准定位数据4. Group

Python pandas库自学超详细教程

《Pythonpandas库自学超详细教程》文章介绍了Pandas库的基本功能、安装方法及核心操作,涵盖数据导入(CSV/Excel等)、数据结构(Series、DataFrame)、数据清洗、转换... 目录一、什么是Pandas库(1)、Pandas 应用(2)、Pandas 功能(3)、数据结构二、安

Python使用Tenacity一行代码实现自动重试详解

《Python使用Tenacity一行代码实现自动重试详解》tenacity是一个专为Python设计的通用重试库,它的核心理念就是用简单、清晰的方式,为任何可能失败的操作添加重试能力,下面我们就来看... 目录一切始于一个简单的 API 调用Tenacity 入门:一行代码实现优雅重试精细控制:让重试按我

Python安装Pandas库的两种方法

《Python安装Pandas库的两种方法》本文介绍了三种安装PythonPandas库的方法,通过cmd命令行安装并解决版本冲突,手动下载whl文件安装,更换国内镜像源加速下载,最后建议用pipli... 目录方法一:cmd命令行执行pip install pandas方法二:找到pandas下载库,然后

MySQL常用字符串函数示例和场景介绍

《MySQL常用字符串函数示例和场景介绍》MySQL提供了丰富的字符串函数帮助我们高效地对字符串进行处理、转换和分析,本文我将全面且深入地介绍MySQL常用的字符串函数,并结合具体示例和场景,帮你熟练... 目录一、字符串函数概述1.1 字符串函数的作用1.2 字符串函数分类二、字符串长度与统计函数2.1

Python实现网格交易策略的过程

《Python实现网格交易策略的过程》本文讲解Python网格交易策略,利用ccxt获取加密货币数据及backtrader回测,通过设定网格节点,低买高卖获利,适合震荡行情,下面跟我一起看看我们的第一... 网格交易是一种经典的量化交易策略,其核心思想是在价格上下预设多个“网格”,当价格触发特定网格时执行买

Python标准库之数据压缩和存档的应用详解

《Python标准库之数据压缩和存档的应用详解》在数据处理与存储领域,压缩和存档是提升效率的关键技术,Python标准库提供了一套完整的工具链,下面小编就来和大家简单介绍一下吧... 目录一、核心模块架构与设计哲学二、关键模块深度解析1.tarfile:专业级归档工具2.zipfile:跨平台归档首选3.

使用Python构建智能BAT文件生成器的完美解决方案

《使用Python构建智能BAT文件生成器的完美解决方案》这篇文章主要为大家详细介绍了如何使用wxPython构建一个智能的BAT文件生成器,它不仅能够为Python脚本生成启动脚本,还提供了完整的文... 目录引言运行效果图项目背景与需求分析核心需求技术选型核心功能实现1. 数据库设计2. 界面布局设计3

MySQL 内存使用率常用分析语句

《MySQL内存使用率常用分析语句》用户整理了MySQL内存占用过高的分析方法,涵盖操作系统层确认及数据库层bufferpool、内存模块差值、线程状态、performance_schema性能数据... 目录一、 OS层二、 DB层1. 全局情况2. 内存占js用详情最近连续遇到mysql内存占用过高导致

idea的终端(Terminal)cmd的命令换成linux的命令详解

《idea的终端(Terminal)cmd的命令换成linux的命令详解》本文介绍IDEA配置Git的步骤:安装Git、修改终端设置并重启IDEA,强调顺序,作为个人经验分享,希望提供参考并支持脚本之... 目录一编程、设置前二、前置条件三、android设置四、设置后总结一、php设置前二、前置条件