[Rx] Using the ObserveOn and SubscribeOn operators

By jay at July 24, 2011 20:04 Tags: , , , , ,

TLDR: This post talks about how the Reactive Extensions ObserveOn operator changes the execution context (the thread) of the IObservable OnNext/OnComplete/OnError methods, whereas the SubscribeOn operator changes the execution context of the implementation of the Subscribe method in the chain of observers. Both methods can be useful to improve the performance of an application's UI by putting work on background threads.

 

When developing asynchronous code, or consuming asynchronous APIs, you find yourself forced to use specific methods on a specific thread.

The most prominent examples being WPF/Silverlight and Winforms, where you cannot use UI bound types outside of the UI Thread. In the context of WPF, you'll find yourself forced to use the Dispatcher.Invoke method to manipulate the UI in the proper context.

However, you don't want to execute everything on the UI Thread, because the UI performance relies on it. Doing too much on the UI thread can lead to a very bad percieved performance, and angry users...

 

Rx framework's ObserveOn operator

I've discussed a few times the use of ObserveOn in the context of WP7, where it is critical to leave the UI thread alone and avoid choppy animations, for instance.

The ObserveOn operator changes the context of execution (scheduler) of a chain of operators until an other operator changes it.

To be able to demonstrate this, let's write our own scheduler :

public class MyScheduler : IScheduler
{
    // Warning: ThreadStatic is not supported on Windows Phone 7.0 and 7.1
    // This code will not work properly on this platform.
    [ThreadStatic]
    public static int? SchedulerId;

    private int _schedulerId;
    private IScheduler _source;
        
    public MyScheduler(IScheduler source, int schedulerId)
    {
        _source = source;
        _schedulerId = schedulerId;
    }

    public DateTimeOffset Now { get { return _source.Now; } }

    public IDisposable Schedule<TState>(
              TState state, 
              Func<IScheduler, TState, IDisposable> action
           )
    {
        return _source.Schedule(state, WrapAction(action));
    }

    private Func<IScheduler, TState, IDisposable> WrapAction<TState>(
              Func<IScheduler, TState, IDisposable> action)
    {
        return (scheduler, state) => {

            // Set the TLS with the proper ID
            SchedulerId = _schedulerId;

            return action(_source, state);
        };
    }
}

This scheduler's purpose is to intercept calls to the ISchedule methods (You'll fill the missing Schedule methods by yourself) and flag them with a custom thread ID. That way, we'll know which scheduler is executing our code.

Note that this code will not work properly on Windows Phone 7, since ThreadStaticAttribute is not supported. And it's still not supported on 7.1... Seems like not enough people are using ThreadStatic to make its way to the WP7 CLR...

Anyway, now if we write the following Rx expression :

Observable.Timer(TimeSpan.FromSeconds(1), new MyScheduler(Scheduler.ThreadPool, 42))
          .Do(_ => Console.WriteLine(MyScheduler.SchedulerId))
          .First();

We force the timer to raise OnNext on the ThreadPool through our scheduler, and we'll get the following :

42

Which means that the lambda passed as a parameter to the Do operator got executed in the context of the Scheduler used when declaring the Timer operator.

If we go a bit farther :

Observable.Timer(TimeSpan.FromSeconds(1), new MyScheduler(Scheduler.ThreadPool, 42))
          .Do(_ => Console.WriteLine("Do(1): " + MyScheduler.SchedulerId))
          .ObserveOn(new MyScheduler(Scheduler.ThreadPool, 43))
          .Do(_ => Console.WriteLine("Do(2): " + MyScheduler.SchedulerId))
          .ObserveOn(new MyScheduler(Scheduler.ThreadPool, 44))
          .Do(_ => Console.WriteLine("Do(3): " + MyScheduler.SchedulerId))
          .Do(_ => Console.WriteLine("Do(4): " + MyScheduler.SchedulerId))
          .First();

We'll get the following :

Do(1): 42
Do(2): 43
Do(3): 44
Do(4): 44

Each time a scheduler was specified, the following operators OnNext delegates were executed on that scheduler.

In this case, we're using the Do operator which does not take a scheduler as a parameter. There some operators though, like Delay, that implicitly use a scheduler that changes the context.

Using this operator is particularly useful when the OnNext delegate is performing a context sensitive operation, like manipulating the UI, or when the source scheduler is the UI and the OnNext delegate is not related to the UI and can be executed on an other thread.

You'll find that operator handy with the WebClient or GeoCoordinateWatcher classes, which both execute their handlers on the UI thread. Watchout for Windows Phone 7.1 (mango) though, this may have changed a bit.

 

An Rx Expression's life cycle

Using an Rx expression is performed in a least 5 stages :

  • The construction of the expression,
  • The subscription to the expression,
  • The optional execution of the OnNext delegates passed as parameters (whether it be observers or explicit OnNext delegates),
  • The observer chain gets disposed either explicitly or implicitly,
  • The observers can optionally get collected by the GC.

The third part's execution context is covered by ObserveOn. But for the first two, this is different.

The expression is constructed like this : 

var o = Observable.Timer(TimeSpan.FromSeconds(1));

Almost nothing's been executed here, just the creation of the observers for the entire expression, in a similar way IEnumerable expressions work. Until you call the IEnumerator.MoveNext, nothing is performed. In Rx expressions, until the Subscribe method is called, nothing is happening.

Then you can subscribe to the expression :

var d = o.Subscribe(_ => Console.WriteLine(_));

At this point, the whole chain of operators get their Subscribe method called, meaning they can start sending OnNext/OnError/OnComplete messages.

 

The case of Observable.Return and SubscribeOn

Then you meet that kind of expressions :

Observable
   .Return(42L)
   // Merge both enumerables into one, whichever the order of appearance
   .Merge(
      Observable.Timer(
         TimeSpan.FromSeconds(1), 
         new MyScheduler(Scheduler.ThreadPool, 42)
      )
   )
   .Subscribe(_ => Console.WriteLine("Do(1): " + MyScheduler.SchedulerId));

Console.WriteLine("Subscribed !");

This expression will merge the two observables into one that will provide two values, one from Return and one from the timer.

And this is the output :

Do(1):
Subscribed !
Do(1): 42

The Observable.Return OnNext was executed during the call to Subscribe, and has that thread has no SchedulerId, meaning that a whole lot of code has been executed in the context of the caller of Subscribe. You can imagine that if that expression is complex, and that the caller is the UI Thread, that can become a performance issue.

This is where the SubscribeOn operator becomes handy :

Observable
   .Return(42L)
   // Merge both enumerables into one, whichever the order of appearance
   .Merge(
      Observable.Timer(
         TimeSpan.FromSeconds(1), 
         new MyScheduler(Scheduler.ThreadPool, 42)
      )
   )
   .SubscribeOn(new MyScheduler(Scheduler.ThreadPool, 43))
   .Subscribe(_ => Console.WriteLine("Do(1): " + MyScheduler.SchedulerId));

Console.WriteLine("Subscribed !");

You then get this :

Subscribed !
Do(1): 43
Do(1): 42

The first OnNext is now executed under of a different scheduler, making subscribe a whole lot faster from the caller's point of view.

 

Why not always Subscribe on an other thread ?

That might come in handy, but you may not want that as an opt-out because of this scenario :

Observable.FromEventPattern(textBox, "TextChanged")
          .SubscribeOn(new MyScheduler(Scheduler.ThreadPool, 43))
          .Subscribe(_ => { });

Console.WriteLine("Subscribed !");

You'd get an "Invalid cross-thread access." System.UnauthorizedAccessException, because yo would try to add an event handler to a UI element from a different thread. 

Interestingly though, this code does not work on WP7 but does on WPF 4.

An other scenario may be one where delaying the subscription may loose messages, so you need to make sure you're completely subscribed before raising events.

 

So there you have it :) I hope this helps you understand a bit better those two operators.

blog comments powered by Disqus

About me

My name is Jerome Laban, I am a Software Architect, C# MVP and .NET enthustiast from Montréal, QC. You will find my blog on this site, where I'm adding my thoughts on current events, or the things I'm working on, such as the Remote Control for Windows Phone.