异步:通过一个线程利用其 IO 等待事件去做一些其他事情。(用更少的资源做更多的事情)

协程(微线程)

协程不是计算机提供的,而是程序员人为创造。(让一个线程在代码中游走的运行

实现:

  • greenlet (早期:每一次都要人为的去指向下一个该执行的协程)
  • gevent (基于 greenlet :每次遇到 io 操作,需要耗时等待时,会自动跳到下一个协程继续执行)
  • yield ( 在一个函数或者对象运行中,通过生成器暂停其运行 插入其他对象)
  • asyncio (py3.4 装饰器实现, 3.8 已不支持装饰器实现)
  • async, await (py3.5 推荐,本篇重点介绍)

示例代码

基于协程的异步编程 async, await

事件循环

管理所有的事件,在整个程序运行过程中不断循环执行并追踪事件发生的顺序将它们放在队列中, 空闲时调用相应的事件处理器来处理这些事件。

  1. 协程函数: 定义函数的时候使用 async def funcname
  2. 协程对象:执行协程函数得到协程对象。(执行协程函数创建协程对象,函数代码不会立即执行
  3. 想要执行协程函数内部代码,必须要将协程对象交给事件循环处理。
import asyncio
async def func():
    print('协程')

result = func()

# loop = asyncio.get_event_loop()
# loop.run_until_complete(result)
asyncio.run(result)  # python3.7

await

await + 可等待的对象(协程对象, Future, Task 对象, IO 等待)

遇到耗时操作时挂起当前协程(任务),等 IO 操作完成后在继续往下执行。当前协程挂起时,事件循环可以去执行 其他协程任务 (注意:遇到耗时操作必须要手动挂起 )


import asyncio

async def other():
    print('start')
    await asyncio.sleep(2)
    print('end')
    return 'other over'

async def func():
    print('执行协程函数的内部代码')

    ret = await other()
    ret2 = await other()  # 程序会先 等待 other 执行完在执行此语句。此时事件循环回去执行其他协程任务
    print("other return: ", ret)
    print("other return 2: ", ret2)


if __name__ == '__main__':
    asyncio.run(func())

Task

在事件循环中添加多个任务

Tasks 用于井发调度协程,通过 asyncio.create_task(协程对象)的方式创建 Task 对象,这样可以让协程加入事件 循环中等待被调度执行。除了使用 asyncio.create_task()函数(python3.7)以外,还可以用低层级的 loop.create_task()loop.ensure_ future()函数(loop = asyncio.get_event_loop())。不建议手动实例化 Task 对象。

import asyncio

async def other(temp):
    print('start-', temp)
    await asyncio.sleep(2)
    print('end-', temp)
    return 'other over'

async def func():
    print('执行协程函数的内部代码')

    # 创建task 对象,将当前执行函数任务添加到事件循环(此时 在执行此函数func前, 事件循环已在 .run中创建
    tasks = [asyncio.create_task(other(i), name=f'task{i}') for i in range(3)]

    # 等价于:
    # await tasks[0]
    # ...

    # timeout 允许任务最长等待时间,(超过此时间不再执行此任务 返回pending未完成 ??)。
    # Returns two sets of Future: (done, pending).
    done, pending = await asyncio.wait(tasks, timeout=None)

    print("done: ", done)
    print('pending: ', pending)


if __name__ == '__main__':
    asyncio.run(func())

asyncio.create_task()将当前任务添加到事件循环。注意:在执行此函数 func 前, 事件循环已在 asyncio.run()创建。

如果在外部使用 asyncio.create_task(), 注意需要事件循环的建立。可以考虑直接使用 asyncio.wait([协程函数对象, ]), 他会在内部帮你创建 task 对象

外部创建 task 对象

# 不建议此种实现
import asyncio

async def other(temp):
    print('start-', temp)
    await asyncio.sleep(2)
    print('end-', temp)
    return 'other over'

if __name__ == '__main__':
    tasks = [other(i) for i in range(3)]
    done, pending = asyncio.run(asyncio.wait(tasks))

    print("done: ", done)
    print('pending: ', pending)

协程延伸 Future 对象

asyncio.Future 对象

Task 继承 Future , Task 对象内部 await 结果的处理基于 Future 对象而来的。

import asyncio

async def set_after(fut):
    await asyncio.sleep(2)
    fut.set_result('666')

async def main():
    loop = asyncio.get_running_loop()

    # 创建一个任务 Future对象,没有绑定任何行为,则这个任务永远不知到何时结束
    fut = loop.create_future()

    # 创建一个任务 Task对象,绑定了 set_after函数,函数内部会在 2s后给 fut赋值。
    # 即手动设置 Future任务的最终结果,那么fut就可以结束
    await loop.create_task(set_after(fut))

    # 等待 Future队形获取最终结果,否则一直等待
    data = await fut
    print(data)


asyncio.run(main())

concurrent.futures.Future

使用线程池, 进程池实现异步操作时用到的对象

from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
from time import sleep


def func(value):
    sleep(1)
    print(value)


# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)

# 创建进程池
# pool = ProcessPoolExecutor(max_workers=5)

for i in range(10):
    fut = pool.submit(func, i)
    print(fut)

更多: 进程线程介绍...

concurrent.futures.Future + asyncio 交叉使用( 非 async 函数对 await 的支持)。

可能会存在 交叉使用 协程,进程,线程。如: crm 项目 80%都是基于协程异步编程 + MySQL(不支持) [ 则需要使用:线程,进程做异步编程 ]

import asyncio
import concurrent.futures
from time import sleep

def func():
    sleep(2)
    return '某个耗时操作'

async def main():
    loop = asyncio.get_event_loop()

    # 1. Run in the default loop's executor (默认为 ThreadPoolExecutor)
    # a. 内部会先调用 ThreadPoolExecutor 的 submit方法去线程池中申请一个线程去执行 func函数,并返回一个 concurrent.futures.Future对象。
    # b. 调用 asyncio.wrap_future将 concurrent.futures.Future对象包装为 asyncio.Future对象。( concurrent.futures.Future并不支持 await语法
    fut = loop.run_in_executor(None, func)
    result = await fut
    print('default thread pool', result)

    # 2. Run in a custom thread pool:
    # with concurrent.futures.ThreadPoolExecutor() as pool:
    #     result = await loop.run_in_executor(pool, func)
    #     print('custom thread pool', result)

    # 3. Run in a custom Process pool:
    # with concurrent.futures.ProcessPoolExecutor() as pool:
    #     result = await loop.run_in_executor(pool, func)
    #     print('custom process pool', result)

asyncio.run(main())

异步迭代器

迭代器:内部实现了 iter 方法和 next 方法的对象

可迭代对象:在类里面实现了 __iter__ 方法并且返回一个迭代器

Python 可迭代对象,迭代器,生成器的区别

异步迭代器:实现了 __aiter__()__anext__()方法的对象。 __anext__必须返回一个 awaitable 对象。 async for会 处理异步送代器的__anext__()方法所返回的可等待对象,直到其引发一个 StopAsyncIteration 异常。由 PEP 492 引入。

异步可送代对象: 可在 async for语句中被使用的对象。必须通过它的 __aiter__()方法返回一个 asynchronous iterator。由 PEP 492 引入。

import asyncio

class Reader:
    """ 自定义异步迭达器,(同时也是异步可迭达对象 """

    def __init__(self):
        self.count = 0

    async def readline(self):
        await asyncio.sleep(1)
        self.count += 1
        if self.count == 100:
            return None
        return self.count

    def __aiter__(self):
        return self

    async def __anext__(self):
        val = await self.readline()

        if val is None:
            raise StopAsyncIteration
        return val

async def func():
    # async for只能写在协程函数内
    async for item in Reader():
        print(item)

if __name__ == '__main__':
    asyncio.run(func())

异步的上下文管理器

此种对象通过定义 __aenter__()__aexit__()方法来对 async with语句中的环境进行控制。由 PEP 492 引入

import asyncio

class AsyncContextManager:
    def __init__(self, conn):
        self.conn = conn

    async def do_something(self):
        # 异步操作数据库
        return '..'

    async def __aenter__(self):
        # 异步链接数据库
        self.conn = await asyncio.sleep(1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 异步关闭数据库连接
        await asyncio.sleep(1)

async def func():
    # async with 必须嵌套在协程函数里面
    async with AsyncContextManager() as f:
        result = await f.do_something()
        print(result)

if __name__ == '__main__':
    asyncio.run(func())

一般很少手动实现异步上下文,主要是看别人封装的模块里面是否实现 __aenter__()__aexit__()方法。有则可以使用 async with语句

uvloop

asyncio 的事件循环的替代方案。它的事件循环效率 大于默认 asyncio 的事件循环。(性能与 go 比肩 ?)

import asyncio
import unloop
# uvloop does not support Windows at the moment
asyncio.set_event_loop_policy(unloop.EventloopPolicy())

# 编写 asyncio的代码。与之前代码一致
# 内部的事件循环自动化会变为 uvloop

注意: asgi(支持异步的网关服务接口) 》 uvicorn内部使用的就是 uvloop

异步案例代码:数据库,网络框架,爬虫


相关推荐:

来自系列:多任务,异步

分类 python下文章:

1.0 爬虫的介绍,和requests模块的简单使用

1.1 数据解析的三种方式。正则表达式, bs4, xpath

2.0 多任务(进程,协程,线程)爬虫:验证码识别,返回头储存,ip代理 介绍。 异步是什么,爬虫异步的方式。线程,进程,介绍

2.0.1 协程的 async/await 实现 爬虫 单线程 + 异步协程的实现

3.0 基于selenium 模块的 爬虫操作。 selenium 是一个用于Web应用程序测试的工具。Selenium测试直接运行在浏览器中,就像真正的用户在操作一样。

更多...

评论([[comments.sum]])

发表

加载更多([[item.son.length-2]])...

发表

2020-11 By chuan.

Python-flask & bootstrap-flask

图片外链来自:fghrsh

互联网ICP备案号:蜀ICP备2020031846号