Dec 18 Keith | C#, Dev

MEF and Prism

I recently trod down the Managed Extensibility Framework road with Prism, it became clear it was a long road and I couldn't see the end. I turned back.

There were some things I really liked, for example dynamic registration was cool. I could just mark up my dependencies with attributes. I could add additional metadata with custom attributes conforming to an interface. Using Lazy<T> and logic during resolution time, I could query dependencies using this metadata/interface and make intelligent decisions as to if I wanted to resolve them. 

Life was good. Then I got some typical dependency mismatches on first launch (in my spike app). No problems, its to be expected with any container based approach. I came across mefx to help me out, the exception data wasn’t really up to it. Then I wanted to do some dynamic loading of assemblies in conjunction with the System.Addin pipeline. MEF needed to load assemblies from different directories. Things start to get complicated when you have a large application and missing dependencies in this scenario. Mefx doesn't really help as you need have to have all the dlls in one place, more scripting, its results we rather baffling!! Then I wanted to use simple factories in my constructors, something like 'Func<ISomething> somethingFactory', you can do it using an export ExportFactory but wasn’t that clean. I was pretty much at that point and stopped.

To cut a long story short, stick to a fully fledged IoC container for typical line of business applications (LOB), they're build for the job. MEF has its use cases, its handy to discover plugins for the current process, or perhaps in smaller LOB apps that don’t have hundreds or thousands of container registrations. I wouldn’t recommend using it with Prism. If you have a working container approach, stick to it.

Dec 12 Keith | C#, Dev

Another version of the EventAggregator

Hers is another version of the code from yesterdays post (which would work in Silverlight, where this won’t) using a ConcurrentDictionary. I saw Jose blog post do something similar with the ConcurrentDictionary  so thought I’d give it a try. I also added some comments as to why I call Publish and Refcount. Observable.Defer was added so the any modifications doesn’t actually happen until someone subscribes.

using System;
using System.Collections.Generic;
using System.Disposables;
using System.Linq;
using GeoTagger.Core.Extension;
using System.Collections.Concurrent;

namespace GeoTagger.Core
{
    public class EventAggregator : IEventAggregator
    {
        private readonly ConcurrentDictionary<Type, Tuple<object, object>> _observablesByPayloadType = new ConcurrentDictionary<Type, Tuple<object, object>>();

        public IObservable<T> GetEventStream<T>()
        {
           var defered = Observable.Defer(
                () => 
                {
                    var tuple =
                        _observablesByPayloadType.GetOrAdd(typeof(T),
                            t =>
                            {
                                var subject = new Subject<T>();

                                var removeFromCache = Disposable.Create(
                                    () =>
                                    {
                                        Tuple<object, object> _; // throw away object!
                                        _observablesByPayloadType.TryRemove(typeof(T), out _);
                                    }
                                );
                                
                                // because we are removing items from the cache to clean up on dispose, 
                                // we need to publish then refcount the subject to ensure we only 
                                // clean up when there are no subscribers left
                                var observable =
                                    subject
                                        .AddDisposable(removeFromCache)
                                        .Publish()
                                        .RefCount()
                                        .AsObservable();

                                return new Tuple<object, object>(subject, observable);
                            }
                        );

                    return (IObservable<T>)tuple.Item2;
                }
            );

           return defered;
        }

        public void Publish<T>(T payload)
        {
            Tuple<object, object> cachedItem;
            if (_observablesByPayloadType.TryGetValue(typeof(T), out cachedItem))
            {
                ((ISubject<T>)cachedItem.Item1).OnNext(payload);
            }
        }
    }
}

I more test was added just to ensure things are getting removed from the cache when the final caller disposes. Its a pretty nasty test!

[Test]
public void CacheRefCountsForDifferentPayloadTypes_DisposeRemovesObservablesFromCache()
{
    var eventAggregator = new EventAggregator();
    // HACK!
    var field = eventAggregator.GetType().GetField("_observablesByPayloadType", BindingFlags.NonPublic | BindingFlags.Instance);
    var cache = (ConcurrentDictionary<Type, Tuple<object, object>>)field.GetValue(eventAggregator);

    // pre-condition 
    Assert.IsTrue(cache.IsEmpty);

    var disposable1 = eventAggregator.GetEventStream<int>().Subscribe( payload =>  { } );

    var disposable2 = eventAggregator.GetEventStream<int>().Subscribe(payload => { });

    // with the cached observables being ref counted there should only be one underlying
    Assert.AreEqual(1, cache.Count);

    var disposable3 = eventAggregator.GetEventStream<string>().Subscribe(payload => { });
   
    Assert.AreEqual(2, cache.Count);

    disposable1.Dispose();
    disposable2.Dispose();
    disposable3.Dispose();

    Assert.IsTrue(cache.IsEmpty);
}

Edit: the above used this extentsion

public static class ObservableEx
{
        public static IObservable<t> AddDisposable<t>(this IObservable<t> source, IDisposable disposable)
        {
            return Observable.CreateWithDisposable<t>(
                o =&gt;
                {
                    source.Subscribe(o);
                    return disposable;
                }
            );
        }
}
Dec 11 Keith | C#, Dev, Rx

Rx Event Aggregator

Ah the Event Aggregator, you either love it or you hate it. I tend to hate it 95% of the time and like it on very rare occasions. One such time was during a port of some old Prism code, another was for use building a status bar. At least RX can spice it up.

The interface I use:

using System;

namespace GeoTagger.Core
{
    public interface IEventAggregator
    {
        IObservable<T> GetEventStream<T>();
        void Publish<T>(T payload);
    }
}

The implementation:

using System;
using System.Collections.Generic;
using System.Disposables;
using System.Linq;
using GeoTagger.Core.Extension;

namespace GeoTagger.Core
{
    public class EventAggregator : IEventAggregator
    {
        private readonly object _observablesByTypeKeyLock = new object();

        private readonly Dictionary<string, Tuple<object, object>> _observablesByTypeKey = new Dictionary<string, Tuple<object, object>>();

        public IObservable<T> GetEventStream<T>()
        {
            IObservable<T> stream;
            var key = typeof(T).ToString();

            lock (_observablesByTypeKeyLock)
            {
                if (_observablesByTypeKey.ContainsKey(key))
                {
                    Tuple<object, object> tuple = _observablesByTypeKey[key];
                    stream = (IObservable<T>)tuple.Item2;
                }
                else
                {
                    Type specificSubjectType = typeof(Subject<>).MakeGenericType(new[] { typeof(T) });
                    var subject = (Subject<T>)Activator.CreateInstance(specificSubjectType, new object[] { });

                    var removeEventStreamFromCache = Disposable.Create(
                        () =>
                        {
                            lock (_observablesByTypeKeyLock)
                            {
                                _observablesByTypeKey.Remove(key);
                            }
                        }
                    );

                    stream = subject.AddDisposable(removeEventStreamFromCache).Publish().RefCount();

                    var tuple = new Tuple<object, object>(subject, stream);
                    _observablesByTypeKey.Add(key, tuple);
                }
            }
            return stream;
        }

        public void Publish<T>(T payload)
        {
            var key = typeof(T).ToString();
            
            Tuple<object, object> tuple;
            
            lock (_observablesByTypeKeyLock)
                _observablesByTypeKey.TryGetValue(key, out tuple);
            
            if (tuple != null)
            {
                ((Subject<T>)tuple.Item1).OnNext(payload);
            }
        }
    }
}

Tests I hear you say, well I did write couple, handy so help you see how it’s used.

using System;
using GeoTagger.Core;
using NUnit.Framework;

namespace GeoTagger.Tests.Core
{
    [TestFixture]
    public class EventAggregatorTests
    {
        [Test]
        public void Publish_PublishesWhenThereAreSubscribers()
        {
            var expected = "Keith";
            string actual = null;

            var eventAggregator = new EventAggregator();

            eventAggregator.GetEventStream<string>()
                .Subscribe(
                    payload =>
                    {
                        actual = payload;
                    }
                );

            eventAggregator.Publish(expected);

            Assert.AreEqual(expected, actual);
        }

        [Test]
        public void Publish_DoesNotThrowWithNoSubscribers()
        {
            var eventAggregator = new EventAggregator();
            eventAggregator.Publish("Boo");
        }

        [Test]
        public void Publish_WithMutipleSubscribersPayloadDeliveredToAll()
        {
            var expected = new object();
            object actual1 = null, actual2 = null;

            var eventAggregator = new EventAggregator();

            eventAggregator.GetEventStream<Object>()
                .Subscribe(
                    payload =>
                    {
                        actual1 = payload;
                    }
                );

            eventAggregator.GetEventStream<Object>()
                .Subscribe(
                    payload =>
                    {
                        actual2 = payload;
                    }
                );


            eventAggregator.Publish(expected);

            Assert.AreEqual(expected, actual1);
            Assert.AreEqual(expected, actual2);
        }

        [Test]
        public void PublishAgainstDisposedSubscriber_ExpectNoPayloadDelevered()
        {
            int expected = 5;
            int actual1 = 0, actual2 = 0;

            var eventAggregator = new EventAggregator();

            var disposable1 = eventAggregator.GetEventStream<int>()
                .Subscribe(
                    payload =>
                    {
                        actual1 = payload;
                    }
                );

            // note we won't dispose this one 
            eventAggregator.GetEventStream<int>()
                .Subscribe(
                    payload =>
                    {
                        actual2 = payload;
                    }
                );

            eventAggregator.Publish(expected);

            Assert.AreEqual(expected, actual1);
            Assert.AreEqual(expected, actual2);

            disposable1.Dispose();

            eventAggregator.Publish(6);

            Assert.AreEqual(expected, actual1);

            // the second subscription should remain
            Assert.AreEqual(6, actual2);
        }
    }
}

Under the covers the EventAggregator manages observable streams. Steams are keyed by payload type and callers of GetEventStream() will receive the same underlying stream. Publishing and ref counting these streams ensures that there will only be one underlying. Once all callers dispose their streams the implementation will remove the stream from its cache. If a caller call Publish() but there are no subscribers, nothing happens.

2 blog posts in 1 day, personal record lol.

Dec 11 Keith | C#, Dev, Rx

Observing a property

I noticed this rated high on the WPF feature suggestion list. For a while now RX is a fantastic choice.

You need a utility class (ExpressionHelper which I’ve mentioned before by the name PropertyHelper) to extract property names from a lambda and turn them into strings. A simple base class to raise the INotifyPropertyChanged events (NotifyingBase). Some extension methods to INotifyPropertyChanged using RX (ObservableExtensions) so you can observe the properties. And finally some code to hook it altogether, be it in a view model or in my example, a controller. Hopefully this’ll help some people out on that forum.

Download this code (185.24 kb) (WPF 4, rx v1.0.2787.0 (included)).

First the extension to INotifyPropertyChanged.

using System;
using System.ComponentModel;
using System.Linq;
using System.Linq.Expressions;

namespace PropertyObservation
{
    public static class ObservableExtensions
    {
        public static IObservable<TValue> ObserveProperty<T, TValue>(
            this T source,
             Expression<Func<T, TValue>> propertyExpression
        )
            where T : INotifyPropertyChanged
        {
            return source.ObserveProperty(propertyExpression, false);
        }

        public static IObservable<TValue> ObserveProperty<T, TValue>(
            this T source,
            Expression<Func<T, TValue>> propertyExpression,
            bool observeInitialValue
        )
            where T : INotifyPropertyChanged
        {
            var memberExpression = (MemberExpression)propertyExpression.Body;

            var getter = propertyExpression.Compile();

            var observable = from evt in Observable
                    .FromEvent<PropertyChangedEventHandler, PropertyChangedEventArgs>
                    (h => (s, e) => h(s, e),
                    h => source.PropertyChanged += h,
                    h => source.PropertyChanged -= h)
                             where evt.EventArgs.PropertyName == memberExpression.Member.Name
                             select getter(source);

            if (observeInitialValue)
                return observable.Merge(Observable.Return(getter(source)));

            return observable;
        }
    }
}

Then the base class for anything that wants to support INotifyPropertyChanged.

using System;
using System.ComponentModel;
using System.Linq.Expressions;

namespace PropertyObservation
{
    public class NotifyingBase : INotifyPropertyChanged
    {
        public event PropertyChangedEventHandler PropertyChanged;

        protected void OnPropertyChanged(string propertyName)
        {
            var handler = PropertyChanged;
            if (handler != null)
            {
                var e = new PropertyChangedEventArgs(propertyName);
                handler(this, e);
            }
        }

        // some like this better as its easier to generate
        protected void RaisePropertyChanged<T>(Expression<Func<T>> expression)
        {
            var propertyName = ExpressionHelper.GetPropertyName(expression);
            OnPropertyChanged(propertyName);
        }

        // some prefer this as its more typed 
        protected void RaisePropertyChanged<T>(Expression<Func<T, Object>> expression)
        {
            var propertyName = ExpressionHelper.GetPropertyName(expression);
            OnPropertyChanged(propertyName);
        }
    }
}

The above uses ExpressionHelper, a class that I use in nearly everything these days (similar version mentioned by me previously as the PropertyHelper class). Also has a handy GetMethodName(). Here it is:

using System;
using System.Linq.Expressions;

namespace PropertyObservation
{
    public class ExpressionHelper
    {
        public static string GetPropertyName<T>(Expression<Func<T>> expression)
        {
            return GetNameFromLambda(expression);
        }

        public static string GetPropertyName<T>(Expression<Func<T, Object>> expression)
        {
            return GetNameFromLambda(expression);
        }

        public static string GetMethodName<T>(Expression<Action<T>> expression)
        {
            return GetNameFromLambda(expression);
        }

        private static string GetNameFromLambda(LambdaExpression lambda)
        {
            MemberExpression memberExpression = null;
            if (lambda.Body is UnaryExpression)
            {
                var unaryExpression = lambda.Body as UnaryExpression;
                memberExpression = unaryExpression.Operand as MemberExpression;
            }
            else if (lambda.Body is MemberExpression)
                memberExpression = lambda.Body as MemberExpression;
           
            if (memberExpression != null)
                    return memberExpression.Member.Name;

            if (lambda.Body is MethodCallExpression)
            {
                var methodCallExpression = lambda.Body as MethodCallExpression;
                return methodCallExpression.Method.Name;
            }

            throw new ArgumentException(String.Format("Expression '{0}' did not provide a property or method name.", lambda));
        }
    }
}

A view model example

namespace PropertyObservation
{
    public class MainWindowViewModel : NotifyingBase
    {
        private string _userName;
        public string UserName
        {
            get { return _userName; }
            set 
            {
                _userName = value;
                // more strongly typed
                RaisePropertyChanged<MainWindowViewModel>(vm => vm.UserName);
            }
        }

        private string _password;
        public string Password
        {
            get { return _password; }
            set 
            {
                _password = value;
                // bit looser, more room for setting an incorrect property  
                RaisePropertyChanged(() => Password);
            }
        }

        private string _results;
        public string Results
        {
            get { return _results; }
            set 
            {
                _results = value;
                RaisePropertyChanged(() => Results);
            }
        }
    }
}

And finally some code in a controller (or where ever you'd normally put this type of controlling code, some say the view model).

using System;
using System.Linq;

namespace PropertyObservation
{
    public class MainWindowController
    {
        private readonly MainWindowViewModel _mainWindowViewModel;

        public MainWindowController(MainWindowViewModel mainWindowViewModel)
        {
            _mainWindowViewModel = mainWindowViewModel;
        }

        public void Start()
        {
            var userNameChanged = _mainWindowViewModel.ObserveProperty(vm => vm.UserName, true);
            var passwordChanged = _mainWindowViewModel.ObserveProperty(vm => vm.Password, true);

            var bothChanged = userNameChanged.CombineLatest(passwordChanged, (username, password) => String.Format("You typed {0} {1}", username, password));

            var disposable = bothChanged.Subscribe(
                message => 
                {
                    _mainWindowViewModel.Results = message;
                }
            );

            // TODO dispose your event wireup at some time if required.
        }
    }
}

The xaml is just standard WPF data binding, check out the download if required. Hope this helps someone out.

About the author

Keith Woods

Keith Woods works for Lab49, a consulting firm that builds advanced solutions for the financial services industry.