已复制
全屏展示
复制代码

深入浅出 Python 异步并发原理


· 13 min read

本文将从Python同步的方式开始,逐步演化Python异步原理,从同步到异步的过程分别如下,文件内容为:使用基础的 socket 编写爬虫爬取百度的多个搜索的结果。

一. 同步执行

简单的同步方式顺序爬取指定url列表。

  • sync.py
from urllib.parse import urlparse
import re
import socket


urls = [f'http://www.baidu.com/s?wd={i}' for i in range(10)]
results = []


def parse(url, response):
    global results
    response = response.decode('utf8', errors='ignore')
    search = re.search(r'<title>(.*)</title>', response)
    title = search.group(1) if search else ''
    results.append((url, title))


def fetch(url):
    r = urlparse(url)
    hostname = r.hostname
    port = r.port if r.port else 80
    path = r.path if r.path else '/'
    query = r.query
    buffersize = 4096

    sock = socket.socket()
    sock.connect((hostname, port))
    get = (f'GET {path}?{query} HTTP/1.1\r\n'
           f'Connection: close\r\nHost: {hostname}\r\n\r\n')
    sock.send(get.encode('utf8'))
    response = b''
    chunk = sock.recv(buffersize)
    while chunk:
        response += chunk
        chunk = sock.recv(buffersize)
    parse(url, response)


if __name__ == '__main__':
    for target in urls:
        fetch(target)
    print(results)
    

二. 多进程执行

在一个程序内,依次执行10次太耗时,那开10个同样的程序执行就可以解决问题。此时当进程数量大于CPU核心数量时,进程切换是必然需要的,同时支持的任务规模较小。

  • process.py
from concurrent import futures
from urllib.parse import urlparse
import re
import socket


urls = [f'http://www.baidu.com/s?wd={i}' for i in range(10)]


def parse(url, response):
    response = response.decode('utf8', errors='ignore')
    search = re.search(r'<title>(.*)</title>', response)
    title = search.group(1) if search else ''
    print((url, title))


def fetch(url):
    r = urlparse(url)
    hostname = r.hostname
    port = r.port if r.port else 80
    path = r.path if r.path else '/'
    query = r.query
    buffersize = 4096

    sock = socket.socket()
    sock.connect((hostname, port))
    get = (f'GET {path}?{query} HTTP/1.1\r\n'
           f'Connection: close\r\nHost: {hostname}\r\n\r\n')
    sock.send(get.encode('utf8'))
    response = b''
    chunk = sock.recv(buffersize)
    while chunk:
        response += chunk
        chunk = sock.recv(buffersize)
    parse(url, response)


def process_way():
    global urls
    workers = 10
    with futures.ProcessPoolExecutor(workers) as executor:
        for target in urls:
            executor.submit(fetch, target)


if __name__ == '__main__':
    process_way()
    

三. 多线程执行

由于线程的线程的数据结构比进程更轻量级,并且同一个进程可以容纳多个线程,所以可以使用多线程解决并发问题。多线程也是有问题的,因为多线程有一个GIL锁:同一时刻一个进程中只有一个进程在运行,并且在线程切换时还资源消耗大,而协程的切换则是在线程内部,所以协程优于线程。

  • thread.py
from concurrent import futures
from urllib.parse import urlparse
import re
import socket
import threading


urls = [f'http://www.baidu.com/s?wd={i}' for i in range(10)]
results = []
lock = threading.Lock()


def parse(url, response):
    global results
    response = response.decode('utf8', errors='ignore')
    search = re.search(r'<title>(.*)</title>', response)
    title = search.group(1) if search else ''
    lock.acquire()
    results.append((url, title))
    lock.release()


def fetch(url):
    r = urlparse(url)
    hostname = r.hostname
    port = r.port if r.port else 80
    path = r.path if r.path else '/'
    query = r.query
    buffersize = 4096

    sock = socket.socket()
    sock.connect((hostname, port))
    get = (f'GET {path}?{query} HTTP/1.1\r\n'
           f'Connection: close\r\nHost: {hostname}\r\n\r\n')
    sock.send(get.encode('utf8'))
    response = b''
    chunk = sock.recv(buffersize)
    while chunk:
        response += chunk
        chunk = sock.recv(buffersize)
    parse(url, response)


def thread_way():
    global urls
    workers = 10
    with futures.ThreadPoolExecutor(workers) as executor:
        for target in urls:
            executor.submit(fetch, target)


if __name__ == '__main__':
    thread_way()
    print(results)
    

四. 非阻塞执行

和同步执行的不同在两个地方,sock.setblocking(False)表示该socket的后续操作都不阻塞,在sock.connect((hostname, port))时,建立连接是一个耗时操作,所以需要不断的尝试发送数据sock.send(get),直到成功为止(因为不知道连接什么时候建立好),chunk = sock.recv(buffersize)也是同样的道理(在非阻塞情况下不知道什么时候可以接受数据,所以需要不断循环),在不断尝试的过程中CPU是空闲的,没有利用好这段时间,所以和  同步执行 时间是差不多的。

  • no_blocking.py
from urllib.parse import urlparse
import re
import socket


urls = [f'http://www.baidu.com/s?wd={i}' for i in range(10)]
results = []


def parse(url, response):
    global results
    response = response.decode('utf8', errors='ignore')
    search = re.search(r'<title>(.*)</title>', response)
    title = search.group(1) if search else ''
    results.append((url, title))


def fetch(url):
    r = urlparse(url)
    hostname = r.hostname
    port = r.port if r.port else 80
    path = r.path if r.path else '/'
    query = r.query
    buffersize = 4096

    sock = socket.socket()
    sock.setblocking(False)
    try:
        sock.connect((hostname, port))
    except BlockingIOError:
        pass

    get = (f'GET {path}?{query} HTTP/1.1\r\n'
           f'Connection: close\r\nHost: {hostname}\r\n\r\n')
    get = get.encode('utf8')

    while True:
        try:
            sock.send(get)
            break
        except OSError:
            pass

    response = b''
    while True:
        try:
            chunk = sock.recv(buffersize)
            while chunk:
                response += chunk
                chunk = sock.recv(buffersize)
            break
        except OSError:
            pass

    parse(url, response)


if __name__ == '__main__':
    for target in urls:
        fetch(target)
    print(results)
    

五. 回调执行


callback 能够充分利用no_blocking的CPU空闲,把IO事件的监听交给OS来完成。所以需要我们把数据的发送和读取封装成独立的函数,并使用 epoll 将封装好的函数注册到 selector,一旦 epoll 监听到了文件描述符读就绪或者写就绪,就通知应用程序,让应用程序调用之前注册好了的处理函数。整个过程中,根据不同的URL不断的产生新的文件描述符并注册selector,所以程序中需要一个循环程序来判断selector的状态是否就绪,就绪以后就直接调用下一步操作(刚刚注册的函数:发送数据、接收数据)。

  • callback.py
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
from urllib.parse import urlparse
import re
import socket


selector = DefaultSelector()
stopped = False
urls = [f'http://www.baidu.com/s?wd={i}' for i in range(10)]


class Spider:
    results = []

    def __init__(self, url):
        self.url = url
        r = urlparse(url)
        self.hostname = r.hostname
        self.port = r.port if r.port else 80
        self.path = r.path if r.path else '/'
        self.query = r.query
        self.response = b''
        self.buffersize = 4096
        self.sock = socket.socket()

    def parse(self):
        response = self.response.decode('utf8', errors='ignore')
        search = re.search(r'<title>(.*)</title>', response)
        title = search.group(1) if search else ''
        self.results.append((self.url, title))

    def fetch(self):
        global selector
        self.sock.setblocking(False)
        try:
            self.sock.connect((self.hostname, self.port))
        except BlockingIOError:
            pass
        selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)

    def connected(self, key):
        global selector
        selector.unregister(key.fd)
        get = (f'GET {self.path}?{self.query} HTTP/1.1\r\n'
               f'Connection: close\r\nHost: {self.hostname}\r\n\r\n')
        self.sock.send(get.encode())
        selector.register(key.fd, EVENT_READ, self.read_response)

    def read_response(self, key):
        global selector
        global stopped
        chunk = self.sock.recv(self.buffersize)
        if chunk:
            self.response += chunk
        else:
            selector.unregister(key.fd)
            urls.remove(self.url)
            stopped = True if not urls else False
            self.parse()


def event_loop():
    global selector
    global stopped
    while not stopped:
        events = selector.select()
        for key, mask in events:
            callback = key.data
            callback(key)


if __name__ == '__main__':
    for target in urls:
        spider = Spider(target)
        spider.fetch()
    event_loop()
    print(Spider.results)
    

六. 生成器协程

基于生成器的协程方式是一个比较优的解决方案,它利用生成器的挂起、唤醒特性实现并发功能,为了解决上面回调的逻辑混乱问题,做如下改进:

  • Future对象 Future对象用来存放未来的结果。一个URL对应一个任务,这个任务从开始到获得结果结束的整个过程中,有耗时操作,所以需要将这个任务挂起,挂起的同时将这个任务到此时为止的执行结果保存到Future里面,下次唤醒此任务时将结果send给生成器。
  • Task对象 Task对象对应到一个具体的任务,每一个url对应一个task对象,它控制着任务的执行。
  • Fetch方法 它是实际的任务逻辑,fetch方法里面一旦需要挂起就创建一个新的Future对象,然后yield挂起,并将下次唤醒时要执行的函数注册到selector,注册的函数里面包含了:当文件描述符就绪时将获取的结果赋值给Future对象,然后继续执行后续操作。
  • 事件循环 同回调一样,也需要一个事件循环来处理已就绪的文件描述符。

主程序 主程序不断创建新的任务,整个调用链如下

  • 1、进入for循环,执行第一次for,创建一个Spider的生成器,将其传递给Task任务,由Task任务激活生成器,直到生成器遇到一个yield为止,生成器创建Future对象、注册selector回调函数并将Future对象返回给Task,由Task将下次的函数保存到Future对象。
  • 2、进入第二次for循环,和第一步一样,遇到第一个yield以后被挂起。
  • 3、直到所有的任务都启动,这个过程很快,因为所有的操作都没有IO。
  • 4、进入事件的循环,判断是否前面启动的任务有描述符就绪的任务,就处理。
  • 5、注意:这个Future对象被唤醒的时机是由selector的文件描述符来确定的,当文件描述就绪时就会调用注册的函数,从而会被唤醒。

  • generator.py
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
from urllib.parse import urlparse
import re
import socket


selector = DefaultSelector()
stopped = False
urls = [f'http://www.baidu.com/s?wd={i}' for i in range(10)]


class Future:

    def __init__(self):
        self.result = None
        self.callbacks = []

    def add_done_callback(self, func):
        self.callbacks.append(func)

    def set_result(self, result=None):
        self.result = result
        for func in self.callbacks:
            func(self)


class Spider:
    results = []

    def __init__(self, url):
        self.url = url
        r = urlparse(url)
        self.hostname = r.hostname
        self.port = r.port if r.port else 80
        self.path = r.path if r.path else '/'
        self.query = r.query
        self.response = b''
        self.sock = socket.socket()

    def parse(self):
        response = self.response.decode('utf8', errors='ignore')
        search = re.search(r'<title>(.*)</title>', response)
        title = search.group(1) if search else ''
        self.results.append((self.url, title))

    def fetch(self):
        global urls
        global stopped
        global selector
        buffersize = 4096
        self.sock.setblocking(False)
        try:
            self.sock.connect((self.hostname, self.port))
        except BlockingIOError:
            pass
        future = Future()

        def on_connected():
            future.set_result()

        selector.register(self.sock.fileno(), EVENT_WRITE, on_connected)
        yield future

        selector.unregister(self.sock.fileno())
        get = (f'GET {self.path}?{self.query} HTTP/1.1\r\n'
               f'Connection: close\r\nHost: {self.hostname}\r\n\r\n')
        self.sock.send(get.encode())

        while True:
            future = Future()

            def on_readable():
                future.set_result(self.sock.recv(buffersize))

            selector.register(self.sock.fileno(), EVENT_READ, on_readable)
            chunk = yield future
            selector.unregister(self.sock.fileno())
            if chunk:
                self.response += chunk
            else:
                urls.remove(self.url)
                stopped = True if not urls else False
                self.parse()
                break


class Task:

    def __init__(self, corotine):
        self.corotine = corotine
        f = Future()
        self.step(f)

    def step(self, future):
        try:
            next_future = self.corotine.send(future.result)
        except StopIteration:
            return
        next_future.add_done_callback(self.step)


def event_loop():
    global stopped
    global selector
    while not stopped:
        events = selector.select()
        for key, mask in events:
            callback = key.data
            callback()


if __name__ == '__main__':
    for target in urls:
        spider = Spider(target)
        Task(spider.fetch())
    event_loop()
    print(Spider.results)
    

七. 生成器语法简化

yield_from的出现简化了生成器的语法,主要有两个功能:

  • 让嵌套生成器不必通过循环迭代yield,而是直接yield from。
  • 在子生成器和原生成器的调用者之间打开双向通道,两者可以直接通信。

然后将和业务无关的代码抽象出来,就形成了yield from版本的并发。

  • yield_from.py
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
from urllib.parse import urlparse
import re
import socket


selector = DefaultSelector()
stopped = False
urls = [f'http://www.baidu.com/s?wd={i}' for i in range(10)]
buffersize = 4096


class Future:

    def __init__(self):
        self.result = None
        self.callbacks = []

    def add_done_callback(self, func):
        self.callbacks.append(func)

    def set_result(self, result=None):
        self.result = result
        for func in self.callbacks:
            func(self)

    def __iter__(self):
        yield self
        return self.result


def connect(sock, address):
    global selector
    future = Future()
    sock.setblocking(False)
    try:
        sock.connect(address)
    except BlockingIOError:
        pass

    def on_connected():
        future.set_result()

    selector.register(sock.fileno(), EVENT_WRITE, on_connected)
    yield from future
    selector.unregister(sock.fileno())


def read(sock):
    future = Future()

    def on_readable():
        future.set_result(sock.recv(buffersize))

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    chunk = yield from future
    selector.unregister(sock.fileno())
    return chunk


def read_all(sock):
    response = b''
    chunk = yield from read(sock)
    while chunk:
        response += chunk
        chunk = yield from read(sock)
    return response


class Spider:
    results = []

    def __init__(self, url):
        self.url = url
        r = urlparse(url)
        self.hostname = r.hostname
        self.port = r.port if r.port else 80
        self.path = r.path if r.path else '/'
        self.query = r.query
        self.response = b''
        self.sock = socket.socket()

    def parse(self):
        response = self.response.decode('utf8', errors='ignore')
        search = re.search(r'<title>(.*)</title>', response)
        title = search.group(1) if search else ''
        self.results.append((self.url, title))

    def fetch(self):
        global urls
        global stopped
        yield from connect(self.sock, (self.hostname, self.port))
        get = (f'GET {self.path}?{self.query} HTTP/1.1\r\n'
               f'Connection: close\r\nHost: {self.hostname}\r\n\r\n')
        self.sock.send(get.encode())
        self.response = yield from read_all(self.sock)
        urls.remove(self.url)
        stopped = True if not urls else False
        self.parse()


class Task:

    def __init__(self, corotine):
        self.corotine = corotine
        future = Future()
        self.step(future)

    def step(self, future):
        try:
            next_future = self.corotine.send(future.result)
        except StopIteration:
            return
        next_future.add_done_callback(self.step)


def event_loop():
    global stopped
    global selector
    while not stopped:
        events = selector.select()
        for key, mask in events:
            callback = key.data
            callback()


if __name__ == '__main__':
    for target in urls:
        spider = Spider(target)
        Task(spider.fetch())
    event_loop()
    print(Spider.results)
    

八. 异步I/O框架asyncio

  • asyncio是Python 3.4 引入的异步I/O框架,提供了基于协程做异步I/O编写单线程并发代码的基础设施。其核心组件有事件循环(Event Loop)、协程(Coroutine)、任务(Task)、未来对象(Future)以及其他一些扩充和辅助性质的模块。
  • 装饰器@asyncio.coroutine用于装饰使用了yield from的函数,以标记其为协程。
  • async/await语法是对yield from的优化,称之为原生协程。async/await 和 yield from这两种风格的协程底层复用共同的实现,而且相互兼容。

  • async_await.py
import asyncio
import aiohttp
import re


urls = [f'http://www.baidu.com/s?wd={i}' for i in range(10)]
loop = asyncio.get_event_loop()
results = []


def parse(url, response):
    response = response.decode('utf8', errors='ignore')
    search = re.search(r'<title>(.*)</title>', response)
    title = search.group(1) if search else ''
    results.append((url, title))


async def fetch(url):
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get(url) as response:
            response = await response.read()
            parse(url, response)


if __name__ == '__main__':
    tasks = [fetch(target) for target in urls]
    loop.run_until_complete(asyncio.gather(*tasks))
    print(results)
    

九. 同步的简单socket server

  • socket_server_sync.py
import socket


class EchoServer:

    def __init__(self, host, port):
        self.sock = socket.socket()
        self.host = host
        self.port = port
        self.buffersize = 4096

    def run(self):
        self.sock.bind((self.host, self.port))
        self.sock.listen(1)
        print(f'Running on http://{self.host}:{self.port}/')
        while True:
            conn, address = self.sock.accept()
            print('accepted', conn, 'from', address)
            while True:
                chunk = conn.recv(self.buffersize)
                if not chunk:
                    break
                print('echoing', repr(chunk), 'to', conn)
                conn.sendall(chunk)


echo_server = EchoServer('localhost', 8888)
echo_server.run()

十. 异步的简单socket server

  • socket_server_async.py
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
import socket


class EventLoop:

    def __init__(self, selector=None):
        self.selector = selector if selector else DefaultSelector()

    def run_forever(self):
        while True:
            events = self.selector.select()
            for key, mask in events:
                if mask == EVENT_READ:
                    callback = key.data  # means on_read or on_accept
                    callback(key.fileobj)
                else:
                    callback, msg = key.data  # means on_write
                    callback(key.fileobj, msg)


class EchoServer:

    def __init__(self, host, port, loop):
        self.host = host
        self.port = port
        self.loop = loop
        self.sock = socket.socket()
        self.buffersize = 4096

    def on_accept(self, sock):
        conn, address = sock.accept()
        print('accepted', conn, 'from', address)
        conn.setblocking(False)
        self.loop.selector.register(conn, EVENT_READ, self.on_read)

    def on_read(self, conn):
        msg = conn.recv(self.buffersize)
        if msg:
            print('echoing', repr(msg), 'to', conn)
            self.loop.selector.modify(conn, EVENT_WRITE, (self.on_write, msg))
        else:
            print('closing', conn)
            self.loop.selector.unregister(conn)
            conn.close()

    def on_write(self, conn, msg):
        conn.sendall(msg)
        self.loop.selector.modify(conn, EVENT_READ, self.on_read)

    def run(self):
        self.sock.bind((self.host, self.port))
        self.sock.setblocking(False)
        self.sock.listen(128)
        print(f'Running on http://{self.host}:{self.port}/')
        self.loop.selector.register(self.sock, EVENT_READ, self.on_accept)
        self.loop.run_forever()


event_loop = EventLoop()
echo_server = EchoServer('localhost', 8888, event_loop)
echo_server.run()

十一. socket 客户端

  • socket_client.py
import socket


def main(host, port):
    client = socket.socket()
    client.connect((host, port))

    while True:
        data = input('>')
        if not data:
            break
        client.send(data.encode())
        chunk = client.recv(8192)
        if not chunk:
            break
        print(chunk)


if __name__ == '__main__':
    main('127.0.0.1', 8888)
    
🔗

文章推荐