Архитектура реактивного потока
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ Publisher │───▶│ Stream │───▶│ Subscriber │
│ (Издатель) │ │ (Поток данных) │ │ (Подписчик) │
└──────────────────┘ └──────────────────┘ └──────────────────┘ Механизм обратного давления
Производитель (Publisher) ---> [Лента конвейера (Stream)] ---> Потребитель (Subscriber)
^ |
|------------------------------------|
(Сигнал: "Помедленнее!") <!-- Для Maven -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.5.10</version> <!-- Используйте актуальную версию -->
</dependency> import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ReactorBasics {
public static void main(String[] args) throws InterruptedException {
// --- Пример с Flux (поток из множества элементов) ---
// 1. Создаем Flux из нескольких элементов
Flux<String> fluxOfStrings = Flux.just("Apple", "Orange", "Banana");
// 2. Применяем операторы для преобразования данных
fluxOfStrings
.filter(fruit -> fruit.length() > 5) // ← Оставляем только фрукты с длиной имени > 5
.map(String::toUpperCase) // ← Преобразуем в верхний регистр
.subscribe( // ← Подписываемся и обрабатываем конечный результат
result -> System.out.println("Обработанный фрукт: " + result),
error -> System.err.println("Ошибка: " + error),
() -> System.out.println("Поток данных завершен!")
);
System.out.println("---");
// --- Пример с Mono (поток из 0 или 1 элемента) ---
// 1. Создаем Mono, который будет выполнен асинхронно
Mono<String> monoOfGreeting = Mono.fromSupplier(() -> {
// Имитация долгой операции
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Hello, Reactive World!";
});
// 2. Подписываемся и обрабатываем результат
monoOfGreeting.subscribe(
greeting -> System.out.println("Приветствие: " + greeting)
);
// Ждем завершения асинхронных операций, так как main поток может завершиться раньше
Thread.sleep(2000);
}
}
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
public class ReactorComposition {
public static void main(String[] args) throws InterruptedException {
// --- then (последовательное выполнение) ---
Mono<String> firstTask = Mono.just("Первая задача");
Mono<String> secondTask = Mono.just("Вторая задача");
firstTask
.then(secondTask) // ← Ждет завершения firstTask, затем запускает secondTask
.subscribe(result -> System.out.println("Результат then: " + result));
// --- merge (слияние потоков) ---
Flux<String> flux1 = Flux.just("A", "B", "C");
Flux<String> flux2 = Flux.just("D", "E", "F");
Flux.merge(flux1, flux2) // ← Сливает элементы из обоих потоков по мере их поступления
.subscribe(item -> System.out.println("Результат merge: " + item));
// --- zip (комбинирование элементов попарно) ---
Flux<String> names = Flux.just("Alice", "Bob", "Charlie");
Flux<Integer> ages = Flux.just(25, 30, 35);
Flux.zip(names, ages) // ← Комбинирует i-й элемент из первого потока с i-м элементом из второго
.subscribe(tuple -> System.out.println("Имя: " + tuple.getT1() + ", Возраст: " + tuple.getT2()));
// Ждем завершения, так как здесь могут быть асинхронные операции
Thread.sleep(1000);
}
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ReactorErrorHandling {
public static void main(String[] args) {
// --- Обработка ошибок ---
Mono<String> errorMono = Mono.error(new RuntimeException("Что-то пошло не так!"));
errorMono
.doOnError(error -> System.err.println("Перехват ошибки в doOnError: " + error.getMessage()))
.onErrorReturn("Значение по умолчанию") // ← Возвращает альтернативное значение при ошибке
.subscribe(
result -> System.out.println("Результат с onErrorReturn: " + result)
);
errorMono
.onErrorResume(error -> { // ← При ошибке выполняет другой реактивный поток
System.err.println("Перехват ошибки в onErrorResume: " + error.getMessage());
return Mono.just("Восстановленное значение");
})
.subscribe(
result -> System.out.println("Результат с onErrorResume: " + result)
);
// --- Использование finally (doFinally) ---
Flux<String> resourceFlux = Flux.just("Resource 1", "Resource 2")
.doFinally(signalType -> { // ← Выполняется при завершении потока (успех, ошибка, отмена)
System.out.println("Поток завершен с сигналом: " + signalType);
// Здесь можно освободить ресурсы, например, закрыть соединение с БД
});
resourceFlux.subscribe(item -> System.out.println("Получен ресурс: " + item));
}
}