Python 异步编程总结 Select,回调,事件循环 对于 Python 异步编程,需要先搞清楚以下这些概念:
回调函数:提供函数供一定条件满足后调用(回调函数中都是非 I/O 操作,性能很高);
事件循环:不断循环列表请求句柄状态,发现状态变化时执行回调函数;
单线程:驱动程序运行的 loop 是单线程运行(不会有内存消耗和切换问题)、非阻塞的,只会对就绪句柄执行回调函数,不会等待 I/O(除非所有句柄都在等待)。
以一段简单的爬虫代码说明:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 import timeimport socketfrom urllib.parse import urlparsefrom selectors import DefaultSelector, EVENT_READ, EVENT_WRITEselector = DefaultSelector() urls = [] stop = False class Fetcher : def connected (self, key ): ''' 回调函数 :param key: :return: ''' selector.unregister(key.fd) self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n" .format (self.path, self.host).encode("utf8" )) selector.register(self.client.fileno(), EVENT_READ, self.readable) def readable (self, key ): d = self.client.recv(1024 ) if d: self.data += d else : selector.unregister(key.fd) data = self.data.decode("utf8" ) html_data = data.split("\r\n\r\n" )[1 ] print (html_data) self.client.close() urls.remove(self.spider_url) if not urls: global stop stop = True def get_url (self, url ): self.spider_url = url url = urlparse(url) self.host = url.netloc self.path = url.path self.data = b"" if self.path == "" : self.path = "/" self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.client.setblocking(False ) try : self.client.connect((self.host, 80 )) except BlockingIOError as e: pass selector.register(self.client.fileno(), EVENT_WRITE, self.connected) def loop (): while not stop: ready = selector.select() for key, mask in ready: call_back = key.data call_back(key) fetcher = Fetcher() start_time = time.time() for url in range (20 ): url = "http://shop.projectsedu.com/goods/{}/" .format (url) urls.append(url) fetcher = Fetcher() fetcher.get_url(url) loop() print (time.time() - start_time)start_time = time.time() for url in range (20 ): url = "http://shop.projectsedu.com/goods/{}/" .format (url) get_(url) print (time.time() - start_time)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import inspectimport socketdef get_socket_data (): yield "ywh" def downloader (url ): client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.setblocking(False ) try : client.connect((host, 80 )) except BlockingIOError as e: pass selector.register(self.client.fileno(), EVENT_WRITE, self.connected) source = yield from get_socket_data() html_data = source.decode("utf8" ).split("\r\n\r\n" )[1 ] print (html_data) def download_html (html ): html = yield from downloader() if __name__ == "__main__" : pass
协程 使用以上 I/O 模型 API 存在以下问题:
因此可考虑使用协程:
采用同步的方式编写异步(事件循环 + I/O 多路复用)代码代替回调,使用单线程切换任务(不再需要锁);
自主编写调度函数,并发性能远高于线程间切换;
调度函数有多个入口:遇到 I/O 操作把当前函数暂停、切换到另一个函数执行,在适当时候恢复。
使用生成器(见“迭代器,生成器”)结合事件循环可实现协程;
协程 + 事件循环的效率不比回调 + 事件循环高,其目的在于简便地解决回调复杂的问题。
为了将语义变得更加明确,Python 3.5 后引入了 async 和 await 关键词用于定义原生协程;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import typesasync def downloader (url ): return "ywh" @types.coroutine def downloader (url ): yield "ywh" async def download_url (url ): html = await downloader(url) return html if __name__ == "__main__" : coro = download_url("http://www.test.com" ) coro.send(None )
使用生成器实现协程:
一般的生成器只能作为生产者,实现为协程则可以消费外部传入的数据;
使用 value = yield from xxx
的生成器表示返回值给调用方、且调用方通过 send 方法传值给生成器函数;
主函数中不能添加耗时的逻辑,如把I/O操作通过 yield from
做异步处理;
最终实现通过同步的方式编写异步代码:在适当时候暂停、恢复启动函数。
获取生成器的状态:
1 2 3 4 5 6 7 8 9 10 import inspectdef gen_func (): yield 1 return "ywh" if __name__ == "__main__" : gen = gen_func() print (inspect.getgeneratorstate(gen)) next (gen) print (inspect.getgeneratorstate(gen))
实现协程:
1 2 3 def gen_func (): value = yield 1 return "ywh"
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import inspectimport socketdef get_socket_data (): yield "ywh" def downloader (url ): client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.setblocking(False ) try : client.connect((host, 80 )) except BlockingIOError as e: pass selector.register(self.client.fileno(), EVENT_WRITE, self.connected) source = yield from get_socket_data() html_data = source.decode("utf8" ).split("\r\n\r\n" )[1 ] print (html_data) def download_html (html ): html = yield from downloader() if __name__ == "__main__" : pass
asyncio 简介 asyncio 模块:asyncio 是 Python 用于解决异步 I/O 编程的整套解决方案。
高并发编程三个要素:事件循环+ I/O 多路复用 + 回调函数(驱动生成器,即协程);
包括各种特定系统实现的模块化事件循环(select、poll、epoll);
传输和协议抽象;
对TCP、UDP、SSL、子进程、延时调用以及其他的具体支持;
模仿 futures
模块但适用于事件循环使用的 Future 类;
基于 yield from
的协议和任务,可以顺序的方式编写并发代码;
必须使用一个将产生阻塞 I/O 的调用时,有接口可以把这个事件转义到线程池;
模仿 threading
模块中的同步原语、可以用在单线程的协程之间;
关键词 async
定义协程,await
异步调用。
基于asyncio的框架:tornado(实现可直接部署的 web 服务器)、gevent、twisted(scrapy,django channels),使用这些框架必须有对应的异步驱动支持(如 tornado 中使用 pymysql 则不能异步)。
asyncio 具备以下特点:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import asyncioimport timeasync def get_html (url ): print ("start get url" ) await asyncio.sleep(2 ) print ("end get url" ) start_time = time.time() loop = asyncio.get_event_loop() tasks = [ get_html("http://www.test.com" ) for i in range (10 ) ] loop.run_until_complete(asyncio.wait(tasks)) print (time.time() - start_time)
如何获取协程返回值? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import asyncioimport timefrom functools import partialasync def get_html (url ): print ("start get url" ) await asyncio.sleep(2 ) return "ywh" def callback (url, future ): print (url) print ("send email to ywh" ) start_time = time.time() loop = asyncio.get_event_loop() task = loop.create_task(get_html("http://www.test.com" )) task.add_done_callback( partial(callback, "http://www.test.com" ) ) loop.run_until_complete(task) print (task.result())
wait 与 gather:gather 是更高层次的封装,可以将 task 分组管理;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import asyncioimport timeasync def get_html (url ): print ("start get url" ) await asyncio.sleep(2 ) print ("end get url" ) if __name__ == "__main__" : start_time = time.time() loop = asyncio.get_event_loop() tasks = [get_html("http://www.test.com" ) for i in range (10 )] group1 = [get_html("http://projectsedu.com" ) for i in range (2 )] group2 = [get_html("http://www.test.com" ) for i in range (2 )] group1 = asyncio.gather(*group1) group2 = asyncio.gather(*group2) group2.cancel() loop.run_until_complete(asyncio.gather(group1, group2)) print (time.time() - start_time)
协程的取消,嵌套 1 2 3 4 5 import asyncioloop = asyncio.get_event_loop() loop.run_forever() loop.run_until_complete()
取消future(task)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import asyncioimport timeasync def get_html (sleep_times ): print ("waiting" ) await asyncio.sleep(sleep_times) print ("done after {}s" .format (sleep_times)) tasks = [get_html(2 ), get_html(3 ), get_html(4 )] loop = asyncio.get_event_loop() try : loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: all_tasks = asyncio.Task.all_tasks() for task in all_tasks: print (task.cancel()) loop.stop() loop.run_forever() finally : loop.close()
嵌套协程的调度过程(task -> print_sum -> compute):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import asyncioasync def compute (x, y ): print ("Compute %s + %s ..." %(x, y)) await asyncio.sleep(1.0 ) return x + y async def print_sum (x, y ): result = await compute(x, y) print ("%s + %s = %s" %(x, y, result)) loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1 , 2 )) loop.close()
创建 loop 和需要提交到 loop 的 task(通过 task 驱动协程执行),执行;
协程中的 await 相当于 yield from,在 task 和 compute 子协程之间建立一个通道,此时进入 compute 调度,print_sum 暂停;
子协程 compute 中的 await 表示暂停,不经过 print_sum、直接返回给 task 再返回给 loop,等待1s;
1s 过后,task 经通道询问 compute,compute 计算好结果值会抛出异常(StopIteration)并返回计算结果,compute 协程标记完成;
print_sum 捕捉到 compute 的异常、提取结果值,最后会把异常抛出给 task,print_sum 协程标记完成。
call_at,call_soon,call_later,call_soon_threadsafe 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import asynciodef callback (sleep_times, loop ): print ("success time {}" .format (loop.time())) def stoploop (loop ): loop.stop() loop = asyncio.get_event_loop() now = loop.time() loop.call_at(now + 2 , callback, 2 , loop) loop.call_at(now + 1 , callback, 1 , loop) loop.call_at(now + 3 , callback, 3 , loop) loop.call_soon(callback, 4 , loop) loop.run_forever()
如何在协程中集成线程池? 在协程中集成阻塞 I/O:对于阻塞的库和接口(如 pymysql ),协程中要使用多线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import asynciofrom concurrent.futures import ThreadPoolExecutorimport socketimport timefrom urllib.parse import urlparsedef get_url (url ): url = urlparse(url) host = url.netloc path = url.path client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((host, 80 )) client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n" .format (path, host).encode("utf8" )) data = b"" while True : d = client.recv(1024 ) if d: data += d else : break data = data.decode("utf8" ) html_data = data.split("\r\n\r\n" )[1 ] print (html_data) client.close() start_time = time.time() loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(3 ) tasks = [] for url in range (20 ): url = "http://shop.projectsedu.com/goods/{}/" .format (url) task = loop.run_in_executor(executor, get_url, url) tasks.append(task) loop.run_until_complete(asyncio.wait(tasks)) print ("last time:{}" .format (time.time() - start_time))
模拟 Http 请求 requests 是同步的 http 请求模块,在 asyncio 不能达到异步的效果,而 asyncio 本身没有提供 http 协议的接口,可以使用 aiohttp:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import timeimport asyncioimport socketfrom urllib.parse import urlparseasync def get_url (url ): url = urlparse(url) host = url.netloc path = url.path if path == "" : path = "/" reader, writer = await asyncio.open_connection(host, 80 ) writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n" .format (path, host).encode("utf8" )) all_lines = [] async for raw_line in reader: data = raw_line.decode("utf8" ) all_lines.append(data) html = "\n" .join(all_lines) return html async def main (): tasks = [] for url in range (20 ): url = "http://shop.projectsedu.com/goods/{}/" .format (url) tasks.append(asyncio.ensure_future(get_url(url))) for task in asyncio.as_completed(tasks): result = await task print (result) if __name__ == "__main__" : start_time = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(main()) print ('last time:{}' .format (time.time() - start_time))
future 和 task 见 asyncio 源码
同步与通信 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import aiohttpimport asynciofrom asyncio import Lock, Queue cache = {} queue = [] lock = Lock() async def get_stuff (url ): async with lock: if url in cache: return cache[url] stuff = await aiohttp.request('GET' , url) cache[url] = stuff return stuff async def parse_stuff (): stuff = await get_stuff() async def use_stuff (): stuff = await get_stuff() tasks = [parse_stuff(), use_stuff()]
实例:基于 asyncio 协程的高并发爬虫
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 import reimport asyncioimport aiohttpimport aiomysqlfrom pyquery import PyQuerystop_flag = False start_url = "http://www.jobbole.com/" waitting_urls = [] seen_urls = set () sem = asyncio.Semaphore(3 ) async def fetch (url, session ): """ 发送 http请求 :param url: :return: """ async with sem: await asyncio.sleep(1 ) try : async with session.get(url) as resp: print ('url statis: {0}' .format (resp.status)) if resp.status in [200 , 201 ]: data = await resp.text() return data except Exception as e: print (e) def extract_urls (html ): """ 从请求页面中获取下次要请求 url :param html: :return: """ urls = [] pq = PyQuery(html) for link in pq.items('a' ): url = link.attr('href' ) if url and url.startswith('http' ) and url not in seen_urls: urls.append(url) waitting_urls.append(url) return urls async def article_handler (url, session, pool ): """ 获取文章详情并解析入库 :param url: :param session: :return: """ html = await fetch(url, session) seen_urls.add(url) extract_urls(html) pq = PyQuery(html) title = pq('title' ).text() print (title) async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("sql" ) insert_sql = """ INSERT INTO xxx """ print (cur.description) await cur.execute(insert_sql) async def init_urls (url, session ): """ 解析页面, :param url: :param session: :return: """ html = await fetch(url, session) seen_urls.add(url) extract_urls(html) async def consumer (pool, session ): while not stop_flag: if len (waitting_urls) == 0 : await asyncio.sleep(0.5 ) continue url = waitting_urls.pop() print ('start get url: ' + url) if re.match('http://.*?jobbole.com/\d+/' , url): if url not in seen_urls: asyncio.ensure_future(article_handler(url, session, pool)) else : if url not in seen_urls: asyncio.ensure_future(init_urls(url, session)) async def main (loop ): pool = await aiomysql.create_pool( host='' , port='' , user='' , password='' , db='mysql' , loop=loop, charset='utf8' , autocommit=True ) async with aiohttp.ClientSession() as session: html = await fetch(start_url, session) seen_urls.add(start_url) extract_urls(html) asyncio.ensure_future(consumer(pool, session)) if __name__ == '__main__' : loop = asyncio.get_event_loop() asyncio.ensure_future(main(loop)) loop.run_forever()