Python中篇:asyncio的事件循环+高阶API实战

向量数据库大模型NoSQL数据库
Python中篇
  1. asyncio事件循环与高阶API实战

1. 前言

讲解事件循环难免要涉及到API的操作,而API的调用与事件循环关联,所以将二者揉合在一起给大家讲明白高阶API如何与事件循环搭配使用。

2. 事件循环+同步调用API

  
import asyncio  
import time  
  
async def download(url):  
    print("start download url")  
    time.sleep(2)  
    print("end download url")  
  
if __name__ == "\_\_main\_\_":  
    start = time.time()  
    loop = asyncio.get_event_loop()  
    loop.run_until_complete(download("www.baidu.com"))  
    print(time.time()-start)  

结果:

  
start download url  
end download url  
2.0046780109405518  

注意: 首先asyncio已经帮我们实现了基于各个系统(linux/windows/mac)事件循环模块,比如我们这里通过asyncio.get_event_loop()获取事件循环对象,有了循环对象我们才能调度协称,获取协称的结果。

我们看到download下载函数先执行第一个print,再sleep 2s,然后再print,从结果来看就是这样的,没有问题。

但是当我们在并发的情况下,在看看会有什么问题:

  
#只需要修改loop.run\_until\_complete这个入参就行  
#其他都没变化  
loop.run_until_complete(asyncio.wait([download("www.baidu.com") for i in range(2)]))  

结果:

  
start download url  
end download url  
start download url  
end download url  
4.008697748184204  

看到了吧,结果令人非常不适,你期望的结果是2s左右返回,但是为啥4s左右才返回呢?结局真的让人很悲哀。

罪魁祸首是谁呢?就是因为异步代码中有同步调用,即download协称中有time.sleep同步调用。

事件循环是单线程运行的,所以当sleep之后阻塞单线程,没办法继续运行,所以只能变成同步调用,所以你会看到2个download任务同步返回,耗时4s。

那么如何解决这个问题呢?

3. 事件循环+异步调用API

用asyncio.sleep代替time.sleep

  
# 把download中time.sleep替换成await asyncio.sleep  
await asyncio.sleep(2)  

还是上面例子运行如下:

  
start download url  
start download url  
end download url  
end download url  
2.0069119930267334  

我们可以看到两个任务并发运行,结果也是我们想要的2s,几乎可以认为两个任务是同时开始同时结束的,这就达到了 异步编程的要求:要异步,所有的都要异步

4. 创建任务(Task)或者未来对象(Future)

上面我们讲解了事件循环如何调用协称来实现异步编程,其实事件循环不但可以调用协称,而且还可以调度Task或者Future对象,比如:

  
只需要修改loop.run_until_complete的入参即可 其他不用修改  
loop.run_until_complete(asyncio.ensure_future(download("www.baidu.com")))  

这里是通过asyncio.ensure_future创建Future对象,而run_until_complete的入参是多选的,协称/Future/Task都可以当作参数传递进去,比如创建Task对象:

  
# 解释同上  
loop.run_until_complete(loop.create_task(download("www.baidu.com")))  

通过loop.create_task创建Task对象,run_until_complete照样可以接受,为什么呢? 因为Task是Future的派生类,所以二者都可以当作入参传递 。还有asyncio.ensure_future的底层就是调用loop.create_task,所以看似ensure_future是和事件循环无关,但其实内部是有获取事件循环并且调用create_task的。

  
部分源码:  
def ensure_future(coro_or_future, *, loop=None):  
    if coroutines.iscoroutine(coro_or_future):  
        if loop is None: //没有事件循环  
            loop = events.get_event_loop() //获取loop, 这个和main中外面获取的loop是同一个,因为在单线程中并且是线程安全的。  
        task = loop.create_task(coro_or_future)  

以上两者运行结果都是一样的,如下:

  
start download url  
end download url  
2.0043089389801025  

5. asyncio.wait和asyncio.gather的高阶API用法

asyncio.wait我们上面示例都在用,大致意思就是运行所有协称/Task/Future,直到他们完成,而它本身就是一个协称,所以可以被事件循环run_until_complete调度。

我们重点说下asyncio.gather的用法,这个函数是high-level的,就是高度抽象的,它比wait更加灵活和方便。比如:

  
# 和wait用法一样  
loop.run_until_complete(asyncio.gather(download("www.baidu.com")))  

如果更加灵活呢?

  
# 示例1 和wait差不多,就是前面得加*  
    tasks = [download("www.baidu.com") for i in range(2)]  
    loop.run_until_complete(asyncio.gather(*tasks))  

  
# 示例2:分组运行  
    tasks1 = [download("www.baidu.com") for i in range(2)]  
    tasks2 = [download("www.baidu.com") for i in range(2)]  
    loop.run_until_complete(asyncio.gather(*tasks1, *tasks2))  

如果想要接受返回值呢?

  
import asyncio  
import time  
# download上面定义过了  
  
async def run():  
    tasks1 = [download("www.baidu.com") for i in range(2)]  
    tasks2 = [download("www.baidu.com") for i in range(2)]  
    ret = await asyncio.gather(*tasks1, *tasks2)  
    print(ret)  
  
  
if __name__ == "\_\_main\_\_":  
    start = time.time()  
    asyncio.run(run())  
    print(time.time()-start)  

结果:

  
start download url  
start download url  
start download url  
start download url  
end download url  
end download url  
end download url  
end download url  
['hello world', 'hello world', 'hello world', 'hello world']  
2.005855083465576  

我们看到各个协称执行的结果也都获取到了, 日常开发中建议多用gather

6. 协称/Task/Future执行完之后想要回调怎么办?

很简单,虽然这种情况我们不多见,但是万一在项目中需要回调,那么只需要几行代码就可以搞定回调。

  
import asyncio  
import time  
from functools import partial  
  
async def download(url):  
    print("start download url")  
    await asyncio.sleep(2)  
    print("end download url")  
    return "hello world" //协称返回的结果  
  
def callback(url, future): //协称在return之前的回调  
    print("回调了:", url)  
  
if __name__ == "\_\_main\_\_":  
    start = time.time()  
    loop = asyncio.get_event_loop()  
    future = asyncio.gather(download("www.baidu.com"))  
    future.add_done_callback(partial(callback, "www.baidu.com")) //利用partial函数包装callback,因为add_done_callback添加回调只接受一个参数,所以这里必须得用partial包装成一个函数,那相应的callback需要在增加一个参数url,而且这个url必须放在参数前面,这样的话我们就可以在回调的时候传递多个参数了。  
    loop.run_until_complete(future)  
    print("协称运行的结果:", future.result())  
    print(time.time()-start)  

运行结果:

  
start download url  
end download url  
回调了: www.baidu.com  
协称运行的结果: ['hello world']  
2.0060088634490967  

7. 小结

asyncio的高阶API和事件循环相结合起来,可以在python的异步编程中发挥巨大作用,你只需要像编写同步代码那样编写异步代码,只是异步多了几个语法规则而已,但这些都不是阻止你学习异步编程的脚步,你现在要做的就是立即掌握它们并且在项目中投入实战。

8. 关注公众号

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
vivo 容器化平台架构与核心能力建设实践
为了实现规模化降本提效的目标,vivo 确定了基于云原生理念构建容器化生态的目标。在容器化生态发展过程中,平台架构不断演进,并针对业务的痛点和诉求,持续完善容器化能力矩阵。本次演讲将会介绍 vivo 容器化平台及主要子系统的架构设计,并分享重点建设的容器化核心能力。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论