There’s a very cool framework that has slipped into the Windows Phone SDK : The Reactive Extensions.

It's actually a quite misunderstood framework, mainly because it is a bit hard to harness, but when you get a handle on it, it is very handy ! I particularly like the MemoizeAll extension, a tricky one, but very powerfull.

But I digress.

 

A Non-Reactive String Download Sample

On Windows Phone 7, the WebClient class only has a DownloadStringAsync method and a corresponding DownloadStringCompleted event. That means that you're forced to be asynchronous, to be nice to the UI and not make the application freeze on the user, because of the bad coding habit of being synchronous on remote calls.

In a world without the reactive extensions, you would use it like this :

public void StartDownload()
{
    var wc = new WebClient();
    wc.DownloadStringCompleted += 
      (e, args) => DownloadCompleted(args.Result);
                  
    // Start the download
    wc.DownloadStringAsync(new Uri("http://www.data.com/service"));
}

public void DownloadCompleted(string value)
{
    myLabel.Text = value;
}

Pretty easy. But you soon find out that the execution of the DownloadStringCompleted event is performed on the UI Thread. That means that if, for some reason you need to perform some expensive calculation after you’ve received the string, you’ll freeze the UI for the duration of your calculation, and since the Windows Phone 7 is all about fluidity and you don't want to be the bad guy, you then have to queue it in the ThreadPool.

But you also have to update the UI in the dispatcher, so you have to come back from the thread pool.

You then have :

 public void StartDownload()
 {
     WebClient wc = new WebClient();
     wc.DownloadStringCompleted += 
        (e, args) => ThreadPool.QueueUserWorkItem(d => DownloadCompleted(args.Result));

     // Start the download
     wc.DownloadStringAsync(new Uri("http://www.data.com/service"));
  }

 public void DownloadCompleted(string value)
 {
     // Some expensive calculation
     Thread.Sleep(1000);

     Dispatcher.BeginInvoke(() => myLabel.Text = value);
 }

That’s a bit more complex. And then you notice that you also have to handle exceptions because, well, it’s the Web. It’s unreliable.

So, let’s add the exception handling :

public void StartDownload()
{
    WebClient wc = new WebClient();

    wc.DownloadStringCompleted += (e, args) => {
        try {
            ThreadPool.QueueUserWorkItem(d => DownloadCompleted(args.Result));
        }
        catch (WebException e) {
            myLabel.Text = "Error !";
        }
    };
   
    // Start the download
    wc.DownloadStringAsync(new Uri("http://www.data.com/service"));
}

public void DownloadCompleted(string  value)
{
    // Some expensive calculation
    Thread.Sleep(1000);
    Dispatcher.BeginInvoke(() => myLabel.Text = value);
}

That’s starting to be a bit complex. But then you have to wait for an other call from an other WebClient to end its call and show both results.

Oh well. Fine, I'll spare you that one.

 

The Same Using the Reactive Extensions

The reactive extensions treats asynchronous events like a stream of events. You subscribe to the stream of events and leave, and you let the reactive framework do the heavy lifting for you.

I’ll spare you the explanation of the duality between IObservable and IEnumerable, because Erik Meijer explains it very well.

So, I’ll start again with the simple example, and after adding the System.Observable and System.Reactive references, I can downloading a string :

public void StartDownload()
{
    WebClient wc = new WebClient();

    var o = Observable.FromEvent<DownloadStringCompletedEventArgs>(wc, "DownloadStringCompleted")

                      // When the event fires, just select the string and make
                      // an IObservable<string> instead
                      .Select(newString => newString.EventArgs.Result);

    // Subscribe to the observable, and set the label text
    o.Subscribe(s => myLabel.Text = s);


    // Start the download
    wc.DownloadStringAsync(new Uri("http://www.data.com/service"));
}

This does the same thing the very first example did. You’ll notice the use of Observable.FromEvent to transform the event into a string from the DownloadStringCompleted event args. For this exemple, the event stream will only contain one event, since the download only occurs once. Each of these ocurrence of the event is then “projected”, using the Select statement, to a string that represents the result of the web request.

It’s a bit more complex for the simple case, because of the additional plumbing.

But now we want to handle the threads context changes. The Reactive Extensions has the concept of scheduler, to observe an IObservable in a specific context.

So, we use the scheduler like this :

public void StartDownload()
{
    WebClient wc = new WebClient();

    var o = Observable.FromEvent<DownloadStringCompletedEventArgs>(wc, "DownloadStringCompleted")

                      // Let's make sure that we’re on the thread pool
                      .ObserveOn(Scheduler.ThreadPool)

                      // When the event fires, just select the string and make
                      // an IObservable<string> instead
                      .Select(newString => ProcessString(newString.EventArgs.Result))

                      // Now go back to the UI Thread
                      .ObserveOn(Scheduler.Dispatcher)

                      // Subscribe to the observable, and set the label text
                      .Subscribe(s => myLabel.Text = s);

    wc.DownloadStringAsync(new Uri("http://www.data.com/service"));
}

public string ProcessString(string s)
{
    // A very very very long computation
    return s + "1";
}
 

In this example, we’ve changed contexts twice to suit our needs, and now, it’s getting a bit less complex than the original sample.

And if we want to handle exceptions, well, easy :

    .Subscribe(s => myLabel.Text = s, e => myLabel.Text = "Error ! " + e.Message);

And you have it !

 

Combining the Results of Two Downloads

Combining two or more asynchronous operations can be very tricky, and you have to handle exceptions, rendez-vous and complex states. That make a very complex piece of code that I won’t write here, I promised, but instead I’ll give you a sample using Reactive Extensions :

public IObservable<string> StartDownload(string uri)
{
    WebClient wc = new WebClient();

    var o = Observable.FromEvent<DownloadStringCompletedEventArgs>(wc, "DownloadStringCompleted")

                      // Let's make sure that we're not on the UI Thread
                      .ObserveOn(Scheduler.ThreadPool)

                      // When the event fires, just select the string and make
                      // an IObservable<string> instead
                      .Select(newString => ProcessString(newString.EventArgs.Result));

    wc.DownloadStringAsync(new Uri(uri));

    return o;
}

public string ProcessString(string s)
{
    // A very very very long computation
    return s + "<!-- Processing End -->";
}

public void DisplayMyString()
{
    var asyncDownload = StartDownload("http://bing.com");
    var asyncDownload2 = StartDownload("http://google.com");

    // Take both results and combine them when they'll be available
    var zipped = asyncDownload.Zip(asyncDownload2, (left, right) => left + " - " + right);

    // Now go back to the UI Thread
    zipped.ObserveOn(Scheduler.Dispatcher)

          // Subscribe to the observable, and set the label text
          .Subscribe(s => myLabel.Text = s);
}

You’ll get a very interesting combination of google and bing :)