Python. Итераторы, гинераторы и асинхронность
Асинхронность
Теперь, когда вы определили тип решаемых задач (CPU bound, I/O bound, Memory bound), самое время подробнее разобраться, что такое асинхронное программирование, зачем оно нужно и как оно реализуется на практике.
Асинхронное программирование —
это подход, при котором выполнение задач не блокирует основной поток исполнения программы.

Вместо того чтобы ждать завершения одной операции (например, сетевого запроса или чтения файла), программа может переключаться на выполнение других задач, эффективно используя время простоя. Такой подход особенно полезен для приложений, в которых значительная часть времени уходит на ожидание внешних ресурсов — например, при работе с сетью, базами данных или файловой системой.
До недавнего времени большинство Python-приложений, в том числе написанных на Django, были синхронными. Это означало, что все инструкции выполнялись последовательно: пока одна операция не завершится, следующая не начнётся. Такой подход прост для понимания и отладки, но плохо масштабируется при большом количестве одновременных запросов, особенно если каждый из них требует длительного ожидания ответа от внешних сервисов.
В синхронной модели обычно используется один поток и один процесс, и если одна задача «зависает» (например, долго ждёт ответа от сервера), все остальные задачи вынуждены ждать её завершения. Это приводит к низкой производительности и неэффективному использованию ресурсов, особенно в высоконагруженных веб-приложениях.
С ростом требований к скорости и масштабируемости приложений появилась необходимость в новых парадигмах, которые позволяют обрабатывать множество задач одновременно, не блокируя основной поток исполнения. Асинхронное программирование как раз и решает эту задачу, позволяя запускать и координировать множество операций параллельно (или, точнее, конкурентно) в рамках одного процесса.
Давайте разберёмся, на каких фундаментальных принципах строится асинхронное программирование в Python, какие механизмы и инструменты для этого существуют, и как они позволяют писать эффективный и масштабируемый код.
Конкурентность
Асинхронное программирование тесно связано с понятием конкурентности (concurrency) — способностью программы выполнять несколько задач одновременно, эффективно используя доступные ресурсы. Однако важно понимать, что в большинстве случаев речь идёт не о настоящей параллельности (parallelism), когда задачи действительно выполняются одновременно на разных ядрах процессора, а о псевдопараллельной многозадачности. Почему «псевдо»? Потому что количество одновременно выполняемых задач может значительно превышать количество физических вычислительных ресурсов, например, ядер процессора или потоков операционной системы.
Виды многозадачности
В программировании выделяют два основных подхода к организации многозадачности:
Вытесняющая многозадачность
(preemptive multitasking)
В этом подходе операционная система самостоятельно управляет переключением между задачами (процессами или потоками), основываясь на их приоритетах, времени выполнения и других факторах. ОС может прервать выполнение одной задачи в любой момент и передать управление другой, чтобы обеспечить справедливое распределение ресурсов.

Преимущества:
  • Даже если одна задача «подвисла» или заняла процессор на долгое время, ОС всё равно сможет переключиться на другие задачи, что предотвращает «зависание» всей системы.
  • Упрощает написание кода, не требующего явного управления переключением задач.

Недостатки:
  • Необходимость защищать общие данные между задачами (например, с помощью мьютексов или других примитивов синхронизации), так как переключение может произойти в любой момент, даже посреди выполнения инструкции.
  • Возможны гонки данных (race conditions) и другие проблемы, связанные с параллельным доступом к памяти.
Кооперативная многозадачность
(cooperative multitasking)
Здесь переключение между задачами происходит только в те моменты, когда сама задача явно сообщает, что готова уступить управление другим. Это может быть вызов специальной функции или ключевого слова (например, await в Python).

Преимущества:
  • Проще контролировать, когда именно происходит переключение задач, что снижает вероятность неожиданных ошибок и проблем с синхронизацией данных.
  • Часто не требуется сложная защита общих данных, так как задачи не прерываются «внезапно».

Недостатки:
  • Если одна из задач «забыла» или не захотела уступить управление (например, попала в бесконечный цикл или выполняет долгую синхронную операцию), все остальные задачи будут заблокированы и не смогут выполняться.
  • Требует дисциплины от разработчика: необходимо явно указывать точки переключения.
Как это связано с Python
Весь асинхронный код в Python (например, с использованием asyncio, async/await, библиотек типа aiohttp и др.) построен на принципах кооперативной многозадачности. Это означает, что управление задачами (coroutines, future, tasks) осуществляется не операционной системой, а специальным планировщиком (event loop), который реализован внутри Python. Такой подход позволяет эффективно обрабатывать множество одновременных операций ввода-вывода (I/O bound), не создавая отдельный поток или процесс для каждой задачи.

Важно:
  • Если внутри асинхронной функции вы выполняете долгую синхронную операцию (например, чтение большого файла или сложные вычисления), вы блокируете не только свою задачу, но и весь event loop, что приводит к «зависанию» всех остальных задач.
  • Для эффективной работы асинхронного кода необходимо, чтобы все потенциально блокирующие операции были тоже асинхронными (например, использовать await asyncio.sleep(), await aiohttp.get(), а не их синхронные аналоги).

Таким образом, конкурентность в асинхронном программировании на Python достигается за счёт кооперативной многозадачности, где задачи добровольно уступают управление друг другу, позволяя эффективно использовать ресурсы и обрабатывать множество операций одновременно в рамках одного процесса.
Событийно-ориентированное программирование
В теме про асинхронность невозможно обойти стороной такой важный вид программирования как событийное. Его основная идея — реакция на события. Событием может быть ввод с клавиатуры, получение сетевого соединения от другой программы или переключение между потоками.
Обычно события обрабатываются специальным обработчиком, который часто называется циклом обработки событий. Ранее об обработке событий говорилось в теме про Nginx. Теперь попробуем разобраться, как перейти от синхронного программирования к событийно-ориентированному.
Изобразим синхронное выполнение задач в виде схемы на примере Flask:
В один момент времени Flask может обрабатывать только одно соединение. При этом фактически идёт работа с сокетами. От них никуда не убежать, потому что именно с ними сопряжена работа с сетью во всех программах.
Посмотрим на программу, которая работает по HTTP и отдаёт информацию обо всём HTTP-запросе от клиента с заголовками и телом запроса.
import socket

HOST, PORT = '', 8888


def handle_request(request: bytes) -> bytes:
    request_data = request.decode()
    http_response = f"""HTTP/1.1 200 OK\nContent-Type: text/html\n\n{request_data}"""
    return http_response.encode()


def serve_forever():
    # Устанавливаем TCP-соединение
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listen_socket:
        listen_socket.bind((HOST, PORT))
        listen_socket.listen()
        print('Serving HTTP on port {port} ...'.format(port=PORT))

        while True:
            client_connection, client_address = listen_socket.accept()
            with client_connection:
                request = client_connection.recv(1024)  # Получаем информацию от клиента
                http_response = handle_request(request)
                client_connection.sendall(http_response)


if __name__ == '__main__':
    serve_forever()
По сути, этот код — простейший HTTP-сервер. Почти как Flask, только проще :)
Разберёмся подробнее с кодом. Первой идёт секция:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listen_socket:
    listen_socket.bind((HOST, PORT))
    listen_socket.listen()
Здесь создаётся сокет — программный интерфейс для обеспечения обмена данными между процессами. В примере обмен данными происходит между процессом-клиентом и процессом-сервером. Роль канала передачи данных выполняет сеть. Сокеты упрощают разработчикам работу по передаче сообщений: они инкапсулируют внутри себя всю логику работы с сетью и передачей пакетов данных.
Перейдём к объявлению сокета. Он работает поверх TCP-протокола с настройкой socket.SOCK_STREAM и совершает обмен данными, используя адресацию IPv4 и настройку socket.AF_INET. Для приёма сообщений используется порт 8888 (PORT). Его будет использовать клиент, чтобы сказать ОС, куда именно посылать запросы из сети.
Переменная HOST указывает, к какому IP-адресу должен обратиться клиент, чтобы отправить данные в этот сокет. Внутри системы каждый IP-адрес принадлежит какому-нибудь сетевому интерфейсу. Это может быть физическое сетевое устройство (например, сетевая карта или маршрутизатор) или виртуальное устройство (например, как у Docker или у VirtualBox), которое позволяет обмениваться данными по сети между процессами. К сетевым интерфейсам могут относить:
Физические интерфейсы
Порты в компьютерах, Wi-Fi-адаптеры и прочие устройства. Обычно на Linux-системах они называются eno1, enp0s3 для ethernet-соединения и wlan0 или wlp5s0 для Wi-Fi.
Виртуальные интерфейсы
Существуют только на уровне операционной системы и не привязаны к физическому адаптеру. Первичный виртуальный интерфейс — петлевой (loopback interface). Он используется для передачи данных по сети между процессами внутри одного компьютера.

Стандартно использует подсеть 127.0.0.0/8 (адреса от 127.0.0.0 до 127.255.255.255). Такие соединения доступны только внутри одного физического компьютера и не видны внешним клиентам. В этот диапазон входит и известный вам localhost (127.0.0.1) — адрес обратной связи с компьютером.
Для сокета используется настройка HOST = '' или 0.0.0.0, которая означает, что сокет может принимать данные с любого IP-адреса, а, следовательно, и интерфейса.
После всех настроек сокет нужно перевести в режим прослушки — listen_socket.listen(). Тогда у него появится возможность принимать запросы от других процессов. Такой сокет ещё называют серверным.
Далее идёт инфраструктурный код работы с данными, которые будут прилетать в сокет:
while True:
    client_connection, client_address = listen_socket.accept()
    with client_connection:
      request = client_connection.recv(1024)  # Получаем информацию от клиента
      http_response = handle_request(client_connection)
      client_connection.sendall(http_response)
Тут всё проще. Код дойдёт до строчки с кодом listen_socket.accept(), и будет ждать, пока кто-то не присоединится к серверу. После соединения система отдаст объект клиентского сокета client_connection, по которому можно будет отправить данные клиенту.
Далее получаем от клиента данные пачками по 1024 байта и передаём сообщение в обработку. После обработки возвращаются байты, которые можно отправить обратно клиенту через команду client_connection.sendall(http_response). Всё. Линейный код и никакой магии :)
Неблокирующий ввод/вывод
Код из примера имеет один недостаток — он обрабатывает только один запрос от одного клиента в любой момент времени. Это очень маленькая пропускная способность, особенно если нужно ходить в БД или другой сервис. Поэтому нужен способ обрабатывать несколько соединений одновременно.
К счастью, помимо процессов и потоков есть другие способы обрабатывать сообщения. Познакомимся с первым базовым способом асинхронной обработки сообщений в рамках сокетов.
В сокетах есть возможность сделать работу с функциями сокета неблокирующей. Это значит, что при вызове accept(), recv(), send() и им подобных, код не будет находиться в режиме ожидания выполнения, а будет выполняться дальше. Такой подход позволит обрабатывать больше сообщений, но для получения ответов от функций сокета потребуется дополнительный код, который будет перехватывать события завершения обработки.
Чтобы сделать сокет неблокирующим, достаточно добавить строчку кода listen_socket.setblocking(False):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listen_socket:
    listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
    listen_socket.bind((HOST, PORT))
    listen_socket.listen()
    listen_socket.setblocking(False)
Сокет использует для своей работы файловые дескрипторы, но в некоторых случаях ему может не хватить объёма портов — максимум 65 535. Поэтому необходимо включить опцию переиспользования портов listen_socket.setsockopt(socket.SOL_SOCKET, socket. SO_REUSEADDR, True). Она позволит подключать больше клиентов, чем портов, так как файловые дескрипторы не имеют ограничения по количеству.
Под капотом сокеты работают с событиями операционной системы, поэтому нужно это использовать для кода на Python. Можно создать очень много сокетов и опрашивать их по кругу каждый раз. Но при множестве соединений это станет неэффективным вариантом. Поэтому разработчики придумали способ сократить нагрузку на ОС — поллинг. По сути это метод отслеживания новых событий ОС и реакции на них.
Если дать более общее определение, то поллинг — это способ обработки событий в событийно-ориентированных системах. Такой подход позволяет существенно снизить затраты на обработку каждого нового события и снизить задержку между получением события и его обработкой.
В основных ОС есть встроенная функция select(), которая позволяет выполнять поллинг событий ОС. Сигнатура вызова выглядит так:
read_sockets, write_sockets, error_sockets = select.select(read_file_descr, write_file_descr, exception_file_descr, timeout)
Рассмотрим подробнее входные параметры и ответ функции select():
read_file_descr
Все файловые дескрипторы (сокеты), по которым нужно завершить чтение.
write_file_descr
Все файловые дескрипторы (сокеты), по которым нужно завершить запись.
exception_file_descr
Все файловые дескрипторы (сокеты), по которым появились ошибки.
read_sockets
Все сокеты, по которым завершилось чтение, из read_file_descr.
write_sockets
Все сокеты, по которым завершилась запись, из write_file_descr.
error_sockets
Все сокеты, по которым появились ошибки, из exception_file_descr.
timeout
Установка времени ожидания событий. Если оно не установлено, то система ждёт первого события. Чтобы не блокировать ОС, лучше ставить таймаут.
Объединим полученные знания для написания простого неблокирующего echo-сервера на сокетах:
import logging
import selectors
import socket
import sys

HOST, PORT = '', 8000  # Порт сервера

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(stream=sys.stdout))


def new_connection(selector: selectors.BaseSelector, sock: socket.socket):
    new_conn, address = sock.accept()
    logger.info('accepted new_conn from %s', address)
    new_conn.setblocking(False)

    selector.register(new_conn, selectors.EVENT_READ, read_callback)


def read_callback(selector: selectors.BaseSelector, sock: socket.socket):
    data = sock.recv(1024)
    if data:
        sock.send(data)
    else:
        logger.info('closing connection %s', sock)
        selector.unregister(sock)
        sock.close()


def run_iteration(selector: selectors.BaseSelector):
    events = selector.select()
    for key, mask in events:
        callback = key.data
        callback(selector, key.fileobj)


def serve_forever():
    """
    Метод запускает сервер на постоянное прослушивание новых сообщений
    """
    with selectors.SelectSelector() as selector:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
            server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
            server_socket.bind((HOST, PORT))
            server_socket.listen()
            server_socket.setblocking(False)
            logger.info('Server started on port %s', PORT)

            selector.register(server_socket, selectors.EVENT_READ, new_connection)

            while True:
                run_iteration(selector)


if __name__ == '__main__':
    serve_forever()
Разберём код по частям. Он начинается с функции serve_forever(). В ней создаётся серверный сокет, который объявляется неблокирующим. Далее идёт интересная строка кода:
selector.register(server_socket, selectors.EVENT_READ, new_connection)
Читается это выражение так: «Зарегистрировать каждое событие получения новых данных по серверному сокету и вызвать функцию new_connection». Другими словами, то каждый раз, когда к серверу отправляют новую пачку данных и её получает ОС, она отправляет событие в программу, а та вызывает функцию new_connection.
Часть программы, которая умеет правильно обрабатывать события по сокетам от ОС, называется селектором. Селектор — это обёртка над select(), которая инкапсулирует логику работы с событиями ОС. Для разных ОС выбирается свой тип селектора. Подробнее об этих типах поговорим чуть позже.
У селектора существует два основных события для обработки:
EVENT_READ (1)
Событие, когда сокет готов принимать данные.
EVENT_WRITE (2)
Событие, когда данные были отправлены из сокета или когда сокет снова готов для записи данных.
По факту в каждой ОС набор событий шире, но они не требуются для работы echo-сервера и для обработки в асинхронных программах. Подробнее про разные виды событий можно почитать в документации.
К сожалению, про селекторы в официальной документации Python написано достаточно формально. Авторы недостаточно подробно объясняют, как работать с этим модулем обычному программисту. Но мы постараемся закрыть этот небольшой пробел в знаниях.
Если код работает исключительно с сокетами, как, например, в echo-сервере, то одной из итераций работы программы будет получение событий по зарегистрированным сокетам от ОС. Простая реализация итерации обработки находится в методе run_iteration():
def run_iteration(selector: selectors.BaseSelector):
    events = selector.select()
    for key, mask in events:
        callback = key.data
        callback(selector, key.fileobj)
Этот код забирает все события от ОС и достаёт метод для обработки. Переменная events содержит кортеж из двух элементов:
Ключ SelectorKey, который содержит информацию:
  • о зарегистрированном сокете fileobj;
  • закреплённым за сокетом файловым дескриптором fd;
  • событиях, которые ожидаются для сокета events;
  • данных, которые передали при регистрации data.
В текущем примере данные — это функция, которую нужно вызвать при наступлении события.

Маска mask, которая может принимать одно из четырёх основных значений:
  • 0 — не произошло ни одного из событий;
  • 1 — произошло событие EVENT_READ;
  • 2 — произошло событие EVENT_WRITE;
  • 3 — произошло оба события EVENT_READ и EVENT_WRITE.
Далее для каждого события вызывается зарегистрированная функция с двумя параметрами: текущий селектор и сокет. Селектор нужен, чтобы регистрировать клиентские сокеты для обработки событий, а сам сокет — чтобы принимать новые соединения или обрабатывать данные.
Запустим этот сервер.
python echo_server.py
Сервер запустится на 8000-ом порту. К нему можно подключиться с помощью утилиты telnet. Если её нет, то вы быстро найдёте, как её поставить на вашу ОС.
Чтобы подключиться к серверу, запускаем консоль и пишем:
telnet 127.0.0.1 8000
После чего вы увидите:
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Далее можно ввести любой текст, а сервер его продублирует.
Чтобы выйти из telnet, нужно ввести комбинацию клавиш Ctrl + ] и нажать Enter, потом нажать Ctrl + C и опять нажать Enter.
У echo-сервера из примера используется два важных концепта:
  • цикл событий (event-loop) для обработки неблокирующих сокетов;
  • callback для вызова последующего кода.
Эти концепты заложены в основу почти всех асинхронных библиотек, которые будут вам встречаться в Python. Теперь пришло время познакомиться с циклами событий.