Fanout Messaging with ServiceStack.REDIS

Standard

I’ve been using the Pub/Sub messaging facilities in Servicestack for a while – using REDIS as the messaging system.  It’s ace – it’s easy to setup and very reliable.

I’ve now started to migrate the app to a CQRS model.  I’ve done this as I’m finding that data queries are taking too long.  With the other benefits adopting CQRS provides – such as event sourcing, easier systems integration, it seems like the logical place to go.

So to make it all work I need to be able to broadcast events – reliably, this means the REDIS pub/sub model wouldn’t work (I Think!).  I could implement RabbitMQ which provides the Q model as I currently use with REDIS and Servicestack, but has a fan out feature via an exchange.  The trouble is, this would mean putting another app into the infrastructure and there’s a fair amount of negative reviews about the speed of RabbitMQ – enough to make me think I don’t really want to go there.

So, I thought, surely it can’t be that tricky to implement an exchange feature in service stack by building on the great stuff that’s already there.  And it wasn’t!  I wanted to change as little as possible – basically I didn’t want to fork servicestack.  This principle lead me down a particular path, it may not be the best one, but it works.

Servicestack MQ uses REDIS lists to create a message Q.  The list name is based upon the  type of object being inserted/published into the Q.  One Q can have multiple apps publishing to it, but only one registered Handler (within an app – there can be multiple servers monitoring the list, the first server to query the Q will pop the top item).

Therefore the exchange will need to monitor a Q and create a Q for each app that wants to receive a message and this will need to be based upon an object type.  So my solution requires the app handling the message to tell the exchange the type of object it wants to know about AND an object that it handles.  So the app creates a new object type that inherits from the object it wishes to know about.  The base class needs to be a POCO.

public class ReadDBCourseInformationUpdated:CourseInformationUpdated
{
}

MessageExchange.registerSubscription<CourseInformationUpdated, ReadDBCourseInformationUpdated>();

mqHost.RegisterHandler<ReadDBCourseInformationUpdated>(m =>
{
//Do Something
});

This lines put in place a new class(ReadDBCourseInformationUpdated). The message exchange is then instructed to relay all messages of type CourseInformationUpdated to ReadDBCourseInformationUpdated. Finally a handler is registered to process the ReadDBCourseInformationUpdated messages.

The Message Exchange
The message exchange is actually really small:


public class MessageExchange : CORE.IMessageExchange
{
    private readonly Dictionary<Type, Type> handlerMap = new Dictionary<Type, Type>();
    public IMessageService mqHost {get;set;}
    public static IMessageProducer messageProducer { get; set; }
    public MessageExchange(){}

    public virtual object distributeMessage(IMessage message) 
    { 
         var match = handlerMap.Where(i => i.Value == message.Body.GetType()); 
         foreach (var client in match) 
         { 
             var newMessage = Activator.CreateInstance(client.Key); 
             newMessage.PopulateWith(message.Body); 
             typeof(MessageExchange) 
               .GetMethod("createMessage", BindingFlags.Static | BindingFlags.NonPublic) 
               .MakeGenericMethod(newMessage.GetType()) 
               .Invoke(null, new object[] { newMessage }); 
         } 
         return null; 
     }

     private static void createMessage(T x) 
     {
        var SSMessage = new Message(x); 
        messageProducer.Publish(SSMessage); 
     }

     ///
     /// Create a subscription to events of Type1 and forward them on as Type2 
     /// 
     /// The type of Message that exchange is to listen for 
     /// The type of Message that exchange is to forward 
     public virtual void registerSubscription<T1,T2>() 
     { 
         if (handlerMap.ContainsKey(typeof(T2))) 
         { 
             throw new ArgumentException("Message handler has already been registered for type: " +typeof(T2).Name); 
         } 
         if (!handlerMap.ContainsValue(typeof(T1))) 
         { 
             if(!handlerMap.ContainsKey(typeof(T1))) 
             { 
                 mqHost.RegisterHandler (m => distributeMessage(m) ); 
             } 
         } 

         Type value; 

         if (handlerMap.TryGetValue(typeof(T1), out value)) 
         { 
             handlerMap[typeof(T2)] = value; 
         } 
         else 
         { 
             handlerMap[typeof(T2)] = typeof(T1); 
         } 
     } 
}

Thanks for the help on stackoverflow.
And thanks for the awesome servicestack.

Advertisements