[Rx] Using the ObserveOn and SubscribeOn operators

By jay at July 24, 2011 20:04 Tags: , , , , ,

TLDR: This post talks about how the Reactive Extensions ObserveOn operator changes the execution context (the thread) of the IObservable OnNext/OnComplete/OnError methods, whereas the SubscribeOn operator changes the execution context of the implementation of the Subscribe method in the chain of observers. Both methods can be useful to improve the performance of an application's UI by putting work on background threads.

 

When developing asynchronous code, or consuming asynchronous APIs, you find yourself forced to use specific methods on a specific thread.

The most prominent examples being WPF/Silverlight and Winforms, where you cannot use UI bound types outside of the UI Thread. In the context of WPF, you'll find yourself forced to use the Dispatcher.Invoke method to manipulate the UI in the proper context.

However, you don't want to execute everything on the UI Thread, because the UI performance relies on it. Doing too much on the UI thread can lead to a very bad percieved performance, and angry users...

 

Rx framework's ObserveOn operator

I've discussed a few times the use of ObserveOn in the context of WP7, where it is critical to leave the UI thread alone and avoid choppy animations, for instance.

The ObserveOn operator changes the context of execution (scheduler) of a chain of operators until an other operator changes it.

To be able to demonstrate this, let's write our own scheduler :

public class MyScheduler : IScheduler
{
    // Warning: ThreadStatic is not supported on Windows Phone 7.0 and 7.1
    // This code will not work properly on this platform.
    [ThreadStatic]
    public static int? SchedulerId;

    private int _schedulerId;
    private IScheduler _source;
        
    public MyScheduler(IScheduler source, int schedulerId)
    {
        _source = source;
        _schedulerId = schedulerId;
    }

    public DateTimeOffset Now { get { return _source.Now; } }

    public IDisposable Schedule<TState>(
              TState state, 
              Func<IScheduler, TState, IDisposable> action
           )
    {
        return _source.Schedule(state, WrapAction(action));
    }

    private Func<IScheduler, TState, IDisposable> WrapAction<TState>(
              Func<IScheduler, TState, IDisposable> action)
    {
        return (scheduler, state) => {

            // Set the TLS with the proper ID
            SchedulerId = _schedulerId;

            return action(_source, state);
        };
    }
}

This scheduler's purpose is to intercept calls to the ISchedule methods (You'll fill the missing Schedule methods by yourself) and flag them with a custom thread ID. That way, we'll know which scheduler is executing our code.

Note that this code will not work properly on Windows Phone 7, since ThreadStaticAttribute is not supported. And it's still not supported on 7.1... Seems like not enough people are using ThreadStatic to make its way to the WP7 CLR...

Anyway, now if we write the following Rx expression :

Observable.Timer(TimeSpan.FromSeconds(1), new MyScheduler(Scheduler.ThreadPool, 42))
          .Do(_ => Console.WriteLine(MyScheduler.SchedulerId))
          .First();

We force the timer to raise OnNext on the ThreadPool through our scheduler, and we'll get the following :

42

Which means that the lambda passed as a parameter to the Do operator got executed in the context of the Scheduler used when declaring the Timer operator.

If we go a bit farther :

Observable.Timer(TimeSpan.FromSeconds(1), new MyScheduler(Scheduler.ThreadPool, 42))
          .Do(_ => Console.WriteLine("Do(1): " + MyScheduler.SchedulerId))
          .ObserveOn(new MyScheduler(Scheduler.ThreadPool, 43))
          .Do(_ => Console.WriteLine("Do(2): " + MyScheduler.SchedulerId))
          .ObserveOn(new MyScheduler(Scheduler.ThreadPool, 44))
          .Do(_ => Console.WriteLine("Do(3): " + MyScheduler.SchedulerId))
          .Do(_ => Console.WriteLine("Do(4): " + MyScheduler.SchedulerId))
          .First();

We'll get the following :

Do(1): 42
Do(2): 43
Do(3): 44
Do(4): 44

Each time a scheduler was specified, the following operators OnNext delegates were executed on that scheduler.

In this case, we're using the Do operator which does not take a scheduler as a parameter. There some operators though, like Delay, that implicitly use a scheduler that changes the context.

Using this operator is particularly useful when the OnNext delegate is performing a context sensitive operation, like manipulating the UI, or when the source scheduler is the UI and the OnNext delegate is not related to the UI and can be executed on an other thread.

You'll find that operator handy with the WebClient or GeoCoordinateWatcher classes, which both execute their handlers on the UI thread. Watchout for Windows Phone 7.1 (mango) though, this may have changed a bit.

 

An Rx Expression's life cycle

Using an Rx expression is performed in a least 5 stages :

  • The construction of the expression,
  • The subscription to the expression,
  • The optional execution of the OnNext delegates passed as parameters (whether it be observers or explicit OnNext delegates),
  • The observer chain gets disposed either explicitly or implicitly,
  • The observers can optionally get collected by the GC.

The third part's execution context is covered by ObserveOn. But for the first two, this is different.

The expression is constructed like this : 

var o = Observable.Timer(TimeSpan.FromSeconds(1));

Almost nothing's been executed here, just the creation of the observers for the entire expression, in a similar way IEnumerable expressions work. Until you call the IEnumerator.MoveNext, nothing is performed. In Rx expressions, until the Subscribe method is called, nothing is happening.

Then you can subscribe to the expression :

var d = o.Subscribe(_ => Console.WriteLine(_));

At this point, the whole chain of operators get their Subscribe method called, meaning they can start sending OnNext/OnError/OnComplete messages.

 

The case of Observable.Return and SubscribeOn

Then you meet that kind of expressions :

Observable
   .Return(42L)
   // Merge both enumerables into one, whichever the order of appearance
   .Merge(
      Observable.Timer(
         TimeSpan.FromSeconds(1), 
         new MyScheduler(Scheduler.ThreadPool, 42)
      )
   )
   .Subscribe(_ => Console.WriteLine("Do(1): " + MyScheduler.SchedulerId));

Console.WriteLine("Subscribed !");

This expression will merge the two observables into one that will provide two values, one from Return and one from the timer.

And this is the output :

Do(1):
Subscribed !
Do(1): 42

The Observable.Return OnNext was executed during the call to Subscribe, and has that thread has no SchedulerId, meaning that a whole lot of code has been executed in the context of the caller of Subscribe. You can imagine that if that expression is complex, and that the caller is the UI Thread, that can become a performance issue.

This is where the SubscribeOn operator becomes handy :

Observable
   .Return(42L)
   // Merge both enumerables into one, whichever the order of appearance
   .Merge(
      Observable.Timer(
         TimeSpan.FromSeconds(1), 
         new MyScheduler(Scheduler.ThreadPool, 42)
      )
   )
   .SubscribeOn(new MyScheduler(Scheduler.ThreadPool, 43))
   .Subscribe(_ => Console.WriteLine("Do(1): " + MyScheduler.SchedulerId));

Console.WriteLine("Subscribed !");

You then get this :

Subscribed !
Do(1): 43
Do(1): 42

The first OnNext is now executed under of a different scheduler, making subscribe a whole lot faster from the caller's point of view.

 

Why not always Subscribe on an other thread ?

That might come in handy, but you may not want that as an opt-out because of this scenario :

Observable.FromEventPattern(textBox, "TextChanged")
          .SubscribeOn(new MyScheduler(Scheduler.ThreadPool, 43))
          .Subscribe(_ => { });

Console.WriteLine("Subscribed !");

You'd get an "Invalid cross-thread access." System.UnauthorizedAccessException, because yo would try to add an event handler to a UI element from a different thread. 

Interestingly though, this code does not work on WP7 but does on WPF 4.

An other scenario may be one where delaying the subscription may loose messages, so you need to make sure you're completely subscribed before raising events.

 

So there you have it :) I hope this helps you understand a bit better those two operators.

[WP7] HttpWebRequest and the Flickr app "Black Screen" issue

By jay at April 22, 2011 14:54 Tags: , , , , ,

TL;DR: While trying to fix the "Black Screen" issue of the Windows Phone 7 flickr app 1.3, I found out that HttpWebRequest is internally making a synchronous call to the UI Thread, making a network call negatively impact the UI. The entire building of an asynchronous web query is performed on the UI thread, and you can't do anything about it.

Edit: This post was formerly named "About the UI Thread performance and HttpWebRequest", but was in fact about Yahoo's Flickr application and was enhanced accordingly.

When programming on Windows Phone 7, you'll hear often that to improve the perceived performance, you'll need to get off of the UI Thread (i.e. the dispatcher) to perform non UI related operations. By good perceived performance, I mean having the UI respond immediately, not stall when some background processing is done.

To acheive this, you'll need to use the common asynchrony techniques like queueing in the ThreadPool, create a new thread, or use the Begin/End pattern.

All of this is very true, and one very good example of bad UI Thread use is the processing of the body of a web request, particularly when using the WebClient where the raised events are in the context of the dispatcher. From a beginner's perspective, not having to care about changing contexts when developing a simple app that updates the UI, provides a particularly good and simple experience.

But that has the annoying effect of degrading the perceived performance of the application, because many parts of the application tend to run on the UI thread.

 

HttpWebRequest to the rescue ?

You'll find that the HttpWebRequest is a better choice in that regard. It uses the Begin/End pattern and the execution of the AsyncCallback is performed in the context of ThreadPool. This performs the execution of the code in that callback in a way that does not impact the perceived performance of the application.

Using the Reactive Extensions, this can be written like this :

var request = WebRequest.Create("http://www.google.com");

var queryBuilder = Observable.FromAsyncPattern(
                                (h, o) => request.BeginGetResponse(h, o),
                                ar => request.EndGetResponse(ar));

queryBuilder()
                /* Perform the expensive work in the context of the AsyncCall back */
                /* from the WebRequest. This will be the ThreadPool. */
                .Select(response => DoSomeExpensiveWork(response))

                /* Go back to the UI Thread to execute the OnNext method on the subscriber */
                .ObserveOnDispatcher()
                .Subscribe(result => DisplayResult(result));

That way, you'll get most of your code to execute out of the UI thread, where that does not impact the perceived performance of the application.

 

Why would it not be to the rescue then ?

Actually, it will always be (as of Windows Phone NoDo), but there's a catch. And that's a big deal, from a performance perspective.

Consider this code :

 public App()
 {
  /* some application initialization code */


  ManualResetEvent ev = new ManualResetEvent(false);

     ThreadPool.QueueUserWorkItem(
  d =>
  {
      var r = WebRequest.Create("http://www.google.com");
      r.BeginGetResponse((r2) => { }, null);

      ev.Set();
  }
     );

     ev.WaitOne();
 }

This code is basically beginning a request on the thread pool, while blocking the UI thread in the App.xaml.cs file. This makes the construction (but not the actual call on the network) of the WebRequest synchronous, and makes the application wait for the request to begin before showing any page to the user.

While this code is definitely not a best practice, there was a code path in the Flickr 1.3 application that was doing something remotely similar, in a more convoluted way. And if you try it for yourself, you'll find that the application hangs in a deadlock during the startup of the application, meaning that our event is never set.

 

What's happening ?

If you dig a bit, you'll find that the stack trace for a thread in the thread pool is the following :

  mscorlib.dll!System.PInvoke.PAL.Threading_Event_Wait() 
  mscorlib.dll!System.Threading.EventWaitHandle.WaitOne() 
  System.Windows.dll!System.Windows.Threading.Dispatcher.FastInvoke(...) 
  System.Windows.dll!System.Net.Browser.AsyncHelper.BeginOnUI(...)
  System.Windows.dll!System.Net.Browser.ClientHttpWebRequest.BeginGetResponse(...) 
  WindowsPhoneApplication2.dll!WindowsPhoneApplication2.App..ctor.AnonymousMethod__0(...)

The BeginGetResponse method is trying to execute something on the UI thread. And in our example, since the UI thread is blocked by the manual reset event, the application hangs in a deadlock between a resource in the dispatcher and our manual reset event.

This is also the case for the EndGetResponse method.

But if you dig even deeper, you'll find in the version of the System.Windows.dll assembly in the WP7 emulator (the one in the SDK is a stub for all public types), that the BeginGetResponse method is doing all the work of actually building the web query on the UI thread !

That is particularly disturbing. I'm still wondering why that network-only code would need to be executed to UI Thread.

 

What's the impact then ?

The impact is fairly simple : The more web requests you make, the less your UI will be responsive, both for processing the beginning and the end of a web request. Each call to the methods BeginGetResponse and EndGetResponse implicitly goes to the UI thread.

In the case of Remote Control applications like mine that are trying to have remote mouse control, all are affected by the same lagging behavior of the mouse. That's partially because the UI thread is particularly busy processing Manipulation events, this explains a lot about the performance issues of the web requests performed at the same time, even by using HttpWebRequest instead of WebClient. This also explains why until the user stops touching the screen, the web requests will be strongly slowed down.

 

The Flickr 1.3 "Black Screen" issue

In the Flickr application for which I've been able to work on, a lot of people were reporting a "black screen" issue, where the application stopped working after a few days.

The application was actually trying to update a resource from the application startup in an asynchronous fashion using the HttpWebRequest. Because of a race condition with an other lock in the application and UI Thread that was waiting in the app's initialization, this resulted in an infinite "Black Screen" that could only be bypassed by reinstalling the application.

Interestingly enough, at this point in the application's initialization, in the App's class constructor, the application is not killed after 10 seconds if it is not showing a page to the user. However, if the application stalls in the constructor of the first page, the application is automatically killed by the OS after something like 10 seconds.

Regarding the use of the UI Thread inside the HttpWebRequest code, applications that are network intensive to get a lot of small web resources like images, this is has a negative impact on the performance. The UI thread is constantly interrupted to process network resources query and responses.

 

Can I do something about it ?

During the analysis of the emulator version of the System.Windows.dll assembly, I noticed that the BeginGetResponse is checking whether the current context is the UI Thread, and does not push the execution on the dispacther.

This means that if you can group the calls to BeginGetResponse calls in the UI thread, you'll spend less time switching between contexts. That's not the panacea, but at the very least you can gain on this side.

 

What about future versions of Windows Phone ?

On the good news side, Scott Gu annouced at the Mix 11 that the manipulation events will be moved out the the UI thread, making the UI "buttery smooth" to take his words. This will a lot of applications benefit from this change.

Anyway, let's wait for Mango, I'm guessing that will this will change is a very positive way, and allow us to have high performance apps on the Windows Phone platform.

[Reactive] Being fluent with CompositeDisposable and DisposeWith

By jay at April 04, 2011 13:48 Tags: , , ,

When you're writing a few queries with the reactive extensions, you'll probably end up doing a lot of this kind of code :

moveSubscription = Observable.FromEvent<MouseEventArgs>(this, "MouseMove")
                             .Subscribe(_ => { });

clickSubscription = Observable.FromEvent<MouseEventArgs>(this, "MouseClick")
                              .Subscribe(_ => { }); 

You'll probably call the dispose method on both subscriptions in some dispose method in the class that creates the subscriptions. But this is a bit too manual for my taste.

Rx provides the CompositeDisposable class, which is basically a list of IDisposable instances that all get disposed when CompositeDisposable.Dispose() is called. So we can write it like this :

var cd = new CompositeDisposable();

var moveSubscription = Observable.FromEvent<MouseEventArgs>(this, "MouseMove")
                                 .Subscribe(_ => { });
cd.Add(moveSubscription);

var clickSubscription = Observable.FromEvent<MouseEventArgs>(this, "MouseClick")
                                  .Subscribe(_ => { });
cd.Add(clickSubscription);

That's better in a sense that it is not needed to get my subscriprions out, but this is still too "manual". There are two lines for each subscriptions, and the need for temporary variables.

That's where a simple DisposeWith extension comes in handy :

public static class DisposableExtensions
{
 public static void DisposeWith(this IDisposable observable, CompositeDisposable disposables)
 {
     disposables.Add(observable);
 }
}

Very simple extension, a bit more fluent, and that allows to avoid creating a temporary variable just to unsubscribe on a subscription:

var cd = new CompositeDisposable();

Observable.FromEvent<MouseEventArgs>(this, "MouseMove")
          .Subscribe(_ => { })
          .DisposeWith(cd);

Observable.FromEvent<MouseEventArgs>(this, "MouseClick")
          .Subscribe(_ => { })
          .DisposeWith(cd);

This is a method with a side effect, but that's acceptable in this case, and it always ends the declaration of a observable query.

[WP7Dev][Reactive] Safer Reactive Extensions

By jay at September 06, 2010 20:26 Tags: , , , ,

Cet article est disponible en français.

When developing .NET applications, unhandled exception in threads have the undesirable effect of terminating the current process.

In the following example :

    static void Main(string[] args)
    {
        var t = new Thread(ThreadMethod);
        t.Start();

        Console.ReadLine();
    }

    private static void ThreadMethod()
    {
        Thread.Sleep(1000); throw new Exception();
    }

The basic exception will invariably terminate the process, and to prevent this, the exception needs to be handled properly :

    private static void ThreadMethod()
    {
        try
        {
            Thread.Sleep(1000); throw new Exception();
        }
        catch (Exception e)
        {
            // TODO: Log and report the exception
        }
    }

This makes classes like System.Threading.Thread, System.Threading.Timer or System.Threading.ThreadPool very dangerous to use if one wants to have an always running application. It is then required that no unhandled exception gets out of the custom handlers for these classes.

Even if it is possible to be notified when an exception has been raised and not handled properly, using the AppDomain.UnhandledException event, most the time this leads to the application being terminated. This termination behavior has been introduced in .NET 2.0, to prevent unhandled exception to be silently ignored.

While this is a very appropriate default behavior, in an enterprise environment, I’m usually enforcing custom static analysis or NDepend rules to prevent the use of these classes directly. This forces new code to use wrappers that provide a very wide exception handler and logs and reports the exception, but does not terminate the process. That also implies that there is still a very valid bug to be investigated, because exceptions should not be handled that late.

 

The case of the Reactive Framework

In Silverlight for Windows Phone 7, and in any other .NET 3.5 or .NET 4.0 application that uses the Reactive Extensions, it is very easy to switch between threads.

Reactive operators like Timer, BufferWithTime, ObserveOn or SubscribeOn allow for specific Schedulers like ThreadPool, TaskPool or NewThread to be used, and if a subscriber does not handle exceptions properly, it ends up with a terminated application.

The same exemple here also terminates the application :


    static void Main(string[] args)
    {
     Observable.Timer(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(10))
                   .Subscribe(_=> ThreadMethod());

            Console.ReadLine();
    }

    private static void ThreadMethod()
    {
            throw new Exception();
    }

The Observable.Timer operator uses the System.Threading.Timer class and that makes it vulnerable to the same termination problems. Every subscriber needs to handle exceptions thrown in the OnNext delegate, or the application will terminate.

Also, do not think that the OnError delegate passed to Observable.Subscribe will handle exceptions thrown during the execution of OnNext code. OnError only notifies of errors generated by previous Reactive operators, not the current.

 

The IScheduler.AsSafe() extension method

Unfortunately, it is not possible for now to override the default schedulers used internally by the Reactive operators. The only way to handle all unhandled exceptions properly is to use the ObserveOn operator and intercept calls to IScheduler.Schedule methods. Calls can then be decorated with appropriate exception handlers to log and report the exception without terminating the process.

So, to be able to generalize this logging and reporting behavior, I created the AsSafe() extension that I place at the very top of a Reactive expression :

    Observable.Timer(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(10))
              .ObserveOn(Scheduler.ThreadPool.AsSafe())
              .Subscribe(_=> ThreadMethod());


And here is the code of this very simple extension method :


public static class SafeSchedulerExtensions
{
    public static IScheduler AsSafe(this IScheduler scheduler)
    {
        return new SafeScheduler(scheduler);
    }

    private class SafeScheduler : IScheduler
    {
        private IScheduler _source;

        public SafeScheduler(IScheduler scheduler) {
            this._source = scheduler;
        }

        public DateTimeOffset Now { get { return _source.Now; } }

        public IDisposable Schedule(Action action, TimeSpan dueTime)
        {
            return _source.Schedule(Wrap(action), dueTime);
        }

        public IDisposable Schedule(Action action)
        {
            return _source.Schedule(Wrap(action));
        }

        private Action Wrap(Action action)
        {
            return () => {
                try  {
                    action();
                }
                catch (Exception e) {
                    // Log and report the exception.
                }
            };

        }
    }
}

About me

My name is Jerome Laban, I am a Software Architect, C# MVP and .NET enthustiast from Montréal, QC. You will find my blog on this site, where I'm adding my thoughts on current events, or the things I'm working on, such as the Remote Control for Windows Phone.