Token-Based PubSub

I recently pushed two new features to both ZeroMQ and NetMQ: manual subscriptions and welcome message, both for the XPub socket. In this post, I will explore what we can do with the manual subscriptions feature. The next post will cover the welcome message.

Neither feature is part of any release of ZeroMQ or NetMQ yet, so you will have to compile from the source code to use the new features.

I will use NetMQ throughout the post, but you can implement the examples using any ZeroMQ binding.

Manual Subscriptions

When the Manual Subscriptions feature is enabled, subscription (or unsubscribe) requests are not added to (or removed from) the internal XPub subscriptions store. They will, however, be available to read as messages. So, after reading a subscription request from XPub socket, we can decide how to handle the socket. If we do nothing (and drop the subscription), the subscriber will not receive anything, including messages that match the subscription. If we want to confirm the subscription as is, we need to call the subscribe method on the XPub socket with the subscription (with ZeroMQ, we need to call the setsockopt with ZMQ_SUBSCRIBE) the same as we subscribe on the Sub socket. Following is an example that confirms each subscription:

using (var context = NetMQContext.Create())  
{
  string[] topics = new string[] { "A", "B", "C" };
  Random random = new Random();

  using (var publisher = context.CreateXPublisherSocket())
    {
    // Set publisher to manual subscriptions mode
    publisher.Options.ManualPublisher = true;
    publisher.Bind("tcp://*:5556");
    publisher.ReceiveReady += (sender, eventArgs) =>
    {
      var message = publisher.ReceiveString();

      // we only handle subscription requests, unsubscription and any
      // other type of messages will be dropped
      if (message[0] == (char) 1)
      {
        // calling Subscribe to add subscription to the last subscriber
        publisher.Subscribe(message.Substring(1));
      }
    };

    NetMQTimer sendTimer = new NetMQTimer(1000);
    sendTimer.Elapsed += (sender, eventArgs) =>
    {
      // sends a message every second with random topic and current time
      publisher.
        SendMore(topics[random.Next(3)]).
        Send(DateTime.Now.ToString());
    };

    Poller poller = new Poller();
    poller.AddSocket(publisher);
    poller.AddTimer(sendTimer);
    poller.PollTillCancelled();
  }
}

As noted in the example, we confirm all subscription requests, but we drop any other type of request. The subscriber will not be able to unsubscribe.

Permission-Based PubSub

With permission-based PubSub, we first check that the subscriber has permission to subscribe to the topic and only then call the subscribe method.

Permission-based subscriptions have one main issue--because we can only use the Subscribe method on the message we are currently handling, the permission check has to be synchronous. If all permissions are stored in memory, that is not a problem. However, usually we need to check with the database or another service and blocking the publisher thread is not an option. The next pattern solves this issue.

Permission-based subscriptions are only relevant to ZeroMQ and not NetMQ, because NetMQ does not currently support ZMTPv3 and ZAP (authentication).

Token-Based PubSub

With token-based subscriptions, the subscriber uses tokens instead of subscriptions. The server then decodes those tokens and decides to which subscriptions to subscribe the subscriber. To acquire the tokens, the subscriber needs to make a request to an authorization service that generates them. Following is the typical workflow:

  1. The client sends a request with username and password to the authorization service.
  2. The authorization service processes the request and replies to the client with a token.
  3. The client connects to the publisher and subscribes with the token.
  4. The publisher decodes the token, confirming its validity, and adds the relevant subscriptions.

XSub Instead of Sub

For a token-based subscription, we cannot use a Sub socket and must use an XSub socket. Sub sockets filter messages according to the subscriptions. Because we use tokens instead of subscriptions, the subscriber will filter all messages. An XSub socket does not filter messages, so we will use that one.

Generating Tokens

Generating tokens is beyond the scope of this post. I'm certain the web is full of examples. For our demo, I will use HMACSHA1, where the publisher and authorization have to share the same key.

Full Source Code

You can find the full source code of the example at: https://github.com/somdoron/TokenPubSub.

Authorization Service

static void Main(string[] args)  
{
  // this key should be shared between authorization server and publisher
  const string Key = "SecretKey";

  using (var context = NetMQContext.Create())
  {
    using (var response = context.CreateResponseSocket())
    {
      response.Bind("tcp://*:5557");

      while (true)
      {
        var requestMessage = response.ReceiveMessage();

        string command = requestMessage.Pop().ConvertToString();

        if (command == AuthorizationProtocol.GetTokenCommand &&
            requestMessage.FrameCount == 3)
        {
          string username = requestMessage.Pop().ConvertToString();
          string password = requestMessage.Pop().ConvertToString();
          string subscription = requestMessage.Pop().ConvertToString();

          // TODO: validating username and password is not part
          // of the example
          // TODO: validate that the user has permission to
          // the subscription is not part of the example

          Console.WriteLine("Received GetTokenCommand {0} {1} {2}",
              username, password, subscription);

          // Create a token
          Token token = new Token(subscription, Key);

          // send token to the client
          response.
              SendMore(AuthorizationProtocol.SuccessReply).
              Send(token.Serialize());
        }
        else
        {
          // unsupported command
          response.Send(AuthorizationProtocol.ErrorReply);
        }
      }
    }
  }
}

Publisher

static void Main(string[] args)  
{
  // this key should be shared between authorization server and publisher
  const string Key = "SecretKey";

  string[] symbols = new[] {"EURUSD", "GBPUSD", "EURJPY",
    "USDJPY", "EURGBP", "GBPJPY"};

  Random random = new Random();

  using (var context = NetMQContext.Create())
  {
    using (var publisher = context.CreateXPublisherSocket())
    {
      publisher.Options.ManualPublisher = true;
      publisher.Bind("tcp://*:5558");
      publisher.ReceiveReady += (sender, eventArgs) =>
      {
        byte[] subscriptionBytes = publisher.Receive();

        // first byte indicate if it a subscription or unsubscription
        if (subscriptionBytes[0] == 1 || subscriptionBytes[0] == 0)
        {
          // the rest of the bytes is the token, convert them to string
          string serializedToken = Encoding.ASCII.GetString(
              subscriptionBytes, 1, subscriptionBytes.Length - 1);

          // deserialize the token
          Token token;

          if (Token.TryDeserialize(serializedToken, out token))
          {
            // Check if the token is valid
            if (token.Validate(Key))
            {                                                        
              if (subscriptionBytes[0] == 1)
              {
                Console.WriteLine("Subscription request {0}",
                    token.Subscription);
                publisher.Subscribe(token.Subscription);
              }
              else
              {
                publisher.Unsubscribe(token.Subscription);
              }
            }
            else
            {
              Console.WriteLine("Invalid token {0}",
                  serializedToken);
            }
          }
        }
      };

      // Some fake publishing
      NetMQTimer publishTimer = new NetMQTimer(100);
      publishTimer.Elapsed += (sender, eventArgs) =>
      {
        publisher.
            SendMore(symbols[random.Next(symbols.Length)]).
            Send(random.Next().ToString());
      };

      Poller poller = new Poller();
      poller.AddSocket(publisher);
      poller.AddTimer(publishTimer);
      poller.PollTillCancelled();
    }
  }
}

Client

static void Main(string[] args)  
{
  string username = args[0];
  string password = args[1];
  string subscription = args[2].ToUpper();

  using (var context = NetMQContext.Create())
  {
    string token;

    // first we try to get a token
    using (var request = context.CreateRequestSocket())
    {
      request.Connect("tcp://localhost:" + AuthorizationProtocol.Port);

      // send token request
      request.
          SendMore(AuthorizationProtocol.GetTokenCommand).
          SendMore(username).
          SendMore(password).
          Send(subscription);

      string result = request.ReceiveString();

      if (result == AuthorizationProtocol.SuccessReply)
      {
        token = request.ReceiveString();
      }
      else
      {
        throw new Exception("Invalid username or password");
      }
    }

    // we must use XSUB because
    using (var subscriber = context.CreateXSubscriberSocket())
    {
      subscriber.Connect("tcp://localhost:" + StreamingProtocol.Port);

      // create the subscription message
      byte[] subscriptionMessage = new byte[token.Length + 1];
      subscriptionMessage[0] = 1;
      Encoding.ASCII.GetBytes(token, 0, token.Length, subscriptionMessage, 1);
      subscriber.Send(subscriptionMessage);

      while (true)
      {
        string symbol = subscriber.ReceiveString();
        string price = subscriber.ReceiveString();

        Console.WriteLine("{0} {1}", symbol, price);
      }
    }
  }
}

Token

public class Token  
{
  public Token(string subscription, string key)
  {
    Subscription = subscription;
    MAC = GenerateMAC(subscription, key);
  }

  public Token()
  {

  }

  public string Subscription { get; set; }
  public string MAC { get; set; }

  private static string GenerateMAC(string subscription, string key)
  {
    HMACSHA1 hmac = new HMACSHA1(Encoding.ASCII.GetBytes(key));
    byte[] hmacBytes = hmac.ComputeHash(Encoding.ASCII.GetBytes(subscription));
    return Convert.ToBase64String(hmacBytes);
  }

  public string Serialize()
  {
    return JsonConvert.SerializeObject(this);
  }

  public bool Validate(string key)
  {
    return MAC.Equals(GenerateMAC(Subscription, key));
  }

  public static bool TryDeserialize(string json, out Token token)
  {
    try
    {
      token = JsonConvert.DeserializeObject<Token>(json);
      return true;
    }
    catch (Exception)
    {
      token = null;
      return false;
    }
  }
}

Token Never Expires

The token in the example never expires. In reality, this is a big concern. Tokens should always have an expiration date. If we use an expiration date on a token, the client should continuously request a new token when the old token expires and subscribe with the replacement token. If the client does not request a new token, they will stop receiving any messages on the next connection drop (ZeroMQ automatically reconnects and sends subscriptions on reconnect. As a result, the publisher would reject the expired token and the client won't receive any messages).

Summary

Before the advent of the manual subscription, it was impossible to create a secure pub-sub using the Pub and Sub sockets. You could create it using a dealer-router, but you had to manage the subscription store yourself. Manual subscription features make this possible.

comments powered by Disqus