Asynchronous Programming with the Reactive Extensions (while waiting for async/await)

By jay at November 25, 2011 20:43 Tags: , , , , , ,

This article was originally published on the MVP Award Blog in December 2011.

Nowadays, with applications that use more and more services that are in the cloud, or simply perform actions that take a user noticeable time to execute, it has become vital to program in an asynchronous way.

But we, as developers, feel at home when thinking sequentially. We like to send a request or execute a method, wait for the response, and then process it.

Unfortunately for us, an application just cannot wait synchronously for a call to end anymore. Reasons can be that the user expects the application to continue responding, or because the application joins the results of multiple operations, and it is necessary to perform all these operations simultaneously for good performance.

Frameworks that are heavily UI dependent (like Silverlight or Silverlight for Windows Phone) are trying the force the developer's hand into programming asynchronously by removing all synchronous APIs. This leaves the developer alone with either the Begin/End pattern, or the plain old C# events. Both patterns are not flexible, not easily composable, often lead to memory leaks, and are just plain difficult to use or worse, to read.

C# 5.0 async/await

Taking a quick look at the not so distant future, Microsoft has taken the bold approach to augment its new .NET 4.5 to include asynchronous APIs and in the case of the Windows Runtime (WinRT), restrict some APIs to be asynchronous only. These are based on the Task class, and are backed by languages to ease asynchronous programming.

In the upcoming C# 5.0 implementation, the async/await pattern is trying to handle this asynchrony problem by making asynchronous code look synchronous. It makes asynchronous programming more "familiar" to developers.

If we take this example:

    static void Main(string[] args)
    {
        // Some initialization of the DB...
        Task<int> t = GetContentFromDatabase();

        // Execute some other code when the task is done
        t.ContinueWith(r => Console.WriteLine(r.Result));

        Console.ReadLine();
    }

    public static async Task<int> GetContentFromDatabase()
    {
        int source = 22;

        // Run starts the execution on another thread
        var result = (int) await Task.Run(
            () => { 
                // Simulate DB access
                Thread.Sleep(1000);
                return 10; 
            }
        );

        return source + result * 2;
    }

The code in GetContentFromDatabaselooks synchronous, but under the hood, it's actually split in half (or more) where the await keyword is used.

The compiler is applying a technique used many times in the C# language, known as syntactic sugar. The code is expanded to a form that is less readable, but is more of a plumbing code that is painful to write – and get right – each time. The using statement, iterators and more recently LINQ are very good examples of that syntactic sugar.

Using a plain old thread pool call, the code actually looks a lot more like this, once the compiler is done:

    public static void Main()
    {
        MySyncMethod(result => Console.WriteLine(result));
        Console.ReadLine();
    }

    public static void GetContentFromDatabase (Action<int> continueWith)
    {
        // The first half of the async method (with QueueUserWorkItem)
        int source = 22;

        // The second half of the async method
        Action<int> onResult = result => {
            continueWith(source + result * 2);
        };

        ThreadPool.QueueUserWorkItem(
            _ => {
                // Simulate DB access
                Thread.Sleep(1000);

                onResult(10);
            }
        );
    }

This sample somewhat more complex, and does not properly handle exceptions. But you probably get the idea.

Asynchronous Development now

Nonetheless, you may not want or will be able to use C# 5.0 soon enough. A lot of people are still using .NET 3.5 or even .NET 2.0, and new features like async will take a while to be deployed in the field. Even when the framework has been offering it for a long time, the awesome LINQ (a C# 3.0 feature) is still being adopted and is not that widely used.

The Reactive Extensions (Rx for friends) offer a framework that is available from .NET 3.5 and functionality similar to C# 5.0, but provide a different approach to asynchronous programming, more functional. More functional is meaning fewer variables to maintain states, and a more declarative approach to programming.

But don't be scared. Functional does not mean abstract concepts that are not useful for the mainstream developer. It just means (veryroughly) that you're going to be more inclined to separate your concerns using functions instead of classes.

But let's dive into some code that is similar to the two previous examples:

    static void Main(string[] args)
    {
        IObservable<int> query = GetContentFromDatabase();

        // Subscribe to the result and display it
        query.Subscribe(r => Console.WriteLine(r));

        Console.ReadLine();
    }

    public static IObservable<int> GetContentFromDatabase()
    {
        int source = 22;

        // Start the work on another thread (using the ThreadPool)
        return Observable.Start<int>(
                   () => {
                      Thread.Sleep(1000);
                      return 10;
                   }
               )

               // Project the result when we get it
               .Select(result => source + result * 2);
    }

From the caller's perspective (the main), the GetContentFromDatabase method behaves almost the same way a .NET 4.5 Task would, and the Subscribe pretty much replaces the ContinueWith method.

But this simplistic approach works well for an example. At this point, you could still choose to use the basic ThreadPoolexample shown earlier in this article.

A word on IObservable

An IObservable is generally considered as a stream of data that can push to its subscribers zero or more values, and either an error or completion message. This “Push” based model that allows the observation of a data source without blocking a thread. This is opposed to the Pull model provided by IEnumerable, which performs a blocking observation of a data source. A very good video with Erik Meijer explains these concepts on Channel 9.

To match the .NET 4.5 Task model, an IObservable needs to provide at most one value, or an error, which is what the Observable.Start method is doing.

A more realistic example

Most of the time, scenarios include calls to multiple asynchronous methods. And if they're not called at the same time and joined, they're called one after the other.

Here is an example that does task chaining:

    public static void Main()
    {
        // Use the observable we've defined before
        var query = GetContentFromDatabase();

              // Once we get the token from the database, transform it first
        query.Select(r => "Token_" + r)

             // When we have the token, we can initiate the call to the web service
             .SelectMany(token => GetFromWebService(token))

             // Once we have the result from the web service, print it.
             .Subscribe(_ => Console.WriteLine(_));
    }

    public static IObservable<string> GetFromWebService(string token)
    {
        return Observable.Start(
            () => new WebClient().DownloadString("http://example.com/" + token)
        )
        .Select(s => Decrypt(s));
    }

The SelectMany operator is a bit strange when it comes to the semantics of an IObservable that behaves like a Task. It can then be thought of a ContinueWith operator. The GetContentFromDatabase only pushes one value, meaning that the provided lambda for the SelectMany is only called once.

Taking the Async route

A peek at WinRT and the Build conference showed a very interesting rule used by Microsoft when moving to asynchronous API throughout the framework. If an API call nominally takes more than 50ms to execute, then it's an asynchronous API call.

This rule is easily applicable to existing .NET 3.5 and later frameworks by exposing IObservable instances that provide at most one value, as a way to simulate a .NET 4.5 Task.

Architecturally speaking, this is a way to enforce that the consumers of a service layer API will be less tempted to synchronously call methods and negatively impact the perceived or actual performance of an application.

For instance, a "favorites" service implemented in an application could look like this, using Rx:

    public interface IFavoritesService
    {
        IObservable<Unit> AddFavorite(string name, string value);
        IObservable<bool> RemoveFavorite(string name);
        IObservable<string[]> GetAllFavorites();
    }

All the operations, including ones that alter content, are executed asynchronously. It is always tempting to think a select operation will take time, but we easily forget that an Addoperation could easily take the same amount of time.

A word on unit: The name comes from functional languages, and represents the void keyword, literally. A deep .NET CLR limitation prevents the use of System.Void as a generic type parameter, and to be able to provide a void return value, Unit has been introduced.

Wrap up

Much more can be achieved with Rx but for starters, using it as a way to perform asynchronous single method call seems to be a good way to learn it.

Also, a note to Rx experts, shortcuts have been taken to explain this in the most simple form, and sure there are many tips and tricks to know to use Rx effectively, particularly when it is used all across the board. The omission of the Completed event is one of them.

Finally, explaining the richness of the Reactive Extensions is a tricky task. Even the smart guys of the Rx team have a hard time doing so... I hope this quick start will help you dive into it!

blog comments powered by Disqus

About me

My name is Jerome Laban, I am a Software Architect, C# MVP and .NET enthustiast from Montréal, QC. You will find my blog on this site, where I'm adding my thoughts on current events, or the things I'm working on, such as the Remote Control for Windows Phone.