Java. Concurrency
Основы реактивного программирования
Введение в реактивный мир
В предыдущей лекции мы с вами разобрали CompletableFuture — мощный инструмент для работы с одиночными асинхронными задачами. Но что, если у нас не просто одна задача, а непрерывный поток данных? Например, данные с датчиков, уведомления из социальной сети или курс валют в реальном времени.
Представьте себе конвейер на заводе. На один его конец подаются детали (данные), они проходят через различные станции обработки (операторы), и на другом конце получается готовый продукт (результат). Если на какой-то станции обработка замедляется, вся предыдущая часть конвейера должна замедлиться, чтобы не возникло завала.
Именно так работает реактивное программирование. Это парадигма, ориентированная на потоки данных и распространение изменений. Она позволяет создавать эффективные, отказоустойчивые и масштабируемые системы, которые элегантно работают с асинхронными потоками событий.
Что такое реактивное программирование?
Реактивное программирование — это подход к асинхронному программированию, который фокусируется на потоках данных (streams) и распространении изменений (propagation of changes). Вместо того чтобы запрашивать данные (pull-подход), вы подписываетесь на поток данных и реагируете на них по мере их поступления (push-подход).
Ключевые принципы реактивного подхода
Реактивный Манифест описывает четыре ключевых принципа, на которых строится реактивная система:
  • Отзывчивость (Responsive):
    Система должна отвечать на запросы в своевременном режиме.
  • Устойчивость к ошибкам (Resilient)
    Система должна оставаться отзывчивой в случае возникновения ошибок.
  • Эластичность (Elastic)
    Система должна оставаться отзывчивой при изменении нагрузки, масштабируясь ресурсы вверх или вниз.
  • Ориентация на передачу сообщений (Message Driven)
    Компоненты системы общаются друг с другом через асинхронные сообщения.
Базовые концепции реактивных потоков
Давайте разберем основные строительные блоки реактивных потоков.
Publisher и Subscriber
В основе реактивных потоков лежит шаблон «Наблюдатель» (Observer). У него есть два главных действующих лица:
  • Издатель (Publisher)
    Источник данных. Он издает (публикует) данные, сигналы об окончании потока или об ошибке.
  • Подписчик (Subscriber)
    Потребитель данных. Он подписывается на издателя и реагирует на приходящие данные, завершение или ошибки.
Вот как выглядит эта схема:
Архитектура реактивного потока
┌──────────────────┐    ┌──────────────────┐    ┌──────────────────┐
│   Publisher      │───▶│      Stream      │───▶│   Subscriber     │
│   (Издатель)     │    │ (Поток данных)   │    │  (Подписчик)     │
└──────────────────┘    └──────────────────┘    └──────────────────┘
Обратное давление (Backpressure)
А что если издатель производит данные намного быстрее, чем подписчик их может обработать? В традиционном push-подходе это привело бы к переполнению памяти и падению приложения.
Здесь на помощь приходит обратное давление — это механизм, при котором подписчик может сигнализировать издателю, что тот производит данные слишком быстро, и попросить его замедлиться.
Вернемся к нашей аналогии с конвейером:
Механизм обратного давления

Производитель (Publisher) ---> [Лента конвейера (Stream)] ---> Потребитель (Subscriber)
								^                                    |
								|------------------------------------|
									       (Сигнал: "Помедленнее!")
Типы потоков: Mono и Flux
В мире Java, особенно в экосистеме Spring, самым популярным инструментом для реактивного программирования является Project Reactor. Он вводит два основных типа потоков:
  • Mono<T>
    Представляет поток из 0 или 1 элемента. Идеально подходит для асинхронных задач, которые возвращают один результат, как CompletableFuture. Например, получение пользователя по ID.
  • Flux<T>
    Представляет поток из 0, 1 или множества элементов. Используется для работы с последовательностями данных. Например, получение списка всех товаров в каталоге или обработка событий из очереди
Практические примеры с Project Reactor
Давайте посмотрим, как все это работает на практике. Для работы с Project Reactor вам понадобится зависимость:
<!-- Для Maven -->
<dependency>
		<groupId>io.projectreactor</groupId>
		<artifactId>reactor-core</artifactId>
		<version>3.5.10</version> <!-- Используйте актуальную версию -->
</dependency>
Создание и обработка потоков
Создавать и обрабатывать потоки в Reactor очень просто и декларативно.
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ReactorBasics {
public static void main(String[] args) throws InterruptedException {
// --- Пример с Flux (поток из множества элементов) ---    
    // 1. Создаем Flux из нескольких элементов
    Flux<String> fluxOfStrings = Flux.just("Apple", "Orange", "Banana");

    // 2. Применяем операторы для преобразования данных
    fluxOfStrings
        .filter(fruit -> fruit.length() > 5)      // ← Оставляем только фрукты с длиной имени > 5
        .map(String::toUpperCase)                 // ← Преобразуем в верхний регистр
        .subscribe(                                // ← Подписываемся и обрабатываем конечный результат
            result -> System.out.println("Обработанный фрукт: " + result),
            error -> System.err.println("Ошибка: " + error),
            () -> System.out.println("Поток данных завершен!")
        );

    System.out.println("---");

    // --- Пример с Mono (поток из 0 или 1 элемента) ---

    // 1. Создаем Mono, который будет выполнен асинхронно
    Mono<String> monoOfGreeting = Mono.fromSupplier(() -> {
        // Имитация долгой операции
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "Hello, Reactive World!";
    });

    // 2. Подписываемся и обрабатываем результат
    monoOfGreeting.subscribe(
        greeting -> System.out.println("Приветствие: " + greeting)
    );

    // Ждем завершения асинхронных операций, так как main поток может завершиться раньше
    Thread.sleep(2000);
	}
}
Что здесь происходит?
  • Flux.just()
    Создает поток (Flux) из фиксированного набора элементов.
  • filter()
    Оператор, который пропускает только те элементы, которые удовлетворяют условию. Он возвращает новый Flux.
  • map()
    Оператор, который преобразует каждый элемент в потоке, применяя к нему функцию. Он также возвращает новый Flux.
  • subscribe()
    Это самый важный метод. Без него ничего не произойдет! Реактивные потоки «ленивы" — они начинают издавать данные только тогда, когда на них подписываются.
    • Первый аргумент subscribe — это обработчик для каждого элемента (onNext).
    • Второй — обработчик ошибок (onError).
    • Третий — обработчик завершения потока (onComplete).
  • Mono.fromSupplier()
    Создает Mono, который будет выполнен асинхронно при подписке. Результатом выполнения Supplier будет единственный элемент потока.
Важное правило: реактивный поток не начнет выполнять код до тех пор, пока на него не подпишутся с помощью метода subscribe().
Комбинирование потоков
Как и в CompletableFuture, в Reactor можно элегантно комбинировать потоки.
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;

public class ReactorComposition {
public static void main(String[] args) throws InterruptedException {
// --- then (последовательное выполнение) ---    
    Mono<String> firstTask = Mono.just("Первая задача");
    Mono<String> secondTask = Mono.just("Вторая задача");

    firstTask
        .then(secondTask) // ← Ждет завершения firstTask, затем запускает secondTask
        .subscribe(result -> System.out.println("Результат then: " + result));

    // --- merge (слияние потоков) ---

    Flux<String> flux1 = Flux.just("A", "B", "C");
    Flux<String> flux2 = Flux.just("D", "E", "F");

    Flux.merge(flux1, flux2) // ← Сливает элементы из обоих потоков по мере их поступления
        .subscribe(item -> System.out.println("Результат merge: " + item));

    // --- zip (комбинирование элементов попарно) ---

    Flux<String> names = Flux.just("Alice", "Bob", "Charlie");
    Flux<Integer> ages = Flux.just(25, 30, 35);

    Flux.zip(names, ages) // ← Комбинирует i-й элемент из первого потока с i-м элементом из второго
        .subscribe(tuple -> System.out.println("Имя: " + tuple.getT1() + ", Возраст: " + tuple.getT2()));

    // Ждем завершения, так как здесь могут быть асинхронные операции
    Thread.sleep(1000);
}
Что здесь происходит?
  • then()
    Выполняет второй Mono после успешного завершения первого. Результат первого Mono игнорируется, возвращается результат второго.
  • Flux.merge()
    Сливает несколько потоков в один. Элементы будут появляться в результирующем потоке по мере их готовности из исходных потоков (неупорядоченно).
  • Flux.zip()
    Комбинирует элементы из потоков попарно. Он ждет, пока в каждом потоке появится элемент, затем объединяет их в кортеж (Tuple) и выдает его. Результирующий поток завершится, когда завершится самый короткий из исходных потоков.
Обработка ошибок и управление ресурсами
Реактивные программы делают обработку ошибок очень элегантной и декларативной.
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ReactorErrorHandling {
public static void main(String[] args) {
// --- Обработка ошибок --- 
    Mono<String> errorMono = Mono.error(new RuntimeException("Что-то пошло не так!"));

    errorMono
        .doOnError(error -> System.err.println("Перехват ошибки в doOnError: " + error.getMessage()))
        .onErrorReturn("Значение по умолчанию") // ← Возвращает альтернативное значение при ошибке
        .subscribe(
            result -> System.out.println("Результат с onErrorReturn: " + result)
        );

    errorMono
        .onErrorResume(error -> { // ← При ошибке выполняет другой реактивный поток
            System.err.println("Перехват ошибки в onErrorResume: " + error.getMessage());
            return Mono.just("Восстановленное значение");
        })
        .subscribe(
            result -> System.out.println("Результат с onErrorResume: " + result)
        );

    // --- Использование finally (doFinally) ---

    Flux<String> resourceFlux = Flux.just("Resource 1", "Resource 2")
        .doFinally(signalType -> { // ← Выполняется при завершении потока (успех, ошибка, отмена)
            System.out.println("Поток завершен с сигналом: " + signalType);
            // Здесь можно освободить ресурсы, например, закрыть соединение с БД
        });

    resourceFlux.subscribe(item -> System.out.println("Получен ресурс: " + item));
	}
}
Что здесь происходит?
  • onErrorReturn (value)
    Если в потоке происходит ошибка, он немедленно завершается и выдает указанное значение value.
  • onErrorResume (function)
    Если в потоке происходит ошибка, вместо него выполняется другой реактивный поток, возвращаемый function. Это позволяет реализовать сложную логику восстановления.
  • doFinally (callback)
    Позволяет выполнить блок кода, когда поток завершается по любой причине (успешно, с ошибкой или из-за отмены). Идеально подходит для очистки ресурсов.
Сравнение CompletableFuture и Project Reactor
Давайте сравним подходы, которые мы изучили.
Преимущества Project Reactor:
  • Работа с потоками данных, а не только одиночными значениями.
  • Глубокая интеграция с экосистемой Spring (Spring WebFlux, Spring Data R2DBC).
  • Декларативный и богатый API для преобразования данных.
  • Встроенная поддержка обратного давления.
Преимущества CompletableFuture:
  • Простота для базовых асинхронных задач.
  • Стандартная часть Java с версии 8.
  • Меньше «магии», легче для понимания новичками.
Заключение
Поздравляю, вы сделали первый шаг в мир реактивного программирования! Мы с вами узнали, что это такое, как работают реактивные потоки и как их использовать на практике с помощью Project Reactor.
Ключевые моменты:
  • Реактивное
    программирование
    Это работа с асинхронными потоками данных по принципу «push».
  • Обратное давление
    Это ключевой механизм, который позволяет потребителю управлять скоростью производителя.
  • Project Reactor
    Предоставляет мощный API для композиции, трансформации и обработки ошибок в потоках.
  • Потоки
    Потоки «ленивы» и начинают работать только после вызова метода subscribe().
  • Mono
    Используется для 0.1 элементов, а Flux — для 0.N.
Что дальше?
Теперь вы готовы использовать реактивный подход для создания более масштабируемых и отказоустойчивых приложений.
  • Изучите Spring WebFlux
    Попробуйте создать простой реактивный REST-контроллер с помощью Spring Boot. Вы увидите, как элегантно Flux и Mono интегрируются в веб-слой.
  • Исследуйте операторы
    В Project Reactor существует огромное количество операторов (take, skip, distinct, scan и многие другие). Изучите их, чтобы понимать всю мощь этой библиотеки.
  • Практикуйтесь
    Попробуйте переписать существующий синхронный или асинхронный код на реактивный. Это лучший способ закрепить знания.
Реактивное программирование может показаться сложным на первый взгляд, но его сила в элегантности и эффективности при работе с асинхронными потоками данных. Удачи в освоении этой мощной технологии
Практическое задание
Для закрепления материала выполните практическое задание в проекте practice/practice-6.
Задача: реализуйте классы и методы так, чтобы все unit-тесты в ReactorTest.java проходили.
Требования:
  • Создавайте и обрабатывайте Flux и Mono потоки
  • Используйте операторы map, filter, flatMap
  • Комбинируйте потоки через merge, zip, concat
  • Обрабатывайте ошибки через onErrorReturn, onErrorResume
  • Правильно подписывайтесь на потоки
Инструкция:
  • Перейдите в директорию practice/practice-6
  • Установите зависимости: mvn install
  • Запустите тесты: mvn test
  • Реализуйте недостающие классы и методы, чтобы все тесты проходили
  • Не изменяйте сами тесты!
Подсказки:
  • Mono
    Для 0−1 элемента, Flux для 0-N элементов
  • Потоки «ленивы»
    Выполнение начинается только после subscribe()
  • map
    Преобразует каждый элемент, filter отфильтровывает элементы
  • flatMap
    Преобразует элемент в поток и «разворачивает» его
  • merge
    Объединяет потоки, zip комбинирует элементы попарно
  • onErrorReturn
    Возвращает значение при ошибке
  • onErrorResume
    Выполняет другой поток