Python. Итераторы, гинераторы и асинхронность
Asyncio
Теперь у вас достаточно знаний, чтобы без труда освоить основную встроенную библиотеку для асинхронного программирования — asyncio.
С версии Python 3.5 в язык добавили специальный синтаксис — async/await. Он позволяет использовать «нативные» корутины, которые теперь считаются частью языка. Они разделяют генераторы от асинхронного кода, что, например, позволяет создавать асинхронные генераторы (PEP 525).
В рамках этого курса осознанно пропускается часть про эволюцию библиотеки asyncio. О ней можно почитать в двух статьях: «Асинхронное программирование в Python» и How the heck does async/await work in Python 3.5.
Так как на практике уже везде используются «нативные» корутины, то сильно погружаться в прошлое необязательно. Однако на собеседованиях вам могут задавать вопросы про внутреннее устройство корутин и отличие «нативной» версии от основанной на генераторах версии. Запомните: у «нативных» корутин нет магических методов __iter__ и __next__ со всеми вытекающими из этого последствиями. Подробнее можно прочитать в PEP 492.
Посмотрим, как выглядит простая программа с использованием async/await.
import random
import asyncio


async def func():
    r = random.random()
    await asyncio.sleep(r)
    return r


async def value():
    result = await func()
    print(result)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(value())
    loop.close()
В целом изменений немного. Переменная loop — это не что иное, как планировщик задач. Он работает по схожим принципам с тем, что рассматривался ранее. Теперь все функции переключаются с помощью await.
Познакомимся с основными функциями asyncio, которые часто встречаются на практике:
  • gather — выполняет список корутин одновременно и дожидается результата выполнения всех корутин.
  • sleep — заставляет корутину уснуть на определённое количество секунд.
  • wait/wait_for — удобные функции, чтобы дождаться выполнения уже запущенной корутины.

Также стоит ознакомиться с основными функциями для работы с event_loop:
  • get_event_loop — получить новый объект event_loop или тот, что уже существует. При этом одновременно может существовать только один объект event_loop в рамках одного потока.
  • run_until_complete/run — удобные функции для запуска и проверки асинхронных функций.
  • shutdown_asyncgens — одна из самых недооценённых функций цикла событий, которая позволяет правильно завершить выполнение цикла событий и всех корутин.
  • call_soon — позволяет запланировать выполнение корутины, но не ждать её выполнения. Таким образом можно вечно ставить на выполнение одну и ту же функцию.

Теперь стоит поговорить про ключевые различия между asyncio и предложенной реализацией цикла событий. Asyncio работает на функциях обратного вызова или колбэках (callback). Этот механизм запускает задачи «честнее», чем текущий планировщик. Каждая корутина по-честному ставится в очередь и исполняется. В простом планировщике переключения не произойдёт, пока вся цепочка корутин не выполнится, что блокирует выполнение остальных задач. Однако, у колбэков есть и свой недостаток — callback hell. Это состояние, когда после вызова каждой функции нужно вызвать ещё одну функцию и ещё одну функцию… Получаются интересные фрагменты кода:
func1.add_callback(
    func2.add_callback(
        func3.add_callback(func4)
    )
)
К счастью, этого удаётся избежать через синтаксис async/await.
await func4()
await func3()
await func2()
await func1()
Такое поведение возможно благодаря введению класса Future. По сути, такие объекты спасают от колбэков и делают код более линейным. В современных версиях Python Future-объекты для нативных корутин не нужны.
Другая особенность скрывается в работе с селектором. Очень часто разработчики используют timeout = 0. Так делают, чтобы переключить работу цикла событий на другие задачи и не тормозить выполнения кода. Для селектора такой таймаут работает так же, как быстрое переключение, чтобы не блокировать работу цикла событий. В этом плане asyncio и реализация в примере работают схожим образом.
Осталось познакомиться с реализацией echo-сервера на asyncio:
import logging
import sys
import asyncio
from asyncio.streams import StreamReader, StreamWriter


logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(stream=sys.stdout))


async def client_connected(reader: StreamReader, writer: StreamWriter):
    address = writer.get_extra_info('peername')
    logger.info('Start serving %s', address)

    while True:
        data = await reader.read(1024)
        if not data:
            break

        writer.write(data)
        await writer.drain()

    logger.info('Stop serving %s', address)
    writer.close()


async def main(host: str, port: int):
    srv = await asyncio.start_server(
        client_connected, host, port)

    async with srv:
        await srv.serve_forever()
        

if __name__ == '__main__':
    asyncio.run(main('127.0.0.1', 8000))
Реализация очень похожа на ту, которая предлагалась на сокетах. Только вместо сокетов asyncio предоставляет удобный интерфейс для работы с данными — стримы. Они позволяют работать с данными без использования колбэков и низкоуровневой работы с сокетами.
Для работы с новыми подключениями у asyncio есть функция start_server, которая отдаёт объект Server. С помощью этого объекта можно запустить вечную обработку новых и существующих соединений через функцию serve_forever. Для каждого нового соединения вызывается функция client_connected, которой передаются два стрима на вход: StreamReader и StreamWriter. Через StreamReader нужно получать данные от клиента. Функции очень похожи на те, которые используются в сокетах (recv -> read). Для передачи данных обратно клиенту используется StreamWriter, который работает чуть хитрее, чем сокет. Для записи данных в стрим используется функция write, которая записывает данные в специальный буфер. Чтобы отправить данные клиенту из буфера, необходимо вызвать асинхронную функцию drain. После обработки соединения стрим на отдачу данных клиентов необходимо закрыть через функцию close.
Ссылки