« Overriding GetHashCode for value types | Main | Fixing C++ projects that always rebuild »

30 October 2010

Coroutines with C# 5's await

C# 5's new "await" keyword is not just for orchestrating and controlling concurrency on multiple threads; it can also be used to introduce "exotic" control flow, such as coroutines (see also here), to a single-threaded C# program.

I was trying to find a really good sample coroutine problem to solve with "await", but all the ones I found given as standard examples (e.g., odd word problem, same fringe problem) could be easily (and I think more suitably) solved with C# 2.0 iterators (because the data only flows one way between the sub/coroutines needed to solve those problems).

The idea I finally came up with was of merging two ordered sequences of numbers. In C# 2.0, you could write a Merge method that creates an enumerator for each input sequence, tracks whether each sequence still has items, and compares the heads of each sequence to determine which is smaller. This is a lot of code, and the scaffolding obscures the core of the method, which is the comparison of the two items:

    void MergeWithEnumerators(IEnumerable<int> leftSequence, IEnumerable<int> rightSequence)
    {
        // enumerate the sequences
        using (IEnumerator<int> left = leftSequence.GetEnumerator())
        using (IEnumerator<int> right = rightSequence.GetEnumerator())
        {
            // start the enumeration
            bool leftHasItems = left.MoveNext();
            bool rightHasItems = right.MoveNext();

            // continue until out of items
            while (leftHasItems || rightHasItems)
            {
                // determine whether to output from left or right
                bool outputLeft;
                if (leftHasItems && rightHasItems)
                    outputLeft = left.Current < right.Current;
                else
                    outputLeft = leftHasItems;

                // output correct element
                if (outputLeft)
                {
                    Console.WriteLine(left.Current);
                    leftHasItems = left.MoveNext();
                }
                else
                {
                    Console.WriteLine(right.Current);
                    rightHasItems = right.MoveNext();
                }
            }
        }
    }

(I've used Console.WriteLine for simplicity; in the real world, you'd probably want this method to return IEnumerable and yield return the merged elements.)

In C# 5, the merge could be written with coroutines. Imagine two Merge methods running in an interleaved fashion (not simultaneously), each processing one input sequence. If the methods can know what the smallest current item in the other sequence is, then their implementation is trivial: for each of the values in their own sequence, compare it to the smallest value from the other sequence. If it's smaller, print it; if not, switch to the other method and let it processs its sequence:

    // The Merge coroutine is run on each of the two input sequences.
    async void Merge(IEnumerable<int> seq, int? smallestInOther)
    {
        // process our entire sequence
        foreach (int value in seq)
        {
            // switch to the other if it has smaller values than we do
            if (value > smallestInOther)
                smallestInOther = await SwitchToOther(value);

            // our value is smaller, print it
            Console.WriteLine(value);
        }
    }

Of course, this code can't stand alone; it needs a scheduler that will start both the methods, and also an implementation of SwitchToOther, which will save this method's "resume" action, and invoke it later when the second method switches back to this one. There's a lot of code here, but much of it is just the boilerplate that's required to support the "await" keyword (GetAwaiter, BeginAwait, EndAwait).

    // continuations (queued up by "await SwitchToOther") that are ready to be resumed
    Queue<SwitchAwaiter> awaiters = new Queue<SwitchAwaiter>();

    // the smallest value in each input sequence; this needs to be passed to the other Merge method
    Queue<int> smallestValues = new Queue<int>();

    // Merges two sequences using coroutines.
    void Merge(IEnumerable<int> left, IEnumerable<int> right)
    {
        // start each of the methods running
        Merge(left, int.MinValue);
        Merge(right, smallestValues.Dequeue());

        // keep switching between them as requested
        while (awaiters.Count > 0)
        {
            SwitchAwaiter switchAwaiter = awaiters.Dequeue();

            // the last switch will have no smallest value (because the Merge method finished its
            // enumeration, and exited); we pass 'null' as a sentinel
            switchAwaiter.Resume(smallestValues.Count == 0 ? default(int?) : smallestValues.Dequeue());
        }
    }

    // Allows a method to switch to another method by 'awaiting' the returned value.
    SwitchAwaiter SwitchToOther(int smallest)
    {
        // this method is called by 'await', so we need to return a type that implements 'GetAwaiter'
        SwitchAwaiter switchAwaiter = new SwitchAwaiter();

        // queue this as the method to switch to in the future
        awaiters.Enqueue(switchAwaiter);
        smallestValues.Enqueue(smallest);

        return switchAwaiter;
    }

    class SwitchAwaiter
    {
        public SwitchAwaiter GetAwaiter()
        {
            // 'GetAwaiter' must return an object that implements 'BeginAwait' and 'EndAwait'
            // for simplicity, it can be this same object
            return this;
        }

        // 'BeginAwait' is called to "suspend" the method executing the 'await'.
        public bool BeginAwait(Action resume)
        {
            // save the action that will be used to resume the method
            m_resume = resume;

            // 'true' means that the awaiting method needs to return control to its caller and
            // that it will be resumed later on
            return true;
        }

        public int? EndAwait()
        {
            // once we resume the method, 'EndAwait' supplies a value to it (from the 'await' keyword)
            // in this case, we give it the smallest value from the other Merge method
            return m_smallest;
        }

        // 'Resume' returns control to the method that executed 'await SwitchToOther'. 'smallest' is the
        // smallest value in the sequence being enumerated by the other method.
        public void Resume(int? smallestInOther)
        {
            m_smallest = smallestInOther;
            m_resume();
        }

        Action m_resume;
        int? m_smallest;
    }

At the end, was it worth it? Yes, the Merge method itself is shorter and concisely expresses its purpose, but there's a lot of supporting code required, which may or may not be reusable to solve different problems. This method is also not scalable: if we wanted to merge three (or more) sequences, it's clearly wrong to have each coroutine pick which of its peers it's going to switch to; in that case, we'd definitely want a single consumer that's picking the overall next smallest value (perhaps using a min heap for efficiency if there are lots of sequences being merged).

So while it was an interesting challenge to interleave the execution of two methods on the same thread using "await", it isn't the best solution to this particular problem. (Maybe there is a similar problem for which "await" really is the killer solution; if you know of one, I'd love to hear about it.)

That's not to say that "await" is useless; far from it! There are many other problems (that we've solved more or less well in the Logos code today) that could be solved far more elegantly and concisely with the "await" keyword; I'm definitely looking forward to using it.

P.S. As an exercise for the reader, change "Console.WriteLine(value);" (in the first Merge method) to "await Output(value);" and rewrite the second Merge method to "yield return" the merged values, so that the merge can happen lazily.

Posted by Bradley Grainger at October 30, 2010 10:30 AM