Reliable PubSub

In the last post I told you about two new features I recently pushed to both ZeroMQ and NetMQ. Last post was about the manual subscription feature. In this post I will create a reliable pubsub using the new Welcome Message feature.

I will use NetMQ through out the post but everything apply to ZeroMQ as well.

ZeroMQ guide has a great chapter on pubsub including reliable pubsub, however I will tackle a different problem, how to make sure the subscriber is always connected to a publisher (and also to the closest one).

Welcome Message

Welcome message is pretty simple, when welcome message is set on a publisher (must be of type XPub) the publisher will send a welcome message to each new subscriber. So the first message subscriber receive will be the welcome message. Subscriber must subscribe to the welcome message before trying to connect.

To set the welcome message with ZeroMQ you need to call zmq_setsockopt with ZMQ_XPUB_WELCOME_MSG and the welcome message.

Following is small example of setting welcome message with NetMQ:

using (var context = NetMQContext.Create())  
{ 
  using (var publisher = context.CreateXPublisherSocket()) 
  { 
    // Set the welcome message, this will be sent to any new subscriber
    publisher.SetWelcomeMessage("WM"); 
    publisher.Bind("tcp://*:5555"); 

    while (true) 
    { 
      // we have to handle all subscription 
      // requests in order for welcome message to work, 
      // we can just drop them afterwards
      publisher.Receive(); 
    } 
  } 
}

Client example:

using (var context = NetMQContext.Create())  
{ 
  using (var subscriber = context.CreateSubscriberSocket()) 
  { 
    subscriber.Subscribe("WM"); 
    subscriber.Connect("tcp://localhost:5555"); 
    var welcomeMessage = subscriber.ReceiveString(); 
    Console.WriteLine(welcomeMessage); 
  } 
}

Connect attempt

So you probably been asking yourself why welcome message is useful? the main problem welcome message is solving is weather a connect attempt was successful, before this feature we have to use a heartbeat message sent from the server periodically and on the client wait for the heartbeat message, which can take up to the time of the keep alive interval.

Our reliable pubsub will have a list of available publishers and will try to connect to each until one of them return welcome message in acceptable time.

Connect to closest publisher

As I mentioned we would have a list of publishers, but what if instead of trying to connect one after the other until successful connection I will connect to all of them, take the first one to reply and close the rest? In the LAN I will probably will connect to the less busy publisher or randomly and in the WAN I will probably connect to closest publisher. We achieved geo-load-balancer without expensive hardware or DNS service.

Recognize connection drop

Welcome message has another useful behavior, ZeroMQ automatically try to reconnect a dropped connection. When welcome message is set every time a reconnect happen the publisher will send a welcome message. We can use that to check if we missed messages. With the clone pattern we can use the welcome message to request full snapshot again from the server.

We would use that behavior to create our reliable pub sub.

Heartbeat

Welcome message solves the connect attempt problem, but we still need to recognize if publisher is down after the connection has been made. So if for 5 seconds no message has been received we would try to reconnect. But what if there was no message sent for 5 seconds? To solve this problem we would send a heartbeat message from the server every 2 seconds, and now if for 5 seconds we didn’t receive any message we can know for sure the publisher is down and we should reconnect.

Code

The following code is simplified for the example in the post. You can view full source code at https://github.com/somdoron/ReliablePubSub.

Server

using (NetMQContext context = NetMQContext.Create())  
{
  using (var publisherSocket = context.CreateXPublisherSocket())
  {
    publisherSocket.SetWelcomeMessage("WM");
    publisherSocket.Bind("tcp://*:6669");

    // we just drop subscriptions                     
    publisherSocket.ReceiveReady += (sender, eventArgs) => 
      publisherSocket.SkipMultipartMessage();

    Poller poller = new Poller(publisherSocket);

    // send a message every second
    NetMQTimer sendMessageTimer = new NetMQTimer(1000);
    poller.AddTimer(sendMessageTimer);
    sendMessageTimer.Elapsed += (sender, eventArgs) => 
      publisherSocket.
        SendMoreFrame("A").
        SendFrame(new Random().Next().ToString());

    // send heartbeat every two seconds
    NetMQTimer heartbeatTimer = new NetMQTimer(2000);
    poller.AddTimer(heartbeatTimer);
    heartbeatTimer.Elapsed += 
      (sender, eventArgs) => publisherSocket.SendFrame("HB");

    poller.PollTillCancelled();
  }
}

Client

Client code is a little more complicated and is split into multiple methods. Let's start with the Connect method which connect to the first socket that reply with Welcome Message.

Connect

private SubscriberSocket Connect(string[] addresses)  
{
  List<SubscriberSocket> sockets = new List<SubscriberSocket>();
  Poller poller = new Poller();

  SubscriberSocket connectedSocket = null;

  // event handler to handle message from socket
  EventHandler<NetMQSocketEventArgs> handleMessage = (sender, args) =>
  {
    if (connectedSocket == null)
    {
      connectedSocket = (SubscriberSocket)args.Socket;
      poller.Cancel();
    }
  };

  // If timeout elapsed just cancel the
  // poller without setting the connected socket
  NetMQTimer timeoutTimer = new NetMQTimer(TimeSpan.FromSeconds(5));
  timeoutTimer.Elapsed += (sender, args) => poller.Cancel();
  poller.AddTimer(timeoutTimer);

  foreach (var address in addresses)
  {
    var socket = m_context.CreateSubscriberSocket();
    sockets.Add(socket);

    socket.ReceiveReady += handleMessage;
    poller.AddSocket(socket);

    // Subscribe to welcome message
    socket.Subscribe("WM");
    socket.Connect(address);
  }

  poller.PollTillCancelled();

  // if we have a connected socket the connection attempt succeed
  if (connectedSocket != null)
  {
    // remove the connected socket from the list
    sockets.Remove(connectedSocket);

    // close all existing sockets
    CloseSockets(sockets);

    // drop the welcome message
    connectedSocket.SkipMultipartMessage();

    // subscribe to heartbeat
    connectedSocket.Subscribe("HB");

    // subscribe to our only topic
    connectedSocket.Subscribe("A");

    connectedSocket.ReceiveReady -= handleMessage;
    connectedSocket.ReceiveReady += OnSubscriberMessage;

    return connectedSocket;
  }
  else
  {
    // close all existing sockets
    CloseSockets(sockets);

    return null;
  }
}

What we are doing?

  1. Create a timeout which will cancel the poller when timeout is elapsed.
  2. Create a handler to handle the welcome messages, first time handler will be called it will cancel the poller.
  3. Create all sockets and connect.
  4. Poll until cancelled, which will either be timeout or welcome message arrived.
  5. Close all existing connections except the one connected, subscribe to heartbeat and topics and register the handler to handle messages from now on.

Run

The run method orchestrate the entire process.

public void Run(params string[] addresses)  
{
  using (m_context = NetMQContext.Create())
  {
    var subscriber = Connect(addresses);

    if (subscriber == null)
      throw new Exception("cannot connect to eny of the endpoints");

    // timeout timer, when heartbeat was not arrived for 5 seconds
    m_timeoutTimer = new NetMQTimer(TimeSpan.FromSeconds(5));
    m_timeoutTimer.Elapsed += (sender, args) =>
    {
      // timeout happend, first dispose existing subscriber
      subscriber.Dispose();
      m_poller.RemoveSocket(subscriber);

      // connect again
      subscriber = Connect(addresses);

      if (subscriber == null)
        throw new Exception("cannot connect to any of the endpoints");

      m_poller.AddSocket(subscriber);
    };

    m_poller = new Poller(subscriber);
    m_poller.AddTimer(m_timeoutTimer);

    m_poller.PollTillCancelled();
  }
}

What we are doing:

  1. Trying to connect to a server
  2. Create a timeout timer in order to recognize connection drops and try to reconnect
  3. Creating the poller and polling

OnSubscriberMessage

The only thing missing in the puzzle is the handle of subscriber messages, which we registered in the Connect method.

private void OnSubscriberMessage(object sender, NetMQSocketEventArgs e)  
{
  var topic = e.Socket.ReceiveFrameString();

  switch (topic)
  {
    case "WM":
      // welcome message, print and reset timeout timer
      Console.WriteLine("Connection drop and reconnect monitoed");
      m_timeoutTimer.Enable = false;
      m_timeoutTimer.Enable = true;
      break;
    case "HB":
      // heartbeat, we reset timeout timer
      m_timeoutTimer.Enable = false;
      m_timeoutTimer.Enable = true;
      break;
    default:
      // its a message, reset timeout timer, notify the client, 
      // for the example we just print it
      m_timeoutTimer.Enable = false;
      m_timeoutTimer.Enable = true;
      string message = e.Socket.ReceiveFrameString();
      Console.WriteLine("Message received. Topic: {0}, Message: {1}", 
        topic, message);
      break;
  }
}

What we are doing:

  1. Get the topic of the message
  2. Check which topic it is
    • When it is welcome message we reset the timeout timer and notify the user on the connection drop
    • When it is an heart beat we only reset the timer
    • When it is a message we reset the timeout timer and notify the user

Summary

So what Reliable PubSub really does:

  • Trying to connect to multiple servers and pick the first one that reply with Welcome Message. Can be used as geo load balancer or to connect to less busy server.
  • Recognize temporary connection drop (when Welcome Message arrived while the connection is up), the user can use that to request full snapshot, take a look at the Clone pattern from zeromq guide.
  • Reconnect when the server is dead, will probably connect to the next closest server.

The Reliable PubSub is a good fit for:

  • Financial Market Data (Forex/Stock Quotes)
  • Social Stream
  • Publishing changes to a client

If you implement the pattern in another language please send it to me and I will add a link to the post.

You can find the complete example and another more complete implementation at:
https://github.com/somdoron/ReliablePubSub.