⛺ Home

Concurrency, Orderings, and Condition Variables

Go is famous for its easy-to-use concurrency primitives. However, concurrency can be challenging. It's quite straightforward to run many tasks concurrently and wait for all of them with a simple `sync.WaitGroup`. However, things fall apart quickly as soon as we add other constraints to that problem. Preserving ordering? Sounds okay. Stopping on the first error? Sure. Making sure to clean up all the goroutines spawned before returning a response? Yes (and people don't do this enough in the wild. It is truly the mark of a sophisticated developer).

Doing it all at the same time? That's what we call a day job. Those are the kinds of requirements we need to think of and answer whenever we are dealing with concurrency.

Today, however, I ran into a specific set of requirements for my concurrent problem, and it was easily solved by the outcast child of the `sync` package: sync.Cond . The prodigal child returns!

Here is the set of conditions:

  1. All tasks must run concurrently.
  2. Ordering of tasks must be preserved in the final result.
  3. Results must be streamed back to the user as they come in, as soon as they can be displayed.

Running tasks concurrently and returning the ordered results is usually done in one of two ways. The most common solution is the following: you run all the tasks, wait for them to complete, and sort the results before returning them to the user. The next most common is to create the slice of results with the known length of the tasks and keep track of the index of the task as it runs so that when the result arrives, you can insert it into the proper memory location within the slice.

However, we have a problem. In both solutions, we depend on collecting the results before we return them. How can we return them as they happen, yet ordered?

Returning them as they happen is simple; we just need to signal that we have a result, and we can achieve this with a channel. The trick to preserving ordering is to keep track of the task's index for the result and send it on the channel only if it matches the index we are expecting, then increment the index. If it does not match, then we must wait. Condition variabless allow you to broadcast to other goroutines once you've managed to send a result, and will give those goroutines a chance to contend for the next spot.

I came upon these requirements when building a test runner, and the following code will reflect that to a degree.


// RunTests will run multiple test files concurrently. It returns a channel of results. 
// Results are received in the same order as the specified test files.
func RunTests(files []string) <-chan Result {
	results := make(chan Result)

	var (
		counter int                             // The counter will keep track of the index we are waiting on.
		cond    = sync.NewCond(&sync.Mutex{})   // The condition variable we will synchronize via waits and broadcasts.
	)

    // Create a loop to spawn a goroutine for each test file.
	for i, file := range files {
        // I understand that in Go 1.22, capturing the loop vars will no longer be necessary, but I do it now because I am scared of change.
		go func(i int, file string) {
            // Immediately run the test. All tests are running concurrently.
			result := RunSingleTest(file)

            // Grab the condition lock!!! We will want to check the condition. If we don't hold the lock, then race-city, here we come.
			cond.L.Lock()

            // It's only polite.
			defer cond.L.Unlock()

            // Here is our condition. We want our counter to equal the index of the goroutine.
            // If it's not, we wait.
            //
            // Make sure to wait in a loop, because when the routine is signaled again, we have no guarantees that the condition will be true.
            // This is one of those rare times when 'for' > 'if' when expressing conditions.
			for counter != i {
				cond.Wait() // When we enter wait, we release the lock. When wait exits, we have the lock. Easy-breezy.
			}

            // The index is correct, our time has come! Push the result!
			results <- result

            // This was the last index. Let's close up shop!
			if i == len(files)-1 {
				close(results)
				return
			}

            // Increment the counter; we want the next index to come through!
            // Notice that we are not using atomics. We still have the lock at this point.
            // Counter is not under threat!
			counter++

            // We let all the other goroutines know that they can wake up and fight for the lock again.
			cond.Broadcast()

            // Remember that defer of the unlock? It allows those goroutines that have been woken up via the broadcast
            // a chance at getting that lock. If not, they will be deadlocked. And that sounds bad.
		}(i, file)
	}

	return results
}
    

In conclusion, dealing with concurrency in Go, while utilizing the power of `sync.Cond`, offers a solution to complex requirements. It enables tasks to run concurrently while preserving order and streaming results in a well-organized manner. This approach exemplifies the essence of Go's lesser known concurrency primitives and empowers developers to tackle challenging concurrent problems effectively.

Update Nov 17 2023

sync.Cond will never find love

This example was great for exploring a condition, and learning how to use sync.Cond . However, sync.Cond will never find love. After having shared this solution, many people gave good arguments as to why channels may be more appropriate for this problem space. The issue with the above solution is that the goroutines hang around and contend for the lock needlessly. With a channel approach, we can simply store the values in a channel per routine, and merge them serially into a resulting channel.

Let's look at it in action!


func RunTests(testDirs []string) <-chan Result {
	// Create the results, one channel for every test, and in the darkness bind them.
	results := make([]chan Result, len(testDirs))
	for i := range testDirs {
		// Let's give them a buffer size of 1.
		// This allows the worker routines to exit immediately instead of waiting for
		// others to finish before its result can be read.
		results[i] = make(chan Result, 1)
	}

	// At the end of the day we want one serially merged channel.
	merged := make(chan Result)
	go func() {
		for _, ch := range results {
			merged <- <-ch // pull from each channel in order!
		}
		close(merged)
	}()

	// This is much simpler!
	for i, dir := range testDirs {
		go func(i int, dir string) {
			results[i] <- RunSingleTest(dir)
		}(i, dir)
	}

	return merged
}
  

Sorry sync.Cond !