[WP7Dev][Reactive] Safer Reactive Extensions

By jay at September 06, 2010 20:26 Tags: , , , ,

Cet article est disponible en français.

When developing .NET applications, unhandled exception in threads have the undesirable effect of terminating the current process.

In the following example :

    static void Main(string[] args)
    {
        var t = new Thread(ThreadMethod);
        t.Start();

        Console.ReadLine();
    }

    private static void ThreadMethod()
    {
        Thread.Sleep(1000); throw new Exception();
    }

The basic exception will invariably terminate the process, and to prevent this, the exception needs to be handled properly :

    private static void ThreadMethod()
    {
        try
        {
            Thread.Sleep(1000); throw new Exception();
        }
        catch (Exception e)
        {
            // TODO: Log and report the exception
        }
    }

This makes classes like System.Threading.Thread, System.Threading.Timer or System.Threading.ThreadPool very dangerous to use if one wants to have an always running application. It is then required that no unhandled exception gets out of the custom handlers for these classes.

Even if it is possible to be notified when an exception has been raised and not handled properly, using the AppDomain.UnhandledException event, most the time this leads to the application being terminated. This termination behavior has been introduced in .NET 2.0, to prevent unhandled exception to be silently ignored.

While this is a very appropriate default behavior, in an enterprise environment, I’m usually enforcing custom static analysis or NDepend rules to prevent the use of these classes directly. This forces new code to use wrappers that provide a very wide exception handler and logs and reports the exception, but does not terminate the process. That also implies that there is still a very valid bug to be investigated, because exceptions should not be handled that late.

 

The case of the Reactive Framework

In Silverlight for Windows Phone 7, and in any other .NET 3.5 or .NET 4.0 application that uses the Reactive Extensions, it is very easy to switch between threads.

Reactive operators like Timer, BufferWithTime, ObserveOn or SubscribeOn allow for specific Schedulers like ThreadPool, TaskPool or NewThread to be used, and if a subscriber does not handle exceptions properly, it ends up with a terminated application.

The same exemple here also terminates the application :


    static void Main(string[] args)
    {
     Observable.Timer(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(10))
                   .Subscribe(_=> ThreadMethod());

            Console.ReadLine();
    }

    private static void ThreadMethod()
    {
            throw new Exception();
    }

The Observable.Timer operator uses the System.Threading.Timer class and that makes it vulnerable to the same termination problems. Every subscriber needs to handle exceptions thrown in the OnNext delegate, or the application will terminate.

Also, do not think that the OnError delegate passed to Observable.Subscribe will handle exceptions thrown during the execution of OnNext code. OnError only notifies of errors generated by previous Reactive operators, not the current.

 

The IScheduler.AsSafe() extension method

Unfortunately, it is not possible for now to override the default schedulers used internally by the Reactive operators. The only way to handle all unhandled exceptions properly is to use the ObserveOn operator and intercept calls to IScheduler.Schedule methods. Calls can then be decorated with appropriate exception handlers to log and report the exception without terminating the process.

So, to be able to generalize this logging and reporting behavior, I created the AsSafe() extension that I place at the very top of a Reactive expression :

    Observable.Timer(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(10))
              .ObserveOn(Scheduler.ThreadPool.AsSafe())
              .Subscribe(_=> ThreadMethod());


And here is the code of this very simple extension method :


public static class SafeSchedulerExtensions
{
    public static IScheduler AsSafe(this IScheduler scheduler)
    {
        return new SafeScheduler(scheduler);
    }

    private class SafeScheduler : IScheduler
    {
        private IScheduler _source;

        public SafeScheduler(IScheduler scheduler) {
            this._source = scheduler;
        }

        public DateTimeOffset Now { get { return _source.Now; } }

        public IDisposable Schedule(Action action, TimeSpan dueTime)
        {
            return _source.Schedule(Wrap(action), dueTime);
        }

        public IDisposable Schedule(Action action)
        {
            return _source.Schedule(Wrap(action));
        }

        private Action Wrap(Action action)
        {
            return () => {
                try  {
                    action();
                }
                catch (Exception e) {
                    // Log and report the exception.
                }
            };

        }
    }
}

blog comments powered by Disqus

About me

My name is Jerome Laban, I am a Software Architect, C# MVP and .NET enthustiast from Montréal, QC. You will find my blog on this site, where I'm adding my thoughts on current events, or the things I'm working on, such as the Remote Control for Windows Phone.