Concurrency в Go: worker pool, fan-out/fan-in и pipeline

Запустить горутину на каждую задачу легко. Положить так прод ещё легче: десятки тысяч соединений к базе, исчерпание памяти, деградация. Горутины и каналы дают примитивы конкурентности, но не говорят, как строить из них рабочие схемы, которые не валят систему под нагрузкой. На практике почти всё сводится к трём паттернам: pipeline (конвейер), fan-out/fan-in и worker pool. В этой статье разберу каждый на рабочем коде. Покажу, когда какой выбирать, как ограничить число одновременных горутин, как корректно останавливать обработку и собирать ошибки. Это хаб кластера про конкурентность: отдельные темы (context, errgroup, WaitGroup) вынесены в свои разборы, ссылки по тексту.

Если вы только начинаете с языком, сначала имеет смысл пройти базовый маршрут из статьи «Go для начинающих: дорожная карта». Здесь я считаю, что вы уже понимаете, что такое горутина и канал.

Три кирпичика, на которых всё держится

Прежде чем собирать паттерны, зафиксируем базу. Все три схемы стоят на одних и тех же свойствах горутин и каналов.

Горутина это дешёвая единица выполнения. Стартовый стек у неё несколько килобайт, и он растёт по мере надобности. Поэтому десятки тысяч горутин это норма, а не повод для тревоги. Но дешёвая не значит бесплатная. Если плодить горутины без счёта, под каждый входящий запрос, дело кончается тем самым исчерпанием памяти и соединений. Отсюда и потребность в паттернах, которые ограничивают параллелизм.

Канал это типизированная труба между горутинами. Небуферизированный канал синхронен: отправка блокируется, пока кто-то не прочитает. Буферизированный канал принимает значения до заполнения буфера, потом отправка тоже блокируется. Это важное свойство. Блокировка на канале это естественный механизм обратного давления (backpressure). Медленный потребитель автоматически притормаживает быстрого производителя.

Закрытие канала это сигнал «данных больше не будет». Закрывает всегда отправитель, а не получатель. Цикл for v := range ch читает значения, пока канал не закрыт, и завершается сам. Два правила, которые стоит запомнить навсегда: отправка в закрытый канал вызывает панику, повторное закрытие тоже паника. Поэтому ответственность за закрытие должна лежать на одной конкретной горутине.

ch := make(chan int)

go func() {
    defer close(ch) // закрываем ровно один раз, в отправителе
    for i := 0; i < 3; i++ {
        ch <- i
    }
}()

for v := range ch { // выйдет сам, когда канал закроется
    fmt.Println(v)
}

Дешевизна горутин, обратное давление на канале и дисциплина закрытия. На этих трёх свойствах держится всё остальное.

Pipeline: конвейер из стадий

Pipeline это цепочка стадий, где каждая стадия получает данные из входного канала, что-то с ними делает и отправляет результат в выходной. Выход одной стадии становится входом следующей. Так строят потоковую обработку: данные текут через конвейер, и каждый этап занят своим делом.

Каноническая форма стадии в Go выглядит так: функция принимает входной канал «только на чтение» и возвращает выходной канал. Внутри она запускает горутину, которая читает вход, пишет в выход и закрывает выход по завершении.

// generator — источник конвейера: превращает аргументы в поток.
func generator(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

// square — стадия: читает числа, возводит в квадрат, отдаёт дальше.
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

Сборка конвейера читается линейно, как описание потока данных:

func main() {
    // generator -> square -> square -> вывод
    for n := range square(square(generator(2, 3, 4))) {
        fmt.Println(n)
    }
}
$ go run main.go
16
81
256

Обратите внимание на типы. Стадии возвращают <-chan int, канал «только на чтение». Это не косметика. Тип в сигнатуре документирует направление и не даёт случайно записать в чужой выход или закрыть не свой канал. Закрытие выхода через defer close(out) гарантирует, что следующая стадия корректно завершит свой range, когда предыдущая закончит.

Сильная сторона конвейера в том, что стадии работают одновременно. Пока вторая стадия возводит в квадрат первое число, первая уже готовит второе. Это конкурентность без явного управления горутинами на стороне вызова: вся машинерия спрятана внутри стадий.

Слабое место конвейера в его линейности. Если одна стадия тяжёлая, например делает сетевой запрос на каждый элемент, она становится бутылочным горлышком. Весь конвейер идёт со скоростью самого медленного звена. Эту стадию и нужно распараллелить, что и делает fan-out.

Fan-out/fan-in: распараллелить узкое место

Fan-out это запуск нескольких горутин, читающих из одного и того же входного канала. Несколько обработчиков разбирают задачи из общей очереди, и тяжёлая стадия перестаёт быть последовательной. Fan-in это обратная операция: слить результаты нескольких горутин в один канал.

Fan-out тривиален. Запускаем N горутин на один и тот же входной канал, каждая получает свой выходной:

in := generator(jobs...)

// fan-out: три параллельных экземпляра тяжёлой стадии
c1 := heavyStage(in)
c2 := heavyStage(in)
c3 := heavyStage(in)

Все три горутины читают из in. Рантайм сам распределяет элементы: кто свободен, тот и забирает следующий. Это и есть балансировка нагрузки без единой строчки кода про балансировку.

Fan-in сложнее, потому что надо слить несколько каналов в один и закрыть результат ровно тогда, когда опустеют все источники. Классическое решение опирается на sync.WaitGroup:

// merge сливает несколько каналов в один.
func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // На каждый входной канал — горутина, перекладывающая в out.
    forward := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            out <- n
        }
    }

    wg.Add(len(cs))
    for _, c := range cs {
        go forward(c)
    }

    // Закрываем out, только когда все источники иссякли.
    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

Ключевой момент в последней горутине. Закрыть out нельзя из forward: их несколько, а закрытие должно произойти один раз. Поэтому отдельная горутина ждёт wg.Wait() и закрывает выход после того, как все перекладчики завершились. Это типовая идиома «WaitGroup плюс закрывающая горутина», и она повторяется почти везде, где сходятся несколько источников.

Собираем всё вместе:

in := generator(jobs...)

// fan-out на три обработчика
c1 := heavyStage(in)
c2 := heavyStage(in)
c3 := heavyStage(in)

// fan-in: слили обратно в один поток
for result := range merge(c1, c2, c3) {
    fmt.Println(result)
}

Про sync.WaitGroup стоит знать одну свежую деталь. В Go 1.25 у него появился метод wg.Go(), который сам делает Add(1) и Done(), убирая частый источник ошибок. Я разбирал это в статье про sync.WaitGroup в Go 1.25.

Fan-out/fan-in отвечает на вопрос «как ускорить узкое место». Но он не отвечает на вопрос «сколько горутин запускать». В примере выше их жёстко три. А если задач десятки тысяч и каждая хочет своё соединение к базе? Запускать горутину на каждую нельзя. Нужен механизм, который держит число одновременных обработчиков под контролем. Это worker pool.

Worker pool: ограниченное число обработчиков

Worker pool (пул воркеров) это фиксированное число горутин-обработчиков, которые разбирают задачи из общего канала. Идея в том, чтобы развязать количество задач и количество одновременных обработчиков. Задач может быть миллион, а воркеров ровно столько, сколько система выдерживает.

Это самый частый поисковый запрос по теме и самый практичный паттерн в продакшене. Им ограничивают нагрузку на внешние сервисы, базы данных, дисковый ввод-вывод. Везде, где ресурс конечен, нужен пул.

Базовая схема: канал задач, канал результатов, N воркеров и WaitGroup для корректного завершения.

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for j := range jobs { // забирает задачи, пока канал не закрыт
        results <- process(j)
    }
}

func main() {
    const numJobs = 100
    const numWorkers = 4

    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    // Запускаем фиксированный пул воркеров.
    var wg sync.WaitGroup
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }

    // Кладём задачи и закрываем канал — это сигнал воркерам.
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)

    // Закрываем results, когда все воркеры закончили.
    go func() {
        wg.Wait()
        close(results)
    }()

    // Читаем результаты, пока канал открыт.
    for r := range results {
        fmt.Println(r)
    }
}

Разберём, как это работает. Четыре воркера висят на for j := range jobs. Пока канал jobs открыт и в нём есть задачи, они их разбирают. Когда главная горутина отправила все задачи и сделала close(jobs), циклы range в воркерах завершаются, каждый воркер делает wg.Done() через defer. Отдельная горутина дожидается wg.Wait() и закрывает results. Это разблокирует финальный range results в main.

Здесь снова та же идиома закрытия: канал результатов закрывает не воркер (их несколько), а отдельная горутина после WaitGroup. Если закрыть results раньше времени, воркер упадёт с паникой «send on closed channel». Если не закрыть вообще, range results зависнет навсегда. Порядок закрытия в worker pool это главный источник дедлоков и паник, поэтому держите его в голове.

Сколько воркеров запускать

Универсального числа нет, оно зависит от характера работы.

Для CPU-bound задач (вычисления, сжатие, парсинг) разумная отправная точка это runtime.GOMAXPROCS(0). По умолчанию это число логических ядер, а с Go 1.25 в контейнере ещё и с учётом cgroup-лимита CPU. Больше воркеров, чем ядер, на чистом счёте не ускорит, а добавит накладные расходы на переключение.

Для IO-bound задач (запросы к базе, HTTP, диск) воркеров обычно нужно больше, чем ядер, потому что горутина большую часть времени ждёт ответа и не грузит процессор. Здесь число воркеров чаще диктуется не ядрами, а ограничением внешнего ресурса. Например, размером пула соединений к базе. Нет смысла держать 200 воркеров, если база отдаёт максимум 20 соединений.

Правильный путь это измерить. Поставьте число воркеров параметром и прогоните бенчмарк на реальной нагрузке. Как это делать аккуратно, со статистикой, а не на глаз, я писал в статье про бенчмарки и оптимизацию в Go.

Отмена и обработка ошибок

Все примеры выше предполагают, что обработка идёт до конца и ничего не ломается. В реальности нужно уметь две вещи: останавливать пул досрочно (клиент отвалился, истёк таймаут) и реагировать на ошибку в одном из воркеров.

Отмена через context

Стандартный механизм отмены в Go это context.Context. Воркер вдобавок к каналу задач слушает ctx.Done() и выходит, как только контекст отменён.

func worker(ctx context.Context, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        select {
        case j, ok := <-jobs:
            if !ok {
                return // канал задач закрыт, работа кончилась
            }
            select {
            case results <- process(j):
            case <-ctx.Done(): // отмена во время отправки результата
                return
            }
        case <-ctx.Done(): // отмена во время ожидания задачи
            return
        }
    }
}

select слушает сразу несколько каналов и реагирует на тот, что готов первым. Здесь воркер одновременно ждёт новую задачу и сигнал отмены. Вложенный select на отправке результата нужен, чтобы воркер не завис на results <- ..., если потребитель уже ушёл. Подробно про устройство контекста, отмену, таймауты и типичные ошибки я разбирал в отдельной статье про context.Context.

Ошибки и errgroup

Когда воркеры могут возвращать ошибку и нужно остановить всех при первом сбое, ручная связка WaitGroup плюс context плюс канал ошибок становится громоздкой. Для этого есть пакет golang.org/x/sync/errgroup. Он делает ровно это: запускает группу горутин, отменяет общий контекст при первой ошибке и возвращает её из Wait().

Особенно удобен метод SetLimit. Он превращает errgroup.Group в worker pool с ограничением параллелизма буквально одной строкой, без ручного канала задач:

func processAll(ctx context.Context, jobs []Job) error {
    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(8) // не больше 8 горутин одновременно

    for _, j := range jobs {
        g.Go(func() error {
            return process(ctx, j) // первая ошибка отменит ctx для остальных
        })
    }

    return g.Wait() // вернёт первую ошибку или nil
}

Это самый короткий способ получить ограниченный пул с отменой и сбором ошибок. Когда задачи помещаются в слайс и не нужен потоковый конвейер, я обычно беру именно его. Полный разбор API, включая TryGo и сравнение с каналами, в статье про errgroup.

Если код собирается под Go до 1.22, в цикл нужно добавить строчку j := j перед g.Go, иначе все горутины захватят последнее значение переменной. С Go 1.22 семантика переменной цикла изменилась, и копия больше не нужна.

Когда какой паттерн

Паттерны не конкурируют, они закрывают разные задачи и часто комбинируются. Конвейер с fan-out на тяжёлой стадии это уже гибрид. Ориентир для выбора такой:

ПаттернКогда братьЧто даёт
PipelineМногоэтапная потоковая обработка, данные текут через стадииЭтапы работают одновременно, чистое разделение логики
Fan-out/fan-inОдна стадия конвейера стала узким местомРаспараллеливание тяжёлого этапа, балансировка рантаймом
Worker poolМного однотипных задач, ресурс ограниченКонтроль числа одновременных обработчиков, защита внешних систем

На практике начинают с простого. Один канал и пара горутин решают больше задач, чем кажется. Усложнять до полноценного пула с контекстом и errgroup стоит тогда, когда появляется конкретная причина: нагрузка на базу, потребность в отмене, сбор ошибок. Преждевременное усложнение конкурентного кода обходится дорого, потому что отлаживать гонки и дедлоки тяжелее, чем обычные баги.

Две привычки экономят больше всего нервов. Первая: всегда держать в голове, кто и когда закрывает канал, потому что почти все паники и зависания в этих паттернах растут из неправильного закрытия. Вторая: запускать тесты с флагом -race, детектор гонок ловит ошибки, которые иначе всплывут только в проде под нагрузкой.

Итог

Конкурентность в Go строится из трёх повторяющихся схем. Pipeline соединяет стадии обработки в поток, где этапы работают одновременно. Fan-out/fan-in распараллеливает узкое место конвейера и сливает результаты обратно. Worker pool ограничивает число одновременных обработчиков, чтобы не положить внешний ресурс. Во всех трёх ключевую роль играет дисциплина закрытия каналов: закрывает отправитель, ровно один раз, а слияние требует отдельной горутины после WaitGroup.

Поверх этих схем ложатся два инструмента из стандартного арсенала. context.Context отвечает за отмену и таймауты, errgroup за ограничение параллелизма и сбор ошибок. Начинайте с простого варианта и усложняйте под конкретную потребность, а не на всякий случай.

Дальше по теме стоит посмотреть на errgroup как готовый ограниченный пул с обработкой ошибок и на context.Context как фундамент отмены. Общий маршрут изучения языка собран в дорожной карте для начинающих.


Теги: