@MatthieuMEZIL is a fellow MVP and friend of mine, and back in february at the MVP summit we shared our mutual interests with Rx on my side, and Roslyn on his side.

I’ve yet to blog about Roslyn, (even though a blog post is in preparation) but he started using Rx and is a bit blogging about it.

Now, Rx is a tricky beast and there are many ways to do the same thing. I’ve promised Matthieu I’d give him a few tricks on how to improve his code. Now that I do have a bit of time, here it is.

[more]

Here is the original code :

    public class FileWatcher
    {
        public FileWatcher(string path, string filter, TimeSpan throttle)
        {
            Path = path;
            Filter = filter;
            Throttle = throttle;
        }

        public string Path { get; private set; }
        public string Filter { get; private set; }
        public TimeSpan Throttle { get; private set; }

        public IObservable<FileChangedEvent> GetObservable()
        {
            return Observable.Create<FileChangedEvent>(observer =>
            {
                FileSystemWatcher fileSystemWatcher = new FileSystemWatcher(Path, Filter) { EnableRaisingEvents = true };
                FileSystemEventHandler created = (_, __) => observer.OnNext(new FileChangedEvent());
                FileSystemEventHandler changed = (_, __) => observer.OnNext(new FileChangedEvent());
                RenamedEventHandler renamed = (_, __) => observer.OnNext(new FileChangedEvent());
                FileSystemEventHandler deleted = (_, __) => observer.OnNext(new FileChangedEvent());
                ErrorEventHandler error = (_, errorArg) => observer.OnError(errorArg.GetException());

                fileSystemWatcher.Created += created;
                fileSystemWatcher.Changed += changed;
                fileSystemWatcher.Renamed += renamed;
                fileSystemWatcher.Deleted += deleted;
                fileSystemWatcher.Error += error;

                return () =>
                {
                    fileSystemWatcher.Created -= created;
                    fileSystemWatcher.Changed -= changed;
                    fileSystemWatcher.Renamed -= renamed;
                    fileSystemWatcher.Deleted -= deleted;
                    fileSystemWatcher.Error -= error;
                    fileSystemWatcher.Dispose();
                };

            }).Throttle(Throttle);
        }
    }

    static void Main(string[] args)
    {
        var fileWatcher = new FileWatcher(".", "*.*", TimeSpan.FromSeconds(5)); 
        var fileObserver = fileWatcher.GetObservable().Subscribe(fce => { /*TODO*/ });
    }

 

There are a few things that can be improved, or be more readable such as :

  • The use of Observable.FromEventPattern, to avoid the use of the += and –= C# event syntax, make the use of a C# event located at a single place and avoid the use of observer.OnNext,
  • The use of Merge, to subscribe to all events at once, and propagate all notifications down the stream, without the use of multiple calls to OnNext on the observer. You’ll notice that Merge accepts IEnumerable<IObservable<T>> which makes for a very readable merge source.
  • The use of Finally, to dispose the only visible state of the expression, the file system watcher itself,
  • The use of Subscribe(IObserver), to effectively pass all the notifications from the merged events to the created observable,
  • The removal of the instance of “FileWatcher”, for which the configuration stored can be in the closure used to create the observable. This allows for the creation of a single self-contained method “ObserveFolderChanges”, without an additional instance whose variables are used only at the creation of the observable.
  • The use of Observable.Throw to generate an error notification using the exception provided by the Error event from FileSystemWatcher.
  • The use of the OnError delegate in Subscribe, to handle errors coming from the ObserveFolderChanges observable.

 

Here is another version, among others :

    public class FileWatcher
    {
        public static IObservable<FileChangedEvent> ObserveFolderChanges(string path, string filter, TimeSpan throttle)
        {
            return Observable.Create<FileChangedEvent>(
                observer =>
                {
                    var fileSystemWatcher = new FileSystemWatcher(path, filter) { EnableRaisingEvents = true };

                    var sources = new[] 
                    { 
                        Observable.FromEventPattern<FileSystemEventArgs>(fileSystemWatcher, "Created")
                                  .Select(ev => new FileChangedEvent()),

                        Observable.FromEventPattern<FileSystemEventArgs>(fileSystemWatcher, "Changed")
                                  .Select(ev => new FileChangedEvent()),

                        Observable.FromEventPattern<RenamedEventArgs>(fileSystemWatcher, "Renamed")
                                  .Select(ev => new FileChangedEvent()),

                        Observable.FromEventPattern<FileSystemEventArgs>(fileSystemWatcher, "Deleted")
                                  .Select(ev => new FileChangedEvent()),

                        Observable.FromEventPattern<ErrorEventArgs>(fileSystemWatcher, "Error")
                                  .SelectMany(ev => Observable.Throw<FileChangedEvent>(ev.EventArgs.GetException()))
                    };

                    return sources.Merge()
                                  .Throttle(throttle)
                                  .Finally(() => fileSystemWatcher.Dispose())
                                  .Subscribe(observer);
                }
            );
        }
    }

    static void Main(string[] args)
    {
        var fileWatcher =
            FileWatcher.ObserveFolderChanges(".", "*.*", TimeSpan.FromSeconds(5))
                        .Subscribe(fce => { Console.WriteLine("Changed"); }, e => Debug.WriteLine(e));

        Console.ReadLine();
    }

 

As a side note, Throttle has the effect of discarding events that have been generated in excess, which can be acceptable if the observer only needs to be notified that something changed. But if the observer is interested in what has changed, then Observable.Buffer() would be more appropriate because it would give all events in an List<T> every five seconds.

Finally, the outer Subscribe is probably not going to be the last one that goes back to synchronous code. That subscribe is probably going to be replaced by another observable that reacts to the change of the folder, the propagates that to other observables...

All this to have (eventually) a single subscribe that runs the whole app :)