Ah the Event Aggregator, you either love it or you hate it. I tend to hate it 95% of the time and like it on very rare occasions. One such time was during a port of some old Prism code, another was for use building a status bar. At least RX can spice it up.
The interface I use:
using System;
namespace GeoTagger.Core
{
public interface IEventAggregator
{
IObservable<T> GetEventStream<T>();
void Publish<T>(T payload);
}
}
The implementation:
using System;
using System.Collections.Generic;
using System.Disposables;
using System.Linq;
using GeoTagger.Core.Extension;
namespace GeoTagger.Core
{
public class EventAggregator : IEventAggregator
{
private readonly object _observablesByTypeKeyLock = new object();
private readonly Dictionary<string, Tuple<object, object>> _observablesByTypeKey = new Dictionary<string, Tuple<object, object>>();
public IObservable<T> GetEventStream<T>()
{
IObservable<T> stream;
var key = typeof(T).ToString();
lock (_observablesByTypeKeyLock)
{
if (_observablesByTypeKey.ContainsKey(key))
{
Tuple<object, object> tuple = _observablesByTypeKey[key];
stream = (IObservable<T>)tuple.Item2;
}
else
{
Type specificSubjectType = typeof(Subject<>).MakeGenericType(new[] { typeof(T) });
var subject = (Subject<T>)Activator.CreateInstance(specificSubjectType, new object[] { });
var removeEventStreamFromCache = Disposable.Create(
() =>
{
lock (_observablesByTypeKeyLock)
{
_observablesByTypeKey.Remove(key);
}
}
);
stream = subject.AddDisposable(removeEventStreamFromCache).Publish().RefCount();
var tuple = new Tuple<object, object>(subject, stream);
_observablesByTypeKey.Add(key, tuple);
}
}
return stream;
}
public void Publish<T>(T payload)
{
var key = typeof(T).ToString();
Tuple<object, object> tuple;
lock (_observablesByTypeKeyLock)
_observablesByTypeKey.TryGetValue(key, out tuple);
if (tuple != null)
{
((Subject<T>)tuple.Item1).OnNext(payload);
}
}
}
}
Tests I hear you say, well I did write couple, handy so help you see how it’s used.
using System;
using GeoTagger.Core;
using NUnit.Framework;
namespace GeoTagger.Tests.Core
{
[TestFixture]
public class EventAggregatorTests
{
[Test]
public void Publish_PublishesWhenThereAreSubscribers()
{
var expected = "Keith";
string actual = null;
var eventAggregator = new EventAggregator();
eventAggregator.GetEventStream<string>()
.Subscribe(
payload =>
{
actual = payload;
}
);
eventAggregator.Publish(expected);
Assert.AreEqual(expected, actual);
}
[Test]
public void Publish_DoesNotThrowWithNoSubscribers()
{
var eventAggregator = new EventAggregator();
eventAggregator.Publish("Boo");
}
[Test]
public void Publish_WithMutipleSubscribersPayloadDeliveredToAll()
{
var expected = new object();
object actual1 = null, actual2 = null;
var eventAggregator = new EventAggregator();
eventAggregator.GetEventStream<Object>()
.Subscribe(
payload =>
{
actual1 = payload;
}
);
eventAggregator.GetEventStream<Object>()
.Subscribe(
payload =>
{
actual2 = payload;
}
);
eventAggregator.Publish(expected);
Assert.AreEqual(expected, actual1);
Assert.AreEqual(expected, actual2);
}
[Test]
public void PublishAgainstDisposedSubscriber_ExpectNoPayloadDelevered()
{
int expected = 5;
int actual1 = 0, actual2 = 0;
var eventAggregator = new EventAggregator();
var disposable1 = eventAggregator.GetEventStream<int>()
.Subscribe(
payload =>
{
actual1 = payload;
}
);
// note we won't dispose this one
eventAggregator.GetEventStream<int>()
.Subscribe(
payload =>
{
actual2 = payload;
}
);
eventAggregator.Publish(expected);
Assert.AreEqual(expected, actual1);
Assert.AreEqual(expected, actual2);
disposable1.Dispose();
eventAggregator.Publish(6);
Assert.AreEqual(expected, actual1);
// the second subscription should remain
Assert.AreEqual(6, actual2);
}
}
}
Under the covers the EventAggregator manages observable streams. Steams are keyed by payload type and callers of GetEventStream() will receive the same underlying stream. Publishing and ref counting these streams ensures that there will only be one underlying. Once all callers dispose their streams the implementation will remove the stream from its cache. If a caller call Publish() but there are no subscribers, nothing happens.
2 blog posts in 1 day, personal record lol.