Fanout Messaging with ServiceStack.REDIS

Standard

I’ve been using the Pub/Sub messaging facilities in Servicestack for a while – using REDIS as the messaging system.  It’s ace – it’s easy to setup and very reliable.

I’ve now started to migrate the app to a CQRS model.  I’ve done this as I’m finding that data queries are taking too long.  With the other benefits adopting CQRS provides – such as event sourcing, easier systems integration, it seems like the logical place to go.

So to make it all work I need to be able to broadcast events – reliably, this means the REDIS pub/sub model wouldn’t work (I Think!).  I could implement RabbitMQ which provides the Q model as I currently use with REDIS and Servicestack, but has a fan out feature via an exchange.  The trouble is, this would mean putting another app into the infrastructure and there’s a fair amount of negative reviews about the speed of RabbitMQ – enough to make me think I don’t really want to go there.

So, I thought, surely it can’t be that tricky to implement an exchange feature in service stack by building on the great stuff that’s already there.  And it wasn’t!  I wanted to change as little as possible – basically I didn’t want to fork servicestack.  This principle lead me down a particular path, it may not be the best one, but it works.

Servicestack MQ uses REDIS lists to create a message Q.  The list name is based upon the  type of object being inserted/published into the Q.  One Q can have multiple apps publishing to it, but only one registered Handler (within an app – there can be multiple servers monitoring the list, the first server to query the Q will pop the top item).

Therefore the exchange will need to monitor a Q and create a Q for each app that wants to receive a message and this will need to be based upon an object type.  So my solution requires the app handling the message to tell the exchange the type of object it wants to know about AND an object that it handles.  So the app creates a new object type that inherits from the object it wishes to know about.  The base class needs to be a POCO.

public class ReadDBCourseInformationUpdated:CourseInformationUpdated
{
}

MessageExchange.registerSubscription<CourseInformationUpdated, ReadDBCourseInformationUpdated>();

mqHost.RegisterHandler<ReadDBCourseInformationUpdated>(m =>
{
//Do Something
});

This lines put in place a new class(ReadDBCourseInformationUpdated). The message exchange is then instructed to relay all messages of type CourseInformationUpdated to ReadDBCourseInformationUpdated. Finally a handler is registered to process the ReadDBCourseInformationUpdated messages.

The Message Exchange
The message exchange is actually really small:


public class MessageExchange : CORE.IMessageExchange
{
    private readonly Dictionary<Type, Type> handlerMap = new Dictionary<Type, Type>();
    public IMessageService mqHost {get;set;}
    public static IMessageProducer messageProducer { get; set; }
    public MessageExchange(){}

    public virtual object distributeMessage(IMessage message) 
    { 
         var match = handlerMap.Where(i => i.Value == message.Body.GetType()); 
         foreach (var client in match) 
         { 
             var newMessage = Activator.CreateInstance(client.Key); 
             newMessage.PopulateWith(message.Body); 
             typeof(MessageExchange) 
               .GetMethod("createMessage", BindingFlags.Static | BindingFlags.NonPublic) 
               .MakeGenericMethod(newMessage.GetType()) 
               .Invoke(null, new object[] { newMessage }); 
         } 
         return null; 
     }

     private static void createMessage(T x) 
     {
        var SSMessage = new Message(x); 
        messageProducer.Publish(SSMessage); 
     }

     ///
     /// Create a subscription to events of Type1 and forward them on as Type2 
     /// 
     /// The type of Message that exchange is to listen for 
     /// The type of Message that exchange is to forward 
     public virtual void registerSubscription<T1,T2>() 
     { 
         if (handlerMap.ContainsKey(typeof(T2))) 
         { 
             throw new ArgumentException("Message handler has already been registered for type: " +typeof(T2).Name); 
         } 
         if (!handlerMap.ContainsValue(typeof(T1))) 
         { 
             if(!handlerMap.ContainsKey(typeof(T1))) 
             { 
                 mqHost.RegisterHandler (m => distributeMessage(m) ); 
             } 
         } 

         Type value; 

         if (handlerMap.TryGetValue(typeof(T1), out value)) 
         { 
             handlerMap[typeof(T2)] = value; 
         } 
         else 
         { 
             handlerMap[typeof(T2)] = typeof(T1); 
         } 
     } 
}

Thanks for the help on stackoverflow.
And thanks for the awesome servicestack.

Advertisements

5 thoughts on “Fanout Messaging with ServiceStack.REDIS

    • I’ve updated my answer to the question with a link to a GIST. It’s close but not quite there, as I say in the answer the code works with an STS server I have written but the server stack OpenId clients still don’t work. The error I am getting now is complaining about configuration, I’ve had to park it for the moment but hope come back to it and work it out soon.

  1. Mike,
    In your code above, all possible fan-outs have to be listed in this same service, right? I am thinking if you had a 2nd service subscribed and wanting the message through your exchange, it would not happen as they would have to be all subscribed from the same server instance. I suppose you could have ‘dedicated exchange servers’, but any new exchanges would have to be put into those, recompiled and such. Btw, would you consider posting all your code (around this) in a Gist?

    • Hi Wayne

      Sorry of the delay in responding.

      All possible fan outs would need to be listed in the same App – I think. If I had multiple Apps wanting to consume the messages I might refactor the exchange into a service so that each app would register with the service saying which queue they wanted to listen to and which queue they were going to listen to? However, this would probably require a dedicated exchange server at which point it might be worth considering one of the messaging systems that supports this model such as RabbitMQ

      The model I use for our app is that we have the same code base deployed across multiple services so we have scaled out, rather than up. So each server is running a complete add and fanning out the messages based on which ever server gets there first.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s