Go. Concurrency
Модуль 3
Модуль 3
Каналы (Channels) — «Трубы для общения»
Представьте, что у вас есть два рабочих (две горутины), которые работают в разных комнатах и не могут видеть друг друга. Как одному рабочему передать деталь другому? Он не может просто протянуть ее через стену.

Канал — это типизированная «труба» или «почтовый ящик», прорезанный в стене между этими комнатами. Один рабочий (горутина) может положить в эту трубу что-то (например, число или строку), а другой рабочий (другая горутина) может это что-то с другой стороны забрать.

Ключевое слово здесь — типизированная. Вы должны сразу решить, какой «почтой» вы будете обмениваться. Это может быть канал для чисел (chan int), для строк (chan string) или для любых других данных. Нельзя положить число в канал, предназначенный для строк.

Аналогия
Лента конвейера: Один рабочий кладет на ленту деталь, другой ее забирает. Лента — это канал.
Почтовый ящик: Вы кладете письмо в ящик. Почтальон (другая горутина) приходит и забирает его. Ящик — это канал.
Как создать и использовать канал?
Создать канал очень просто с помощью встроенной функции make:
// Создаем канал для передачи целых чисел
ch := make(chan int)

// Создаем канал для передачи строк
strCh := make(chan string)
После создания канала с ним можно выполнять две основные операции.
Отправка (положить в ящик)
ch <- 42 // Кладем число 42 в канал ch
Стрелка <- показывает направление движения данных: из переменной 42 в канал ch.
Получение (забрать из ящика)
value := <-ch // Ждем, пока в канале что-то появится, и забираем это в переменную value
Здесь стрелка <- показывает, что данные движутся из канала ch в переменную value.
Блокировка — ключевое свойство каналов
Это самая важная и магическая особенность каналов. По умолчанию каналы небуферизованные (unbuffered). Это означает, что в них нет места для хранения данных. Они работают как прямой «передатчик-приемник».

Это приводит к двум золотым правилам:

Отправка в небуферизованный канал блокируется, пока другая горутина не будет готова принять данные из этого же канала.

Получение из канала блокируется, пока в нем не появятся данные (то есть, пока кто-то не начнет отправку).
Аналогия с мячом
Вы не можете бросить мяч человеку, который не готов его поймать. Вы будете стоять с мячом в руке и ждать (отправка блокируется).

Вы не можете поймать мяч, который вам еще не бросили. Вы будете стоять с протянутыми руками и ждать (получение блокируется).

Эта блокировка — не баг, а фича! Именно она обеспечивает идеальную синхронизацию. Две горутины встречаются на канале, обмениваются данными и продолжают свою работу. Ни одна из них не «убегает» вперед.
Producer-Consumer (Производитель-Потребитель)
Это классический паттерн, который идеально демонстрирует работу каналов. Одна горутина будет «производить» числа (Producer), а другая — «потреблять» их (Consumer).
Пример кода
package main

import (
    "fmt"
    "sync"
    "time"
)

// Это наша функция-потребитель (Consumer)
// Она будет бесконечно читать данные из канала jobs и выводить их
func worker(jobs <-chan int, wg *sync.WaitGroup) {
    // В конце сообщаем, что горутина завершилась
    defer wg.Done()

    // Цикл for...range по каналу — это идиоматичный способ читать из канала,
    // пока он не будет закрыт.
    for j := range jobs {
        fmt.Println("Потребитель получил число:", j)
        time.Sleep(500 * time.Millisecond) // Имитируем долгую обработку
    }
    fmt.Println("Канал закрыт, потребитель завершил работу.")
}

func main() {
    // 1. Создаем канал для передачи "заданий" (чисел)
    jobs := make(chan int)

    // 2. Используем WaitGroup, чтобы main дождался завершения worker'а
    var wg sync.WaitGroup
    wg.Add(1) // У нас одна задача - дождаться worker'а

    // 3. Запускаем горутину-потребителя
    // Она сразу же заблокируется на строке `for j := range jobs`,
    // так как канал jobs пока пуст.
    go worker(jobs, &wg)

    // 4. Теперь главная горутина (main) выступает в роли производителя (Producer)
    fmt.Println("Производитель начинает отправлять числа...")
    for i := 1; i <= 5; i++ {
        fmt.Printf("Производитель отправляет число: %d\n", i)
        jobs <- i // Отправляем число в канал.
        // Эта операция заблокирует main, пока worker не заберет число.
    }

    // 5. ВАЖНО: Когда все задания отправлены, нужно закрыть канал.
    // Это сигнал для потребителя, что больше данных не будет.
    // Цикл `for...range` в worker'е увидит это и завершится.
    close(jobs)

    fmt.Println("Производитель отправил все числа и закрыл канал.")

    // 6. Ждем, пока потребитель обработает все задания и завершится.
    wg.Wait()
    fmt.Println("Программа завершена.")
}
Что происходит в этом коде (пошагово)
Создается канал jobs. Запускается горутина worker. Она доходит до for j := range jobs и "засыпает", ожидая данные из канала. Главная горутина main начинает цикл. На первой итерации она пытается отправить 1 в канал (jobs <- 1). Поскольку worker уже ждет на прием, передача происходит мгновенно. main отправляет 1 и "просыпается" для следующей итерации. Worker получает 1, печатает его и засыпает снова в ожидании следующего числа. Этот «пинг-понг» продолжается для всех чисел от 1 до 5. После отправки всех чисел, main вызывает close (jobs). Это говорит: «Больше в этот ящик ничего класть не буду». Worker получает последнее число (5), обрабатывает его. На следующей итерации цикла for… range он видит, что канал закрыт, и корректно завершает свою работу. Worker вызывает wg. Done (). main, которая ждала на wg. Wait (), разблокируется и программа завершается.

Этот пример демонстрирует, как каналы обеспечивают идеальную синхронизацию между горутинами без явных блокировок.
ватта
Буферизованные каналы (Buffered Channels)
Что это? Канал с ёмкостью как конвейер с несколькими слотами
Мы уже знаем, что обычный (небуферизованный) канал — это как прямой «передатчик-приемник» или почтовый ящик на одно письмо. Отправитель и получатель должны встретиться в один и тот же момент, чтобы произошла передача.

Буферизованный канал — это канал с «памятью» или «ёмкостью». Это как почтовый ящик не на одно письмо, а целый почтовый отсек с несколькими ячейками, или, что еще лучше, лента конвейера с несколькими слотами.
Аналогия: Лента конвейера
Представьте себе ленту конвейера на заводе. У этой ленты есть определенная длина, на которой можно разместить, скажем, 5 коробок.

Рабочий 1 (Producer) может класть коробки на ленту одну за другой. Он не обязан ждать, пока Рабочий 2 заберет первую коробку, чтобы положить вторую. Он может быстро заполнить все 5 мест на ленте.

Рабочий 2 (Consumer) подходит к другому концу ленты и забирает коробки по одной.

Ключевое отличие: Производитель и потребитель больше не обязаны быть синхронизированы в каждый момент времени. Производитель может «опередить» потребителя, заполнив буфер.
Правила блокировки (самое важное!)
Благодаря буферу, правила блокировки меняются и становятся более гибкими.

Отправка блокируется, только если канал ПОЛОН. Пока в буфере есть свободное место, операция ch <- value не будет блокировать отправителя. Он просто положит значение в буфер и пойдет дальше. Блокировка произойдет только тогда, когда вы попытаетесь отправить значение в полностью заполненный канал.

Получение блокируется, только если канал ПУСТ. Пока в буфере есть хотя бы одно значение, операция value := <-ch не будет блокировать получателя. Он сразу заберет значение. Блокировка произойдет, когда вы попытаетесь получить значение из пустого канала.
Буферизованный канал: разъединение скоростей
Давайте возьмем наш предыдущий код и изменим его так, чтобы производитель мог положить в канал несколько чисел, не дожидаясь потребителя. Это и есть decoupling (разъединение) скоростей.
Пример кода
package main

import (
    "fmt"
    "sync"
    "time"
)

// Функция-потребитель (Consumer) остается без изменений
// Она все так же медленно обрабатывает задания
func worker(jobs <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for j := range jobs {
        fmt.Printf("  Потребитель получил число: %d\n", j)
        time.Sleep(500 * time.Millisecond) // Имитируем долгую обработку
    }
    fmt.Println("  Потребитель завершил работу.")
}

func main() {
    // 1. Создаем БУФЕРИЗИРОВАННЫЙ канал с емкостью 5
    jobs := make(chan int, 5)

    var wg sync.WaitGroup
    wg.Add(1)
    go worker(jobs, &wg)

    // 2. Главная горутина (Producer) теперь может отправлять числа,
    // не дожидаясь, пока их заберет потребитель.
    fmt.Println("Производитель начинает отправлять числа...")
    for i := 1; i <= 5; i++ {
        fmt.Printf("-> Производитель отправил число: %d\n", i)
        jobs <- i // <- ЭТОТ ВЫЗОВ НЕ БЛОКИРУЕТСЯ!
        // Main не ждет consumer'а, а сразу продолжает цикл.
    }

    // 3. После того как все числа отправлены в буфер, мы закрываем канал.
    close(jobs)
    fmt.Println("Производитель отправил все числа в буфер и закрыл канал.")

    // 4. Ждем, пока потребитель обработает все задания из буфера.
    wg.Wait()
    fmt.Println("Программа завершена.")
}
Что изменилось и что мы увидим в консоли?
Производитель начинает отправлять числа...
-> Производитель отправил число: 1
-> Производитель отправил число: 2
-> Производитель отправил число: 3
-> Производитель отправил число: 4
-> Производитель отправил число: 5
Производитель отправил все числа в буфер и закрыл канал.
  Потребитель получил число: 1
  Потребитель получил число: 2
  Потребитель получил число: 3
  Потребитель получил число: 4
  Потребитель получил число: 5
  Потребитель завершил работу.
Программа завершена.
Обратите внимание на порядок
Сначала производитель полностью заполнил буфер канала, не дожидаясь потребителя. Только после этого потребитель начал «выгребать» эти числа из канала и обрабатывать их.

Это наглядно демонстрирует, что буферизованный канал «разъединяет (decouples)» горутины. Производитель и потребитель работают с разной скоростью и не блокируют друг друга на каждой операции, а только при переполнении или опустошении буфера.
Когда использовать буферизованные каналы?
Когда вы знаете, что у вас будут «всплески» данных, и вы хотите, чтобы отправитель не ждал получателя.

Когда вы хотите ограничить количество одновременных задач (паттерн «Semaphore»). Например, вы можете создать канал с емкостью 10 и перед отправкой каждой задачи класть в него «пустое» значение. Когда канал заполнится, это значит, что уже 10 задач выполняются, и новые будут ждать.

Когда вам нужно собрать несколько результатов от разных горутин перед тем, как их обработать.
Закрытие каналов
close (ch) как сигнал завершения
Представьте, что вы подписались на ежемесячный журнал. Каждый месяц издатель (отправитель) кладет свежий номер в ваш почтовый ящик (канал). Вы (получатель) каждый месяц его забираете.

А что произойдет, если журнал перестанет издаваться? Вы будете продолжать каждый месяц выходить к пустому ящику и вечно ждать. Ваша программа «зависнет» в состоянии ожидания.

Закрытие канала — это способ для отправителя сказать получателю: «Больше в этот ящик ничего не будет. Можешь перестать проверять и идти заниматься своими делами».
Это «сигнал о завершении». Он позволяет принимающей горутине корректно завершить свой цикл обработки данных, а не ждать вечно.
Правило: закрывает только отправитель
Это золотое правило. Почему? Потому что попытка отправить данные в уже закрытый канал вызовет «панику (panic)» — фатальную ошибку, которая «уронит» вашу программу.

Отправитель знает, когда он отправил последнее значение. После этого он может безопасно закрыть канал и гарантировать, что больше не будет попыток отправки.
Получатель не должен закрывать канал. Он не знает, собирается ли отправитель прислать еще что-то. Если получатель закроет канал, а отправитель попробует отправить данные — паника.

Важно: Закрытие канала — это не то же самое, что установка в него nil. Закрытый канал — это валидный объект, из которого можно читать все оставшиеся значения. nil-канал — это «сломанный» канал, любая операция с ним заблокируется навсегда.
Как определить, что канал закрыт?
Есть два способа.
Ручной способ при получении: value, ok := <-ch
Когда вы читаете из канала, вы можете получить два значения: само данные и булев флаг ok.

ok будет true, если значение было успешно прочитано из открытого канала (то есть, его прислал живой отправитель).
ok будет false, если вы читаете из канала, который уже был закрыт и из которого все данные были уже прочитаны.
value, ok := <-ch
if !ok {
    fmt.Println("Канал закрыт, больше данных не будет.")
    // здесь нужно выйти из цикла
}
Идиоматичный способ: for v := range ch
Это самый простой и правильный способ читать данные из канала до его закрытия. Цикл for… range по каналу автоматически делает всю работу за вас.

Он читает значения из канала на каждой итерации. Когда канал закрывается и все данные из него прочитаны, цикл автоматически и корректно завершается.

Это и есть «Go-way» — писать меньше кода, который при этом более читаем и безопасен.
Несколько воркеров читают из общего канала заданий
Это классический паттерн «Worker Pool», который идеально демонстрирует, почему закрытие каналов так важно. У нас есть один «диспетчер» (main) и несколько «воркеров». Диспетчер раздает задания, а воркеры их выполняют.
Пример кода
package main

import (
    "fmt"
    "sync"
    "time"
)

// worker - это наша горутина-исполнитель.
// Она будет читать задания из канала jobs до тех пор, пока он не закроется.
func worker(id int, jobs <-chan int, wg *sync.WaitGroup) {
    // Сообщаем, что горутина завершена, когда выйдем из функции
    defer wg.Done()
    fmt.Printf("Воркер #%d запущен и ждет заданий...\n", id)

    // Идиоматичный цикл для чтения из канала.
    // Он автоматически остановится, когда канал jobs будет закрыт.
    for j := range jobs {
        fmt.Printf("Воркер #%d взял в работу задание: %d\n", id, j)
        time.Sleep(1 * time.Second) // Имитируем выполнение работы
        fmt.Printf("Воркер #%d завершил задание: %d\n", id, j)
    }

    fmt.Printf("Воркер #%d завершил работу, так как канал заданий закрыт.\n", id)
}

func main() {
    // 1. Создаем канал для передачи заданий
    jobs := make(chan int)

    // 2. Используем WaitGroup, чтобы main дождался завершения всех воркеров
    var wg sync.WaitGroup

    // 3. Запускаем 3 воркеров, которые будут конкурировать за задания из одного канала
    for w := 1; w <= 3; w++ {
        wg.Add(1) // Увеличиваем счетчик для каждого воркера
        go worker(w, jobs, &wg)
    }

    // 4. Главная горутина (main) выступает в роли "диспетчера"
    // и отправляет 5 заданий в канал
    fmt.Println("Диспетчер начинает раздавать задания...")
    for j := 1; j <= 5; j++ {
        fmt.Printf("-> Диспетчер отправил задание: %d\n", j)
        jobs <- j
    }

    // 5. ВАЖНО: После того как все задания отправлены, мы закрываем канал.
    // Это сигнал для ВСЕХ воркеров, что новых заданий больше не будет,
    // и они могут завершиться, как только закончат текущую работу.
    close(jobs)
    fmt.Println("Диспетчер отправил все задания и закрыл канал.")

    // 6. Ждем, пока все воркеры завершат свою работу
    wg.Wait()
    fmt.Println("Все воркеры завершили работу. Программа завершена.")
}
Что мы увидим в консоли
Диспетчер начинает раздавать задания...
-> Диспетчер отправил задание: 1
Воркер #1 запущен и ждет заданий...
Воркер #1 взял в работу задание: 1
-> Диспетчер отправил задание: 2
Воркер #2 запущен и ждет заданий...
Воркер #2 взял в работу задание: 2
-> Диспетчер отправил задание: 3
Воркер #3 запущен и ждет заданий...
Воркер #3 взял в работу задание: 3
-> Диспетчер отправил задание: 4
Воркер #1 взял в работу задание: 4
-> Диспетчер отправил задание: 5
Воркер #2 взял в работу задание: 5
Диспетчер отправил все задания и закрыл канал.
Воркер #1 завершил задание: 1
Воркер #1 завершил работу, так как канал заданий закрыт.
Воркер #3 завершил задание: 3
Воркер #3 завершил работу, так как канал заданий закрыт.
Воркер #2 завершил задание: 2
Воркер #2 завершил задание: 5
Воркер #2 завершил работу, так как канал заданий закрыт.
Все воркеры завершили работу. Программа завершена.
Обратите внимание: после close (jobs) все воркеры благополучно завершили свою последнюю задачу и вышли из цикла for… range. Без закрытия канала они бы «зависли» на строке for j := range jobs в ожидании новых заданий навсегда.
Паттерны: Worker Pool и Fan-out/Fan-in
Паттерны — это следующая ступень после изучения примитивов. Они показывают, как правильно комбинировать горутины и каналы для решения реальных задач.
Паттерн 1: Worker Pool (Пул воркеров)
Что это?
Worker Pool — это паттерн, при котором вы создаете фиксированное количество «воркеров» (горутин), которые получают задачи из общей очереди (канала) и выполняют их.
Аналогия: Почтовое отделение
Очередь с письмами — это канал с заданиями (jobs chan).

Несколько сортировщиков — это воркеры (горутины).

Задача — это письмо, которое нужно отсортировать и доставить.
Вместо того чтобы нанимать нового сортировщика для каждого нового письма (что было бы очень дорого и неэффективно), у вас есть постоянная команда из, скажем, 5 сортировщиков. Они просто берут письма из общей очереди, как только освобождаются. Это позволяет контролировать нагрузку и эффективно использовать ресурсы (5 сортировщиков вместо 1000).
Когда это нужно?
Когда вам нужно обработать большое количество однотипных задач.

Когда вы хотите ограничить количество одновременно выполняемых задач, чтобы не перегрузить CPU, память или внешние API (например, делать не 1000 запросов в секунду, а 10).

Когда задачи независимы друг от друга.
Пул воркеров через канал заданий
package main

import (
    "fmt"
    "sync"
    "time"
)

// worker - это наша функция-воркер.
// Она будет в бесконечном цикле читать задания из канала jobs
// и отправлять результаты в канал results.
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done() // Сообщаем, что этот воркер завершился
    fmt.Printf("Воркер #%d запущен\n", id)
    for j := range jobs {
        fmt.Printf("Воркер #%d взял задание %d\n", id, j)
        // Имитируем долгую работу
        time.Sleep(time.Second)
        // Отправляем результат (например, квадрат числа)
        results <- j * j
        fmt.Printf("Воркер #%d завершил задание %d\n", id, j)
    }
}

func main() {
    // 1. Создаем каналы для заданий и результатов
    jobs := make(chan int, 10)    // Буферизованный канал, чтобы не блокировать main
    results := make(chan int, 10) // Буферизованный канал для результатов

    // 2. Запускаем пул из 3 воркеров
    var wg sync.WaitGroup
    numWorkers := 3
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }

    // 3. Отправляем 5 заданий в канал jobs
    numJobs := 5
    for j := 1; j <= numJobs; j++ {
        fmt.Printf("-> Отправляем задание %d\n", j)
        jobs <- j
    }
    close(jobs) // ВАЖНО: закрываем канал, чтобы воркеры знали, что заданий больше не будет
    fmt.Println("Все задания отправлены, канал jobs закрыт.")

    // 4. Собираем результаты
    // Мы знаем, что будет ровно numJobs результатов
    for r := 1; r <= numJobs; r++ {
        result := <-results
        fmt.Printf("<- Получен результат: %d\n", result)
    }

    // 5. Ждем, пока все воркеры завершат свою работу (важно, если бы у них была логика после цикла)
    wg.Wait()
    fmt.Println("Все воркеры завершили работу.")
}
Паттерн 2: Fan-out / Fan-in
Это не столько отдельный паттерн, сколько комбинация двух идей, которые часто используются вместе.
Fan-out (Распределение)
Fan-out — это процесс распределения задач из одного источника (одного канала) на несколько обработчиков (несколько горутин).

Что это? У вас есть один канал с входными данными. Вы запускаете несколько горутин, и каждая из них читает из этого одного канала.

Аналогия: Один менеджер (main) выдает задачи, и вся команда воркеров слушает его и берет задачи.
По сути, «паттерн Worker Pool, который мы только что рассмотрели, и есть реализация Fan-out». main «распыляет» (fans out) задания по нескольким воркерам.
Fan-in (Сбор)
Fan-in — это процесс сбора результатов с нескольких обработчиков (нескольких горутин) в один канал.

Что это? У вас есть несколько горутин, каждая из которых производит результат в свой собственный канал. Вам нужно собрать все эти результаты в один общий канал для дальнейшей обработки.

Аналогия: Все воркеры кладут готовые детали на одну общую ленту конвейера (results chan).
Объединение результатов из нескольких каналов
Давайте напишем функцию, которая реализует fan-in. Она будет принимать несколько каналов и возвращать один общий канал.
package main

import (
    "fmt"
    "sync"
    "time"
)

// Функция-воркер, которая просто отправляет числа в свой канал
func worker(id int, out chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    defer close(out) // Важно закрыть свой выходной канал, когда работа закончена
    for i := 0; i < 3; i++ {
        out <- id*10 + i // Воркер 1: 10, 11, 12. Воркер 2: 20, 21, 22.
    }
}

// Функция, реализующая fan-in
func fanIn(inputChannels ...<-chan int) <-chan int {
    var wg sync.WaitGroup

    // Создаем общий выходной канал
    output := make(chan int)

    // Внутренняя функция для чтения из одного канала и отправки в общий
    outputFunc := func(ch <-chan int) {
        defer wg.Done()
        for n := range ch {
            output <- n
        }
    }

    // Запускаем по горутине для каждого входного канала
    wg.Add(len(inputChannels))
    for _, ch := range inputChannels {
        go outputFunc(ch)
    }

    // Отдельная горутина, которая ждет, пока все читающие горутины завершатся,
    // и затем закрывает общий выходной канал.
    go func() {
        wg.Wait()
        close(output)
    }()
    
    return output
}

func main() {
    // 1. Fan-out: Запускаем двух воркеров
    ch1 := make(chan int)
    ch2 := make(chan int)
    var wg sync.WaitGroup
    wg.Add(2)
    go worker(1, ch1, &wg)
    go worker(2, ch2, &wg)

    // 2. Fan-in: Собираем результаты из обоих каналов в один
    combinedOutput := fanIn(ch1, ch2)

    // 3. Читаем из общего канала, пока он не закроется
    for n := range combinedOutput {
        fmt.Println("Получено:", n)
    }

    // Ждем, пока воркеры не закроют свои каналы (хотя fan-in уже с этим справился)
    wg.Wait()
    fmt.Println("Работа завершена.")
}
Итог: Полный конвейер Fan-out -> Fan-in
В реальных задачах эти два паттерна почти всегда идут вместе.

Fan-out: main читает данные из источника (файл, сеть) и распределяет их по каналам N воркерам.

Process: Каждый воркер выполняет свою сложную задачу.

Fan-in: Специальная функция (fanIn) собирает результаты от всех N воркеров в один канал.

main (или другая горутина) читает из финального канала и делает что-то с собранными результатами (например, записывает в файл или базу данных).
Это создает мощный, масштабируемый и контролируемый конвейер обработки данных.

Задача 3: «Генератор задач и пул воркеров»
Реализуй систему:

Есть генератор задач (main), который создаёт N «заданий» (например, числа от 1 до N). Есть канал jobs для заданий. Есть M воркеров (горутин), каждая читает из jobs, обрабатывает (например, возводит число в квадрат с time. Sleep), пишет результат в канал results. Условия: Используй буферизованный канал jobs (ёмкость выбрать разумно). После отправки всех задач jobs закрывается. Отдельная горутина читает из results и печатает результаты. WaitGroup используется, чтобы дождаться окончания всех воркеров и закрыть results. Расширения: Сделай версию, где results не буферизован, и сравни поведение (блокировки/порядок вывода). Добавь ограничение на одновременное количество «тяжёлых» задач через дополнительный семафор‑канал.