深入浅出 Python 异步并发原理
本文将从Python同步的方式开始,逐步演化Python异步原理,从同步到异步的过程分别如下,文件内容为:使用基础的 socket 编写爬虫爬取百度的多个搜索的结果。
一. 同步执行
简单的同步方式顺序爬取指定url列表。
- sync.py
from urllib.parse import urlparse
import re
import socket
urls = [f'http://www.baidu.com/s?wd={i}' for i in range(10)]
results = []
def parse(url, response):
global results
response = response.decode('utf8', errors='ignore')
search = re.search(r'<title>(.*)</title>', response)
title = search.group(1) if search else ''
results.append((url, title))
def fetch(url):
r = urlparse(url)
hostname = r.hostname
port = r.port if r.port else 80
path = r.path if r.path else '/'
query = r.query
buffersize = 4096
sock = socket.socket()
sock.connect((hostname, port))
get = (f'GET {path}?{query} HTTP/1.1\r\n'
f'Connection: close\r\nHost: {hostname}\r\n\r\n')
sock.send(get.encode('utf8'))
response = b''
chunk = sock.recv(buffersize)
while chunk:
response += chunk
chunk = sock.recv(buffersize)
parse(url, response)
if __name__ == '__main__':
for target in urls:
fetch(target)
print(results)
二. 多进程执行
在一个程序内,依次执行10次太耗时,那开10个同样的程序执行就可以解决问题。此时当进程数量大于CPU核心数量时,进程切换是必然需要的,同时支持的任务规模较小。
- process.py
from concurrent import futures
from urllib.parse import urlparse
import re
import socket
urls = [f'http://www.baidu.com/s?wd={i}' for i in range(10)]
def parse(url, response):
response = response.decode('utf8', errors='ignore')
search = re.search(r'<title>(.*)</title>', response)
title = search.group(1) if search else ''
print((url, title))
def fetch(url):
r = urlparse(url)
hostname = r.hostname
port = r.port if r.port else 80
path = r.path if r.path else '/'
query = r.query
buffersize = 4096
sock = socket.socket()
sock.connect((hostname, port))
get = (f'GET {path}?{query} HTTP/1.1\r\n'
f'Connection: close\r\nHost: {hostname}\r\n\r\n')
sock.send(get.encode('utf8'))
response = b''
chunk = sock.recv(buffersize)
while chunk:
response += chunk
chunk = sock.recv(buffersize)
parse(url, response)
def process_way():
global urls
workers = 10
with futures.ProcessPoolExecutor(workers) as executor:
for target in urls:
executor.submit(fetch, target)
if __name__ == '__main__':
process_way()
三. 多线程执行
由于线程的线程的数据结构比进程更轻量级,并且同一个进程可以容纳多个线程,所以可以使用多线程解决并发问题。多线程也是有问题的,因为多线程有一个GIL锁:同一时刻一个进程中只有一个进程在运行,并且在线程切换时还资源消耗大,而协程的切换则是在线程内部,所以协程优于线程。
- thread.py
from concurrent import futures
from urllib.parse import urlparse
import re
import socket
import threading
urls = [f'http://www.baidu.com/s?wd={i}' for i in range(10)]
results = []
lock = threading.Lock()
def parse(url, response):
global results
response = response.decode('utf8', errors='ignore')
search = re.search(r'<title>(.*)</title>', response)
title = search.group(1) if search else ''
lock.acquire()
results.append((url, title))
lock.release()
def fetch(url):
r = urlparse(url)
hostname = r.hostname
port = r.port if r.port else 80
path = r.path if r.path else '/'
query = r.query
buffersize = 4096
sock = socket.socket()
sock.connect((hostname, port))
get = (f'GET {path}?{query} HTTP/1.1\r\n'
f'Connection: close\r\nHost: {hostname}\r\n\r\n')
sock.send(get.encode('utf8'))
response = b''
chunk = sock.recv(buffersize)
while chunk:
response += chunk
chunk = sock.recv(buffersize)
parse(url, response)
def thread_way():
global urls
workers = 10
with futures.ThreadPoolExecutor(workers) as executor:
for target in urls:
executor.submit(fetch, target)
if __name__ == '__main__':
thread_way()
print(results)
四. 非阻塞执行
和同步执行的不同在两个地方,sock.setblocking(False)
表示该socket的后续操作都不阻塞,在sock.connect((hostname, port))
时,建立连接是一个耗时操作,所以需要不断的尝试发送数据sock.send(get)
,直到成功为止(因为不知道连接什么时候建立好),chunk = sock.recv(buffersize)
也是同样的道理(在非阻塞情况下不知道什么时候可以接受数据,所以需要不断循环),在不断尝试的过程中CPU是空闲的,没有利用好这段时间,所以和 同步执行 时间是差不多的。
- no_blocking.py
from urllib.parse import urlparse
import re
import socket
urls = [f'http://www.baidu.com/s?wd={i}' for i in range(10)]
results = []
def parse(url, response):
global results
response = response.decode('utf8', errors='ignore')
search = re.search(r'<title>(.*)</title>', response)
title = search.group(1) if search else ''
results.append((url, title))
def fetch(url):
r = urlparse(url)
hostname = r.hostname
port = r.port if r.port else 80
path = r.path if r.path else '/'
query = r.query
buffersize = 4096
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect((hostname, port))
except BlockingIOError:
pass
get = (f'GET {path}?{query} HTTP/1.1\r\n'
f'Connection: close\r\nHost: {hostname}\r\n\r\n')
get = get.encode('utf8')
while True:
try:
sock.send(get)
break
except OSError:
pass
response = b''
while True:
try:
chunk = sock.recv(buffersize)
while chunk:
response += chunk
chunk = sock.recv(buffersize)
break
except OSError:
pass
parse(url, response)
if __name__ == '__main__':
for target in urls:
fetch(target)
print(results)
五. 回调执行
callback 能够充分利用no_blocking的CPU空闲,把IO事件的监听交给OS来完成。所以需要我们把数据的发送和读取封装成独立的函数,并使用 epoll 将封装好的函数注册到 selector,一旦 epoll 监听到了文件描述符读就绪或者写就绪,就通知应用程序,让应用程序调用之前注册好了的处理函数。整个过程中,根据不同的URL不断的产生新的文件描述符并注册selector,所以程序中需要一个循环程序来判断selector的状态是否就绪,就绪以后就直接调用下一步操作(刚刚注册的函数:发送数据、接收数据)。
- callback.py
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
from urllib.parse import urlparse
import re
import socket
selector = DefaultSelector()
stopped = False
urls = [f'http://www.baidu.com/s?wd={i}' for i in range(10)]
class Spider:
results = []
def __init__(self, url):
self.url = url
r = urlparse(url)
self.hostname = r.hostname
self.port = r.port if r.port else 80
self.path = r.path if r.path else '/'
self.query = r.query
self.response = b''
self.buffersize = 4096
self.sock = socket.socket()
def parse(self):
response = self.response.decode('utf8', errors='ignore')
search = re.search(r'<title>(.*)</title>', response)
title = search.group(1) if search else ''
self.results.append((self.url, title))
def fetch(self):
global selector
self.sock.setblocking(False)
try:
self.sock.connect((self.hostname, self.port))
except BlockingIOError:
pass
selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)
def connected(self, key):
global selector
selector.unregister(key.fd)
get = (f'GET {self.path}?{self.query} HTTP/1.1\r\n'
f'Connection: close\r\nHost: {self.hostname}\r\n\r\n')
self.sock.send(get.encode())
selector.register(key.fd, EVENT_READ, self.read_response)
def read_response(self, key):
global selector
global stopped
chunk = self.sock.recv(self.buffersize)
if chunk:
self.response += chunk
else:
selector.unregister(key.fd)
urls.remove(self.url)
stopped = True if not urls else False
self.parse()
def event_loop():
global selector
global stopped
while not stopped:
events = selector.select()
for key, mask in events:
callback = key.data
callback(key)
if __name__ == '__main__':
for target in urls:
spider = Spider(target)
spider.fetch()
event_loop()
print(Spider.results)
六. 生成器协程
基于生成器的协程方式是一个比较优的解决方案,它利用生成器的挂起、唤醒特性实现并发功能,为了解决上面回调的逻辑混乱问题,做如下改进:
- Future对象 Future对象用来存放未来的结果。一个URL对应一个任务,这个任务从开始到获得结果结束的整个过程中,有耗时操作,所以需要将这个任务挂起,挂起的同时将这个任务到此时为止的执行结果保存到Future里面,下次唤醒此任务时将结果send给生成器。
- Task对象 Task对象对应到一个具体的任务,每一个url对应一个task对象,它控制着任务的执行。
- Fetch方法 它是实际的任务逻辑,fetch方法里面一旦需要挂起就创建一个新的Future对象,然后yield挂起,并将下次唤醒时要执行的函数注册到selector,注册的函数里面包含了:当文件描述符就绪时将获取的结果赋值给Future对象,然后继续执行后续操作。
- 事件循环 同回调一样,也需要一个事件循环来处理已就绪的文件描述符。
主程序 主程序不断创建新的任务,整个调用链如下
- 1、进入for循环,执行第一次for,创建一个Spider的生成器,将其传递给Task任务,由Task任务激活生成器,直到生成器遇到一个yield为止,生成器创建Future对象、注册selector回调函数并将Future对象返回给Task,由Task将下次的函数保存到Future对象。
- 2、进入第二次for循环,和第一步一样,遇到第一个yield以后被挂起。
- 3、直到所有的任务都启动,这个过程很快,因为所有的操作都没有IO。
- 4、进入事件的循环,判断是否前面启动的任务有描述符就绪的任务,就处理。
- 5、注意:这个Future对象被唤醒的时机是由selector的文件描述符来确定的,当文件描述就绪时就会调用注册的函数,从而会被唤醒。
- generator.py
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
from urllib.parse import urlparse
import re
import socket
selector = DefaultSelector()
stopped = False
urls = [f'http://www.baidu.com/s?wd={i}' for i in range(10)]
class Future:
def __init__(self):
self.result = None
self.callbacks = []
def add_done_callback(self, func):
self.callbacks.append(func)
def set_result(self, result=None):
self.result = result
for func in self.callbacks:
func(self)
class Spider:
results = []
def __init__(self, url):
self.url = url
r = urlparse(url)
self.hostname = r.hostname
self.port = r.port if r.port else 80
self.path = r.path if r.path else '/'
self.query = r.query
self.response = b''
self.sock = socket.socket()
def parse(self):
response = self.response.decode('utf8', errors='ignore')
search = re.search(r'<title>(.*)</title>', response)
title = search.group(1) if search else ''
self.results.append((self.url, title))
def fetch(self):
global urls
global stopped
global selector
buffersize = 4096
self.sock.setblocking(False)
try:
self.sock.connect((self.hostname, self.port))
except BlockingIOError:
pass
future = Future()
def on_connected():
future.set_result()
selector.register(self.sock.fileno(), EVENT_WRITE, on_connected)
yield future
selector.unregister(self.sock.fileno())
get = (f'GET {self.path}?{self.query} HTTP/1.1\r\n'
f'Connection: close\r\nHost: {self.hostname}\r\n\r\n')
self.sock.send(get.encode())
while True:
future = Future()
def on_readable():
future.set_result(self.sock.recv(buffersize))
selector.register(self.sock.fileno(), EVENT_READ, on_readable)
chunk = yield future
selector.unregister(self.sock.fileno())
if chunk:
self.response += chunk
else:
urls.remove(self.url)
stopped = True if not urls else False
self.parse()
break
class Task:
def __init__(self, corotine):
self.corotine = corotine
f = Future()
self.step(f)
def step(self, future):
try:
next_future = self.corotine.send(future.result)
except StopIteration:
return
next_future.add_done_callback(self.step)
def event_loop():
global stopped
global selector
while not stopped:
events = selector.select()
for key, mask in events:
callback = key.data
callback()
if __name__ == '__main__':
for target in urls:
spider = Spider(target)
Task(spider.fetch())
event_loop()
print(Spider.results)
七. 生成器语法简化
yield_from的出现简化了生成器的语法,主要有两个功能:
- 让嵌套生成器不必通过循环迭代yield,而是直接yield from。
- 在子生成器和原生成器的调用者之间打开双向通道,两者可以直接通信。
然后将和业务无关的代码抽象出来,就形成了yield from版本的并发。
- yield_from.py
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
from urllib.parse import urlparse
import re
import socket
selector = DefaultSelector()
stopped = False
urls = [f'http://www.baidu.com/s?wd={i}' for i in range(10)]
buffersize = 4096
class Future:
def __init__(self):
self.result = None
self.callbacks = []
def add_done_callback(self, func):
self.callbacks.append(func)
def set_result(self, result=None):
self.result = result
for func in self.callbacks:
func(self)
def __iter__(self):
yield self
return self.result
def connect(sock, address):
global selector
future = Future()
sock.setblocking(False)
try:
sock.connect(address)
except BlockingIOError:
pass
def on_connected():
future.set_result()
selector.register(sock.fileno(), EVENT_WRITE, on_connected)
yield from future
selector.unregister(sock.fileno())
def read(sock):
future = Future()
def on_readable():
future.set_result(sock.recv(buffersize))
selector.register(sock.fileno(), EVENT_READ, on_readable)
chunk = yield from future
selector.unregister(sock.fileno())
return chunk
def read_all(sock):
response = b''
chunk = yield from read(sock)
while chunk:
response += chunk
chunk = yield from read(sock)
return response
class Spider:
results = []
def __init__(self, url):
self.url = url
r = urlparse(url)
self.hostname = r.hostname
self.port = r.port if r.port else 80
self.path = r.path if r.path else '/'
self.query = r.query
self.response = b''
self.sock = socket.socket()
def parse(self):
response = self.response.decode('utf8', errors='ignore')
search = re.search(r'<title>(.*)</title>', response)
title = search.group(1) if search else ''
self.results.append((self.url, title))
def fetch(self):
global urls
global stopped
yield from connect(self.sock, (self.hostname, self.port))
get = (f'GET {self.path}?{self.query} HTTP/1.1\r\n'
f'Connection: close\r\nHost: {self.hostname}\r\n\r\n')
self.sock.send(get.encode())
self.response = yield from read_all(self.sock)
urls.remove(self.url)
stopped = True if not urls else False
self.parse()
class Task:
def __init__(self, corotine):
self.corotine = corotine
future = Future()
self.step(future)
def step(self, future):
try:
next_future = self.corotine.send(future.result)
except StopIteration:
return
next_future.add_done_callback(self.step)
def event_loop():
global stopped
global selector
while not stopped:
events = selector.select()
for key, mask in events:
callback = key.data
callback()
if __name__ == '__main__':
for target in urls:
spider = Spider(target)
Task(spider.fetch())
event_loop()
print(Spider.results)
八. 异步I/O框架asyncio
- asyncio是Python 3.4 引入的异步I/O框架,提供了基于协程做异步I/O编写单线程并发代码的基础设施。其核心组件有事件循环(Event Loop)、协程(Coroutine)、任务(Task)、未来对象(Future)以及其他一些扩充和辅助性质的模块。
- 装饰器@asyncio.coroutine用于装饰使用了yield from的函数,以标记其为协程。
- async/await语法是对yield from的优化,称之为原生协程。async/await 和 yield from这两种风格的协程底层复用共同的实现,而且相互兼容。
- async_await.py
import asyncio
import aiohttp
import re
urls = [f'http://www.baidu.com/s?wd={i}' for i in range(10)]
loop = asyncio.get_event_loop()
results = []
def parse(url, response):
response = response.decode('utf8', errors='ignore')
search = re.search(r'<title>(.*)</title>', response)
title = search.group(1) if search else ''
results.append((url, title))
async def fetch(url):
async with aiohttp.ClientSession(loop=loop) as session:
async with session.get(url) as response:
response = await response.read()
parse(url, response)
if __name__ == '__main__':
tasks = [fetch(target) for target in urls]
loop.run_until_complete(asyncio.gather(*tasks))
print(results)
九. 同步的简单socket server
- socket_server_sync.py
import socket
class EchoServer:
def __init__(self, host, port):
self.sock = socket.socket()
self.host = host
self.port = port
self.buffersize = 4096
def run(self):
self.sock.bind((self.host, self.port))
self.sock.listen(1)
print(f'Running on http://{self.host}:{self.port}/')
while True:
conn, address = self.sock.accept()
print('accepted', conn, 'from', address)
while True:
chunk = conn.recv(self.buffersize)
if not chunk:
break
print('echoing', repr(chunk), 'to', conn)
conn.sendall(chunk)
echo_server = EchoServer('localhost', 8888)
echo_server.run()
十. 异步的简单socket server
- socket_server_async.py
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
import socket
class EventLoop:
def __init__(self, selector=None):
self.selector = selector if selector else DefaultSelector()
def run_forever(self):
while True:
events = self.selector.select()
for key, mask in events:
if mask == EVENT_READ:
callback = key.data # means on_read or on_accept
callback(key.fileobj)
else:
callback, msg = key.data # means on_write
callback(key.fileobj, msg)
class EchoServer:
def __init__(self, host, port, loop):
self.host = host
self.port = port
self.loop = loop
self.sock = socket.socket()
self.buffersize = 4096
def on_accept(self, sock):
conn, address = sock.accept()
print('accepted', conn, 'from', address)
conn.setblocking(False)
self.loop.selector.register(conn, EVENT_READ, self.on_read)
def on_read(self, conn):
msg = conn.recv(self.buffersize)
if msg:
print('echoing', repr(msg), 'to', conn)
self.loop.selector.modify(conn, EVENT_WRITE, (self.on_write, msg))
else:
print('closing', conn)
self.loop.selector.unregister(conn)
conn.close()
def on_write(self, conn, msg):
conn.sendall(msg)
self.loop.selector.modify(conn, EVENT_READ, self.on_read)
def run(self):
self.sock.bind((self.host, self.port))
self.sock.setblocking(False)
self.sock.listen(128)
print(f'Running on http://{self.host}:{self.port}/')
self.loop.selector.register(self.sock, EVENT_READ, self.on_accept)
self.loop.run_forever()
event_loop = EventLoop()
echo_server = EchoServer('localhost', 8888, event_loop)
echo_server.run()
十一. socket 客户端
- socket_client.py
import socket
def main(host, port):
client = socket.socket()
client.connect((host, port))
while True:
data = input('>')
if not data:
break
client.send(data.encode())
chunk = client.recv(8192)
if not chunk:
break
print(chunk)
if __name__ == '__main__':
main('127.0.0.1', 8888)