Go: Building Concurrent Data Processing Pipelines

Now that we understand how to use goroutines and channels, let's explore how to combine them into concurrent pipelines for efficient data processing.
Leaked Goroutines
Consider a function that sends numbers within a specified range to a channel:
func rangeGen(start, stop int) <-chan int {
out := make(chan int)
go func() {
for i := start; i < stop; i++ {
out <- i
}
close(out)
}()
return out
}
It appears to work correctly:
func main() {
generated := rangeGen(41, 46)
for val := range generated {
fmt.Println(val)
}
}
41
42
43
44
45
However, let's examine what happens if we exit the loop prematurely:
func main() {
generated := rangeGen(41, 46)
for val := range generated {
fmt.Println(val)
if val == 42 {
break
}
}
}
41
42
At first glance, it still works. But there's a problem — the rangeGen() goroutine becomes stuck:
func rangeGen(start, stop int) <-chan int {
out := make(chan int)
go func() {
for i := start; i < stop; i++ { // (1)
out <- i // (2)
}
close(out)
}()
return out
}
Since main() breaks its loop at number 42 and stops reading from the generated channel, the loop inside rangeGen() ➊ didn't complete. It got permanently blocked trying to send number 43 to the out channel ➋. The goroutine is stuck. The out channel didn't close, so if other goroutines depended on it, they would also get stuck.
In this case, it's not critical: when main() exits, the runtime will terminate all other goroutines. But if main() continued to run and called rangeGen() repeatedly, the leaked goroutines would accumulate. This is problematic: goroutines are lightweight but not completely "free". Eventually, you might run out of memory (the garbage collector doesn't collect goroutines).
We need a mechanism to terminate a goroutine early.
Cancel Channel
First, we'll create a separate cancel channel through which main() will signal rangeGen() to exit:
func main() {
cancel := make(chan struct{}) // (1)
defer close(cancel) // (2)
generated := rangeGen(cancel, 41, 46) // (3)
for val := range generated {
fmt.Println(val)
if val == 42 {
break
}
}
}
We create a cancel channel ➊ and immediately set up a deferred close(cancel) ➋. This is a common practice to avoid tracking every place in the code where the channel needs to be closed. defer ensures that the channel is closed when the function exits, so you don't have to worry about it.
Next, we pass the cancel channel to the goroutine ➌. Now, when the channel closes, the goroutine needs to detect this and exit. Ideally, you'd add a check like this:
func rangeGen(cancel <-chan struct{}, start, stop int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := start; i < stop; i++ {
out <- i
if <-cancel == struct{}{} { // (1)
return
}
}
}()
return out
}
fatal error: all goroutines are asleep - deadlock!
If cancel is closed, the check ➊ will pass (a closed channel always returns a zero value, remember?), and the goroutine will exit. However, if cancel isn't closed, the goroutine would block and not continue to the next loop iteration.
We need a different, non-blocking approach:
- If
cancelis closed, exit the goroutine; - Otherwise, send the next value to
out.
Go has a select statement for this:
func rangeGen(cancel <-chan struct{}, start, stop int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := start; i < stop; i++ {
select {
case out <- i: // (1)
case <-cancel: // (2)
return
}
}
}()
return out
}
41
42
select is somewhat like switch, but specifically designed for channels. Here's what it does:
- Checks which cases are not blocked.
- If multiple cases are ready, randomly selects one to execute.
- If all cases are blocked, waits until one is ready.
In our case, while cancel is open, its case ➋ is blocked (you can't read from a channel if no one is writing to it). However, the out <- i case ➊ is unblocked because main() is reading from out. So, select will execute out <- i in each loop iteration.
Then main() will reach number 42 and stop reading from out. After that, both select cases will block, and the goroutine will (temporarily) hang.
Finally, main() will execute the deferred close(cancel), which will unblock the second select case ➋, and the goroutine will exit. The out channel will close too, thanks to defer.
If main() decides not to stop at 42 and continues to read all values, the cancel channel approach will still work correctly:
func main() {
cancel := make(chan struct{})
defer close(cancel)
generated := rangeGen(cancel, 41, 46)
for val := range generated {
fmt.Println(val)
}
}
41
42
43
44
45
Here, rangeGen() will finish before main() calls close(cancel). Which is perfectly fine.
So thanks to the cancel channel and the select statement, the rangeGen() goroutine will exit correctly regardless of what happens in main(). No more leaked goroutines!
Cancel vs. Done
The cancel channel is similar to the done channel that we covered in previous chapters.
Done channel:
// Goroutine B receives a channel to signal
// when it has finished its work.
func b(done chan<- struct{}) {
// do work...
done <- struct{}{}
}
func a() {
done := make(chan struct{})
go b(done)
// Goroutine A waits for B to finish its work.
<-done
}
Cancel channel:
// Goroutine B receives a channel
// to get a cancel signal.
func b(cancel <-chan struct{}) {
// do work...
select {
case <-cancel:
return
}
}
func a() {
cancel := make(chan struct{})
go b(cancel)
// Goroutine A signals to B
// that it is time to stop.
close(cancel)
}
In practice, both cancel and done channels are often named "done", so don't be surprised. To avoid confusion, we'll use "cancel" for cancellation and "done" for completion.
Merging Channels
Sometimes several independent functions send results to their own channels. But it's more convenient to work with a single result channel. So you need to merge the output channels of these functions into a single channel.
Sequential Merging
The simplest approach is to merge channels sequentially:
func merge(ch1, ch2 <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for val := range ch1 {
out <- val
}
for val := range ch2 {
out <- val
}
}()
return out
}
This works, but it processes channels one after another, which may not be optimal for concurrent processing.
Concurrent Merging
A better approach is to merge channels concurrently:
func merge(ch1, ch2 <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for val := range ch1 {
out <- val
}
}()
go func() {
defer wg.Done()
for val := range ch2 {
out <- val
}
}()
wg.Wait()
}()
return out
}
This approach processes both channels simultaneously, which is more efficient.
Merging with Select
For multiple channels, you can use select:
func merge(channels ...<-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
var wg sync.WaitGroup
wg.Add(len(channels))
for _, ch := range channels {
go func(c <-chan int) {
defer wg.Done()
for val := range c {
out <- val
}
}(ch)
}
wg.Wait()
}()
return out
}
Pipelines
A pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function. In each stage, the goroutines:
- Receive values from upstream via inbound channels
- Perform some function on that data, usually producing new values
- Send values downstream via outbound channels
Each stage has any number of inbound and outbound channels, except the first and last stages, which have only outbound or inbound channels, respectively. The first stage is sometimes called the source or producer; the last stage, the sink or consumer.
Here's a simple pipeline example:
The "generate" stage:
// generate sends numbers from 1 to stop inclusive.
func generate(stop int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 1; i <= stop; i++ {
out <- i
}
}()
return out
}
The "calculate" stage:
// Answer represents the result of a calculation.
type Answer struct {
x, y int
}
// calculate produces answers for the given numbers.
func calculate(in <-chan int) <-chan Answer {
out := make(chan Answer)
go func() {
defer close(out)
for n := range in {
out <- fetchAnswer(n)
}
}()
return out
}
The "print" stage (in the main goroutine for simplicity):
func main() {
inputs := generate(5)
answers := calculate(inputs)
for ans := range answers {
fmt.Printf("%d -> %d\n", ans.x, ans.y)
}
}
1 -> 1
2 -> 4
3 -> 9
4 -> 16
5 -> 25
Preventing Goroutine Leaks
To prevent goroutine leaks in pipelines, we need to ensure that:
- All channels are properly closed when no longer needed
- Goroutines can be cancelled when the pipeline is no longer needed
- We use
defer close()to ensure cleanup happens
Here's an improved version with cancellation support:
func generate(cancel <-chan struct{}, stop int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 1; i <= stop; i++ {
select {
case out <- i:
case <-cancel:
return
}
}
}()
return out
}
func calculate(cancel <-chan struct{}, in <-chan int) <-chan Answer {
out := make(chan Answer)
go func() {
defer close(out)
for {
select {
case n, ok := <-in:
if !ok {
return
}
select {
case out <- fetchAnswer(n):
case <-cancel:
return
}
case <-cancel:
return
}
}
}()
return out
}
Error Handling
The fetchAnswer function is responsible for retrieving an answer for a given number from a remote API:
func fetchAnswer(n int) Answer {
// ...
}
Hoping that a remote service will always work properly is a bit unrealistic. We have to account for errors:
func fetchAnswer(n int) (Answer, error) {
// ...
}
But what should we do with these errors (if any)? There is no place for them in calculate.
As it turns out, we have three options.
Return on First Error
If we don't tolerate errors, the easiest thing to do is to return from calculate as soon as fetchAnswer encounters an error. Since the out channel only accepts Answers, let's add a separate errc channel with a place for a single error:
// calculate produces answers for the given numbers.
func calculate(in <-chan int) (<-chan Answer, <-chan error) {
out := make(chan Answer)
errc := make(chan error, 1) // (1)
go func() {
defer close(out)
for n := range in {
ans, err := fetchAnswer(n)
if err != nil {
errc <- err // (2)
return
}
out <- ans
}
errc <- nil // (3)
}()
return out, errc
}
The error channel is buffered with a capacity of one ➊. As a result of calculate execution, it will contain either an actual error ➋ or nil ➌, depending on the results of the remote call in fetchAnswer.
Since errc is guaranteed to contain a value (an error or nil), we can read from it in the next pipeline step without using select:
func main() {
inputs := generate(5)
answers, errs := calculate(inputs)
for ans := range answers {
fmt.Printf("%d -> %d\n", ans.x, ans.y)
}
if err := <-errs; err != nil {
fmt.Println("error:", err)
}
}
1 -> 1
error: bad number
But what if we don't want to stop the whole pipeline because of a single error? Enter the next option.
Composite Result Type
Let's get rid of the error channel and return an error along with the answer. To do this, we will introduce a separate result type:
// Result contains an answer or an error.
type Result struct {
answer Answer
err error
}
Now calculate can send result values to the output channel:
// calculate produces answers for the given numbers.
func calculate(in <-chan int) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for n := range in {
ans, err := fetchAnswer(n)
out <- Result{ans, err}
}
}()
return out
}
And the next pipeline step can process these results however it wants:
func main() {
inputs := generate(5)
results := calculate(inputs)
for res := range results {
if res.err == nil {
fmt.Printf("%d -> %d\n", res.answer.x, res.answer.y)
} else {
fmt.Printf("%d -> error: %s\n", res.answer.x, res.err)
}
}
}
1 -> 1
2 -> error: bad number
3 -> 9
4 -> error: bad number
5 -> 25
We don't have to introduce a separate result type for each possible pipeline step in our program. A single generic Result will suffice:
// Result contains a value or an error.
type Result[T any] struct {
val T
err error
}
func (r Result[T]) OK() bool {
return r.err == nil
}
func (r Result[T]) Val() T {
return r.val
}
func (r Result[T]) Err() error {
return r.err
}
// calculate produces answers for the given numbers.
func calculate(in <-chan int) <-chan Result[Answer] {
// ...
}
func main() {
inputs := generate(5)
results := calculate(inputs)
for res := range results {
if res.OK() {
fmt.Printf("✓ %v\n", res.Val())
} else {
fmt.Printf("✗ %v\n", res.Err())
}
}
}
Collect Errors Separately
Let's say we don't want to bother handling errors from individual pipeline stages. What we want is a single error collector for the entire pipeline. For simplicity, it'll just log all errors:
// collectErrors prints all incoming errors.
func collectErrors(in <-chan error) <-chan struct{} {
done := make(chan struct{})
go func() {
defer close(done)
for err := range in {
fmt.Printf("error: %s\n", err)
}
}()
return done
}
Since there will be a single error channel for all pipeline stages, we'll create it in main and pass it to each of the pipeline steps:
func main() {
errc := make(chan error)
done := collectErrors(errc)
inputs := generate(5, errc)
answers := calculate(inputs, errc)
for ans := range answers {
fmt.Printf("%d -> %d\n", ans.x, ans.y)
}
close(errc)
<-done
}
At each stage of the pipeline, we'll send any errors we encounter to the error channel:
// calculate produces answers for the given numbers.
func calculate(in <-chan int, errc chan<- error) <-chan Answer {
out := make(chan Answer)
go func() {
defer close(out)
for n := range in {
ans, err := fetchAnswer(n)
if err == nil {
out <- ans
} else {
errc <- err
}
}
}()
return out
}
1 -> 1
3 -> 9
error: bad number
error: bad number
5 -> 25
Works like a charm. But there is one caveat: since errors are no longer tied to answers, we do not know which numbers caused the remote API to fail. Of course, we can add the necessary information to the error text, or even create a richer error type, so it's probably not a big deal.
Also, the error collector should be reasonably fast, so that it does not slow down (or even block) the normal pipeline flow in case of occasional errors. We can add a buffer to the error channel and use select, just to be sure:
func main() {
// A buffered channel to queue up to 100 errors.
errc := make(chan error, 100)
// ...
}
// calculate produces answers for the given numbers.
func calculate(in <-chan int, errc chan<- error) <-chan Answer {
out := make(chan Answer)
go func() {
defer close(out)
for n := range in {
ans, err := fetchAnswer(n)
if err == nil {
out <- ans
} else {
select {
case errc <- err:
default:
// If errc is full, drop the error.
}
}
}
}()
return out
}
This approach is quite rare. Using a result type is more common in practice.
Summary
Pipelines are one of the most common uses of concurrency in real-world programs. Now you know how to:
- Combine pipelines of independent blocks.
- Split and merge data streams.
- Cancel pipeline stages.
- Prevent goroutine leaks.
- Handle pipeline step errors.
Pipelines provide a powerful way to process data concurrently while maintaining clear structure and error handling capabilities.






