Fanout Messaging with ServiceStack.REDIS

Standard

I’ve been using the Pub/Sub messaging facilities in Servicestack for a while – using REDIS as the messaging system.  It’s ace – it’s easy to setup and very reliable.

I’ve now started to migrate the app to a CQRS model.  I’ve done this as I’m finding that data queries are taking too long.  With the other benefits adopting CQRS provides – such as event sourcing, easier systems integration, it seems like the logical place to go.

So to make it all work I need to be able to broadcast events – reliably, this means the REDIS pub/sub model wouldn’t work (I Think!).  I could implement RabbitMQ which provides the Q model as I currently use with REDIS and Servicestack, but has a fan out feature via an exchange.  The trouble is, this would mean putting another app into the infrastructure and there’s a fair amount of negative reviews about the speed of RabbitMQ – enough to make me think I don’t really want to go there.

So, I thought, surely it can’t be that tricky to implement an exchange feature in service stack by building on the great stuff that’s already there.  And it wasn’t!  I wanted to change as little as possible – basically I didn’t want to fork servicestack.  This principle lead me down a particular path, it may not be the best one, but it works.

Servicestack MQ uses REDIS lists to create a message Q.  The list name is based upon the  type of object being inserted/published into the Q.  One Q can have multiple apps publishing to it, but only one registered Handler (within an app – there can be multiple servers monitoring the list, the first server to query the Q will pop the top item).

Therefore the exchange will need to monitor a Q and create a Q for each app that wants to receive a message and this will need to be based upon an object type.  So my solution requires the app handling the message to tell the exchange the type of object it wants to know about AND an object that it handles.  So the app creates a new object type that inherits from the object it wishes to know about.  The base class needs to be a POCO.

public class ReadDBCourseInformationUpdated:CourseInformationUpdated
{
}

MessageExchange.registerSubscription<CourseInformationUpdated, ReadDBCourseInformationUpdated>();

mqHost.RegisterHandler<ReadDBCourseInformationUpdated>(m =>
{
//Do Something
});

This lines put in place a new class(ReadDBCourseInformationUpdated). The message exchange is then instructed to relay all messages of type CourseInformationUpdated to ReadDBCourseInformationUpdated. Finally a handler is registered to process the ReadDBCourseInformationUpdated messages.

The Message Exchange
The message exchange is actually really small:


public class MessageExchange : CORE.IMessageExchange
{
    private readonly Dictionary<Type, Type> handlerMap = new Dictionary<Type, Type>();
    public IMessageService mqHost {get;set;}
    public static IMessageProducer messageProducer { get; set; }
    public MessageExchange(){}

    public virtual object distributeMessage(IMessage message) 
    { 
         var match = handlerMap.Where(i => i.Value == message.Body.GetType()); 
         foreach (var client in match) 
         { 
             var newMessage = Activator.CreateInstance(client.Key); 
             newMessage.PopulateWith(message.Body); 
             typeof(MessageExchange) 
               .GetMethod("createMessage", BindingFlags.Static | BindingFlags.NonPublic) 
               .MakeGenericMethod(newMessage.GetType()) 
               .Invoke(null, new object[] { newMessage }); 
         } 
         return null; 
     }

     private static void createMessage(T x) 
     {
        var SSMessage = new Message(x); 
        messageProducer.Publish(SSMessage); 
     }

     ///
     /// Create a subscription to events of Type1 and forward them on as Type2 
     /// 
     /// The type of Message that exchange is to listen for 
     /// The type of Message that exchange is to forward 
     public virtual void registerSubscription<T1,T2>() 
     { 
         if (handlerMap.ContainsKey(typeof(T2))) 
         { 
             throw new ArgumentException("Message handler has already been registered for type: " +typeof(T2).Name); 
         } 
         if (!handlerMap.ContainsValue(typeof(T1))) 
         { 
             if(!handlerMap.ContainsKey(typeof(T1))) 
             { 
                 mqHost.RegisterHandler (m => distributeMessage(m) ); 
             } 
         } 

         Type value; 

         if (handlerMap.TryGetValue(typeof(T1), out value)) 
         { 
             handlerMap[typeof(T2)] = value; 
         } 
         else 
         { 
             handlerMap[typeof(T2)] = typeof(T1); 
         } 
     } 
}

Thanks for the help on stackoverflow.
And thanks for the awesome servicestack.

Beware base.RequestContext.ToOptimizedResultUsingCache

Standard

Loving learning Servicestack, it’s a fantastic framework.  I’ve come across a gotcha this morning (and last night (and the day before!)).

The caching is really simple and can backend onto Memcached, Redis amongst others.  This is where my story starts; On the dev boxes the app caches to memached, but the production servers use Redis.  There is a subtle difference in how objects are serialised for storig in memcached versus Redis – the Redis Cache client uses a serialisation to string.

This doesn’t work for complex objects, to make complex objects work with Redis then the RedisNativeClient needs to be used.  This is fine, once you realise what is happening and our system as a RedisNativeClient available for when it is needed.

Where I got caught out was the pattern for caching web requests is (from here):

public class OrdersService : Service
{
    public object Get(CachedOrders request)
    {
        var cacheKey = "unique_key_for_this_request";
        return base.RequestContext.ToOptimizedResultUsingCache(base.Cache,cacheKey,()=> 
            {
                //Delegate is executed if item doesn't exist in cache 
                //Any response DTO returned here will be cached automatically
            });
    }
}

The bit that caught me out was the link to the requestcontext seems to control how the request is serialised rather than the response.

The work around is just to use the cache manually.

Simple HTTP Listener example

Link

Simple HTTP Listener example

I’ve been having problems with service stack only listening on one port.  Our servicestack has been moved to https but our network load balancer would rather hit an http port, therefore not attracting any SSL overhead.  Servicestack only supports one endpoint, therefore I’ve added a simple HTTP Listener to respond to http://host/ping otherwise redirect to the https service.

Servicestack, Self Hosting and SSL

Standard

I’ve been exploring http://servicestack.net recently and redesigning our systems architecture around REST principles.  It’s looking good, but a lot to learn!

Additionally the service that’s being developed will be self-hosted .NET (no IIS!) to help making  testing and automated deployment easier.

So gotchas I found already:

When setting up the apphost don’t forget the trailing slash – http://localhost:2013 won’t work http://localhost:2013/ will work!

If you want the service to listen on any host name then set the service up as http://*/ or you could add a port in http://*:2013/

If you want to do SSL then you need to read this article  http://blogs.msdn.com/b/jpsanders/archive/2009/09/29/walkthrough-using-httplistener-as-an-ssl-simple-server.aspx – I’m still working on it!