Dec 29 Keith | Photography

Street art

December is always a bit bluesy around the city. I live in central London and the place is deserted during the Christmas break. But there was some new street art just around the corner from my place in Shoreditch.

I have to admit I'm a fan of *cough* graffiti *cough*, not scribbles on walls by kids with nothing to do, but by those with really artistic talent (admittedly they perhaps were once guilty of some scribbles). These pieces were don't on a building that always has a few walls dedicated to art (i.e. legal graffiti).

Dec 18 Keith | C#, Dev

MEF and Prism

I recently trod down the Managed Extensibility Framework road with Prism, it became clear it was a long road and I couldn't see the end. I turned back.

There were some things I really liked, for example dynamic registration was cool. I could just mark up my dependencies with attributes. I could add additional metadata with custom attributes conforming to an interface. Using Lazy<T> and logic during resolution time, I could query dependencies using this metadata/interface and make intelligent decisions as to if I wanted to resolve them. 

Life was good. Then I got some typical dependency mismatches on first launch (in my spike app). No problems, its to be expected with any container based approach. I came across mefx to help me out, the exception data wasn’t really up to it. Then I wanted to do some dynamic loading of assemblies in conjunction with the System.Addin pipeline. MEF needed to load assemblies from different directories. Things start to get complicated when you have a large application and missing dependencies in this scenario. Mefx doesn't really help as you need have to have all the dlls in one place, more scripting, its results we rather baffling!! Then I wanted to use simple factories in my constructors, something like 'Func<ISomething> somethingFactory', you can do it using an export ExportFactory but wasn’t that clean. I was pretty much at that point and stopped.

To cut a long story short, stick to a fully fledged IoC container for typical line of business applications (LOB), they're build for the job. MEF has its use cases, its handy to discover plugins for the current process, or perhaps in smaller LOB apps that don’t have hundreds or thousands of container registrations. I wouldn’t recommend using it with Prism. If you have a working container approach, stick to it.

Dec 12 Keith | C#, Dev

Another version of the EventAggregator

Hers is another version of the code from yesterdays post (which would work in Silverlight, where this won’t) using a ConcurrentDictionary. I saw Jose blog post do something similar with the ConcurrentDictionary  so thought I’d give it a try. I also added some comments as to why I call Publish and Refcount. Observable.Defer was added so the any modifications doesn’t actually happen until someone subscribes.

using System;
using System.Collections.Generic;
using System.Disposables;
using System.Linq;
using GeoTagger.Core.Extension;
using System.Collections.Concurrent;

namespace GeoTagger.Core
{
    public class EventAggregator : IEventAggregator
    {
        private readonly ConcurrentDictionary<Type, Tuple<object, object>> _observablesByPayloadType = new ConcurrentDictionary<Type, Tuple<object, object>>();

        public IObservable<T> GetEventStream<T>()
        {
           var defered = Observable.Defer(
                () => 
                {
                    var tuple =
                        _observablesByPayloadType.GetOrAdd(typeof(T),
                            t =>
                            {
                                var subject = new Subject<T>();

                                var removeFromCache = Disposable.Create(
                                    () =>
                                    {
                                        Tuple<object, object> _; // throw away object!
                                        _observablesByPayloadType.TryRemove(typeof(T), out _);
                                    }
                                );
                                
                                // because we are removing items from the cache to clean up on dispose, 
                                // we need to publish then refcount the subject to ensure we only 
                                // clean up when there are no subscribers left
                                var observable =
                                    subject
                                        .AddDisposable(removeFromCache)
                                        .Publish()
                                        .RefCount()
                                        .AsObservable();

                                return new Tuple<object, object>(subject, observable);
                            }
                        );

                    return (IObservable<T>)tuple.Item2;
                }
            );

           return defered;
        }

        public void Publish<T>(T payload)
        {
            Tuple<object, object> cachedItem;
            if (_observablesByPayloadType.TryGetValue(typeof(T), out cachedItem))
            {
                ((ISubject<T>)cachedItem.Item1).OnNext(payload);
            }
        }
    }
}

I more test was added just to ensure things are getting removed from the cache when the final caller disposes. Its a pretty nasty test!

[Test]
public void CacheRefCountsForDifferentPayloadTypes_DisposeRemovesObservablesFromCache()
{
    var eventAggregator = new EventAggregator();
    // HACK!
    var field = eventAggregator.GetType().GetField("_observablesByPayloadType", BindingFlags.NonPublic | BindingFlags.Instance);
    var cache = (ConcurrentDictionary<Type, Tuple<object, object>>)field.GetValue(eventAggregator);

    // pre-condition 
    Assert.IsTrue(cache.IsEmpty);

    var disposable1 = eventAggregator.GetEventStream<int>().Subscribe( payload =>  { } );

    var disposable2 = eventAggregator.GetEventStream<int>().Subscribe(payload => { });

    // with the cached observables being ref counted there should only be one underlying
    Assert.AreEqual(1, cache.Count);

    var disposable3 = eventAggregator.GetEventStream<string>().Subscribe(payload => { });
   
    Assert.AreEqual(2, cache.Count);

    disposable1.Dispose();
    disposable2.Dispose();
    disposable3.Dispose();

    Assert.IsTrue(cache.IsEmpty);
}

Edit: the above used this extentsion

public static class ObservableEx
{
        public static IObservable<t> AddDisposable<t>(this IObservable<t> source, IDisposable disposable)
        {
            return Observable.CreateWithDisposable<t>(
                o =&gt;
                {
                    source.Subscribe(o);
                    return disposable;
                }
            );
        }
}
Dec 11 Keith | C#, Dev, Rx

Rx Event Aggregator

Ah the Event Aggregator, you either love it or you hate it. I tend to hate it 95% of the time and like it on very rare occasions. One such time was during a port of some old Prism code, another was for use building a status bar. At least RX can spice it up.

The interface I use:

using System;

namespace GeoTagger.Core
{
    public interface IEventAggregator
    {
        IObservable<T> GetEventStream<T>();
        void Publish<T>(T payload);
    }
}

The implementation:

using System;
using System.Collections.Generic;
using System.Disposables;
using System.Linq;
using GeoTagger.Core.Extension;

namespace GeoTagger.Core
{
    public class EventAggregator : IEventAggregator
    {
        private readonly object _observablesByTypeKeyLock = new object();

        private readonly Dictionary<string, Tuple<object, object>> _observablesByTypeKey = new Dictionary<string, Tuple<object, object>>();

        public IObservable<T> GetEventStream<T>()
        {
            IObservable<T> stream;
            var key = typeof(T).ToString();

            lock (_observablesByTypeKeyLock)
            {
                if (_observablesByTypeKey.ContainsKey(key))
                {
                    Tuple<object, object> tuple = _observablesByTypeKey[key];
                    stream = (IObservable<T>)tuple.Item2;
                }
                else
                {
                    Type specificSubjectType = typeof(Subject<>).MakeGenericType(new[] { typeof(T) });
                    var subject = (Subject<T>)Activator.CreateInstance(specificSubjectType, new object[] { });

                    var removeEventStreamFromCache = Disposable.Create(
                        () =>
                        {
                            lock (_observablesByTypeKeyLock)
                            {
                                _observablesByTypeKey.Remove(key);
                            }
                        }
                    );

                    stream = subject.AddDisposable(removeEventStreamFromCache).Publish().RefCount();

                    var tuple = new Tuple<object, object>(subject, stream);
                    _observablesByTypeKey.Add(key, tuple);
                }
            }
            return stream;
        }

        public void Publish<T>(T payload)
        {
            var key = typeof(T).ToString();
            
            Tuple<object, object> tuple;
            
            lock (_observablesByTypeKeyLock)
                _observablesByTypeKey.TryGetValue(key, out tuple);
            
            if (tuple != null)
            {
                ((Subject<T>)tuple.Item1).OnNext(payload);
            }
        }
    }
}

Tests I hear you say, well I did write couple, handy so help you see how it’s used.

using System;
using GeoTagger.Core;
using NUnit.Framework;

namespace GeoTagger.Tests.Core
{
    [TestFixture]
    public class EventAggregatorTests
    {
        [Test]
        public void Publish_PublishesWhenThereAreSubscribers()
        {
            var expected = "Keith";
            string actual = null;

            var eventAggregator = new EventAggregator();

            eventAggregator.GetEventStream<string>()
                .Subscribe(
                    payload =>
                    {
                        actual = payload;
                    }
                );

            eventAggregator.Publish(expected);

            Assert.AreEqual(expected, actual);
        }

        [Test]
        public void Publish_DoesNotThrowWithNoSubscribers()
        {
            var eventAggregator = new EventAggregator();
            eventAggregator.Publish("Boo");
        }

        [Test]
        public void Publish_WithMutipleSubscribersPayloadDeliveredToAll()
        {
            var expected = new object();
            object actual1 = null, actual2 = null;

            var eventAggregator = new EventAggregator();

            eventAggregator.GetEventStream<Object>()
                .Subscribe(
                    payload =>
                    {
                        actual1 = payload;
                    }
                );

            eventAggregator.GetEventStream<Object>()
                .Subscribe(
                    payload =>
                    {
                        actual2 = payload;
                    }
                );


            eventAggregator.Publish(expected);

            Assert.AreEqual(expected, actual1);
            Assert.AreEqual(expected, actual2);
        }

        [Test]
        public void PublishAgainstDisposedSubscriber_ExpectNoPayloadDelevered()
        {
            int expected = 5;
            int actual1 = 0, actual2 = 0;

            var eventAggregator = new EventAggregator();

            var disposable1 = eventAggregator.GetEventStream<int>()
                .Subscribe(
                    payload =>
                    {
                        actual1 = payload;
                    }
                );

            // note we won't dispose this one 
            eventAggregator.GetEventStream<int>()
                .Subscribe(
                    payload =>
                    {
                        actual2 = payload;
                    }
                );

            eventAggregator.Publish(expected);

            Assert.AreEqual(expected, actual1);
            Assert.AreEqual(expected, actual2);

            disposable1.Dispose();

            eventAggregator.Publish(6);

            Assert.AreEqual(expected, actual1);

            // the second subscription should remain
            Assert.AreEqual(6, actual2);
        }
    }
}

Under the covers the EventAggregator manages observable streams. Steams are keyed by payload type and callers of GetEventStream() will receive the same underlying stream. Publishing and ref counting these streams ensures that there will only be one underlying. Once all callers dispose their streams the implementation will remove the stream from its cache. If a caller call Publish() but there are no subscribers, nothing happens.

2 blog posts in 1 day, personal record lol.

About the author

Keith Woods

Keith Woods works for Lab49, a consulting firm that builds advanced solutions for the financial services industry.