04.
Java
Сoncurrency
уроки
Введение в многопоточное программирование
На заре IT-индустрии все программы выполнялись от начала до конца, и у каждой был полный доступ ко всем ресурсам машины. Операционные системы же позволили запускать несколько программ в рамках индивидуальных процессов.
Процесс - это изолированный и независимый юнит, которому операционная система выделяет определенный набор ресурсов (память, файловые дескрипторы и так далее).

Такой подход позволил не только более эффективно расходовать ресурсы вычислительного устройства, но и повысил качество и удобство разработки.
Ведь гораздо проще написать несколько программ, каждая из которых выполняет отдельную задачу, вместо того чтобы пытаться уместить всю бизнес-логику в рамках одной единицы.

На протяжении долгого времени программы выполнялись исключительно последовательно.
Программирование в таком ключе является простым и интуитивно понятным, ведь это похоже на то, как люди работают в реальной жизни. Однако это не всегда является достаточно эффективным.

Предположим, что мы пишем программу по имитации утренней рутины человека.
Необходимо выполнить следующий набор действий:
  1. Встать с кровати.
  2. Принять душ.
  3. Выпить кофе.
  4. Отутюжить рубашку.
  5. Пойти на работу.
Перед тем как выпить кофе, необходимо его сварить. В случае последовательного выполнения, человек будет ждать и ничего не делать, пока кофе не будет готов. Но можно было в это время приготовить одежду.

По тем же причинам, по которым появились процессы, была предложена концепция потоков.
Они выполняют схожую роль: позволяют утилизировать ресурсы более эффективно.

Схематично отношение между процессами и потокам можно изобразить следующим образом:
Потоки делят общую память с процессом, к которому они относятся.
Их иногда называют легковесными процессами.
Из-за того что все потоки одного процесса имеют доступ к одним и тем же переменным и объектам, требуются дополнительные инструменты синхронизации, которые позволят гарантировать потокобезопасность.

Потокобезопасность — это признак программного обеспечения, который гарантирует корректность выходных результатов, как при последовательном выполнении, так и при конкурентном.

Писать правильно работающие программы — сложно. Писать правильно работающие многопоточные программы — еще сложнее. В последующих уроках мы с вами убедимся в этом на практике.
Потокобезопасность
Класс является потокобезопасным, если он ведет себя корректно при доступе из нескольких потоков, независимо от чередования выполнения этих потоков и без дополнительной синхронизации или другой координации со стороны вызывающего кода.
Класс без сохранения состояния (stateless)
Рассмотрим пример простого класса с одним методом, который умножает число на два:
class NumberMultiplier {
    public long multipleByTwo(int number) {
        final int result = number * 2;
        return result;
    }
}
Данный класс не имеет полей и не ссылается на поля в других классах. Вычисляемое значение находится в локальной переменной, которая хранится в стеке исполняющего потока и доступна только ему.
Один поток, работающий с NumberMultiplier не сможет повлиять на работу другого потока, работающего с тем же NumberMultiplier, поскольку потоки не имеют общего состояния.

Важный вывод из этого: классы без состояний — потокобезопасны.
Класс с состоянием
Предположим, нам нужно реализовать класс-счётчик.
Для хранения счётчика между вызовами нам понадобится поле в классе:
class Counter {
    private long count;

    public long increment() {
        return ++count;
    }
    
    public long getCount() {
        return count;
    }
}
Данный класс не будет потокобезопасным. Дело в том, что операция ++count в методе increment — это не одна неделимая операция. Подобный инкремент — это лишь краткая запись трёх операций: получение значения, добавление единицы и запись обновлённого значения. Посмотрите на пример кода ниже. Он аналогичен примеру с оператором ++count.
class Counter {
    private long count;

    public long increment() {
        int temp = count;
        temp = temp + 1;
        count = temp;
        return count;
    }
    
    public long getCount() {
        return count;
    }
}
Если 2 потока одновременно вызовут метод increment у одного экземпляра Counter, то может произойти следующее (предположим, что текущее значение счётчика - 1).
  • Поток А и поток Б одновременно вызывают метод increment.
  • Оба потока получают текущее значение - 1.
  • Оба потока увеличивают это значение на 1. У обоих получается 2.
  • Затем оба потока записывают значение 2 в счётчик.
В результате счётчик равен 2, то есть один инкремент потерялся.

Для подобных ситуаций есть термин: состояние гонки (race condition).

Чаще всего состояние гонки получается, если сперва выполняется некая проверка, а затем принимается решение о том, что делать дальше. Если между этими шагами результаты проверки устареют, то программа поведёт себя некорректно. Данный тип состояния гонки называется check-then-act.
Решение проблемы
Для того чтобы класс Counter вёл себя корректно при многопоточной работе, необходим способ предотвратить использование поля count другими потоками, пока мы находимся в процессе его изменения. Этого можно добиться разными способами.
Атомарность
В пакете java.util.concurrent.atomic содержатся классы атомарных переменных (atomic variable).
Благодаря им возможно атомарно (в одно действие) изменять состояние переменных. При замене типа счетчика с long на AtomicLong все действия по его чтению и изменению станут атомарными, а потому один поток, работающий со счётчиком, не сможет повлиять на работу другого потока.
class AtomicCounter {
    private AtomicLong count;

    public long increment() {
        return count.incrementAndGet();
    }

    public long getCount() {
        return count.get();
    }
}
Блокировка
Использование атомарных переменных не всегда может сделать класс потокобезопасным.
В качестве примера рассмотрим следующий класс:
public class Incrementer {
    private AtomicLong value;
    private AtomicLong latestDelta;

    public void increment(long delta) {
        value.addAndGet(delta);
        latestDelta.set(delta);
    }
}
В поле value он хранит число, которое можно увеличивать при помощи метода increment. А в поле latestDelta хранит последнее значение, на которое было увеличено поле value.

При работе с одним объектом этого класса из нескольких потоков может возникнуть ситуация, когда поток А начал работать чуть раньше, увеличил value, а затем немного притормозил (например, потому что запустился Garbage Collector). Поток Б начал работать чуть позже, но выполнил весь метод increment без задержек и после этого поток А продолжил свою работу.

В результате в latestDelta будет delta из потока А, хотя фактически значение value последним изменил поток Б.
Чтобы исправить эту проблему, необходимо, чтобы оба действия в методе increment выполнялись как одна неделимая операция, т.е. чтобы эти действия были атомарными.

Для этого в Java есть механизм блокировок — блок synchronized. Данный блок состоит из двух частей: кода, который должен выполняться атомарно, и объекта, который будет являться блокировкой.
synchronized (lock) {
    // код
}
В качестве блокировки может выступать любой Java-объект.

Код, расположенный внутри блока synchronized и защищённый одним и тем же объектом, в один момент времени может исполняться только одним потоком.
Иными словами, только один поток может "войти" внутрь sychronized. Остальные потоки вынуждены ждать. Однако, если блок synchronized будет защищён разными объектами, то возможно одновременное выполнение, например:
// Thread 1: doSomething(new Object());
// Thread 2: doSomething(new Object());

public void doSomething(Object lock) {
    synchronized (lock) {
       // ... 
    }
}
В таком варианте разные потоки смогут зайти в данный блок вместе.

Можно использовать один и тот же объект в качестве блокировки в нескольких разных блоках synchronized.
В таком случае эти блоки будут атомарны относительно друг друга, т.е. в один момент времени будет выполняться только один из этих блоков.

Добавим блок synchronized в метод increment:
public void increment(long delta) {
    synchronized (this) {
        count.addAndGet(delta);
        latestIncrement.set(delta);
    }
}
Ключевое слово synchronized можно также использовать в объявлении метода:
public synchronized void increment(long delta) {
    count.addAndGet(delta);
    latestIncrement.set(delta);
}
Когда synchronized стоит в объявлении метода, это означает, что весь код в этом методе «обёрнут» блоком synchronized, а его блокировкой является сам объект, чей метод вызывается (фактически, this). В нашем примере эти два варианта равнозначны.

Если synchronized указать в объявлении статического метода, тогда блокировка будет не на объект-экземпляр класса, а на сам класс.

Бывают ситуации, когда в качестве блокировки требуется использовать «внешний» объект.
В таких случаях использование synchronized в объявлении метода не подходит.

Также стоит заметить, что при использовании блока synchronized не обязательно применять классы из java.util.concurrent.atomic. Можно использовать простые примитивы, корректность кода от этого не изменится. Посмотрите на пример кода ниже:
public class Incrementer {
    private long value;
    private long latestDelta;

    public synchronized void increment(long delta) {
        value += delta;
        latestDelta = delta;
    }
}
Область видимости
Давайте поговорим немного о том, как устроена работа с памятью в железе и в Java.

Обычно у компьютера есть несколько ядер процессора, в которых выполняются вычисления.
Если ядру нужна какая-то информация из памяти, то он сначала будет искать внутри своих регистров, затем — в своих кешах, и только потом — в оперативной памяти.

Теперь представим, что в нашем Java-приложении запущены два потока, и так случилось, что один запущен на одном ядре, второй — на другом. Оба потока вычитали значение переменной из главной памяти. Первый поработал с этой переменной и перезаписал ее значение в главную память. Увидит ли это новое значение второй поток? Не всегда, он может использовать старое значение переменной, записанное в регистре или кеше второго ядра.

Как же быть? Как сделать так, чтобы один поток видел результат работы другого?
Разберемся, как эта задача решается в Java.
happens-before
happens-before — абстракция, которая определяет порядок выполнения операций.
Ее смысл, заключается в следующем: если действие A happens-before действия B, то действие B в своем потоке будет видеть изменения, сделанные действием A, даже в другом потоке.

happens-before задает порядок операций только в двух потоках, действия в других потоках могут быть неупорядоченны. В пределах одного потока все операции выполняются в логическом порядке, описанным в программе.
volatile
 volatile int sharedVar = 3;
volatile указывает на то, что значение переменной нужно читать и писать в основную память, в обход кешей и регистров ядра процессора.
Это ключевое слово имеет смысл использовать с примитивами, так как при употреблении со ссылочными типами синхронизовано будет только само значение ссылки, а не данные, на которые она указывает.

Хотя Value Objectне является примитивом, для него volatile тоже решает проблему видимости.

volatile также предоставляет happens-before гарантию: запись volatile переменной happens-before последующих чтений этой переменной.

Важно, что volatile решает проблему видимости данных, но не проблемы конкурентного доступа:
нет гарантий для атомарного изменения переменной. В частности, если для изменения переменной нужно узнать ее предыдущее значение, может создаться race condition. Потому что между чтением волатильной переменной и ее записью может произойти другая операция над этой переменной.
    private volatile int sharedVar = 3;

    public void updateSharedVar(){
        sharedVar += 3;
    }
sharedVar += 3 состоит из трех операций: чтение текущего значения переменной, инкремент и запись нового значения. Все эти три действия не собраны в одну атомарную операцию, поэтому может произойти, например, такое:
 private volatile int sharedVar = 3;
    
    thread 1: sharedVar += 3;
              // thread 1 reads 3
    thread 2: sharedVar += 3;
              // thread 2 reads 3
              // thread 1 increments, writes 6
              // thread 2 increments, writes 6
              // sharedVar = 6, not 9
Если же изменение значения состоит только из одной операции, то race condition не происходит:
    private volatile int sharedVar = 3;

    public void updateSharedVar(int value){
        sharedVar = value;
    }
В этом случае запись нового значения sharedVar не требует его предварительного чтения, поэтому гарантий volatile нам хватает.
synchronized
synchronized тоже дает гарантию happens-before: выход из synchronized блока happens-before последующего входа в synchronized блок на том же мониторе.

Причем при входе синхронизированный блок обновляет локальную память из главной, а при выходе обновляет главную память из локальной. И поэтому все блоки синхронизированные на одном и том же мониторе, видят изменения синхронизированных блоков, исполненных до них.

Перепишем пример выше с использованием synchronized:
    private int sharedVar = 3;

    public synchronized void updateShareVar(){
        sharedVar += 3;
    }
Заметим, что мы убрали volatile для переменной, так как синхронизация уже позволяет sharedVar быть видимой для соседних потоков, которые используют синхронизацию на тот же объект.
final
 private final int constant = 42;
Все final поля видимы другим потокам. Если в объекте все поля final, то его можно считать иммутабельным, и такой объект тоже могут видеть другие потоки без использования методов синхронизации.

Важно, что в Java существуют механизмы изменения final полей, например, reflecion.
Изменения, сделанные над финальными полями могут быть не видимы, так как компилятор в целях оптимизации может заменить такое поле константой и всегда возвращать ее значение, игнорируя модификацию оригинального значения.
Прерывание потоков
Иногда возникают такие ситуации, что результат выполнения задачи нас более не интересует.
В этом случае имеет смысл отменить ее, чтобы освободить ресурсы машины. Предположим, что у нас есть задача, которая генерирует последовательность из простых чисел.
class PrimeGenerator implements Runnable {
  private final List<BigInteger> primes = new CopyOnWriteArrayList<BigInteger>();
  
  @Override
  public void run() {
    BigInteger p = BigInteger.ONE;
    while (true) {
      p = p.nextProbablePrime();
      primes.add(p);
    }
  }
  
  public List<BigInteger> getPrimes() {
    return primes;
  }
}

...
    
PrimeGenerator primeGenerator = new PrimeGenerator();
Thread t = new Thread(primeGenerator);
t.start();
В данном случае поток будет генерировать простые числа бесконечно.
Но что если мы хотим прервать его после того, как было сгенерировано определенное количество данных?

Класс Thread предоставляет метод stop(), однако он почти сразу был объявлен как deprecated.
Дело в том, что stop() освобождает все мониторы (locks), которые были заняты данным потоком. Если в процессе выполнения операции объект, доступ к которому был ограничен с помощью монитора, находится в неконсистентном состоянии, есть вероятность, что другие потоки так же увидят его в «поврежденном» виде. Чтобы избегать таким ошибок, метод stop() не рекомендован к использованию.

Вместо этого в Java есть механизм прерываний. Строго говоря, поток не будет автоматически прерван. Отправится лишь запрос на это действие, обработка которого возлагается на плечи разработчика.

Давайте внедрим прерывание для PrimeGenerator.
class PrimeGenerator implements Runnable {
  private final List<BigInteger> primes = new CopyOnWriteArrayList<BigInteger>();
  
  @Override
  public void run() {
    BigInteger p = BigInteger.ONE;
    while (!Thread.currentThread().isInterrupted()) {
      p = p.nextProbablePrime();
      primes.add(p);
    }
  }
  
  public List<BigInteger> getPrimes() {
    return primes;
  }
}

...
    
PrimeGenerator primeGenerator = new PrimeGenerator();
Thread t = new Thread(primeGenerator);
t.start();
// do some logic
t.interrupt();
Теперь мы проверяем флаг прерывания и выполняем бизнес-логику до тех пор, пока он не будет выставлен в true.
Прерывание потока происходит при вызове t.interrupt()
InterruptedException
В описанном выше примере простые числа добавляются в обычный список. Что если нам требуется запрашивать результаты по одному по мере их поступления? Для этого можно использовать BlockingQueue.
class PrimeGenerator implements Runnable {
  private final BlockingQueue<BigInteger> primes;
  
  public PrimeGenerator(BlockingQueue<BigInteger> primes) {
    this.primes = primes;
  }
  
  @Override
  public void run() {
    BigInteger p = BigInteger.ONE;
    try {
      while (!Thread.currentThread().isInterrupted()) {
        p = p.nextProbablePrime();
        primes.put(p);
      }
    }
    catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
}

...
PrimeGenerator primeGenerator = new PrimeGenerator(someBlockingQueue);
Thread t = new Thread(primeGenerator);
t.start();
// do some logic
t.interrupt();
Метод BlockingQueue.put является блокирующим (например, в очереди может быть максимальное количество элементов). Если же во время ожидания put произошло прерывание потока, выбрасывается InterruptedException.
Это поведение является паттерном для многих методов в Java, которые предполагают работу в конкурентной среде. Во-первых, благодаря этому соблюдается принцип fail-fast. А во-вторых, ответственность по логике прерывания делегируется коду выше по стеку вызова.
При вызове метода, который выбрасывает InterruptedException,
есть два возможных варианта поведения.
  1. Пробросить исключение далее.
  2. Отловить его и поставить флаг прерывания в true.
В данном примере используется второй вариант.
Флаг прерывания должен быть обязательно выставлен, так как мы не знаем, как код выше по цепочке реагирует на прерывания. «Проглатывание» исключений может привести к неожиданным багам.

Стоит отметить, что конкретно в данном случае выставлять
повторно флаг прерывания необязательно.
Дело в том, что после выхода из метода run поток так же завершает свое выполнение. Соответственно, кода выше по цепочке нет и отсутствия флага прерывания не приведет к ошибкам. Но для того чтобы избежать случайных ошибок, рекомендуется всегда восстанавливать значение свойства interrupted.
Synchronized collections
Перейдем к потокобезопасным инструментам для работы с коллекциями. Начнем с синхронизированных коллекций. Удобно, что API этих коллекций остается таким же, как и у обычных.

Помимо обычных непотокобезопасных классов из java.util: HashMap, ArrayList, Hashset, есть еще коллекции, которые можно получить с помощью методов:
  • Collections.synchronizedCollection(Collection<T> c)
  • Collections.synchronizedList(List<T> l)
  • Collections.synchronizedMap(Map<K, V> m)
  • Collections.synchronizedSet(Set<T> s)
Эти методы возвращают синхронизированную wrapper над вашей коллекцией, методы которой обернуты synchronized. Вот, например, несколько методов класса SynchronizedCollection:
    SynchronizedCollection(Collection<E> c) {
            this.c = Objects.requireNonNull(c);
            mutex = this;
    }

    public boolean add(E e) {
        synchronized (mutex) {return c.add(e);}
    }
    public boolean remove(Object o) {
        synchronized (mutex) {return c.remove(o);}
    }

    public boolean containsAll(Collection<?> coll) {
        synchronized (mutex) {return c.containsAll(coll);}
    }
    public boolean addAll(Collection<? extends E> coll) {
        synchronized (mutex) {return c.addAll(coll);}
    }
Здесь синхронизация происходит на один и тот же объект mutex, поэтому при конкурентном обращении два потока не смогут одновременно выполнять разные методы одного экземпляра коллекции: один из них будет ждать пока другой закончит работать.
Итерация по коллекции
Давайте представим, что мы в одном потоке итерируемся по коллекции, а в соседнем потоке произошло некоторое изменение той же коллекции, что будет?

Ответ зависит от того, какой мы используем итератор, есть как минимум два вида:
  • fail-fast iterator — при конкурентном изменении коллекции во время итерирования, этот итератор бросает исключение ConcurrentModificationException.
  • fail-safe iterator — не бросает исключение при конкурентном изменении коллекции во время итерирования.
В синхронизированной коллекции используется первый тип итератора.
То есть нам остается только надеяться, что никакой другой поток не изменит коллекцию, пока мы с ней работаем. Но можно самим обернуть всю итерацию в блок synchronized:
    List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
    synchronizedList.addAll(List.of(1, 2, 3, 4, 5));
    
    synchronized (synchronizedList) {
        for (Integer element : synchronizedList) {
            //some logic
        }
    }
Тогда заблокируется вся коллекция до тех пор, пока не отработает итерация по ней.
Это решение может повлиять на производительность, ведь коллекции могут быть большими.
Производительность
Подход, используемый в синхронизированных коллекциях, делает их потокобезопасными, но крайне медленными, так как одновременно может взаимодействовать с коллекцией только один поток. Другие же потоки ждут своей очереди на исполнение.
Конкурентные коллекции
Конкурентные коллекции, содержащиеся в пакете java.util.concurrent, предоставляют лучшую производительность, чем синхронизированные коллекции. Давайте посмотрим, какими механизмами это достигается.
Устройство коллекций
ConcurrentHashMap
Эта коллекция не ограничивает число потоков, одновременно читающих данные. Для записи, происходит блокировка данных, но блокируется не вся коллекция целиком. Дело в том, что ConcurrentHashMap разделена на сегменты (по умолчанию их 16), и если нужно записать данные в какой-то один сегмент, то и блокируется один сегмент.

Такой подход позволяет увеличить количество
и одновременно читающих и одновременно пишущих потоков.

ConcurrentHashMap использует итератор, который не бросает исключение ConcurrentModificationException при конкурентном изменении коллекции.
Этот итератор использует состояние коллекции на момент начала итерации и даже может подхватывать новое конкурентно измененное состояние, но такое поведение не гарантируется.
CopyOnWriteArrayList, CopyOnWriteArraySet
Две данные коллекции являются иммутабельными, то есть после создания их невозможно поменять, и чтобы модифицировать коллекцию, нужно создать новый экземпляр и только потом записать новые данные.

Именно из-за иммутабельности нет необходимости синхронизовывать разные версии коллекций и не нужно производить дополнительные блокировки для чтения и итерации. Итератор этих коллекций отображает состояние на момент начала итерации и не знает ничего о новых записях.

CopyOnWriteArrayList, CopyOnWriteArraySet выгодно использовать только в случаях, когда чтений коллекции значительно больше, чем ее изменений, потому что копирование всех данных для их последующего изменения — очень дорогая операция как для памяти, так и вычислительно.
Сравнение
с Синхронизированными коллекциями
Итак, конкурентные коллекции в целом производительнее, чем синхронные, и зачастую стоит выбирать именно их.

Но синхронизированные коллекции предоставляют более строгие гарантии: ни чтение, ни итератор не может вывести "неактуальные" данные. Если вам критично использовать самое свежее состояние коллекции, можно выбрать синхронизированные, но при этом помнить об их влиянии на производительность.
Сложные коллекторы в Stream API
В одном из предыдущих занятий мы познакомились с такой терминальной операцией, как коллектор и увидели несколько базовых реализаций. В этом уроке рассмотрим более сложные варианты.
groupingBy
groupingBy — это коллектор, который работает аналогично выражению GROUP BY в SQL. Чтобы использовать его, нам нужно указать по какому свойству нужно группировать элементы. В результате элементы стрима будут сгруппированы по этому свойству и помещены в Map.

Предположим, у нас есть список объектов класса Book:
class Book {
    private String author;
    private String title;
    private String genre;
    private Integer pageCount;
    // конструктор, геттеры, сеттеры
}

List<Book> books = List.of(
        new Book("Пушкин", "Евгений Онегин", "Роман", 352),
        new Book("Лермонтов", "Герой нашего времени", "Роман", 224),
        new Book("Гоголь", "Мёртвые души", "Поэма", 560),
        new Book("Гоголь", "Шинель", "Повесть", 64));
Простая группировка
Для группировки книг по жанру:
Map<String, List<Book>> byGenre = books.stream()
        .collect(groupingBy(Book::getGenre));

// byGenre = {Поэма=['Мёртвые души'], Роман=['Евгений Онегин', 'Герой нашего времени'], Повесть=['Шинель']} 
Если нужно получить сумму из свойств сгруппированных элементов, то можно воспользоваться такими методами:
summingInt() 
summingLong() 
summingDouble() 
Например, для получения суммарного количества страниц во всех книгах в определённом жанре, можно написать такой код:
Map<String, Integer> totalLengthByGenre = books.stream()
        .collect(groupingBy(Book::getGenre,
                summingInt(Book::getPageCount)));

// totalLengthByGenre = {Поэма=560, Роман=576, Повесть=64}
Можно определить среднее количество страниц в книге в определённом жанре:
Map<String, Double> averageLength = books.stream()
        .collect(groupingBy(Book::getGenre,
                averagingInt(Book::getPageCount)));
System.out.println("averageLength: " + averageLength);

// averageLength: {Поэма=560.0, Роман=288.0, Повесть=64.0}
Можно собрать статистику о количестве страниц в книгах по жанрам:
Map<String, IntSummaryStatistics> statisticsByGenre = books.stream()
        .collect(groupingBy(Book::getGenre,
                summarizingInt(Book::getPageCount)));
/*
statisticsByGenre = {
  Поэма=IntSummaryStatistics{
    count=1, sum=560, min=560, average=560.000000, max=560}, 
  Роман=IntSummaryStatistics{
    count=2, sum=576, min=224, average=288.000000, max=352}, 
  Повесть=IntSummaryStatistics{
    count=1, sum=64, min=64, average=64.000000, max=64}
}
*/
Можно найти самую большую книгу у каждого автора:
Map<String, Optional<Book>> maxLengthByAuthor = books.stream()
        .collect(groupingBy(Book::getAuthor,
                maxBy(Comparator.comparingInt(Book::getPageCount))));

// maxLengthByAuthor = {Пушкин=Optional['Евгений Онегин'], Гоголь=Optional['Мёртвые души'], Лермонтов=Optional['Герой нашего времени']}
Обратите внимание, что в результате мы получаем Map, состоящий из Optional.
Дело в том, что maxBy и minBy рассчитаны на работу с пустыми коллекциями. Соответственно максимума (или минимума) в пустой коллекции не будет. Отсюда и получается Optional.
mapping
Сгруппированные элементы можно преобразовать, применив дополнительный коллектор mapping.
К примеру, если нужно получить все жанры, в которых работал автор:
Map<String, Set<String>> genresByAuthor = books.stream()
        .collect(groupingBy(Book::getAuthor,
                mapping(Book::getGenre, toSet())));

// genresByAuthor = {Пушкин=[Роман], Гоголь=[Поэма, Повесть], Лермонтов=[Роман]}
Первый параметр mapping — это функция преобразования элемента стрима, которая работает аналогично Stream#map.

А второй параметр — это коллектор.
filtering
Сгруппированные элементы можно отфильтровать, применив дополнительный коллектор filtering. Например, чтобы получить короткие (количество страниц < 100) книги по каждому автору:
Map<String, Set<Book>> shortBooks = books.stream()
        .collect(groupingBy(Book::getAuthor,
                filtering(b -> b.getPageCount() < 100,
                        toSet())));

// shortBooks = {Пушкин=[], Гоголь=['Шинель'], Лермонтов=[]}
Может возникнуть вопрос: почему вместо этого способа нельзя просто отфильтровать элементы в стриме через метод filter, а затем вызвать groupingBy? Дело в том, что в таком случае до этапа группировки дойдут не все элементы. Поэтому если у какого-либо автора не окажется коротких произведений, то информация по этому автору не попадёт в результирующую Map. На наших данных результат был бы таким:
// shortBooks = {Гоголь=['Шинель']}
flatMapping
Допустим, в класс Book мы добавили поле comments для хранения комментариев:
class Book {
    ...
    private List<String> comments = new ArrayList<>();
    ...
И у нескольких книг они заполнены:
onegin.setComments(List.of("Отлично!", "Хорошо"));
shinel.setComments(List.of("Нормально", "Хорошо", "Отлично!"));
Чтобы получить все комментарии по всем книгам автора, можно использовать такой код:
Map<String, List<List<String>>> collect = books.stream()
        .collect(groupingBy(Book::getAuthor,
                mapping(Book::getComments,
                        toList())));
Но в этом случае получается список списков (List<List<String>>), что не всегда удобно.
Если нам нужно получить все комментарии одним списком, то нужно использовать метод flatMapping, который по сути объединяет несколько стримов в один:
Map<String, List<String>> authorComments = books.stream()
        .collect(groupingBy(Book::getAuthor,
                flatMapping(book -> book.getComments().stream(),
                        toList())));

// authorComments = {Пушкин=[Отлично!, Хорошо], Гоголь=[Нормально, Хорошо, Отлично!], Лермонтов=[]}
Пользовательские коллекторы в Stream API
В Stream API реализовано множество коллекторов (Collectors): toList, toSet, toMap и т.д.
Однако порой требуется что-то необычное и штатных коллекторов уже не хватает.

Для таких случаев Java даёт возможность написать свой коллектор.

Предположим, что у нас есть стрим статей Stream<Article>, а в статьях есть метод getWordCount, который возвращает количество слов в этой статье. И нам нужен такой коллектор, который просуммирует количество слов во всех этих статьях. Давайте создадим его.
Устройство коллектора
Коллектор — это класс, реализующий интерфейс Collector, поэтому для создания своего коллектора мы можем либо имплементировать данный интерфейс стандартным образом, либо воспользоваться методом Collector.of, который облегчает этот процесс. Второй способ короче — воспользуемся им.

Основные методы интерфейса Collector, которые нам нужны — следующие:

  • Supplier<A> supplier() — создаёт объект, в котором будет храниться результат работы коллектора (контейнер):
() -> new MutableInteger(0)
В методе supplier необходимо создать контейнер для результата, который в ходе работы стрима будет передаваться в качестве аргумента в метод accumulator.
В методе accumulator будет обновляться значение в контейнере. Обычный Integer в качестве контейнера не подойдёт, т.к. в таком случае при выходе из метода accumulator исходное значение контейнера не изменится.

Создадим класс-контейнер:
class MutableInteger {
    private int value;
    public MutableInteger(int value) {
        this.value = value;
    }
    public void add(int value) {
        this.value += value;
    }
    public int getValue() {
        return this.value;
    }
}
  • BiConsumer<A, T> accumulator() — возвращает функцию, которая будет использоваться для обновления результата в контейнере на основе элемента стрима. В нашем случае — добавит количество слов в статье:
(accumulator, article) -> accumulator.add(article.getWordCount())
  • BinaryOperator<A> combiner() — возвращает функцию, которая будет использоваться при объединении нескольких контейнеров с результатами:
(left, right) -> {
    left.add(right.getValue()); 
    return left;
}
Остановимся отдельно на последнем пункте.
При последовательной обработке элементов стрима методы supplier и accumulator отработают нормально. Но для того чтобы поддерживать параллельную работу стрима, нам нужна функция объединения нескольких результатов. Дело в том, что в случае параллельной работы, стрим делится на части и каждая часть обрабатывается параллельно. А в конце все полученные результаты объединяются в один при помощи функции объединения, которая описывается в методе combiner.
Реализация
    Collector<Article, Integer, Integer> collector = Collector.of(
        () -> new MutableInteger(0),
        (accumulator, article) -> accumulator.add(article.getWordCount()),
        (left, right) -> {
            left.add(right.getValue());
            return left;
        }
    );
Обратите внимание: combiner вызывается только для параллельных стримов, поэтому если заранее известно, что со стримом будет вестись только последовательная работа, то данные метод можно не реализовывать.

Если в конце работы коллектора нам нужно выполнить какие-либо преобразования над контейнером с результатом, то для этого есть четвёртый метод в интерфейсе Collector finisher.

В нашем случае нам необходимо превратить MutableInteger в int.
Обновим код коллектора:
    Collector<Article, Integer, Integer> collector = Collector.of(
        () -> new MutableInteger(0),
        (accumulator, article) -> accumulator.add(article.getWordCount()),
        (left, right) -> {
            left.add(right.getValue());
            return left;
        },
        accumulator -> accumulator.getValue()
    );
Оптимизация
При создании коллектора можно указать его характеристики, которые будут использоваться для внутренней оптимизации работы коллектора. Эти характеристики можно задать в пятом методе интерфейса Collector characteristics. А в Collectors.of эта информация передаётся через varargs:
Collector.of(  
  // supplier,
  // accumulator,
  // combiner,
  // finisher, 
  Collector.Characteristics.CONCURRENT,
  Collector.Characteristics.IDENTITY_FINISH,
  // ...
);
Существует 3 характеристики:
  • CONCURRENT — Указывает, что контейнер с результатом может использоваться при параллельной работе.
  • UNORDERED — Указывает, результат работы коллектора не учитывает исходный порядок элементов.
  • IDENTITY_FINISH — Указывает, что функция finisher возвращает контейнер с результатом без каких-либо преобразований, следовательно, её можно не вызывать.
Producer-Consumer
Producer-Consumer (производитель-потребитель, издатель-подписчик) — это паттерн, который позволяет разделить задачу и ее исполнителя. Это реализуется с помощью дополнительного звена — очереди. Задачи помещаются в очередь производителями, а потребители могут их считывать и выполнять те или иные действия.

Общая схема паттерна выглядит следующим образом:
В однопоточной среде реализация этого паттерна тривиальна.
Можно использовать LinkedList. Producer добавляет элементы в начало, а consumer читает и удаляет с конца. Но как вы могли догадаться, наибольшую ценность такой подход имеет в многопоточной среде, где разные производители и потребители могут работать независимо друг от друга.

Java определяет интерфейс BlockingQueue как раз для такого случая.
Контракт гарантирует, что все имплементации можно безопасно использовать в многопоточной среде.

Вот некоторые стандартные реализации BlockingQueue:
  1. ArrayBlockingQueue — стандартная очередь ограниченного размера формата FIFO (first-in, first-out).
  2. LinkedBlockingQueue — FIFO-очередь, которая использует связный список для хранения элементов. По умолчанию максимальное количество элементов равно Integer.MAX_VALUE. При необходимости может быть ограничено.
  3. PriorityBlockingQueue — очередь с приоритетами. Когда потребитель запрашивает следующий элемент, то он получает тот, чей приоритет является максимальным. Приоритет считается с помощью Comparator.
  4. SynchronousQueue. Строго говоря, это реализация не является очередью, так как не хранит в себе никаких элементов. Вместо этого при добавлении нового элемента producer блокируется до тех пор, пока не появится consumer, который готов забрать элемент. Это можно сравнить с раздачей рекламных буклетов. Пока текущую листовку кто-то не возьмет, промоутер не достанет следующую.
Теперь давайте рассмотрим реальный пример, в котором BlockingQueue может быть полезна.
Предположим, что мы стриминговый сервис по прослушиванию музыки. Чтобы получить доступ к системе, новые пользователи должны купить подписку.
class SubscriptionService {

  private final TransactionTemplate transaction;

  public void buySubscription(User user) {
    transaction.execute(() -> {
      user.checkSubscriptionStatus();
      user.applySubscription();
    });
  }
}
Аналитики сообщили нам, что они хотят получать информацию о каждой подписке, которую приобретают пользователи. Данные должны складывать в специальную систему аудита.

Конечно, мы могли бы добавить эту функциональность непосредственно в метод buySubscription.
class SubscriptionService {

  private final TransactionTemplate transaction;
  private final AuditService auditService;

  public void buySubscription(User user) {
    transaction.execute(() -> {
      user.checkSubscriptionStatus();
      user.applySubscription();
    });

    auditService.notifyAboutBoughSubscription(user);
  }
}
Однако здесь мы нарушаем Single-Responsibility Principle(SRP) и Open-Closed Principle(OCP).
Возможно, в будущем понадобится проводить дополнительные действия при покупке подписки. В этом случае придется постоянно вносить изменения в метод buySubscription. Это не только усложнит разработку, но и тестирование.

Лучшим вариантом будет использовать промежуточное звено — очередь. Вызывающему коду необязательно знать, как конкретно будет обработан его запрос. Достаточно лишь добавить новый элемент.
class SubscriptionService {

  private final TransactionTemplate transaction;
  private final BlockingQueue<SubscribtionBoughtEvent> queue;

  public void buySubscription(User user) throws InterruptedException {
    transaction.execute(() -> {
      user.checkSubscriptionStatus();
      user.applySubscription();
    });

    queue.put(new SubscribtionBoughtEvent(user));
  }
}

class SubscriptionBoughtEventListener {

  private final BlockingQueue<SubscribtionBoughtEvent> queue;
  private final AuditService auditService;

  public void notifyAboutBoughtSubscription() throws InterruptedException {
    while (true) {
      SubscribtionBoughtEvent event = queue.take();
      auditService.notifyAboutBoughSubscription(event);
    }
  }
}
Теперь бизнес-операция по аудированию факта покупки подписки отделена от ее приобретения.
Это дает возможность добавлять нам новые события и слушателей.
Stream API
Java 8 предоставила нам возможность писать некоторые вещи в красивом и лаконичном функциональном стиле. В релизе есть такие функциональные вещи, как лямбда-функции, функциональные интерфейсы и, сегодняшняя тема, Stream-ы.

Стримы позволяют нам компактно и понятно описать обработку последовательности элементов.
Давайте посмотрим, что и как можно сделать, используя стримы.
Порядок исполнения
Стримы несколько похожи на конвейеры на заводе. Что нужно, чтобы получить конвейер готовый к работе?

Во-первых, нам нужны вещи или данные, с которыми конвейер будет работать: детали машин, числа, строки. Поэтому первым делом нам нужно настроить поток элементов.

Во-вторых, нужно что-то с этими вещами сделать: смешать, отфильтровать, преобразовать. На этом этапе нам на помощь приходят промежуточные операции, логично, что на промежуточных операциях конвейер не заканчивается. Например, мы покрасили деталь машины, нам нужно передать ее дальше, чтобы ее можно было посушить. Если говорить чуть более технично, это значит, что любые промежуточные операции принимают Stream<T> и возвращают Stream<T>.

Наконец, когда мы закончили с обработкой элементов, нам нужно их собрать, упаковать или разослать клиентам. Здесь мы используем терминальные операции: они приводят элементы стрима к финальному виду и закрывают стрим. После того как мы собрали нашу машину, уже нет потока деталей, и нельзя с ними ничего сделать, так же мы не можем обратиться к стриму после его закрытия.
Как создать стрим
Есть множество вариантов того, как можно создать стрим. Рассмотрим самые часто употребимые из них.
Из существующей коллекции
Допустим, у нас в коде уже есть коллекция элементов, которые мы хотим обработать:
    List<String> strings = List.of("a", "b", "c");
    Stream<String> stream = strings.stream();
Генератор
Если мы хотим создать поток элементов, то можно использовать генератор. Генератору на вход нужно дать так называемый Supplier, единственной задачей которого является предоставление нового элемента стриму:
    // сгенерирует 10 приветствий Марку
    Stream.generate(() -> "Hi, Mark!")
        .limit(10);   

    // сгенерирует 3 случайных числа с плавающей точкой 
    Stream.generate(new Random()::nextFloat)
        .limit(3);
Заметим, что в конце мы поставили limit(), иначе бы мы сгенерировали бесконечный поток элементов.

Стрим из примитивов
В Java есть возможность гибкого получения стрима для Int и Long.
Можно задать начальный и конечный элемент с помощью методов range(firstElement, lastElement) и rangeClosed(firstElement, lastElement), первый исключает lastElement из стрима, второй - включает.
    IntStream.range(0, 3);           // 0, 1, 2
    LongStream.rangeClosed(0L, 3L);  // 0L, 1L, 2L, 3L
Билдер
Со стримами можно использовать паттерн builder и наполнять стрим постепенно:
    Stream<Integer> builtStream = Stream.<Integer>builder()
        .add(1)
        .add(2)
        .add(3)
        .build();

    Stream<Object> builtStream = Stream.builder()
        .add(1)
        .add("hi")
        .add(List.of(1, 2, 3))
        .build();
Здесь стоит обратить внимание на то, что в первой строчке мы указываем тип данных, который мы будем класть в стрим, иначе автоматически подставится тип Object, и проверка типов элементов не будет осуществляться, как во втором случае.

Также нужно не забыть "собрать" наш стрим методом build(), в противном случае будет тип не Stream<DataType>, а Stream.Builder<DataType>.
Промежуточные методы
Отлично, мы теперь умеем делать стримы. Давайте посмотрим, как устроена обработка элементов.

После того как мы получили поток элементов, можно этот поток преобразовать с помощью промежуточных (intermediate) методов. Эти методы "знают", как применить функцию, которую мы передаем как аргумент, к элементам стрима. То есть разработчик только описывает, что сделать с элементами, а как это делать - ответственность промежуточных методов.

Разберем на примерах.

filter
Методу filter() нужно указать функцию Predicate: она требует один аргумент и должна возвращать boolean. Если для какого-то элемента предикат вернул true, он остается в потоке, false — удаляется.
    Set<String> pizzaIngredients = Set.of("tomato", "flour", "water", "cheese");
    pizzaIngredients.stream()
        .filter(ingredient -> ingredient.contains("e"));
        // water, cheese
sorted
Метод sorted() принимает в качестве аргумента некоторый Comparator: это функция, которая указывает, как сравнивать два элемента. Интересно, что у всех классов, имплементирующих интерфейс Comparable, уже есть встроенный компаратор. Это значит, что для строчек, разных типов чисел и почти всех классов стандартной библиотеки необязательно писать свои правила сравнения.
    Set<String> pizzaIngredients = Set.of("tomato", "flour", "water", "cheese");
    pizzaIngredients.stream()
        .sorted()
        .forEach(System.out::println);
        // cheese, flour, tomato, water
        
    Stream.of(2, 3, 1, 0)
        .sorted()
        .forEach(System.out::println);
        // 0, 1, 2, 3
Давайте посмотрим, как выглядит компаратор для int:
    public static int compare(int x, int y) {
        return (x < y) ? -1 : ((x == y) ? 0 : 1);
    }
Если числа равны нужно вернуть 0, если первое больше второго — 1, если второе больше первого — -1.
Точно так же пишут и другие компараторы, нужно только определить, что больше, а что меньше.

Напишем свой собственный компаратор. Допустим, есть такой класс:
    public class Pizza implements Comparable<Pizza> {
        private String name;
        private List<String> ingredients;

        //other methods
    }
Давайте, лучше ("больше") будет та пицца, в которой есть ананас, если в обеих пиццах есть ананас, то они одинаковые. Добавим в наш класс следующий метод:
    @Override
    public int compareTo(Pizza otherPizza) {
        return hasPineapple(this) == hasPineapple(otherPizza) ? 0 
            : (hasPineapple(this) ? 1 : -1);
    }

    private boolean hasPineapple(Pizza pizza) {
        return pizza.ingredients.contains("pineapple");
    }
Теперь сравним две пиццы:
    Pizza margarita = new Pizza("Margarita", List.of("tomato", "flour", "water", "cheese"));
    Pizza hawaiian = new Pizza("Hawaiian", List.of("tomato", "flour", "water", "cheese", "pineapple"));
    
    Stream.of(margarita, hawaiian)
        .sorted();
    // Margarita, Hawaiian
Итак, пицца с ананасом лучше, поэтому она идет позднее.

map
map принимает функцию-преобразование с одним аргументом, в которой описывается, как сделать из элемента другой элемент.
    IntStream.rangeClosed(0, 3)
        .map(e -> e * 2);       // 0, 2, 4, 6
Терминальные методы
reduce
Операция reduce() принимает очень интересную функцию. Она требует два аргумента, первый представляет собой элемент нашего потока, второй аргумент — аккумулятор, куда мы собираем все наши элементы. Сама функция — правило, как сложить первый элемент в аккумулятор.

Давайте просуммируем все элементы нашего потока:
    IntStream.range(0, 4)
        .reduce((el, acc) -> el + acc); // 6
collect
Позволяет собрать стрим в коллекцию, например, Set или List. Удобно, что уже есть готовые методы для создания коллекции из потока. Посмотрим некоторые примеры.
    Set<Integer> integerSet = Stream.of(1, 2, 3, 4, 5, 4, 3, 2, 1)
        .collect(Collectors.toSet()); // [1, 2, 3, 4, 5]
Теперь сделаем мапу с ключами — элементами и значениям — количество вхождений элемента.
    Map<Integer, Integer> integerMap = Stream.of(1, 1, 1, 2, 3, 3, 3, 3, 3)
        .collect(Collectors.toMap(e -> e, e -> 1, Integer::sum));      
            // {1=3, 2=1, 3=5}
В Collectors.toMap первым аргументом указали функцию, которая устанавливает ключи в мапе, в данном случае — это просто наши элементы потока. Вторым аргументом — функцию, которая устанавливает значения: нам нужно, чтобы при каждом вхождении элемента в значение клалась 1, третий аргумент отвечает за то, что делать с той 1, которую нам дала вторая функция, здесь мы просто суммируем.

Когда обработалась первая единица, по ключу 1 записалась 1, когда обработалась вторая единица, вторая функция e -> 1 тоже пытается записать 1 по ключу 1, и происходит коллизия, потому что у нас уже есть значение по этому ключу. В таком случае будет вызываться третья функция, которая принимает два аргумента: уже установленное значение и новое. Результат выполнения это функции будет записан в значение по ключу, в данном примере sum(1,1).

Есть еще много терминальных операций, мы посмотрели самые распространенные.
Ленивость промежуточных операций
Вероятно, самая главна фича стримов — ленивость промежуточных операций. Она заключается в том, что промежуточные операции будут исполнены, только если этого требует терминальная операция. Посмотрим на примерах:
    Set.of("tomato", "flour", "water", "cheese", "pineapple")
        .stream()
        .filter(e -> {
            System.out.println("I did something");
            return e.contains("e");
    });
Как вы думаете, сколько раз в консоль будет выведено сообщение?
Ответ: ни разу. Потому что у нас нет терминальной операции. Давайте добавим!
    Set.of("tomato", "flour", "water", "cheese", "pineapple")   
        .stream()
        .filter(e -> {
            System.out.println("I did something");
            return e.contains("e");
    })
        .collect(Collectors.toList());
В этом случае наше сообщение будет напечатано уже все 5 раз.
А если добавить еще один фильтр?
    Set.of("tomato", "flour", "water", "cheese", "pineapple")
            .stream()
            .filter(e -> {
                System.out.println("I did something");
                return e.contains("e");
            })
            .filter(e -> {
                System.out.println("I did something too");
                return e.startsWith("p");
            })
            .collect(Collectors.toList());
Сообщение для второго фильтра напечатается только для тех элементов, которые прошли первый фильтр.
Почему стримы?
Теперь, когда мы разобрались с теорией, давайте поймем, зачем вообще использовать стримы, как и когда они помогают?
  1. Stream API — удобный инструмент для декларативного описания обработки данных.
  2. Используя стримы, можно легко работать с большими данными из разных источников, например, из файлов или баз данных, не беспокоясь при этом о затраченной памяти.
  3. Ленивость промежуточных методов оптимизирует количество необходимых операций.
  4. С помощью стримов можно без труда параллелить обработку. Как? Об этом сейчас и поговорим.
Параллелизм
Мы с вами успели рассмотреть, как работать со стримами, которые обрабатывают элементы последовательно один за другим. Теперь разберемся, как устроены параллельные стримы: те, которые могут работать с элементами в разных потоках.

Параллельный стрим из последовательного
Итак, допустим у нас есть обычный последовательный стрим.
Сделать из него параллельный можно с помощью метода parallel().
    Stream.of(1, 2, 3)
            .parallel()
            .map(e -> e * 2)
            .forEach(System.out::println); // 2, 6, 4
Параллельный стрим можно получить не только из последовательного стрима, но и из коллекции, используя метод parallelStream():
    List.of(1, 2, 3)
            .parallelStream()
            .map(e -> e * 2)
            .forEach(System.out::println); // 6, 4, 2
Посмотрите на порядок вывода: при использовании параллельных стримов нет гарантии, что обрабатываться элементы будут в том же порядке, в котором они указаны. Более того, порядок вывода будет меняться от запуска к запуску.

Последовательный стрим из параллельного
Stream API нам также позволяет собрать параллельные потоки в один. Для этого достаточно указать sequential():
    List.of(1, 2, 3)
            .parallelStream()
            .map(e -> e * 2)
            .sequential()
            .forEach(System.out::println); //2, 4, 6
В этом случае порядок вывода элементов будет совпадать с порядком их ввода.

Методы parallel() и sequential() — промежуточные операции, они лишь указывают терминальной операции, как обрабатывать элементы, и сами не создают параллельных или последовательных потоков. Вот реализация этих методов:
    public final S parallel() {
        sourceStage.parallel = true;
        return (S) this;
    }

    public final S sequential() {
        sourceStage.parallel = false;
        return (S) this;
    }
Здесь был поменян булевый флажок sorceStage.parallel и не было никаких преобразований стримов.

Внутреннее устройство
Когда-то давным-давно, до Java 8, в синтаксисе языка еще не было Stream API, и для создания параллельных потоков разработчики использовали fork-join framework.
Сегодня он скрыт за Stream API, и программистам уже необязательно писать на fork-join framework-е, хотя с ним все же стоит быть знакомым для корректного использования параллелизма в стримах.

Как мы уже видели, parallel() и parallelStream() запускает обработку в отдельных потоках.
За организацию потоков отвечает ForkJoinPool, по умолчанию количество потоков в пуле равно количеству ядер.
Если все потоки ForkJoinPool заняты, то методы parallel() и parallelStream() вернут не параллельный стрим, а последовательный.
Чтобы проверить, какой у вас стрим можно использовать метод isParallel().

Как сделать, чтобы гарантированно распараллелить стрим? Просто — создать свой пул потоков.
    ForkJoinPool pool = new ForkJoinPool(3);
    List<Integer> list = List.of(1, 2, 3);

    pool.submit(() -> list.parallelStream()
                          .map(e -> e * 2)
                          .forEach(System.out::println)
            ).get();
    pool.shutdown();
Use cases
Несмотря на внешнюю лаконичность параллельных стримов, они могут сильно влиять на производительность: потоки вашего стрима могут отнимать процессорное время у соседних потоков, частое переключение между потоками ведет к частой смене контекста исполнения.

Поэтому использовать параллельные стримы нужно только при сильной необходимости и с подходящими задачами.
Какие задачи можно считать подходящими?
  • те, что не зависят от соседних задач и, которым необходим только один элемент стрима
  • те, что используют источник данных, который можно вычитывать в несколько потоков
  • те, которые тратят много времени для работы с каждым элементом, иначе расходы на обслуживание потоков могут "съесть" прирост производительности от параллельности
  • те, что не требуют больших затрат на соединение результатов работы каждого потока
Управление задачами
При построении программных систем мы так или иначе выстраиваем логику выполнения определенных задач. Это может быть запрос в базу данных, выполнение сложного алгоритма, отправка уведомления другому серверу и так далее. Предположим, что мы пишем собственный веб-сервер. Наивная реализация может выглядеть следующим образом:
class WebServer {

  public static void main(String[] args) throws IOException {
    ServerSocket socket = new ServerSocket(80);
    while (true) {
      Socket connection = socket.accept();
      handleRequest(connection);
    }
  }
}
В данном случае все запросы обрабатываются последовательно в однопоточном режиме.
Следующая задача начинает выполняться только после завершения предыдущей. Понятно, что подобная реализация будет не эффективной. Но что если каждая задача будет выполняться в отдельном потоке?
class WebServer {

  public static void main(String[] args) throws IOException {
    ServerSocket socket = new ServerSocket(80);
    while (true) {
      Socket connection = socket.accept();
      new Thread(() -> handleRequest(connection)).start();
    }
  }
}
На каждый новый запрос создается новый поток. Такой вариант действительно даст прирост в производительности. Однако у бесконтрольного создания потоков есть ряд недостатков.
  1. Создание нового потока является дорогой операцией с точки зрения ОС.
  2. Активные потоки потребляют значительное количество ресурсов (особенно памяти).
  3. Максимальное количество потоков в ОС ограниченно. Теоретически при большом количество запросов программа может завершиться аварийно.
Чтобы избежать подобных проблем, используют пулы потоков.
Это объекты, которые инкапсулируют в себе определенное количество потоков и предоставляют интерфейс по запуску задач. Но после завершения задачи поток не удаляется, а остается внутри и может переиспользоваться для следующих запросов.

Начиная с Java 5, предоставляется интерфейс ExecutorService, который стандартизирует API для запуска задач на пуле потоков.

Future
Когда мы используем ExecutorService, важно понимать значение такого объекта как Future.
Future<Response> future = executorService.submit(() -> sendRequestToRemoteServer());
Future олицетворяет контейнер, значение в котором либо уже присутствует, либо появится со временем.
Чтобы получить результат, достаточно вызвать метод get.
Future<Response> future = executorService.submit(() -> sendRequestToRemoteServer());
...
Response response = future.get();
Однако проблема в том, что метод get является блокирующим. Это значит, что поток будет в ожидании результата до тех пор, пока он не появится. Поэтому есть вариация get с timeout.
Future<Response> future = executorService.submit(() -> sendRequestToRemoteServer());
...
Response response = future.get(3, TimeUnit.SECONDS);
В данном случае результат будет ожидаться не более трех секунд. В противном выбросится TimeoutException.

Стандартные реализации ExecutorService
В классе Executors также присутствует несколько имплементаций ExecutorService, которые покрывают большую часть потребностей в контексте многопоточного программирования. Вот некоторые из них:
  1. newFixedThreadPool
  2. newCachedThreadPool
  3. newScheduledThreadPool

Fixed Thread Pool
Fixed Thread Pool — это наиболее простая в понимании концепция. При вызове newFixedThreadPool создается объект с фиксированным количеством потоков (количество передается в качестве параметра). При поступлении новой задачи из пула запрашивается свободный поток. Если таковой отсутствует, задача ставится в очередь.
Эта реализация ExecutorService является наиболее популярной и используется чаще всего.

Cached Thread Pool
Кэшированный пул потоков, в отличие от фиксированного, не ставит верхнюю границу на количество потоков. Вместо этого используется следующий алгоритм:
  1. Если есть свободный поток, задача присваивается ему
  2. Если нет, создается новый поток
Эта реализация подходит для тех случаев, когда не ожидается «всплесков» нагрузки. Потому что в противном случае может быть создано чересчур много потоков, что отрицательно скажется на производительности.
Строго говоря, максимальное количество потоков в Cached Thread Pool не является бесконечным. Оно равно Integer.MAX_VALUE.
Schedule Thread Pool
Вызов newScheduledThreadPool возвращает интерфейс ScheduledExecutorService, который является расширением ExecutorService. ScheduledExecutorService позволяет запускать задачи с заданной задержкой и интервалом.

Выключение ExecutorService
После выполнения необходимых задач, ExecutorService требуется выключить. Для этого методы:
  1. shutdown
  2. shutdownNow
  3. awaitTermination
shutdown отправляет запрос на выключение, после которого новые задачи более не принимаются. Поведение shutdownNow похоже, однако также происходит попытка прервать все задачи, которые сейчас выполняются.
А awaitTermination ждет указанное количество времени, пока ExecutorService не будет выключен.
CompletableFuture
Future
Future API, введенная в Java 5, предоставила возможность асинхронного программирования. С помощью нее можно запускать задачи параллельно в соседних потоках, главный же поток не блокируется на время выполнения задачи, а только получает уведомление о статусе работы.

Future — самая важная сущность API и представляет собой ссылку на результат выполнения асинхронной задачи.
С помощью методов isDone, isCanceled можно проверить состояние задачи, а с помощью get() получить результат.
    // Какая-то долгая и сложная задача, которая возвращает Future<Integer>
    Future<Integer> future = new Counter.countSomethingBig();

    while(!future.isDone()) {
        System.out.println("Counting...");
        Thread.sleep(300);
    }

    Integer result = future.get();
Future — большой шаг в сторону удобства работы с асинхронностью, но у нее есть несколько недостатков.
  • нельзя завершить задачу самостоятельно
  • метод get() блокирует поток до момента получения результата
  • нельзя скомбинировать несколько Future
  • нельзя выстроить цепочку из фьюч для последовательного выполнения
Поэтому в Java 8 добавили более удобную надстройку над Future — CompletableFuture.

CompletableFuture
Новая CompletableFuture уже умеет решать все указанные выше проблемы.
Давайте разберем, как работать с ней, и как она борется с этими недостатками.

Создание задачи

supplyAsync
Если нужно выполнить асинхронно задачу и вернуть результат — есть метод supplyAsync:
    CompletableFuture<String> completableFuture = CompletableFuture
            .supplyAsync(() -> GroundControl.getStatus());
runAsync
Если нам нужно выполнить какую-то работу без возврата результата выполнения, можно воспользоваться runAsync:
    CompletableFuture<Void> completableFuture = CompletableFuture
            .runAsync(() -> MajorTom.sendStatus());
Executor
Мы уже знаем, что supplyAsync и runAsync запускают задачи в новом потоке. Но откуда берется новый поток?
Ответ: он получается из общего ForkJoinPool.commonPool(). Но мы можем использовать и свой пул:
    Executor executor = Executors.newCachedThreadPool();

    CompletableFuture<Void> completableFuture = CompletableFuture
            .runAsync(() -> MajorTom.sendStatus(), executor);
Завершение задачи
Одна из центральных возможностей, которая даже фигурирует в названии — CompletableFuture.
    CompletableFuture<String> completableFuture = new CompletableFuture<>();

    try {
        //заблокирует главный поток на 100 миллисекунд
        completableFuture.get(100, TimeUnit.MILLISECONDS);
    } catch (Exception e){
        logger.warn("Completing future with default value", e);
    } finally {
        completableFuture.complete("Устал ждать");
    }
Ура, мы смогли завершить фьючу руками и вернуть результат "Устал ждать".

Обработка результата
У CompletableFuture метод get() тоже блокирующий, как и у обычной фьючи. Но зато мы можем навесить на CompletableFuture колбек, который вызовется, когда вернется результат задачи.

thenApply
    CompletableFuture<String> createBalanceInfo = CompletableFuture.supplyAsync(() -> {
        long amountOfMoney = Counter.countMoney();
        return amountOfMoney;
    }).thenApply(amountOfMoney -> {
        return "There is a lot of money:" + amountOfMoney;
    });

    logger.info(createBalanceInfo.get());
thenApply возвращает CompletableFuture<T>, поэтому можно сделать цепочку из thenApply, которые последовательно будут преобразовывать результат.

thenAccept
thenAccept в отличие от thenApply не возвращает результата выполнения, он просто делает некоторую полезную работу:
    CompletableFuture<Void> createBalanceInfo = CompletableFuture.supplyAsync(() -> {
        long amountOfMoney = Counter.countMoney();
        return amountOfMoney;
    }).thenAccept(amountOfMoney -> send(amountOfMoney));
thenRun
То же, что и thenAccept, только не имеет доступа к результату задачи.
    CompletableFuture<Void> createBalanceInfo = CompletableFuture.supplyAsync(() -> {
        long amountOfMoney = Counter.countMoney();
        return amountOfMoney;
    }).thenRun(() -> logger.info("Created balance info"));
async методы
thenApply, thenAccept, thenRun выполняются в том же потоке, в котором и сама CompletableFuture, но если мы хотим запустить их в отдельных потоках — мы можем воспользоваться async методами: CompletableFuture, предоставляет целых две реализации асинхронных методов для всех колбеков.
    public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }
    
    public <U> CompletableFuture<U> thenApplyAsync(              
        Function<? super T,? extends U> fn) {
        return uniApplyStage(defaultExecutor(), fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn, Executor executor) {
        return uniApplyStage(screenExecutor(executor), fn);
    }
Перепишем пример сверху на async:
    CompletableFuture<String> createBalanceInfo = CompletableFuture.supplyAsync(() -> {
        long amountOfMoney = Counter.countMoney();
        return amountOfMoney;
    }).thenApplyAsync(amountOfMoney -> {
        return "There is a lot of money:" + amountOfMoney;
    });

    logger.info(createBalanceInfo.get());
Теперь колбек будет выполняться в новом потоке, полученным из пула ForkJoinPool.commonPool(). Если мы хотим получить поток из своего пула, это можно сделать, указав его вторым аргументом в thenXXXAsync:
    Executor executor = Executors.newFixedThreadPool(2);

    CompletableFuture<String> createBalanceInfo = CompletableFuture.supplyAsync(() -> {
        long amountOfMoney = Counter.countMoney();
        return amountOfMoney;
    }).thenApplyAsync(amountOfMoney -> {
        return "There is a lot of money:" + amountOfMoney;
    }, executor);
Комбинирование результатов

Комбинирование двух задач
Допустим у нас есть две зависимые задачи:
    CompletableFuture<ClientInfo> getClientInfo(String clientId) {
        return CompletableFuture.supplyAsync(() -> ClientService.getClientInfo(clientId));
    }

    CompletableFuture<BalanceInfo> getBalanceInfo (ClientInfo clientInfo){
        return CompletableFuture.supplyAsync(() -> {
            int balanceId = clientInfo.getBalanceId();
            return BalanceService.getBalanceInfo(balanceId);
        });
    }
Мы хотим, чтобы вторая запускалась с результатами первой, можно использовать метод thenCompose():
    CompletableFuture<BalanceInfo> balanceInfo = getClientInfo(clientId)
            .thenCompose(clientInfo -> getBalanceInfo(clientInfo));
Если же у нас есть две независимые задачи, и нам нужно обработать их результат, то можно вызвать метод thenCombine():
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Ground Control to ")
            .thenCombine(CompletableFuture.supplyAsync(() -> "Major Tom"), String::concat);
Комбинирование нескольких задач
Теперь нам надо запустить несколько задач параллельно и подождать пока они все завершатся: будем использовать allOf(). Этот метод возвращает CompletableFuture<Void>, чтобы получить результат, мы можем или вызвать get() или воспользоваться CompletableFuture.join():
    CompletableFuture<String> future1 = CompletableFuture
            .supplyAsync(() -> "Ground Control to");
    CompletableFuture<String> future2 = CompletableFuture
            .supplyAsync(() -> "Major Tom");
    CompletableFuture<String> future3 = CompletableFuture
            .supplyAsync(() -> "Take your protein pills and put your helmet on");

    CompletableFuture<Void> completableFuture = CompletableFuture.allOf(future1, future2, future3);

    String combined = Stream.of(future1, future2, future3)
        .map(CompletableFuture::join)
        .collect(Collectors.joining(" "));   // Ground Control to Major Tom Take your protein pill and put your helmet on
Если нужен результат только первой закончившей фьючи, то подходит метод anyOf(). Единственное, этот метод всегда возвращает CompletableFuture<Object>:
        CompletableFuture<String> future1 = CompletableFuture
                .supplyAsync(() -> "Boris Volynov");
        CompletableFuture<String> future2 = CompletableFuture
                .supplyAsync(() -> "Major Tom");
        CompletableFuture<String> future3 = CompletableFuture
                .supplyAsync(() -> "Valentina Tereshkova");

        CompletableFuture<Object> completableFuture = CompletableFuture.anyOf(future1, future2, future3);
        
        completableFuture.get();
ТЕСТ
Проверь свои знания
на практике и прикрепляй
в репозиторий скриншот с результатами!
Проходи тест только после того, как изучишь все уроки спринта!
Пройти тест
Почему многопоточное программирование стало так востребовано и заняло место последовательного выполнения?
Верно! При вертикальном масштабировании мы неизбежно упираемся в ограничения железа. Горизонтальное же помогает обойти это.
Скорее наоборот, так как в многопоточном программировании необходимо заботиться о доступе к общим ресурсам, о синхронизации и взаимодействии потоков.
Верно! Современные процессоры являются многоядерными. А значит, для полного применения их потенциала, необходимо писать многопоточные программы.
Все так. Действительно, многопоточные программы могут быть гораздо более производительными. Но в некоторых ситуациях, одного потока достаточно, чтобы покрыть все нужды и не превносить излишнюю сложность. Например, большинство движков по отрисовке графического интерфейса (в том числе, в браузере), работают в одном потоке.
Дальше
Проверить
Узнать результат
В чем разница между потоком и процессом?
Неверно. Несколько потоков разделяют одну память с процессом. В то время как разные процессы оперируют с независимыми участками.
Поток — более легковесная сущность, чем процесс, поэтому его создание и остановка занимают существенно меньше времени, чем создание нового процесса. Потоку не выделяется новая оперативная память, он использует память процесса.
Наоборот.
Процесс — это выполняющаяся программа и все её элементы: адресное пространство, глобальные переменные, регистры, стек, открытые файлы и так далее, в то время как поток — это последовательность команд, обрабатываемых в рамках процесса.
Неверно. Несколько потоков разделяют одну память с процессом. В то время как разные процессы оперируют с независимыми участками.
Дальше
Проверить
Узнать результат
В чем разница между synchronized и concurrent коллекциями?
Верно!
Верно! Конкурентный итератор либо гарантирует, что объект не будет подвергнут изменениям (CopyOnWriteArrayList), либо же допускает, что некоторые изменения не будут отражены (ConcurrentHashMap).
Наоборот.
Наоборот, потому что concurrent коллекции, в отличие от synchronized, не выдают экслюзивный lock одному потоку для работы с ней.
Дальше
Проверить
Узнать результат
Предположим, что поток А вызвал метод BlockingQueue.put, а после поток B прервал его (Thread.interrupt). Что в этом случае произойдет с потоком А?
Неверно. Метод BlockingQueue.put в этом случае бросит InterruptedException.
Вызов метода Thread.interrupt не прерывает поток сам по себе, а лишь выставляет флаг, который может быть проверен во время выполнения определенных операций.
Верно!
Дальше
Проверить
Узнать результат
Можно ли создавать новый поток на каждый запрос, и почему?
Верно!
Неверно. Создание потоков – дорогая операция. Так что при большой нагрузке не стоит их инстанцировать на каждый запрос.
Верно! Но тем не менее, даже при небольших нагрузках это не является рекомендованным подходом.
Дальше
Проверить
Узнать результат
Есть ли проблемы в этом коде, если есть, то какие?*
Неверно. Пул здесь мы как раз используем.
Верно! Блокирующий get() без таймаута может привести к deadlock.
Верно! Сначала мы ждем результат imageFuture и только потом запускам задачу downloadTags. Правильнее было бы сначала отправить их обе в пул, а затем ждать выполнения каждой.
Дальше
Проверить
Узнать результат
Предположим, что мы разрабатываем сервис для местной газеты, в котором клиенты могут подписываться на уведомления о новых публикациях. Какую коллекцию эффективнее всего будет использовать для хранения подписчиков, если рассылка происходит гораздо чаще, чем запросы на подписку? *
Верно! Так как рассылка (чтение) происходит гораздо чаще, чем подписка (запись), то копирование всех элементо внутри коллекции будет происходить не часто.
Неверно. Если много потоков будут читать из списка одновременно, то операции будут выполняться последовательно.
Верно! Так как рассылка (чтение) происходит гораздо чаще, чем подписка (запись), то копирование всех элементо внутри коллекции будет происходить не часто.
Неверно. Если много потоков будут читать из списка одновременно, то операции будут выполняться последовательно.
Дальше
Проверить
Узнать результат
Сколько раз в консоль выведется строка "Here we are"?
Операции filter и map являются промежуточными. Из-за "ленивой" природы стримов, промежуточные операции начинают выполняться только после вызова терминальной. Например, collect.
Операции filter и map являются промежуточными. Из-за "ленивой" природы стримов, промежуточные операции начинают выполняться только после вызова терминальной. Например, collect.
Операции filter и map являются промежуточными. Из-за "ленивой" природы стримов, промежуточные операции начинают выполняться только после вызова терминальной. Например, collect.
Операции filter и map являются промежуточными. Из-за "ленивой" природы стримов, промежуточные операции начинают выполняться только после вызова терминальной. Например, collect.
Дальше
Проверить
Узнать результат
В каких ситуациях параллельные стримы наиболее эффективны?
Верно! Тогда можно добиться максимального параллелизма.
При маленьком количестве входных данных затраты на "распараллеливание" будут больше, чем итоговая польза.
Операции не смогут выполняться параллельно. Поэтому смысла в использовании параллельных стримов также нет.
Верно!
System.out.println является операцией, которая требует синхронизации.
Дальше
Проверить
Узнать результат
Что делает данный коллектор, если его элементы — это новостные статьи?
На каждом этапе работы accumulator в итоговый список добавляются только русские статьи. На этапе же работы combiner полученные списки сливаются воедино.
На каждом этапе работы accumulator в итоговый список добавляются только русские статьи. На этапе же работы combiner полученные списки сливаются воедино.
На каждом этапе работы accumulator в итоговый список добавляются только русские статьи. На этапе же работы combiner полученные списки сливаются воедино.
На каждом этапе работы accumulator в итоговый список добавляются только русские статьи. На этапе же работы combiner полученные списки сливаются воедино.
Дальше
Проверить
Узнать результат
Практическое задание
Выполняй его после прохождения всех уроков спринта
Вам необходимо использовать полученные знания для реализации прототипа системы по обогащению пользовательских сообщений.
Дедлайн — 21 октября
Концепция
Пользователь отправляет DTO в следующем формате:
public class Message {
    private String content;
    private EnrichmentType enrichmentType;

    public enum EnrichmentType {
        MSISDN;
    }
}
DTO содержит сообщение content и тип обогащения enrichmentType.

MSISDN — это обогащение по номеру телефона.
В качесте результата обогащения добавляются firstName и lastName в поле enrichment.
Пример
Входное сообщение:
{
    "action": "button_click",
    "page": "book_card",
    "msisdn": "88005553535"
}
Обогащенное сообщение:
{
    "action": "button_click",
    "page": "book_card",
    "msisdn": "88005553535",
    "enrichment": {
        "firstName": "Vasya",
        "lastName": "Ivanov"
    }
}
Условия обогащения по MSISDN
  1. Сообщение должно быть в формате JSON.
  2. В JSON должно быть поле msisdn со строковым значением. Остальные поля произвольны.
  3. По указанному MSISDN подставляется соответствующая информация.
  4. Если поле enrichment уже есть в сообщении, оно перезаписывается.
  5. Если одно из условий не соблюдается (сообщение не формате JSON, поле msisdn отсутствует, или равно null, или в нем информация в некорректном формате), сообщение возвращается в том же виде, в котором пришло.
Точка входа
Точкой входа в приложение должен являться класс EnrichmentService с методом enrich.
public class EnrichmentService {
    // возвращается обогащенный (или необогащенный content сообщения)
    public String enrich(Message message) {...}
}
Важно: Никакие другие API добавлять не нужно (CLI, GUI, REST). Работоспособность проверяем с помощью Unit-тестов.
Требования к реализации
  1. Система должна работать корректно в многопоточной среде. Предполагаем, что метод enrich может вызываться паралельно из разных потоков.
  2. Информация о пользователях хранится в памяти. Предполагаем, что она может периодически меняться в другом потоке.
  3. Каждое обработанное сообщение дополнительно должно сохраняться в одной из двух структур данных в зависимости от того, было ли сообщение успешно обогащено, или нет. Например, список обогащенных и необогащенных сообщений.
  4. Должен быть написан хотя бы один End-to-End тест, который проверяет, что система работает корректно в многопоточном режиме. Как вариант, можно использовать ExecutorService и CountDownLatch или Phaser для запуска нескольких задач одновременно.
  5. Несмотря на то что в системе всего один тип обогащения (MSISDN), код необходимо написать так, чтобы он был открыт для дальнейшего расширения (вспомните урок по GoF-паттернам).
End-to-End тест тестирует всю систему целиком, а не каждый компонент по отдельности.
Советы
  • Не нарушайте принцип инверсии зависимостей, так как это сильно усложнит и тестирование, и разработку в целом.
public class EnrichmentService {
    // принцип DI нарушен
    private final MessageValidator validator = new MessageValidatorImpl();

    public String enrich(Message message) {...}
}
public class EnrichmentService {
    // принцип DI НЕ нарушен
    private final MessageValidator validator;

    public EnrichmentService(MessageValidator validator) {
        this.validator = validator;
    }

    public String enrich(Message message) {...}
}
  • Не пытайтесь решить проблему с "наскока". Разбейте систему на компоненты, протестируйте каждый из них отдельно, а потом совместите по кусочкам.
  • Для валидации JSON можно использовать библиотеку JSONAssert.
Полученные резултаты оформите в Pull Request и отправьте на ревью ментору.