Reliable PubSub

In the last post I told you about two new features I recently pushed to both ZeroMQ and NetMQ. Last post was about the manual subscription feature. In this post I will create a reliable pubsub using the new Welcome Message feature.

I will use NetMQ through out the post but everything apply to ZeroMQ as well.

ZeroMQ guide has a great chapter on pubsub including reliable pubsub, however I will tackle a different problem, how to make sure the subscriber is always connected to a publisher (and also to the closest one).

Welcome Message

Welcome message is pretty simple, when welcome message is set on a publisher (must be of type XPub) the publisher will send a welcome message to each new subscriber. So the first message subscriber receive will be the welcome message. Subscriber must subscribe to the welcome message before trying to connect.

To set the welcome message with ZeroMQ you need to call zmq_setsockopt with ZMQ_XPUB_WELCOME_MSG and the welcome message.

Following is small example of setting welcome message with NetMQ:

using (var context = NetMQContext.Create())  
{ 
  using (var publisher = context.CreateXPublisherSocket()) 
  { 
    // Set the welcome message, this will be sent to any new subscriber
    publisher.SetWelcomeMessage("WM"); 
    publisher.Bind("tcp://*:5555"); 

    while (true) 
    { 
      // we have to handle all subscription 
      // requests in order for welcome message to work, 
      // we can just drop them afterwards
      publisher.Receive(); 
    } 
  } 
}

Client example:

using (var context = NetMQContext.Create())  
{ 
  using (var subscriber = context.CreateSubscriberSocket()) 
  { 
    subscriber.Subscribe("WM"); 
    subscriber.Connect("tcp://localhost:5555"); 
    var welcomeMessage = subscriber.ReceiveString(); 
    Console.WriteLine(welcomeMessage); 
  } 
}

Connect attempt

So you probably been asking yourself why welcome message is useful? the main problem welcome message is solving is weather a connect attempt was successful, before this feature we have to use a heartbeat message sent from the server periodically and on the client wait for the heartbeat message, which can take up to the time of the keep alive interval.

Our reliable pubsub will have a list of available publishers and will try to connect to each until one of them return welcome message in acceptable time.

Connect to closest publisher

As I mentioned we would have a list of publishers, but what if instead of trying to connect one after the other until successful connection I will connect to all of them, take the first one to reply and close the rest? In the LAN I will probably will connect to the less busy publisher or randomly and in the WAN I will probably connect to closest publisher. We achieved geo-load-balancer without expensive hardware or DNS service.

Recognize connection drop

Welcome message has another useful behavior, ZeroMQ automatically try to reconnect a dropped connection. When welcome message is set every time a reconnect happen the publisher will send a welcome message. We can use that to check if we missed messages. With the clone pattern we can use the welcome message to request full snapshot again from the server.

We would use that behavior to create our reliable pub sub.

Heartbeat

Welcome message solves the connect attempt problem, but we still need to recognize if publisher is down after the connection has been made. So if for 5 seconds no message has been received we would try to reconnect. But what if there was no message sent for 5 seconds? To solve this problem we would send a heartbeat message from the server every 2 seconds, and now if for 5 seconds we didn’t receive any message we can know for sure the publisher is down and we should reconnect.

Code

The following code is simplified for the example in the post. You can view full source code at https://github.com/somdoron/ReliablePubSub.

Server

using (NetMQContext context = NetMQContext.Create())  
{
  using (var publisherSocket = context.CreateXPublisherSocket())
  {
    publisherSocket.SetWelcomeMessage("WM");
    publisherSocket.Bind("tcp://*:6669");

    // we just drop subscriptions                     
    publisherSocket.ReceiveReady += (sender, eventArgs) => 
      publisherSocket.SkipMultipartMessage();

    Poller poller = new Poller(publisherSocket);

    // send a message every second
    NetMQTimer sendMessageTimer = new NetMQTimer(1000);
    poller.AddTimer(sendMessageTimer);
    sendMessageTimer.Elapsed += (sender, eventArgs) => 
      publisherSocket.
        SendMoreFrame("A").
        SendFrame(new Random().Next().ToString());

    // send heartbeat every two seconds
    NetMQTimer heartbeatTimer = new NetMQTimer(2000);
    poller.AddTimer(heartbeatTimer);
    heartbeatTimer.Elapsed += 
      (sender, eventArgs) => publisherSocket.SendFrame("HB");

    poller.PollTillCancelled();
  }
}

Client

Client code is a little more complicated and is split into multiple methods. Let's start with the Connect method which connect to the first socket that reply with Welcome Message.

Connect

private SubscriberSocket Connect(string[] addresses)  
{
  List<SubscriberSocket> sockets = new List<SubscriberSocket>();
  Poller poller = new Poller();

  SubscriberSocket connectedSocket = null;

  // event handler to handle message from socket
  EventHandler<NetMQSocketEventArgs> handleMessage = (sender, args) =>
  {
    if (connectedSocket == null)
    {
      connectedSocket = (SubscriberSocket)args.Socket;
      poller.Cancel();
    }
  };

  // If timeout elapsed just cancel the
  // poller without setting the connected socket
  NetMQTimer timeoutTimer = new NetMQTimer(TimeSpan.FromSeconds(5));
  timeoutTimer.Elapsed += (sender, args) => poller.Cancel();
  poller.AddTimer(timeoutTimer);

  foreach (var address in addresses)
  {
    var socket = m_context.CreateSubscriberSocket();
    sockets.Add(socket);

    socket.ReceiveReady += handleMessage;
    poller.AddSocket(socket);

    // Subscribe to welcome message
    socket.Subscribe("WM");
    socket.Connect(address);
  }

  poller.PollTillCancelled();

  // if we have a connected socket the connection attempt succeed
  if (connectedSocket != null)
  {
    // remove the connected socket from the list
    sockets.Remove(connectedSocket);

    // close all existing sockets
    CloseSockets(sockets);

    // drop the welcome message
    connectedSocket.SkipMultipartMessage();

    // subscribe to heartbeat
    connectedSocket.Subscribe("HB");

    // subscribe to our only topic
    connectedSocket.Subscribe("A");

    connectedSocket.ReceiveReady -= handleMessage;
    connectedSocket.ReceiveReady += OnSubscriberMessage;

    return connectedSocket;
  }
  else
  {
    // close all existing sockets
    CloseSockets(sockets);

    return null;
  }
}

What we are doing?

  1. Create a timeout which will cancel the poller when timeout is elapsed.
  2. Create a handler to handle the welcome messages, first time handler will be called it will cancel the poller.
  3. Create all sockets and connect.
  4. Poll until cancelled, which will either be timeout or welcome message arrived.
  5. Close all existing connections except the one connected, subscribe to heartbeat and topics and register the handler to handle messages from now on.

Run

The run method orchestrate the entire process.

public void Run(params string[] addresses)  
{
  using (m_context = NetMQContext.Create())
  {
    var subscriber = Connect(addresses);

    if (subscriber == null)
      throw new Exception("cannot connect to eny of the endpoints");

    // timeout timer, when heartbeat was not arrived for 5 seconds
    m_timeoutTimer = new NetMQTimer(TimeSpan.FromSeconds(5));
    m_timeoutTimer.Elapsed += (sender, args) =>
    {
      // timeout happend, first dispose existing subscriber
      subscriber.Dispose();
      m_poller.RemoveSocket(subscriber);

      // connect again
      subscriber = Connect(addresses);

      if (subscriber == null)
        throw new Exception("cannot connect to any of the endpoints");

      m_poller.AddSocket(subscriber);
    };

    m_poller = new Poller(subscriber);
    m_poller.AddTimer(m_timeoutTimer);

    m_poller.PollTillCancelled();
  }
}

What we are doing:

  1. Trying to connect to a server
  2. Create a timeout timer in order to recognize connection drops and try to reconnect
  3. Creating the poller and polling

OnSubscriberMessage

The only thing missing in the puzzle is the handle of subscriber messages, which we registered in the Connect method.

private void OnSubscriberMessage(object sender, NetMQSocketEventArgs e)  
{
  var topic = e.Socket.ReceiveFrameString();

  switch (topic)
  {
    case "WM":
      // welcome message, print and reset timeout timer
      Console.WriteLine("Connection drop and reconnect monitoed");
      m_timeoutTimer.Enable = false;
      m_timeoutTimer.Enable = true;
      break;
    case "HB":
      // heartbeat, we reset timeout timer
      m_timeoutTimer.Enable = false;
      m_timeoutTimer.Enable = true;
      break;
    default:
      // its a message, reset timeout timer, notify the client, 
      // for the example we just print it
      m_timeoutTimer.Enable = false;
      m_timeoutTimer.Enable = true;
      string message = e.Socket.ReceiveFrameString();
      Console.WriteLine("Message received. Topic: {0}, Message: {1}", 
        topic, message);
      break;
  }
}

What we are doing:

  1. Get the topic of the message
  2. Check which topic it is
    • When it is welcome message we reset the timeout timer and notify the user on the connection drop
    • When it is an heart beat we only reset the timer
    • When it is a message we reset the timeout timer and notify the user

Summary

So what Reliable PubSub really does:

  • Trying to connect to multiple servers and pick the first one that reply with Welcome Message. Can be used as geo load balancer or to connect to less busy server.
  • Recognize temporary connection drop (when Welcome Message arrived while the connection is up), the user can use that to request full snapshot, take a look at the Clone pattern from zeromq guide.
  • Reconnect when the server is dead, will probably connect to the next closest server.

The Reliable PubSub is a good fit for:

  • Financial Market Data (Forex/Stock Quotes)
  • Social Stream
  • Publishing changes to a client

If you implement the pattern in another language please send it to me and I will add a link to the post.

You can find the complete example and another more complete implementation at:
https://github.com/somdoron/ReliablePubSub.

Token-Based PubSub

I recently pushed two new features to both ZeroMQ and NetMQ: manual subscriptions and welcome message, both for the XPub socket. In this post, I will explore what we can do with the manual subscriptions feature. The next post will cover the welcome message.

Neither feature is part of any release of ZeroMQ or NetMQ yet, so you will have to compile from the source code to use the new features.

I will use NetMQ throughout the post, but you can implement the examples using any ZeroMQ binding.

Manual Subscriptions

When the Manual Subscriptions feature is enabled, subscription (or unsubscribe) requests are not added to (or removed from) the internal XPub subscriptions store. They will, however, be available to read as messages. So, after reading a subscription request from XPub socket, we can decide how to handle the socket. If we do nothing (and drop the subscription), the subscriber will not receive anything, including messages that match the subscription. If we want to confirm the subscription as is, we need to call the subscribe method on the XPub socket with the subscription (with ZeroMQ, we need to call the setsockopt with ZMQ_SUBSCRIBE) the same as we subscribe on the Sub socket. Following is an example that confirms each subscription:

using (var context = NetMQContext.Create())  
{
  string[] topics = new string[] { "A", "B", "C" };
  Random random = new Random();

  using (var publisher = context.CreateXPublisherSocket())
    {
    // Set publisher to manual subscriptions mode
    publisher.Options.ManualPublisher = true;
    publisher.Bind("tcp://*:5556");
    publisher.ReceiveReady += (sender, eventArgs) =>
    {
      var message = publisher.ReceiveString();

      // we only handle subscription requests, unsubscription and any
      // other type of messages will be dropped
      if (message[0] == (char) 1)
      {
        // calling Subscribe to add subscription to the last subscriber
        publisher.Subscribe(message.Substring(1));
      }
    };

    NetMQTimer sendTimer = new NetMQTimer(1000);
    sendTimer.Elapsed += (sender, eventArgs) =>
    {
      // sends a message every second with random topic and current time
      publisher.
        SendMore(topics[random.Next(3)]).
        Send(DateTime.Now.ToString());
    };

    Poller poller = new Poller();
    poller.AddSocket(publisher);
    poller.AddTimer(sendTimer);
    poller.PollTillCancelled();
  }
}

As noted in the example, we confirm all subscription requests, but we drop any other type of request. The subscriber will not be able to unsubscribe.

Permission-Based PubSub

With permission-based PubSub, we first check that the subscriber has permission to subscribe to the topic and only then call the subscribe method.

Permission-based subscriptions have one main issue--because we can only use the Subscribe method on the message we are currently handling, the permission check has to be synchronous. If all permissions are stored in memory, that is not a problem. However, usually we need to check with the database or another service and blocking the publisher thread is not an option. The next pattern solves this issue.

Permission-based subscriptions are only relevant to ZeroMQ and not NetMQ, because NetMQ does not currently support ZMTPv3 and ZAP (authentication).

Token-Based PubSub

With token-based subscriptions, the subscriber uses tokens instead of subscriptions. The server then decodes those tokens and decides to which subscriptions to subscribe the subscriber. To acquire the tokens, the subscriber needs to make a request to an authorization service that generates them. Following is the typical workflow:

  1. The client sends a request with username and password to the authorization service.
  2. The authorization service processes the request and replies to the client with a token.
  3. The client connects to the publisher and subscribes with the token.
  4. The publisher decodes the token, confirming its validity, and adds the relevant subscriptions.

XSub Instead of Sub

For a token-based subscription, we cannot use a Sub socket and must use an XSub socket. Sub sockets filter messages according to the subscriptions. Because we use tokens instead of subscriptions, the subscriber will filter all messages. An XSub socket does not filter messages, so we will use that one.

Generating Tokens

Generating tokens is beyond the scope of this post. I'm certain the web is full of examples. For our demo, I will use HMACSHA1, where the publisher and authorization have to share the same key.

Full Source Code

You can find the full source code of the example at: https://github.com/somdoron/TokenPubSub.

Authorization Service

static void Main(string[] args)  
{
  // this key should be shared between authorization server and publisher
  const string Key = "SecretKey";

  using (var context = NetMQContext.Create())
  {
    using (var response = context.CreateResponseSocket())
    {
      response.Bind("tcp://*:5557");

      while (true)
      {
        var requestMessage = response.ReceiveMessage();

        string command = requestMessage.Pop().ConvertToString();

        if (command == AuthorizationProtocol.GetTokenCommand &&
            requestMessage.FrameCount == 3)
        {
          string username = requestMessage.Pop().ConvertToString();
          string password = requestMessage.Pop().ConvertToString();
          string subscription = requestMessage.Pop().ConvertToString();

          // TODO: validating username and password is not part
          // of the example
          // TODO: validate that the user has permission to
          // the subscription is not part of the example

          Console.WriteLine("Received GetTokenCommand {0} {1} {2}",
              username, password, subscription);

          // Create a token
          Token token = new Token(subscription, Key);

          // send token to the client
          response.
              SendMore(AuthorizationProtocol.SuccessReply).
              Send(token.Serialize());
        }
        else
        {
          // unsupported command
          response.Send(AuthorizationProtocol.ErrorReply);
        }
      }
    }
  }
}

Publisher

static void Main(string[] args)  
{
  // this key should be shared between authorization server and publisher
  const string Key = "SecretKey";

  string[] symbols = new[] {"EURUSD", "GBPUSD", "EURJPY",
    "USDJPY", "EURGBP", "GBPJPY"};

  Random random = new Random();

  using (var context = NetMQContext.Create())
  {
    using (var publisher = context.CreateXPublisherSocket())
    {
      publisher.Options.ManualPublisher = true;
      publisher.Bind("tcp://*:5558");
      publisher.ReceiveReady += (sender, eventArgs) =>
      {
        byte[] subscriptionBytes = publisher.Receive();

        // first byte indicate if it a subscription or unsubscription
        if (subscriptionBytes[0] == 1 || subscriptionBytes[0] == 0)
        {
          // the rest of the bytes is the token, convert them to string
          string serializedToken = Encoding.ASCII.GetString(
              subscriptionBytes, 1, subscriptionBytes.Length - 1);

          // deserialize the token
          Token token;

          if (Token.TryDeserialize(serializedToken, out token))
          {
            // Check if the token is valid
            if (token.Validate(Key))
            {                                                        
              if (subscriptionBytes[0] == 1)
              {
                Console.WriteLine("Subscription request {0}",
                    token.Subscription);
                publisher.Subscribe(token.Subscription);
              }
              else
              {
                publisher.Unsubscribe(token.Subscription);
              }
            }
            else
            {
              Console.WriteLine("Invalid token {0}",
                  serializedToken);
            }
          }
        }
      };

      // Some fake publishing
      NetMQTimer publishTimer = new NetMQTimer(100);
      publishTimer.Elapsed += (sender, eventArgs) =>
      {
        publisher.
            SendMore(symbols[random.Next(symbols.Length)]).
            Send(random.Next().ToString());
      };

      Poller poller = new Poller();
      poller.AddSocket(publisher);
      poller.AddTimer(publishTimer);
      poller.PollTillCancelled();
    }
  }
}

Client

static void Main(string[] args)  
{
  string username = args[0];
  string password = args[1];
  string subscription = args[2].ToUpper();

  using (var context = NetMQContext.Create())
  {
    string token;

    // first we try to get a token
    using (var request = context.CreateRequestSocket())
    {
      request.Connect("tcp://localhost:" + AuthorizationProtocol.Port);

      // send token request
      request.
          SendMore(AuthorizationProtocol.GetTokenCommand).
          SendMore(username).
          SendMore(password).
          Send(subscription);

      string result = request.ReceiveString();

      if (result == AuthorizationProtocol.SuccessReply)
      {
        token = request.ReceiveString();
      }
      else
      {
        throw new Exception("Invalid username or password");
      }
    }

    // we must use XSUB because
    using (var subscriber = context.CreateXSubscriberSocket())
    {
      subscriber.Connect("tcp://localhost:" + StreamingProtocol.Port);

      // create the subscription message
      byte[] subscriptionMessage = new byte[token.Length + 1];
      subscriptionMessage[0] = 1;
      Encoding.ASCII.GetBytes(token, 0, token.Length, subscriptionMessage, 1);
      subscriber.Send(subscriptionMessage);

      while (true)
      {
        string symbol = subscriber.ReceiveString();
        string price = subscriber.ReceiveString();

        Console.WriteLine("{0} {1}", symbol, price);
      }
    }
  }
}

Token

public class Token  
{
  public Token(string subscription, string key)
  {
    Subscription = subscription;
    MAC = GenerateMAC(subscription, key);
  }

  public Token()
  {

  }

  public string Subscription { get; set; }
  public string MAC { get; set; }

  private static string GenerateMAC(string subscription, string key)
  {
    HMACSHA1 hmac = new HMACSHA1(Encoding.ASCII.GetBytes(key));
    byte[] hmacBytes = hmac.ComputeHash(Encoding.ASCII.GetBytes(subscription));
    return Convert.ToBase64String(hmacBytes);
  }

  public string Serialize()
  {
    return JsonConvert.SerializeObject(this);
  }

  public bool Validate(string key)
  {
    return MAC.Equals(GenerateMAC(Subscription, key));
  }

  public static bool TryDeserialize(string json, out Token token)
  {
    try
    {
      token = JsonConvert.DeserializeObject<Token>(json);
      return true;
    }
    catch (Exception)
    {
      token = null;
      return false;
    }
  }
}

Token Never Expires

The token in the example never expires. In reality, this is a big concern. Tokens should always have an expiration date. If we use an expiration date on a token, the client should continuously request a new token when the old token expires and subscribe with the replacement token. If the client does not request a new token, they will stop receiving any messages on the next connection drop (ZeroMQ automatically reconnects and sends subscriptions on reconnect. As a result, the publisher would reject the expired token and the client won't receive any messages).

Summary

Before the advent of the manual subscription, it was impossible to create a secure pub-sub using the Pub and Sub sockets. You could create it using a dealer-router, but you had to manage the subscription store yourself. Manual subscription features make this possible.