Python. Итераторы, гинераторы и асинхронность
Event-loop
Итак, вы добрались до сердца асинхронных программ в Python — цикла событий. Чтобы понять, как он работает, обратимся к простой реализации, которую предложил Девид Бизли (David Beazley) в 2009 году. Она хороша тем, что не содержит сложных конструкций, которыми сейчас обросли популярные реализации цикла событий на Python. В этом уроке мы разберём, какой код и практики применяются для разработки цикла событий на основе кода Бизли, и как учитывать эти знания при разработке асинхронных приложений на Python. Код уже приведён к современной версии Python.
Начнём с архитектуры цикла событий.
Рассмотрим блоки:
Планировщик (Scheduler)
Корень всей программы.
Обрабатывает задачи в очереди задач и следит за их правильным переключением между собой.
Очередь задач (Task queue)
Здесь собираются новые задачи на исполнение.
Задача (Task)
Основной блок работы цикла событий. В задачах хранится информация о выполняемой корутине. Умеет обрабатывать цепочку вложенных корутин.
Корутина (Coroutine)
Исполняемый код, которым оперирует планировщик задач.
Системный вызов (SystemCall)
Блоки кода, которые расширяют функциональность планировщика.
Корутина для выполнения работы с I/O (I/O-tasks)
В планировщик добавляется специальная задача (Task) для обработки I/O-событий от ОС.
Селектор (Selector)
С ним вы уже знакомы. Он слушает события от ОС и передаёт работу корутинам, которые ждут обработки I/O-сообщений.
Первым стоит рассмотреть работу планировщика. Его основные функции — приём и справедливая обработка списка задач.
import logging
from typing import Generator
from queue import Queue


logger = logging.getLogger(__name__)


class Scheduler:
    def __init__(self):
        self.ready = Queue()
        self.task_map = {}

    def add_task(self, coroutine: Generator) -> int:
        new_task = Task(coroutine)
        self.task_map[new_task.tid] = new_task
        self.schedule(new_task)
        return new_task.tid

    def exit(self, task: Task):
        logger.info('Task %d terminated', task.tid)
        del self.task_map[task.tid]

    def schedule(self, task: Task):
        self.ready.put(task)

    def _run_once(self):
        task = self.ready.get()
        try:
            result = task.run()
        except StopIteration:
            self.exit(task)
            return
        self.schedule(task)

    def event_loop(self):
        while self.task_map:
            self._run_once()
Вся работа происходит в функции event_loop(), которая просто достаёт задачи одну за другой. В функции _run_once() идёт обработка итерации цикла событий, в которой поочерёдно берутся и запускаются задачи для обработки. Если задача не завершилась, то она ставится заново в очередь задач self.ready. Выполненные задачи нужно убрать из планировщика функцией exit().
Для добавления задачи используйте функцию add_task(). Она принимает корутину для выполнения и создаёт с ней задачу в планировщике. Чтобы поставить задачу напрямую в планировщик, необходимо вызвать функцию schedule().
Далее разберёмся с устройством задачи.
import types
from typing import Generator, Union

class Task:
    task_id = 0

    def __init__(self, target: Generator):
        Task.task_id += 1
        self.tid = Task.task_id  # Task ID
        self.target = target  # Target coroutine
        self.sendval = None  # Value to send
        self.stack = []  # Call stack

    # Run a task until it hits the next yield statement
    def run(self):
        while True:
            try:
                result = self.target.send(self.sendval)

                if isinstance(result, types.GeneratorType):
                    self.stack.append(self.target)
                    self.sendval = None
                    self.target = result
                else:
                    if not self.stack:
                        return
                    self.sendval = result
                    self.target = self.stack.pop()

            except StopIteration:
                if not self.stack:
                    raise
                self.sendval = None
                self.target = self.stack.pop()
Сама по себе задача — обёртка над корутиной. У каждой задачи есть свой id, который учитывается в планировщике в словаре task_map. На его заполненность смотрит планировщик при выполнении задач.
Другая особенность задач — возможность выполнения корутин методом run(). Давайте посмотрим, как они выполняются в рамках задачи. Предположим, что есть корутина, которая вызывает другую корутину, а та — третью. Например, вот такой код:
def double(x):
  yield x * x

def add(x, y):
    yield from double(x + y)

def main():
    result = yield add(1, 2)
    print(result)
    yield
Код является модификацией кода Бизли из его выступления. Теперь попробуем выполнить эту цепочку корутин в рамках Task.
>>> task = Task(main())
>>> task.run()
9
Таким же образом будут выполняться и остальные корутины в рамках планировщика.
Осталось расширить планировщик для работы с I/O-операциями. Пользуемся уже знакомым селектором.
import logging
from typing import Generator, Union
from queue import Queue
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE


logger = logging.getLogger(__name__)


class Scheduler:
    def __init__(self):
        self.ready = Queue()
        self.selector = DefaultSelector()
        self.task_map = {}

    def add_task(self, coroutine: Generator) -> int:
        new_task = Task(coroutine)
        self.task_map[new_task.tid] = new_task
        self.schedule(new_task)
        return new_task.tid

    def exit(self, task: Task):
        logger.info('Task %d terminated', task.tid)
        del self.task_map[task.tid]

    # I/O waiting
    def wait_for_read(self, task: Task, fd: int):
        try:
            key = self.selector.get_key(fd)
        except KeyError:
            self.selector.register(fd, EVENT_READ, (task, None))

        else:
            mask, (reader, writer) = key.events, key.data
            self.selector.modify(fd, mask | EVENT_READ, (task, writer))

    def wait_for_write(self, task: Task, fd: int):
        try:
            key = self.selector.get_key(fd)
        except KeyError:
            self.selector.register(fd, EVENT_WRITE, (None, task))

        else:
            mask, (reader, writer) = key.events, key.data
            self.selector.modify(fd, mask | EVENT_WRITE, (reader, task))

    def _remove_reader(self, fd: int):
        try:
            key = self.selector.get_key(fd)
        except KeyError:
            pass
        else:
            mask, (reader, writer) = key.events, key.data
            mask &= ~EVENT_READ
            if not mask:
                self.selector.unregister(fd)
            else:
                self.selector.modify(fd, mask, (None, writer))

    def _remove_writer(self, fd: int):
        try:
            key = self.selector.get_key(fd)
        except KeyError:
            pass
        else:
            mask, (reader, writer) = key.events, key.data
            mask &= ~EVENT_WRITE
            if not mask:
                self.selector.unregister(fd)
            else:
                self.selector.modify(fd, mask, (reader, None))

    def io_poll(self, timeout: Union[None, float]):
        events = self.selector.select(timeout)
        for key, mask in events:
            fileobj, (reader, writer) = key.fileobj, key.data
            if mask & EVENT_READ and reader is not None:
                self.schedule(reader)
                self._remove_reader(fileobj)
            if mask & EVENT_WRITE and writer is not None:
                self.schedule(writer)
                self._remove_writer(fileobj)

    def io_task(self) -> Generator:
        while True:
            if self.ready.empty():
                self.io_poll(None)
            else:
                self.io_poll(0)
            yield

    def schedule(self, task: Task):
        self.ready.put(task)

    def _run_once(self):
        task = self.ready.get()
        try:
            result = task.run()
        except StopIteration:
            self.exit(task)
            return
        self.schedule(task)

    def event_loop(self):
        self.add_task(self.io_task())
        while self.task_map:
            self._run_once()
Код значительно разросся, но на самом деле ничего страшного не произошло. Рассмотрим изменения в порядке вызовов. В рамках планировщика добавляем специальную бесконечную задачу io_task перед стартом цикла событий. Эта функция имеет бесконечный цикл внутри и передаёт управление планировщику после вызова выполненных событий из селектора.
Рассмотрим подробнее устройство io_task. Если очередь задач пустая, то timeout для ожидания событий из селектора ставится в режим «до тех пор, пока не будет новых событий». В остальных случаях ставим таймаут 0, чтобы получить все события от ОС сразу же. Такую особенность работы этого метода рассмотрим чуть позже.
Если из селектора пришли новые события, то обрабатываем их и убираем из обработки файловые дескрипторы. Важный момент — хранение данных о задачах в селекторе. Одна и та же задача может ожидать чтения данных и пытаться записать новые данные. Именно поэтому в поле data хранится кортеж (reader, writer).
По сути, event_loop должен предоставлять интерфейс для работы с сокетами. Таких метода всего четыре:
  • wait_for_read,
  • wait_for_write,
  • _remove_reader,
  • _remove_writer.
Эти методы позволяют работать с циклом событий, встроенным в ОС.
Работа с I/O для цикла событий — «пристройка сбоку» для обработки сетевых запросов. То есть основное назначение цикла событий в Python — обработка функций корутин, которые могут никуда не ходить по сети, а работать только с файловой системой.
Осталось разобраться с конструкцией SystemCall. Так как изначально цикл событий больше напоминает работу ОС, должен быть механизм прерываний, чтобы передать управление ОС. В асинхронном коде прерывание обеспечивается с помощью yield. После переключения контекста может вызываться системная функция для исполнения. Например, для создания новых задач можно использовать вот такой код:
class SystemCall:
    def handle(self, sched: Scheduler, task: Task):
        pass


class NewTask(SystemCall):
    def __init__(self, target: Generator):
        self.target = target

    def handle(self, sched: Scheduler, task: Task):
        tid = sched.add_task(self.target)
        task.sendval = tid
        sched.schedule(task)
В Scheduler достаточно добавить небольшой фрагмент кода:
class Scheduler:
  ...
  def _run_once(self):
      task = self.ready.get()
      try:
          result = task.run()
          if isinstance(result, SystemCall):
                result.handle(self, task)
                return
      except StopIteration:
          self.exit(task)
          return
      self.schedule(task)
А в Task добавляем небольшое условие при выполнении корутин:
class Task:
    ...
    def run(self):
        while True:
            try:
                result = self.target.send(self.sendval)
                if isinstance(result, SystemCall):
                    return result
                ...
Что всё это значит? Разберёмся на примере NewTask. Этот класс предоставляет интерфейс для создания новых задач в цикле событий. Такой интерфейс позволяет абстрагировать клиентский код. Это эмуляция защищённой среды ОС, когда последняя предоставляет безопасные методы для работы с ядром. Такие методы не дают клиентскому коду мешать другим программам в ОС. Таким же образом можно сделать KillTask или WaitTask.
Теперь посмотрим, как реализовать echo-сервер на свежеиспечённом цикле событий:
import logging
from socket import socket

logger = logging.getLogger(__name__)

def handle_client(client, addr):
    logger.info('Connection from %s', addr)
    while True:
        data = client.recv(65536)
        if not data:
            break
        client.send(data)
    logger.info('Client closed')
    client.close()


def server(port):
    print("Server starting")
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(("", port))
    sock.listen()
    try:
        while True:
            client, addr = sock.accept()
            yield NewTask(handle_client(client, addr))
    finally:
        sock.close()
        
if __name__ == '__main__':
    shed = Scheduler()
    shed.add_task(server(8000))
    shed.event_loop()
Этот код — доработанная версия от Бизли. Здесь он проводит обработку новых соединений через цикл событий, а не через чистый селектор, как это было ранее.
Осталась последняя проблема — блокирующие операции. К блокирующими операциями в этом коде относятся:
  • client.recv,
  • client.send,
  • client.close,
  • sock.accept.
Из-за них код не может асинхронно обрабатывать события и цикл событий ждёт выполнения на каждой функции. Есть два варианта решения проблемы:
  • Сделать неблокирующими сокеты через socket.setblocking(False).
  • Сделать асинхронную реализацию сокета самостоятельно.
Для лучшего понимания выберем второй путь. Про первый путь уже рассказывалось ранее, кроме того, он есть во всех реализациях циклов событий на Python. Второй путь позволит понять, как синхронный блокирующий код делают асинхронным.
Посмотрим на реализацию асинхронного сокета:
from socket import socket
from typing import Tuple
from . import SystemCall


# Wait for writing
class WriteWait(SystemCall):
    def __init__(self, f):
        self.f = f

    def handle(self, sched, task):
        fd = self.f.fileno()
        sched.wait_for_write(task, fd)


# Wait for reading
class ReadWait(SystemCall):
    def __init__(self, f):
        self.f = f

    def handle(self, sched, task):
        fd = self.f.fileno()
        sched.wait_for_read(task, fd)


class AsyncSocket:
    def __init__(self, sock: socket):
        self.sock = sock

    def accept(self) -> Tuple['AsyncSocket', str]:
        yield ReadWait(self.sock)
        client, addr = self.sock.accept()
        return AsyncSocket(client), addr

    def send(self, buffer: bytes):
        while buffer:
            yield WriteWait(self.sock)
            len = self.sock.send(buffer)
            buffer = buffer[len:]

    def recv(self, maxbytes: int) -> bytes:
        yield ReadWait(self.sock)
        return self.sock.recv(maxbytes)

    def close(self):
        yield self.sock.close()
Чтобы использовать асинхронный сокет, нужно добавить его инстанцирование в функции server, а для всех методов сокета — получать значения асинхронно, используя yield from. Здесь реализованы все четыре функции, которыми пользуется echo-сервер. Блокировка работы обычно происходит по причине, что сокет не готов обрабатывать ту или иную операцию. Чтобы сокет был доступен в любой момент, используются SystemCall функции WriteWait и ReadWait. После выполнения ожидающих функций сокет будет свободен для выполнения операции.
Отличие от setblocking(False) состоит в том, что этот код следит сам за состоянием сокета. При неблокирующем сокете необходимо следить, как передавать управление внутри кода. Этот момент учтём чуть позже, когда будем сравнивать это решение с asyncio.
С использованием асинхронного сокета предыдущий код будет выглядеть так:
import logging
import sys
from socket import socket, AF_INET, SOCK_STREAM

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(stream=sys.stdout))


def handle_client(client, addr):
    print("Connection from", addr)
    while True:
        data = yield from client.recv(65536)
        if not data:
            break
        yield from client.send(data)
    print("Client closed")
    client.close()


def server(port):
    print("Server starting")
    rawsock = socket(AF_INET, SOCK_STREAM)
    rawsock.bind(("", port))
    rawsock.listen()
    sock = AsyncSocket(rawsock)
    try:
        while True:
            client, addr = yield from sock.accept()
            yield NewTask(handle_client(client, addr))
    finally:
        sock.close()


if __name__ == '__main__':
    shed = Scheduler()
    shed.add_task(server(8000))
    shed.event_loop()