Kyle's Notebook

Python 异步编程总结

Word count: 3.9kReading time: 18 min
2019/10/04

Python 异步编程总结

Select,回调,事件循环

对于 Python 异步编程,需要先搞清楚以下这些概念:

  • 回调函数:提供函数供一定条件满足后调用(回调函数中都是非 I/O 操作,性能很高);

  • 事件循环:不断循环列表请求句柄状态,发现状态变化时执行回调函数;

  • 单线程:驱动程序运行的 loop 是单线程运行(不会有内存消耗和切换问题)、非阻塞的,只会对就绪句柄执行回调函数,不会等待 I/O(除非所有句柄都在等待)。

以一段简单的爬虫代码说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import time
import socket
from urllib.parse import urlparse
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector() # select 函数更高层次的封装,根据环境可以自动选择 select、poll 或 epoll
urls = []
stop = False

class Fetcher:
def connected(self, key):
'''
回调函数
:param key:
:return:
'''
# 执行回调函数时,首先要对句柄取消注册
selector.unregister(key.fd)
self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8"))

# 注册句柄,监听读状态,执行回调函数 readable
selector.register(self.client.fileno(), EVENT_READ, self.readable)

def readable(self, key):
d = self.client.recv(1024)
if d:
self.data += d
else:
selector.unregister(key.fd) # 数据读取完成
data = self.data.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
print(html_data)
self.client.close()
urls.remove(self.spider_url)
if not urls:
global stop
stop = True

def get_url(self, url):
self.spider_url = url
url = urlparse(url)
self.host = url.netloc
self.path = url.path
self.data = b""
if self.path == "":
self.path = "/"

# 建立 socket 连接
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# select 需要非阻塞 I/O
self.client.setblocking(False)
try:
self.client.connect((self.host, 80))
except BlockingIOError as e:
pass

# 注册句柄,当发生写事件时,执行回调函数 connected
selector.register(self.client.fileno(), EVENT_WRITE, self.connected)


def loop():
# 事件循环,不停请求 socket 的状态并调用对应的回调函数
# select 本身是不支持 register 模式(selector 是对 select 的封装,提供了 register)
# socket 状态变化以后的回调由程序员完成
while not stop:
ready = selector.select()
for key, mask in ready:
call_back = key.data # 执行注册时执行的回调函数
call_back(key)

# 异步
fetcher = Fetcher()
start_time = time.time()
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
urls.append(url)
fetcher = Fetcher()
fetcher.get_url(url)
loop()
print(time.time() - start_time)

# 同步(注意 self.client.setblocking(True))
start_time = time.time()
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
get_(url)
print(time.time() - start_time)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import inspect
import socket

def get_socket_data(): # 模拟从 socket 中获取数据,唤醒 downloader
yield "ywh" # 如发生异常,则会抛出给 downloader

def downloader(url): # 主方法中不能添加耗时操作
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setblocking(False)

try:
client.connect((host, 80)) # 阻塞不会消耗 cpu
except BlockingIOError as e:
pass

selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
source = yield from get_socket_data() # 暂停,直到 socket 获取到数据再往下执行
html_data = source.decode("utf8").split("\r\n\r\n")[1]
print(html_data)

def download_html(html):
html = yield from downloader()

if __name__ == "__main__":
# 协程的调度:事件循环 + 协程模式(单线程)
pass

协程

使用以上 I/O 模型 API 存在以下问题:

  • 回调:代码可读性差,共享状态管理困难,异常处理困难;

  • 多线程:线程间同步、锁并发性能差,线程创建消耗内存大、切换开销大;

  • 同步:并发度低。

因此可考虑使用协程:

  • 采用同步的方式编写异步(事件循环 + I/O 多路复用)代码代替回调,使用单线程切换任务(不再需要锁);

  • 自主编写调度函数,并发性能远高于线程间切换;

  • 调度函数有多个入口:遇到 I/O 操作把当前函数暂停、切换到另一个函数执行,在适当时候恢复。

  • 使用生成器(见“迭代器,生成器”)结合事件循环可实现协程;

  • 协程 + 事件循环的效率不比回调 + 事件循环高,其目的在于简便地解决回调复杂的问题。

为了将语义变得更加明确,Python 3.5 后引入了 async 和 await 关键词用于定义原生协程;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import types

async def downloader(url): # 使用原生协程
return "ywh"

@types.coroutine
def downloader(url): # 使用生成器实现协程
yield "ywh"

async def download_url(url): # async 和 await 必须成对使用
html = await downloader(url) # await:执行费时操作(生成器不能直接用于 await,要加上装饰器或 async)
return html


if __name__ == "__main__":
coro = download_url("http://www.test.com")
# next(None)
coro.send(None) # 使用原生协程只能使用 send(None)

使用生成器实现协程:

  • 一般的生成器只能作为生产者,实现为协程则可以消费外部传入的数据;

  • 使用 value = yield from xxx 的生成器表示返回值给调用方、且调用方通过 send 方法传值给生成器函数;

  • 主函数中不能添加耗时的逻辑,如把I/O操作通过 yield from 做异步处理;

  • 最终实现通过同步的方式编写异步代码:在适当时候暂停、恢复启动函数。

获取生成器的状态:

1
2
3
4
5
6
7
8
9
10
import inspect
def gen_func():
yield 1
return "ywh"

if __name__ == "__main__":
gen = gen_func()
print(inspect.getgeneratorstate(gen))
next(gen)
print(inspect.getgeneratorstate(gen))

实现协程:

1
2
3
def gen_func():
value = yield 1
return "ywh"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import inspect
import socket

def get_socket_data(): # 模拟从 socket 中获取数据,唤醒 downloader
yield "ywh" # 如发生异常,则会抛出给 downloader

def downloader(url): # 主方法中不能添加耗时操作
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setblocking(False)

try:
client.connect((host, 80)) # 阻塞不会消耗 cpu
except BlockingIOError as e:
pass

selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
source = yield from get_socket_data() # 暂停,直到 socket 获取到数据再往下执行
html_data = source.decode("utf8").split("\r\n\r\n")[1]
print(html_data)

def download_html(html):
html = yield from downloader()

if __name__ == "__main__":
# 协程的调度:事件循环 + 协程模式(单线程)
pass

asyncio 简介

asyncio 模块:asyncio 是 Python 用于解决异步 I/O 编程的整套解决方案。

  • 高并发编程三个要素:事件循环+ I/O 多路复用 + 回调函数(驱动生成器,即协程);

  • 包括各种特定系统实现的模块化事件循环(select、poll、epoll);

  • 传输和协议抽象;

  • 对TCP、UDP、SSL、子进程、延时调用以及其他的具体支持;

  • 模仿 futures 模块但适用于事件循环使用的 Future 类;

  • 基于 yield from 的协议和任务,可以顺序的方式编写并发代码;

  • 必须使用一个将产生阻塞 I/O 的调用时,有接口可以把这个事件转义到线程池;

  • 模仿 threading 模块中的同步原语、可以用在单线程的协程之间;

  • 关键词 async 定义协程,await 异步调用。

基于asyncio的框架:tornado(实现可直接部署的 web 服务器)、gevent、twisted(scrapy,django channels),使用这些框架必须有对应的异步驱动支持(如 tornado 中使用 pymysql 则不能异步)。

asyncio 具备以下特点:

  • 单线程,所有的函数调用都在 loop 中执行(如执行耗时操作则会等待完成后才执行下一个);

  • 使用 await asyncio.sleep(2)time.sleep(2) 的区别是前者会立即返回一个 Future 对象,下次循环判断是否已经过 2s,而不是阻塞等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio
import time

# 定义异步函数
async def get_html(url):
print("start get url")
await asyncio.sleep(2)
# time.sleep(5) # 协程是单线程执行,time.sleep 是同步阻塞接口,不应在协程中实现
print("end get url")

start_time = time.time()
loop = asyncio.get_event_loop() # 创建事件循环(单线程:所有的函数调用都在 loop 中执行)
tasks = [
get_html("http://www.test.com") for i in range(10)
]
loop.run_until_complete(asyncio.wait(tasks)) # 类似 join,等待协程执行完成才往下执行
print(time.time() - start_time)

如何获取协程返回值?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import time
from functools import partial

async def get_html(url):
print("start get url")
await asyncio.sleep(2)
return "ywh"

def callback(url, future):
print(url)
print("send email to ywh")

start_time = time.time()
loop = asyncio.get_event_loop()
# get_future = asyncio.ensure_future(get_html("http://www.test.com"))
task = loop.create_task(get_html("http://www.test.com")) # 创建一个任务,返回 Future 对象
task.add_done_callback(
partial(callback, "http://www.test.com") # 把 callback 包装可接收参数的偏函数
) # 执行 get_html,完成后调用回调函数 callback,最后再返回 get_html 的结果
loop.run_until_complete(task)
print(task.result()) # 获取 Future 对象的结果

wait 与 gather:gather 是更高层次的封装,可以将 task 分组管理;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
import time


async def get_html(url):
print("start get url")
await asyncio.sleep(2)
print("end get url")


if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
tasks = [get_html("http://www.test.com") for i in range(10)]
# loop.run_until_complete(asyncio.gather(*tasks))
# print(time.time() - start_time)

group1 = [get_html("http://projectsedu.com") for i in range(2)]
group2 = [get_html("http://www.test.com") for i in range(2)]

group1 = asyncio.gather(*group1)
group2 = asyncio.gather(*group2)
group2.cancel()

loop.run_until_complete(asyncio.gather(group1, group2))
print(time.time() - start_time)

协程的取消,嵌套

1
2
3
4
5
import asyncio

loop = asyncio.get_event_loop()
loop.run_forever() # 一直运行不会停止
loop.run_until_complete() # 运行指定协程后停止

取消future(task)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
import time

async def get_html(sleep_times):
print("waiting")
await asyncio.sleep(sleep_times)
print("done after {}s".format(sleep_times))

tasks = [get_html(2), get_html(3), get_html(4)] # 模拟三个执行时长不同的任务
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e: # 人为制造取消信号:捕捉键盘 ctrl + c 异常
all_tasks = asyncio.Task.all_tasks() # 不需要传入loop:自动从events.get_event_loop 中获取 loop,并获取 loop 中的所有 task
for task in all_tasks: # 获取所有 tasks
print(task.cancel()) # 取消 task,返回取消结果
loop.stop()
loop.run_forever() # loop 调用 stop 后必须重新调用 run_forever,否则会抛出异常
finally:
loop.close()

嵌套协程的调度过程(task -> print_sum -> compute):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio

# print_sum 协程嵌套 await_compute 协程
async def compute(x, y):
print("Compute %s + %s ..." %(x, y))
await asyncio.sleep(1.0) # 3
return x + y # 4

async def print_sum(x, y):
result = await compute(x, y) # 2

print("%s + %s = %s" %(x, y, result)) # 5

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2)) # 1
loop.close()
  • 创建 loop 和需要提交到 loop 的 task(通过 task 驱动协程执行),执行;

  • 协程中的 await 相当于 yield from,在 task 和 compute 子协程之间建立一个通道,此时进入 compute 调度,print_sum 暂停;

  • 子协程 compute 中的 await 表示暂停,不经过 print_sum、直接返回给 task 再返回给 loop,等待1s;

  • 1s 过后,task 经通道询问 compute,compute 计算好结果值会抛出异常(StopIteration)并返回计算结果,compute 协程标记完成;

  • print_sum 捕捉到 compute 的异常、提取结果值,最后会把异常抛出给 task,print_sum 协程标记完成。

call_at,call_soon,call_later,call_soon_threadsafe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio

def callback(sleep_times, loop):
print("success time {}".format(loop.time()))

def stoploop(loop):
loop.stop()

loop = asyncio.get_event_loop()
now = loop.time()
loop.call_at(now + 2, callback, 2, loop) # 当前时间的 2s 后执行
loop.call_at(now + 1, callback, 1, loop)
loop.call_at(now + 3, callback, 3, loop)
# loop.call_soon(stoploop, loop) # 退出循环
loop.call_soon(callback, 4, loop) # 立刻执行(在队列中等待到下一个循环即执行),另外还有 call_later

# 处理共享变量的线程安全问题可以使用 call_soon_threadsafe
loop.run_forever()

如何在协程中集成线程池?

在协程中集成阻塞 I/O:对于阻塞的库和接口(如 pymysql ),协程中要使用多线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio
from concurrent.futures import ThreadPoolExecutor
import socket
import time
from urllib.parse import urlparse

def get_url(url):
url = urlparse(url)
host = url.netloc
path = url.path
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# client.setblocking(False)
client.connect((host, 80))
client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
data = b""
while True:
d = client.recv(1024)
if d:
data += d
else:
break

data = data.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
print(html_data)
client.close()

start_time = time.time()
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(3)
tasks = []
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
task = loop.run_in_executor(executor, get_url, url) # 指定线程池,将阻塞 I/O 操作放入 loop 中,不影响系统运行
# 把线程中的 future 包装成协程的 future
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
print("last time:{}".format(time.time() - start_time))

模拟 Http 请求

requests 是同步的 http 请求模块,在 asyncio 不能达到异步的效果,而 asyncio 本身没有提供 http 协议的接口,可以使用 aiohttp:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import time
import asyncio
import socket
from urllib.parse import urlparse

async def get_url(url):
url = urlparse(url)
host = url.netloc
path = url.path
if path == "":
path = "/"

# 建立 socket 连接比较费时,使用 await
reader, writer = await asyncio.open_connection(host, 80)
writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
all_lines = []
async for raw_line in reader: # 异步化的 for 循环
data = raw_line.decode("utf8")
all_lines.append(data)
html = "\n".join(all_lines)
return html

async def main():
tasks = []

for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
tasks.append(asyncio.ensure_future(get_url(url)))

for task in asyncio.as_completed(tasks):
result = await task
print(result)

if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print('last time:{}'.format(time.time() - start_time))

future 和 task

见 asyncio 源码

同步与通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import aiohttp
import asyncio
from asyncio import Lock, Queue # await,区别于多线程的 Queue(阻塞)

cache = {}
queue = [] # 协程是单线程,因此使用 list、dict 就可以实现通信,而不会有线程安全问题
lock = Lock()

async def get_stuff(url):
# 如果没有同步机制,此处可能会被两个协程都对同一个 url 发起请求(耗时、被反爬)
async with lock: # 加锁,避免 parse_stuff 和 use_stuff 同时执行这部分代码
# 由__await__、__aenter__实现,等价于await lock.acquire()、lock.release()
if url in cache:
return cache[url]
stuff = await aiohttp.request('GET', url)
cache[url] = stuff
return stuff

async def parse_stuff():
stuff = await get_stuff()

async def use_stuff():
stuff = await get_stuff()

tasks = [parse_stuff(), use_stuff()] # 当协程函数中没有 await,则会按加入 tasks 顺序执行

实例:基于 asyncio 协程的高并发爬虫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import re
import asyncio
import aiohttp
import aiomysql
from pyquery import PyQuery

# https://www.lfd.uci.edu/


stop_flag = False
start_url = "http://www.jobbole.com/"
waitting_urls = []
seen_urls = set()
sem = asyncio.Semaphore(3)


async def fetch(url, session):
"""
发送 http请求
:param url:
:return:
"""
async with sem: # 并发度控制
await asyncio.sleep(1) # 爬取速度控制
try:
async with session.get(url) as resp:
print('url statis: {0}'.format(resp.status))
if resp.status in [200, 201]:
data = await resp.text()
return data
except Exception as e:
print(e)


def extract_urls(html):
"""
从请求页面中获取下次要请求 url
:param html:
:return:
"""
urls = []
pq = PyQuery(html)
for link in pq.items('a'):
url = link.attr('href')
if url and url.startswith('http') and url not in seen_urls:
urls.append(url)
waitting_urls.append(url)
return urls


async def article_handler(url, session, pool):
"""
获取文章详情并解析入库
:param url:
:param session:
:return:
"""
html = await fetch(url, session)
seen_urls.add(url)
extract_urls(html)
pq = PyQuery(html)
title = pq('title').text() # 省略其他字段
print(title)
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("sql")
insert_sql = """
INSERT INTO xxx
"""
print(cur.description)
await cur.execute(insert_sql)


async def init_urls(url, session):
"""
解析页面,
:param url:
:param session:
:return:
"""

html = await fetch(url, session)
seen_urls.add(url)
extract_urls(html)


async def consumer(pool, session):
# async with aiohttp.ClientSession() as session: # 发送 http 请求需要的 session
while not stop_flag:
if len(waitting_urls) == 0:
await asyncio.sleep(0.5)
continue
url = waitting_urls.pop()
print('start get url: ' + url)

# 详情页协程,解析页面内容、入库
if re.match('http://.*?jobbole.com/\d+/', url):
if url not in seen_urls:
asyncio.ensure_future(article_handler(url, session, pool))

# 非详情页协程,进一步提取出详情页的url
else:
if url not in seen_urls:
asyncio.ensure_future(init_urls(url, session))


async def main(loop):
# 等待 MySQL 连接池建立
pool = await aiomysql.create_pool(
host='', port='', user='', password='', db='mysql', loop=loop, charset='utf8', autocommit=True
)
async with aiohttp.ClientSession() as session: # 发送 http 请求需要的 session
html = await fetch(start_url, session)
seen_urls.add(start_url)
extract_urls(html)

# consumer 协程从 url 获取,动态向 asyncio 提交 article_handler 和 init_urls 协程
asyncio.ensure_future(consumer(pool, session))


if __name__ == '__main__':
loop = asyncio.get_event_loop()
asyncio.ensure_future(main(loop))
loop.run_forever()
CATALOG
  1. 1. Python 异步编程总结
    1. 1.1. Select,回调,事件循环
    2. 1.2. 协程
    3. 1.3. asyncio 简介
    4. 1.4. 如何获取协程返回值?
    5. 1.5. 协程的取消,嵌套
    6. 1.6. call_at,call_soon,call_later,call_soon_threadsafe
    7. 1.7. 如何在协程中集成线程池?
    8. 1.8. 模拟 Http 请求
    9. 1.9. future 和 task
    10. 1.10. 同步与通信