« 上一篇下一篇 »

asyncio要点

asyncio是一个异步并发库, 注意是并发, 不是并行. 利用 async/await 组合

`可等待对象`有三种主要类型: coroutine,Task,Future, `可等待对象`实现了__await__方法 (Task派生至Future)

coroutine 协程,实现异步模型的基本单元

Future 主要用于coroutine之间`执行权`的切换, 内部本质用的是yield, await future -> __await__ -> yeild,
这个类很大程度上模仿concurrent.futures.Future

Task将(coroutine,Future)整合到一起, 成为一个task, 然后交给loop去执行. add_done_callback可以
为Task添加完成通知回调

loop是一个任务执行队列循环, 底层用类似select机制, linux上可以用epoll, windows上可以
用wsaeventselect或者`完成端口`来实现`单线程多任务`异步非阻塞IO操作

asyncio.run -> loop.run_until_complete -> loop.run_forever()

下面是并发示例代码:

#==========================================================================
#==========================================================================

async def test1():
    """
    asyncio.sleep内部创建了一个future对象和一个定时运行的回调函数,
    然后await等待这个future对象,这样会交出`执行权`,
    然后等到指定时间回调函数被调用,在回调函数里面
    调用future.set_result函数,这样asyncio.sleep将重新获取`执行权`,并返回.
    代码如下:
    async def sleep(delay, result=None, *, loop=None):
        if delay <= 0:
            await __sleep0()
            return result
        
        if loop is None:
            loop = events.get_event_loop()
        future = loop.create_future()  #创建一个future对象
        h = loop.call_later(delay,
                            futures._set_result_unless_cancelled, #这个回调函数里面会调用future.set_result
                            future, result)
        try:
            return await future #进入等待状态,直到回调函数futures._set_result_unless_cancelled被调用
        finally:
            h.cancel()
    """
    obj = asyncio.sleep(3) #生成`协程对象`,并不是真正执行`协程函数`
    await obj              #真正执行协程函数,并且等待完成
    print("test1 done")

async def test2():
    obj = asyncio.sleep(3)
    await obj
    print("test2 done")

async def main():
    """
    方式一:
    这里是串行执行, 3+3 一共要等待6秒才完成
    """
    obj1 = test1()   #生成`协程对象`,并不是真正执行`协程函数`
    obj2 = test2()   #生成`协程对象`,并不是真正执行`协程函数`
    #obj1.send(None) #协程对象可以这样运行
    await obj1 #真正执行协程函数,并且等待完成
    await obj2 #真正执行协程函数,并且等待完成
    
    """
    方式二:
    这里是并发执行, 一共只需要大约3秒就完成
    """
    obj1 = test1()  #生成`协程对象`,并不是真正执行`协程函数`
    obj2 = test2()  #生成`协程对象`,并不是真正执行`协程函数`
    task1 = asyncio.create_task(obj1)#创建任务对象,并且加入到执行列表里面,Task派生至Future,所以这里可以看成是future对象
    task2 = asyncio.create_task(obj2)#创建任务对象,并且加入到执行列表里面,Task派生至Future,所以这里可以看成是future对象
    """
    因为上面asyncio.create_task把两个协程都排入了执行队列,
    具有并发执行的性质,所以下面虽然从代码看上去是一个接一个在等待,
    但两个任务实际是并发完成的,所以一共只需要大约3秒就完成了
    """
    await task1#其实就是等待一个future对象,task内部执行完协程后,会在这个future对象上调用future.set_result,这样await就会返回,后面的代码继续运行
    await task2#其实就是等待一个future对象,task内部执行完协程后,会在这个future对象上调用future.set_result,这样await就会返回,后面的代码继续运行
    
    """
    方式三:
    这里也是并发执行,一共只需要大约3秒就完成,而且从代码上看更容易理解是并发执行,不会产生歧义.
    
    asyncio.wait内部主要流程是会利用add_done_callback为传入的每个任务添加同一个`任务完成回调通知`函数,
    然后await等待一个future对象,等任务完成后,`任务完成回调通知`函数被调用,这个函数里面有个计数器,
    当计数为0,代表所有任务都完成,然后就会调用future.set_result,这样前面说的await调用就会返回,然后asyncio.wait就会返回
   
    代码如下:
    async def _wait(fs, timeout, return_when, loop):
    
        assert fs, 'Set of Futures is empty.'
        waiter = loop.create_future() #创建一个future对象,后面会用到
        timeout_handle = None
        if timeout is not None:
            timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        counter = len(fs)
        #下面就是`任务完成回调通知`函数
        def _on_completion(f):
            nonlocal counter #计数器
            counter -= 1 #每完成一个任务计数器减一
            if (counter <= 0 or #计数器为0意味着任务都执行完毕了
                return_when == FIRST_COMPLETED or
                return_when == FIRST_EXCEPTION and (not f.cancelled() and
                                                    f.exception() is not None)):
                if timeout_handle is not None:
                    timeout_handle.cancel()
                if not waiter.done():
                    waiter.set_result(None) #任务都执行完毕,调用set_result,这样后面await waiter代码就会返回,流程得以继续
    
        for f in fs:
            f.add_done_callback(_on_completion) #为每个任务添加`任务完成回调通知`函数
    
        try:
            await waiter #等待之前创建的future对象
        finally:
            if timeout_handle is not None:
                timeout_handle.cancel()
    
        done, pending = set(), set()
        for f in fs:
            f.remove_done_callback(_on_completion)
            if f.done():
                done.add(f)
            else:
                pending.add(f)
        return done, pending
    """
    obj1 = test1()  #生成`协程对象`,并不是真正执行`协程函数`
    obj2 = test2()  #生成`协程对象`,并不是真正执行`协程函数`
    task1 = asyncio.create_task(obj1)#创建任务对象,并且加入到执行列表里面,Task派生至Future,所以这里可以看成是future对象
    task2 = asyncio.create_task(obj2)#创建任务对象,并且加入到执行列表里面,Task派生至Future,所以这里可以看成是future对象
    l = [task1, task2]
    obj = asyncio.wait(l)       #生成`协程对象`,并不是真正执行`协程函数`
    done, pending = await obj   #真正执行协程函数,并且等待完成

asyncio.run(main())

#还可以如下方式调用
#loop = asyncio.get_event_loop()
#loop.run_until_complete(main())

#还可以如下方式调用
#loop = asyncio.get_event_loop()
#t = loop.create_task(main())
#loop.run_until_complete(t)

#还可以如下方式调用,这种方式程序永远不会退出
#loop = asyncio.get_event_loop()
#loop.create_task(main())
#loop.run_forever()

exit()

#==========================================================================
#==========================================================================

若干参考资料:

asyncio源码: Anaconda3/Lib/asyncio/*.py 例如futures.py,tasks.py,coroutines.py,events.py,runners.py等

https://docs.python.org/zh-cn/3/library/asyncio-task.html  《协程与任务》
https://zhuanlan.zhihu.com/p/27258289     《Python Async/Await入门指南》
https://www.cnblogs.com/pigerhan/p/3474217.html 《两种IO模式:Proactor与Reactor模式》
https://www.cnblogs.com/harelion/p/8496360.html 《Python 3.5中async/await的工作机制》