共计 7037 个字符,预计需要花费 18 分钟才能阅读完成。
一、概述
-
asyncio 模块
是 python3.4 版本引入的标准库,直接内置了对异步 IO 的操作
-
编程模式
是一个消息循环,我们从 asyncio 模块中直接获取一个 EventLoop 的引用,然后把需要执行的协程扔到 EventLoop 中执行,就实现了异步 IO
-
说明
到目前为止实现协程的不仅仅只有 asyncio,tornado 和 gevent 都实现了类似功能
-
关键字的说明
关键字 说明 event_loop 消息循环,程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数 coroutine 协程对象,指一个使用 async 关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用 task 任务,一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含了任务的各种状态 future 代表将来执行或没有执行的任务的结果,它和 task 上没有本质上的区别 async/await python3.5 用于定义协程的关键字,async 定义一个协程,await 用于挂起阻塞的异步调用接口
二、asyncio 基本使用
-
定义一个协程
import asyncio import time # 通过 async 关键字定义了一个协程,协程是不能直接运行的,需要将协程放到消息循环中 async def run(x): print("waiting:%d"%x) time.sleep(x) print("结束 run") # 得到一个协程对象 coroutine = run(2) # 创建一个消息循环 # 注意:真实是在 asyncio 模块中获取一个引用 loop = asyncio.get_event_loop() # 将协程对象加入到消息循环 loop.run_until_complete(coroutine)
-
创建一个任务
import asyncio import time async def run(x): print("waiting:%d"%x) time.sleep(x) print("结束 run") coroutine = run(2) # 创建任务 task = asyncio.ensure_future(coroutine) loop = asyncio.get_event_loop() # 协程对象加入到消息循环中,协程对象不能直接运行的,在注册消息循环时 run_until_complete 方法将加入的协程对象包装成一个任务,task 对象时 Future 类的子类对象,保存协程运行后的状态,用于未来获取协程的结果 # loop.run_until_complete(coroutine) # 将任务加入到消息循环 loop.run_until_complete(task)
-
绑定回调
回调:不需要手动调用,触发某种条件才会调用
import time import asyncio async def run(url): print("开始向'%s'要数据……"%(url)) # 向百度要数据,网络 IO asyncio.sleep(5) data = "'%s' 的数据 "%(url) print("给你数据") return data # 定义一个回调函数(不需要手动调用,触发某种条件才会调用) def call_back(future): print("call_back:", future.result()) coroutine = run("百度") # 创建一个任务对象 task = asyncio.ensure_future(coroutine) # 给任务添加回调,在任务结束后调用回调函数 task.add_done_callback(call_back) loop = asyncio.get_event_loop() loop.run_until_complete(task) print("-------main------") while 1: time.sleep(2)
注意:asyncio.sleep(5) 会报 RuntimeWarning
-
阻塞和 await
async 可以定义协程,使用 await 可以针对耗时操作进行挂起,就与生成器的 yield 一样,函数交出控制权。协程遇到 await,消息循环会挂起该协程,执行别的协程,直到其他协程也会挂起或者执行完毕,在进行下一次执行
import time import asyncio async def run(url): print("开始向'%s'要数据……"%(url)) # 向百度要数据,网络 IO await asyncio.sleep(5) data = "'%s' 的数据 "%(url) print("给你数据") return data # 定义一个回调函数(不需要手动调用,触发某种条件才会调用) def call_back(future): print("call_back:", future.result()) coroutine = run("百度") task = asyncio.ensure_future(coroutine) task.add_done_callback(call_back) loop = asyncio.get_event_loop() loop.run_until_complete(task) print("-------main------")
三、多任务
-
同步
同时请求 ” 百度 ”,“阿里”,“腾讯”, “ 新浪 ” 四个网站,假设响应时长均为 2 秒
import time def run(url): print("开始向'%s'要数据……"%(url)) # 向百度要数据,网络 IO time.sleep(2) data = "'%s' 的数据 "%(url) return data if __name__ == "__main__": t1 = time.time() for url in ["百度", "阿里", "腾讯", "新浪"]: print(run(url)) t2 = time.time() print("总耗时:%.2f"%(t2-t1))
-
异步
同时请求 ” 百度 ”,“阿里”,“腾讯”, “ 新浪 ” 四个网站,假设响应时长均为 2 秒
import time import asyncio async def run(url): print("开始向'%s'要数据……"%(url)) await asyncio.sleep(2) data = "'%s' 的数据 "%(url) return data def call_back(future): print("call_back:", future.result()) if __name__ == "__main__": loop = asyncio.get_event_loop() tasks = [] t1 = time.time() for url in ["百度", "阿里", "腾讯", "新浪"]: coroutine = run(url) task = asyncio.ensure_future(coroutine) task.add_done_callback(call_back) tasks.append(task) # 同时添加 4 个异步任务 loop.run_until_complete(asyncio.gather(*tasks)) t2 = time.time() print("总耗时:%.2f" % (t2 - t1))
四、协程嵌套
使用 async 可以定义协程,协程用于耗时的 io 操作,我们也可以封装更多的 io 操作过程,这样就实现了嵌套的协程,即一个协程中 await 了另外一个协程,如此连接起来
import time
import asyncio
async def run(url):
print("开始向'%s'要数据……"%(url))
await asyncio.sleep(2)
data = "'%s' 的数据 "%(url)
return data
def call_back(future):
print("call_back:", future.result())
async def main():
tasks = []
for url in ["百度", "阿里", "腾讯", "新浪"]:
coroutine = run(url)
task = asyncio.ensure_future(coroutine)
# task.add_done_callback(call_back)
tasks.append(task)
# #1、可以没有回调函数
# dones, pendings = await asyncio.wait(tasks)
# #处理数据,类似回调,建议使用回调
# for t in dones:
# print("数据:%s"%(t.result()))
# #2、可以没有回调函数
# results = await asyncio.gather(*tasks)
# # 处理数据,类似回调,建议使用回调
# for result in results:
# print("数据:%s"%(result))
# 3、
# return await asyncio.wait(tasks)
# 4、
# return await asyncio.gather(*tasks)
# 5、
# for t in asyncio.as_completed(tasks):
# await t
# 6、
for t in asyncio.as_completed(tasks):
# 可以没有回调
result = await t
print("数据:%s"%(result))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
t1 = time.time()
#1、
# loop.run_until_complete(main())
#2、
# loop.run_until_complete(main())
# # 3、
# dones, pendings = loop.run_until_complete(main())
# #处理数据,类似回调,建议使用回调
# for t in dones:
# print("数据:%s"%(t.result()))
# 4、
# results = loop.run_until_complete(main())
# for result in results:
# print("数据:%s"%(result))
# 5、
# loop.run_until_complete(main())
# 6、
loop.run_until_complete(main())
t2 = time.time()
print("总耗时:%.2f" % (t2 - t1))
整理协程嵌套
import time
import asyncio
async def run(url):
print("开始向'%s'要数据……"%(url))
await asyncio.sleep(2)
data = "'%s' 的数据 "%(url)
return data
def call_back(future):
print("call_back:", future.result())
async def main():
tasks = []
for url in ["百度", "阿里", "腾讯", "新浪"]:
coroutine = run(url)
task = asyncio.ensure_future(coroutine)
task.add_done_callback(call_back)
tasks.append(task)
await asyncio.wait(tasks)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
t1 = time.time()
loop.run_until_complete(main())
t2 = time.time()
print("总耗时:%.2f" % (t2 - t1))
五、消息循环在另一个线程中启动
很多时候,我们的事件循环用于注册协程,而有的协程需要动态的添加到事件循环中。一个简单的方式就是使用多线程。当前线程创建一个事件循环,然后在新建一个线程,在新线程中启动事件循环。当前线程不会被 block
import asyncio
import threading
import time
def run(url):
print("开始向'%s'要数据……"%(url))
time.sleep(2)
data = "'%s' 的数据 "%(url)
print("结束请求")
return data
def start_loop(loop):
# 启动消息循环
asyncio.set_event_loop(loop)
loop.run_forever()
if __name__ == "__main__":
# 创建消息循环 (类似死循环)
# 注意:此时消息循环没有启动
loop = asyncio.get_event_loop()
threading.Thread(target=start_loop, args=(loop,)).start()
t1 = time.time()
# 给消息循环添加任务
# loop.run_until_complete(create_tasks())
loop.call_soon_threadsafe(run, "百度")
loop.call_soon_threadsafe(run, "腾讯")
t2 = time.time()
print("总耗时:%.2f" % (t2 - t1))
六、asyncio 终极使用
使用到协程嵌套与消息循环在另一个线程中启动相关联
import asyncio
import threading
async def run(url):
print("开始向'%s'要数据……"%(url))
await asyncio.sleep(2)
data = "'%s' 的数据 "%(url)
print("结束请求")
return data
def call_back(future):
print("call_back:", future.result())
async def create_tasks():
tasks = []
for url in ["百度", "阿里", "腾讯", "新浪"]:
coroutine = run(url)
task = asyncio.ensure_future(coroutine)
task.add_done_callback(call_back)
tasks.append(task)
await asyncio.wait(tasks)
def start_loop(loop):
# 启动消息循环
asyncio.set_event_loop(loop)
loop.run_forever()
if __name__ == "__main__":
# 创建消息循环 (类似死循环)
# 注意:此时消息循环没有启动
loop = asyncio.get_event_loop()
threading.Thread(target=start_loop, args=(loop,)).start()
# 给消息循环添加任务
asyncio.run_coroutine_threadsafe(create_tasks(), loop)
# asyncio.run_coroutine_threadsafe(run("百度"), loop)
# asyncio.run_coroutine_threadsafe(run("腾讯"), loop)
# asyncio.run_coroutine_threadsafe(run("阿里"), loop)
# asyncio.run_coroutine_threadsafe(run("新浪"), loop)
七、获取网页信息
import asyncio
import threading
async def run(url):
print("开始加载'%s'页面……" % (url))
# 发起链接,耗时 IO
connet = asyncio.open_connection(url, 80)
reader, writer = await connet
# 链接成功
# 发起请求,耗时 IO
header = "GET / HTTP/1.0\r\nHost: %s\r\n\r\n"%(url)
writer.write(header.encode("utf-8"))
await writer.drain()
# 接收数据
with open(url + ".html", "wb") as fp:
while True:
line = await reader.readline()
if line == b"\r\n":
break
else:
fp.write(line)
fp.flush()
async def create_tasks():
tasks = []
for url in ["www.baidu.com", "www.zutuanxue.com", "www.sina.com.cn"]:
coroutine = run(url)
task = asyncio.ensure_future(coroutine)
tasks.append(task)
await asyncio.wait(tasks)
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
threading.Thread(target=start_loop, args=(loop,)).start()
asyncio.run_coroutine_threadsafe(create_tasks(), loop)