[WP7Dev] Using the WebClient with Reactive Extensions for Effective Asynchronous Downloads

By jay at June 22, 2010 21:07 Tags: , , , , ,

There’s a very cool framework that has slipped into the Windows Phone SDK : The Reactive Extensions.

It's actually a quite misunderstood framework, mainly because it is a bit hard to harness, but when you get a handle on it, it is very handy ! I particularly like the MemoizeAll extension, a tricky one, but very powerfull.

But I digress.

 

A Non-Reactive String Download Sample

On Windows Phone 7, the WebClient class only has a DownloadStringAsync method and a corresponding DownloadStringCompleted event. That means that you're forced to be asynchronous, to be nice to the UI and not make the application freeze on the user, because of the bad coding habit of being synchronous on remote calls.

In a world without the reactive extensions, you would use it like this :

public void StartDownload()
{
    var wc = new WebClient();
    wc.DownloadStringCompleted += 
      (e, args) => DownloadCompleted(args.Result);
                  
    // Start the download
    wc.DownloadStringAsync(new Uri("http://www.data.com/service"));
}

public void DownloadCompleted(string value)
{
    myLabel.Text = value;
}

Pretty easy. But you soon find out that the execution of the DownloadStringCompleted event is performed on the UI Thread. That means that if, for some reason you need to perform some expensive calculation after you’ve received the string, you’ll freeze the UI for the duration of your calculation, and since the Windows Phone 7 is all about fluidity and you don't want to be the bad guy, you then have to queue it in the ThreadPool.

But you also have to update the UI in the dispatcher, so you have to come back from the thread pool.

You then have :

 public void StartDownload()
 {
     WebClient wc = new WebClient();
     wc.DownloadStringCompleted += 
        (e, args) => ThreadPool.QueueUserWorkItem(d => DownloadCompleted(args.Result));

     // Start the download
     wc.DownloadStringAsync(new Uri("http://www.data.com/service"));
  }

 public void DownloadCompleted(string value)
 {
     // Some expensive calculation
     Thread.Sleep(1000);

     Dispatcher.BeginInvoke(() => myLabel.Text = value);
 }

That’s a bit more complex. And then you notice that you also have to handle exceptions because, well, it’s the Web. It’s unreliable.

So, let’s add the exception handling :

public void StartDownload()
{
    WebClient wc = new WebClient();

    wc.DownloadStringCompleted += (e, args) => {
        try {
            ThreadPool.QueueUserWorkItem(d => DownloadCompleted(args.Result));
        }
        catch (WebException e) {
            myLabel.Text = "Error !";
        }
    };
   
    // Start the download
    wc.DownloadStringAsync(new Uri("http://www.data.com/service"));
}

public void DownloadCompleted(string  value)
{
    // Some expensive calculation
    Thread.Sleep(1000);
    Dispatcher.BeginInvoke(() => myLabel.Text = value);
}

That’s starting to be a bit complex. But then you have to wait for an other call from an other WebClient to end its call and show both results.

Oh well. Fine, I'll spare you that one.

 

The Same Using the Reactive Extensions

The reactive extensions treats asynchronous events like a stream of events. You subscribe to the stream of events and leave, and you let the reactive framework do the heavy lifting for you.

I’ll spare you the explanation of the duality between IObservable and IEnumerable, because Erik Meijer explains it very well.

So, I’ll start again with the simple example, and after adding the System.Observable and System.Reactive references, I can downloading a string :

public void StartDownload()
{
    WebClient wc = new WebClient();

    var o = Observable.FromEvent<DownloadStringCompletedEventArgs>(wc, "DownloadStringCompleted")

                      // When the event fires, just select the string and make
                      // an IObservable<string> instead
                      .Select(newString => newString.EventArgs.Result);

    // Subscribe to the observable, and set the label text
    o.Subscribe(s => myLabel.Text = s);


    // Start the download
    wc.DownloadStringAsync(new Uri("http://www.data.com/service"));
}

This does the same thing the very first example did. You’ll notice the use of Observable.FromEvent to transform the event into a string from the DownloadStringCompleted event args. For this exemple, the event stream will only contain one event, since the download only occurs once. Each of these ocurrence of the event is then “projected”, using the Select statement, to a string that represents the result of the web request.

It’s a bit more complex for the simple case, because of the additional plumbing.

But now we want to handle the threads context changes. The Reactive Extensions has the concept of scheduler, to observe an IObservable in a specific context.

So, we use the scheduler like this :

public void StartDownload()
{
    WebClient wc = new WebClient();

    var o = Observable.FromEvent<DownloadStringCompletedEventArgs>(wc, "DownloadStringCompleted")

                      // Let's make sure that we’re on the thread pool
                      .ObserveOn(Scheduler.ThreadPool)

                      // When the event fires, just select the string and make
                      // an IObservable<string> instead
                      .Select(newString => ProcessString(newString.EventArgs.Result))

                      // Now go back to the UI Thread
                      .ObserveOn(Scheduler.Dispatcher)

                      // Subscribe to the observable, and set the label text
                      .Subscribe(s => myLabel.Text = s);

    wc.DownloadStringAsync(new Uri("http://www.data.com/service"));
}

public string ProcessString(string s)
{
    // A very very very long computation
    return s + "1";
}
 

In this example, we’ve changed contexts twice to suit our needs, and now, it’s getting a bit less complex than the original sample.

And if we want to handle exceptions, well, easy :

    .Subscribe(s => myLabel.Text = s, e => myLabel.Text = "Error ! " + e.Message);

And you have it !

 

Combining the Results of Two Downloads

Combining two or more asynchronous operations can be very tricky, and you have to handle exceptions, rendez-vous and complex states. That make a very complex piece of code that I won’t write here, I promised, but instead I’ll give you a sample using Reactive Extensions :

public IObservable<string> StartDownload(string uri)
{
    WebClient wc = new WebClient();

    var o = Observable.FromEvent<DownloadStringCompletedEventArgs>(wc, "DownloadStringCompleted")

                      // Let's make sure that we're not on the UI Thread
                      .ObserveOn(Scheduler.ThreadPool)

                      // When the event fires, just select the string and make
                      // an IObservable<string> instead
                      .Select(newString => ProcessString(newString.EventArgs.Result));

    wc.DownloadStringAsync(new Uri(uri));

    return o;
}

public string ProcessString(string s)
{
    // A very very very long computation
    return s + "<!-- Processing End -->";
}

public void DisplayMyString()
{
    var asyncDownload = StartDownload("http://bing.com");
    var asyncDownload2 = StartDownload("http://google.com");

    // Take both results and combine them when they'll be available
    var zipped = asyncDownload.Zip(asyncDownload2, (left, right) => left + " - " + right);

    // Now go back to the UI Thread
    zipped.ObserveOn(Scheduler.Dispatcher)

          // Subscribe to the observable, and set the label text
          .Subscribe(s => myLabel.Text = s);
}

You’ll get a very interesting combination of google and bing :)

Reactive Framework: MemoizeAll

By jay at February 04, 2010 18:21 Tags: , , ,

Cet article est disponible en francais.

For some time now, with the release of the Rx Framework and the reactive/interactive programming, some new features were highlighted through a very good article of Bart De Smet dealing with System.Interactive and the “lazy-caching”.

When using LINQ, one can find two sorts of operators: The “lazy” operators that take elements one by one and forward them when they a requested (Select, Where, SelectMany, First, …), and the operators that I would call “Rendez-vous” for which the entirety of the elements of the enumerators need to be enumerated (Count, ToArray/ToList, OrderBy, …) to produce a result.

 

“Lazy” Operators

Lazy operators are pretty useful as they offer a good performance when it is not required to enumerate all the elements of an enumerable. This can also be useful when it may take a very long time to enumerate each element of an enumerator, and that we only want to get the first few elements.

For instance this :
         

static IEnumerable<int> GetItems()
{
    for (int i = 0; i < 5; i++)
    {
        Console.WriteLine(i);
        yield return i + 1;
    }
}

static void Main()
{
   Console.WriteLine(GetItems().First());
   Console.WriteLine(GetItems().First());
}

Will output :


0
1
0
1

Only the first element of the enumerator will be enumerated from GetItems().

However, these operators expose a behavior that is important to know about: Each time they are enumerated, they also enumerate their source again. That could either be a advantage (Enumerating multiple times a changing source) or a problem (enumerating multiple times a resource intensive source).

 

“Rendez-vous” Operators

These operators are also interesting because they force the enumeration of all the elements of the enumerable, and in the particular case of ToArray, this allows the creation of an immutable version of the content of the enumerable. These are useful in conjunction with lazy operators to prevent them to enumerate their source again, when enumerated multiple times.

If we the previous sample, and update it a bit:


static void Main()
{
   var items = GetItems().ToArray();

   Console.WriteLine(items.Count());
   Console.WriteLine(items.Count());
}

We get this result :


0
1
2
3
4
5
5

Because Count() needs to know all the elements of the source enumerator to determine the count.

These operators also enumerate their source with each use, but using ToArray/ToList prevents their result to enumerate the source again.

The case of multiple enumerations

A concrete example of the problem posed by the multiple enumerations is the creation of an enumerable partitionning operator. In this example, we can see that the enumerable passed as the source is used by to "Where" different operators, which implies that the source enumerable will be enumerated twice. Storing the whole content of the source enumerable by means of a ToArray/ToList is possible, but that would be a possible waste of resource, mainly because we can't know if the output enumerable will be enumerated completely (If that is possible, as in the case of an infinite enumerable, ToArray is not applicable).

An intermediate operator between "Lazy" and "Rendez-vous" would be useful.

EnumerableEx.MemoizeAll

The EnumerableEx class brings us an extension, MemoizeAll (built from the Memoization concept), that is just the middle ground we're looking for, and will cache elements from the source enumerator when they are requested. A sort of "lazy" ToArray.

If we take the example of Mark Needham, we would modify it like this :


var evensAndOdds = Enumerable.Range(1, 10)
                             .MemoizeAll()
                             .Partition(x => x % 2 == 0);

In this example, the MemoizeAll does not have a real benefit on the performance side, since Enumerable.Range is not a very expensive operator. But in the case where the source of the "Partition" operator would be a most expensive enumerable, like a Linq2Sql query, the lazy caching could be very effective.

One of the comments suggests that a GroupBy based implementation could be written, but this operator also evaluates the source operator when a group is enumerated. The MemoizeAll is then again appropriate for better performance, but as always, this is a tradeoff between processing and memory.

By the way, Bart de Smeth discusses the part of the elimination of side effects linked the multiple enumeration of enumerables by using Memoize and MemoizeAll, which is not really an issue in the previous example, but is nonetheless a very interesting subject.

 

.NET 4.5 ?

On a side note, I find regrettable that the EnumerableEx extensions did not make their way in .NET 4.0... They are very useful, and not very complex. They may have arrived too late in the development cycle of .NET 4.0... Maybe in .NET 4.5 :)

About me

My name is Jerome Laban, I am a Software Architect, C# MVP and .NET enthustiast from Montréal, QC. You will find my blog on this site, where I'm adding my thoughts on current events, or the things I'm working on, such as the Remote Control for Windows Phone.