Dec 11 Keith | C#, Dev, Rx

Rx Event Aggregator

Ah the Event Aggregator, you either love it or you hate it. I tend to hate it 95% of the time and like it on very rare occasions. One such time was during a port of some old Prism code, another was for use building a status bar. At least RX can spice it up.

The interface I use:

using System;

namespace GeoTagger.Core
{
    public interface IEventAggregator
    {
        IObservable<T> GetEventStream<T>();
        void Publish<T>(T payload);
    }
}

The implementation:

using System;
using System.Collections.Generic;
using System.Disposables;
using System.Linq;
using GeoTagger.Core.Extension;

namespace GeoTagger.Core
{
    public class EventAggregator : IEventAggregator
    {
        private readonly object _observablesByTypeKeyLock = new object();

        private readonly Dictionary<string, Tuple<object, object>> _observablesByTypeKey = new Dictionary<string, Tuple<object, object>>();

        public IObservable<T> GetEventStream<T>()
        {
            IObservable<T> stream;
            var key = typeof(T).ToString();

            lock (_observablesByTypeKeyLock)
            {
                if (_observablesByTypeKey.ContainsKey(key))
                {
                    Tuple<object, object> tuple = _observablesByTypeKey[key];
                    stream = (IObservable<T>)tuple.Item2;
                }
                else
                {
                    Type specificSubjectType = typeof(Subject<>).MakeGenericType(new[] { typeof(T) });
                    var subject = (Subject<T>)Activator.CreateInstance(specificSubjectType, new object[] { });

                    var removeEventStreamFromCache = Disposable.Create(
                        () =>
                        {
                            lock (_observablesByTypeKeyLock)
                            {
                                _observablesByTypeKey.Remove(key);
                            }
                        }
                    );

                    stream = subject.AddDisposable(removeEventStreamFromCache).Publish().RefCount();

                    var tuple = new Tuple<object, object>(subject, stream);
                    _observablesByTypeKey.Add(key, tuple);
                }
            }
            return stream;
        }

        public void Publish<T>(T payload)
        {
            var key = typeof(T).ToString();
            
            Tuple<object, object> tuple;
            
            lock (_observablesByTypeKeyLock)
                _observablesByTypeKey.TryGetValue(key, out tuple);
            
            if (tuple != null)
            {
                ((Subject<T>)tuple.Item1).OnNext(payload);
            }
        }
    }
}

Tests I hear you say, well I did write couple, handy so help you see how it’s used.

using System;
using GeoTagger.Core;
using NUnit.Framework;

namespace GeoTagger.Tests.Core
{
    [TestFixture]
    public class EventAggregatorTests
    {
        [Test]
        public void Publish_PublishesWhenThereAreSubscribers()
        {
            var expected = "Keith";
            string actual = null;

            var eventAggregator = new EventAggregator();

            eventAggregator.GetEventStream<string>()
                .Subscribe(
                    payload =>
                    {
                        actual = payload;
                    }
                );

            eventAggregator.Publish(expected);

            Assert.AreEqual(expected, actual);
        }

        [Test]
        public void Publish_DoesNotThrowWithNoSubscribers()
        {
            var eventAggregator = new EventAggregator();
            eventAggregator.Publish("Boo");
        }

        [Test]
        public void Publish_WithMutipleSubscribersPayloadDeliveredToAll()
        {
            var expected = new object();
            object actual1 = null, actual2 = null;

            var eventAggregator = new EventAggregator();

            eventAggregator.GetEventStream<Object>()
                .Subscribe(
                    payload =>
                    {
                        actual1 = payload;
                    }
                );

            eventAggregator.GetEventStream<Object>()
                .Subscribe(
                    payload =>
                    {
                        actual2 = payload;
                    }
                );


            eventAggregator.Publish(expected);

            Assert.AreEqual(expected, actual1);
            Assert.AreEqual(expected, actual2);
        }

        [Test]
        public void PublishAgainstDisposedSubscriber_ExpectNoPayloadDelevered()
        {
            int expected = 5;
            int actual1 = 0, actual2 = 0;

            var eventAggregator = new EventAggregator();

            var disposable1 = eventAggregator.GetEventStream<int>()
                .Subscribe(
                    payload =>
                    {
                        actual1 = payload;
                    }
                );

            // note we won't dispose this one 
            eventAggregator.GetEventStream<int>()
                .Subscribe(
                    payload =>
                    {
                        actual2 = payload;
                    }
                );

            eventAggregator.Publish(expected);

            Assert.AreEqual(expected, actual1);
            Assert.AreEqual(expected, actual2);

            disposable1.Dispose();

            eventAggregator.Publish(6);

            Assert.AreEqual(expected, actual1);

            // the second subscription should remain
            Assert.AreEqual(6, actual2);
        }
    }
}

Under the covers the EventAggregator manages observable streams. Steams are keyed by payload type and callers of GetEventStream() will receive the same underlying stream. Publishing and ref counting these streams ensures that there will only be one underlying. Once all callers dispose their streams the implementation will remove the stream from its cache. If a caller call Publish() but there are no subscribers, nothing happens.

2 blog posts in 1 day, personal record lol.

Dec 11 Keith | C#, Dev, Rx

Observing a property

I noticed this rated high on the WPF feature suggestion list. For a while now RX is a fantastic choice.

You need a utility class (ExpressionHelper which I’ve mentioned before by the name PropertyHelper) to extract property names from a lambda and turn them into strings. A simple base class to raise the INotifyPropertyChanged events (NotifyingBase). Some extension methods to INotifyPropertyChanged using RX (ObservableExtensions) so you can observe the properties. And finally some code to hook it altogether, be it in a view model or in my example, a controller. Hopefully this’ll help some people out on that forum.

Download this code (185.24 kb) (WPF 4, rx v1.0.2787.0 (included)).

First the extension to INotifyPropertyChanged.

using System;
using System.ComponentModel;
using System.Linq;
using System.Linq.Expressions;

namespace PropertyObservation
{
    public static class ObservableExtensions
    {
        public static IObservable<TValue> ObserveProperty<T, TValue>(
            this T source,
             Expression<Func<T, TValue>> propertyExpression
        )
            where T : INotifyPropertyChanged
        {
            return source.ObserveProperty(propertyExpression, false);
        }

        public static IObservable<TValue> ObserveProperty<T, TValue>(
            this T source,
            Expression<Func<T, TValue>> propertyExpression,
            bool observeInitialValue
        )
            where T : INotifyPropertyChanged
        {
            var memberExpression = (MemberExpression)propertyExpression.Body;

            var getter = propertyExpression.Compile();

            var observable = from evt in Observable
                    .FromEvent<PropertyChangedEventHandler, PropertyChangedEventArgs>
                    (h => (s, e) => h(s, e),
                    h => source.PropertyChanged += h,
                    h => source.PropertyChanged -= h)
                             where evt.EventArgs.PropertyName == memberExpression.Member.Name
                             select getter(source);

            if (observeInitialValue)
                return observable.Merge(Observable.Return(getter(source)));

            return observable;
        }
    }
}

Then the base class for anything that wants to support INotifyPropertyChanged.

using System;
using System.ComponentModel;
using System.Linq.Expressions;

namespace PropertyObservation
{
    public class NotifyingBase : INotifyPropertyChanged
    {
        public event PropertyChangedEventHandler PropertyChanged;

        protected void OnPropertyChanged(string propertyName)
        {
            var handler = PropertyChanged;
            if (handler != null)
            {
                var e = new PropertyChangedEventArgs(propertyName);
                handler(this, e);
            }
        }

        // some like this better as its easier to generate
        protected void RaisePropertyChanged<T>(Expression<Func<T>> expression)
        {
            var propertyName = ExpressionHelper.GetPropertyName(expression);
            OnPropertyChanged(propertyName);
        }

        // some prefer this as its more typed 
        protected void RaisePropertyChanged<T>(Expression<Func<T, Object>> expression)
        {
            var propertyName = ExpressionHelper.GetPropertyName(expression);
            OnPropertyChanged(propertyName);
        }
    }
}

The above uses ExpressionHelper, a class that I use in nearly everything these days (similar version mentioned by me previously as the PropertyHelper class). Also has a handy GetMethodName(). Here it is:

using System;
using System.Linq.Expressions;

namespace PropertyObservation
{
    public class ExpressionHelper
    {
        public static string GetPropertyName<T>(Expression<Func<T>> expression)
        {
            return GetNameFromLambda(expression);
        }

        public static string GetPropertyName<T>(Expression<Func<T, Object>> expression)
        {
            return GetNameFromLambda(expression);
        }

        public static string GetMethodName<T>(Expression<Action<T>> expression)
        {
            return GetNameFromLambda(expression);
        }

        private static string GetNameFromLambda(LambdaExpression lambda)
        {
            MemberExpression memberExpression = null;
            if (lambda.Body is UnaryExpression)
            {
                var unaryExpression = lambda.Body as UnaryExpression;
                memberExpression = unaryExpression.Operand as MemberExpression;
            }
            else if (lambda.Body is MemberExpression)
                memberExpression = lambda.Body as MemberExpression;
           
            if (memberExpression != null)
                    return memberExpression.Member.Name;

            if (lambda.Body is MethodCallExpression)
            {
                var methodCallExpression = lambda.Body as MethodCallExpression;
                return methodCallExpression.Method.Name;
            }

            throw new ArgumentException(String.Format("Expression '{0}' did not provide a property or method name.", lambda));
        }
    }
}

A view model example

namespace PropertyObservation
{
    public class MainWindowViewModel : NotifyingBase
    {
        private string _userName;
        public string UserName
        {
            get { return _userName; }
            set 
            {
                _userName = value;
                // more strongly typed
                RaisePropertyChanged<MainWindowViewModel>(vm => vm.UserName);
            }
        }

        private string _password;
        public string Password
        {
            get { return _password; }
            set 
            {
                _password = value;
                // bit looser, more room for setting an incorrect property  
                RaisePropertyChanged(() => Password);
            }
        }

        private string _results;
        public string Results
        {
            get { return _results; }
            set 
            {
                _results = value;
                RaisePropertyChanged(() => Results);
            }
        }
    }
}

And finally some code in a controller (or where ever you'd normally put this type of controlling code, some say the view model).

using System;
using System.Linq;

namespace PropertyObservation
{
    public class MainWindowController
    {
        private readonly MainWindowViewModel _mainWindowViewModel;

        public MainWindowController(MainWindowViewModel mainWindowViewModel)
        {
            _mainWindowViewModel = mainWindowViewModel;
        }

        public void Start()
        {
            var userNameChanged = _mainWindowViewModel.ObserveProperty(vm => vm.UserName, true);
            var passwordChanged = _mainWindowViewModel.ObserveProperty(vm => vm.Password, true);

            var bothChanged = userNameChanged.CombineLatest(passwordChanged, (username, password) => String.Format("You typed {0} {1}", username, password));

            var disposable = bothChanged.Subscribe(
                message => 
                {
                    _mainWindowViewModel.Results = message;
                }
            );

            // TODO dispose your event wireup at some time if required.
        }
    }
}

The xaml is just standard WPF data binding, check out the download if required. Hope this helps someone out.

About the author

Keith Woods

Keith Woods works for Lab49, a consulting firm that builds advanced solutions for the financial services industry.