Reliable PubSub

In the last post I told you about two new features I recently pushed to both ZeroMQ and NetMQ. Last post was about the manual subscription feature. In this post I will create a reliable pubsub using the new Welcome Message feature.

I will use NetMQ through out the post but everything apply to ZeroMQ as well.

ZeroMQ guide has a great chapter on pubsub including reliable pubsub, however I will tackle a different problem, how to make sure the subscriber is always connected to a publisher (and also to the closest one).

Welcome Message

Welcome message is pretty simple, when welcome message is set on a publisher (must be of type XPub) the publisher will send a welcome message to each new subscriber. So the first message subscriber receive will be the welcome message. Subscriber must subscribe to the welcome message before trying to connect.

To set the welcome message with ZeroMQ you need to call zmq_setsockopt with ZMQ_XPUB_WELCOME_MSG and the welcome message.

Following is small example of setting welcome message with NetMQ:

using (var context = NetMQContext.Create())  
{ 
  using (var publisher = context.CreateXPublisherSocket()) 
  { 
    // Set the welcome message, this will be sent to any new subscriber
    publisher.SetWelcomeMessage("WM"); 
    publisher.Bind("tcp://*:5555"); 

    while (true) 
    { 
      // we have to handle all subscription 
      // requests in order for welcome message to work, 
      // we can just drop them afterwards
      publisher.Receive(); 
    } 
  } 
}

Client example:

using (var context = NetMQContext.Create())  
{ 
  using (var subscriber = context.CreateSubscriberSocket()) 
  { 
    subscriber.Subscribe("WM"); 
    subscriber.Connect("tcp://localhost:5555"); 
    var welcomeMessage = subscriber.ReceiveString(); 
    Console.WriteLine(welcomeMessage); 
  } 
}

Connect attempt

So you probably been asking yourself why welcome message is useful? the main problem welcome message is solving is weather a connect attempt was successful, before this feature we have to use a heartbeat message sent from the server periodically and on the client wait for the heartbeat message, which can take up to the time of the keep alive interval.

Our reliable pubsub will have a list of available publishers and will try to connect to each until one of them return welcome message in acceptable time.

Connect to closest publisher

As I mentioned we would have a list of publishers, but what if instead of trying to connect one after the other until successful connection I will connect to all of them, take the first one to reply and close the rest? In the LAN I will probably will connect to the less busy publisher or randomly and in the WAN I will probably connect to closest publisher. We achieved geo-load-balancer without expensive hardware or DNS service.

Recognize connection drop

Welcome message has another useful behavior, ZeroMQ automatically try to reconnect a dropped connection. When welcome message is set every time a reconnect happen the publisher will send a welcome message. We can use that to check if we missed messages. With the clone pattern we can use the welcome message to request full snapshot again from the server.

We would use that behavior to create our reliable pub sub.

Heartbeat

Welcome message solves the connect attempt problem, but we still need to recognize if publisher is down after the connection has been made. So if for 5 seconds no message has been received we would try to reconnect. But what if there was no message sent for 5 seconds? To solve this problem we would send a heartbeat message from the server every 2 seconds, and now if for 5 seconds we didn’t receive any message we can know for sure the publisher is down and we should reconnect.

Code

The following code is simplified for the example in the post. You can view full source code at https://github.com/somdoron/ReliablePubSub.

Server

using (NetMQContext context = NetMQContext.Create())  
{
  using (var publisherSocket = context.CreateXPublisherSocket())
  {
    publisherSocket.SetWelcomeMessage("WM");
    publisherSocket.Bind("tcp://*:6669");

    // we just drop subscriptions                     
    publisherSocket.ReceiveReady += (sender, eventArgs) => 
      publisherSocket.SkipMultipartMessage();

    Poller poller = new Poller(publisherSocket);

    // send a message every second
    NetMQTimer sendMessageTimer = new NetMQTimer(1000);
    poller.AddTimer(sendMessageTimer);
    sendMessageTimer.Elapsed += (sender, eventArgs) => 
      publisherSocket.
        SendMoreFrame("A").
        SendFrame(new Random().Next().ToString());

    // send heartbeat every two seconds
    NetMQTimer heartbeatTimer = new NetMQTimer(2000);
    poller.AddTimer(heartbeatTimer);
    heartbeatTimer.Elapsed += 
      (sender, eventArgs) => publisherSocket.SendFrame("HB");

    poller.PollTillCancelled();
  }
}

Client

Client code is a little more complicated and is split into multiple methods. Let's start with the Connect method which connect to the first socket that reply with Welcome Message.

Connect

private SubscriberSocket Connect(string[] addresses)  
{
  List<SubscriberSocket> sockets = new List<SubscriberSocket>();
  Poller poller = new Poller();

  SubscriberSocket connectedSocket = null;

  // event handler to handle message from socket
  EventHandler<NetMQSocketEventArgs> handleMessage = (sender, args) =>
  {
    if (connectedSocket == null)
    {
      connectedSocket = (SubscriberSocket)args.Socket;
      poller.Cancel();
    }
  };

  // If timeout elapsed just cancel the
  // poller without setting the connected socket
  NetMQTimer timeoutTimer = new NetMQTimer(TimeSpan.FromSeconds(5));
  timeoutTimer.Elapsed += (sender, args) => poller.Cancel();
  poller.AddTimer(timeoutTimer);

  foreach (var address in addresses)
  {
    var socket = m_context.CreateSubscriberSocket();
    sockets.Add(socket);

    socket.ReceiveReady += handleMessage;
    poller.AddSocket(socket);

    // Subscribe to welcome message
    socket.Subscribe("WM");
    socket.Connect(address);
  }

  poller.PollTillCancelled();

  // if we have a connected socket the connection attempt succeed
  if (connectedSocket != null)
  {
    // remove the connected socket from the list
    sockets.Remove(connectedSocket);

    // close all existing sockets
    CloseSockets(sockets);

    // drop the welcome message
    connectedSocket.SkipMultipartMessage();

    // subscribe to heartbeat
    connectedSocket.Subscribe("HB");

    // subscribe to our only topic
    connectedSocket.Subscribe("A");

    connectedSocket.ReceiveReady -= handleMessage;
    connectedSocket.ReceiveReady += OnSubscriberMessage;

    return connectedSocket;
  }
  else
  {
    // close all existing sockets
    CloseSockets(sockets);

    return null;
  }
}

What we are doing?

  1. Create a timeout which will cancel the poller when timeout is elapsed.
  2. Create a handler to handle the welcome messages, first time handler will be called it will cancel the poller.
  3. Create all sockets and connect.
  4. Poll until cancelled, which will either be timeout or welcome message arrived.
  5. Close all existing connections except the one connected, subscribe to heartbeat and topics and register the handler to handle messages from now on.

Run

The run method orchestrate the entire process.

public void Run(params string[] addresses)  
{
  using (m_context = NetMQContext.Create())
  {
    var subscriber = Connect(addresses);

    if (subscriber == null)
      throw new Exception("cannot connect to eny of the endpoints");

    // timeout timer, when heartbeat was not arrived for 5 seconds
    m_timeoutTimer = new NetMQTimer(TimeSpan.FromSeconds(5));
    m_timeoutTimer.Elapsed += (sender, args) =>
    {
      // timeout happend, first dispose existing subscriber
      subscriber.Dispose();
      m_poller.RemoveSocket(subscriber);

      // connect again
      subscriber = Connect(addresses);

      if (subscriber == null)
        throw new Exception("cannot connect to any of the endpoints");

      m_poller.AddSocket(subscriber);
    };

    m_poller = new Poller(subscriber);
    m_poller.AddTimer(m_timeoutTimer);

    m_poller.PollTillCancelled();
  }
}

What we are doing:

  1. Trying to connect to a server
  2. Create a timeout timer in order to recognize connection drops and try to reconnect
  3. Creating the poller and polling

OnSubscriberMessage

The only thing missing in the puzzle is the handle of subscriber messages, which we registered in the Connect method.

private void OnSubscriberMessage(object sender, NetMQSocketEventArgs e)  
{
  var topic = e.Socket.ReceiveFrameString();

  switch (topic)
  {
    case "WM":
      // welcome message, print and reset timeout timer
      Console.WriteLine("Connection drop and reconnect monitoed");
      m_timeoutTimer.Enable = false;
      m_timeoutTimer.Enable = true;
      break;
    case "HB":
      // heartbeat, we reset timeout timer
      m_timeoutTimer.Enable = false;
      m_timeoutTimer.Enable = true;
      break;
    default:
      // its a message, reset timeout timer, notify the client, 
      // for the example we just print it
      m_timeoutTimer.Enable = false;
      m_timeoutTimer.Enable = true;
      string message = e.Socket.ReceiveFrameString();
      Console.WriteLine("Message received. Topic: {0}, Message: {1}", 
        topic, message);
      break;
  }
}

What we are doing:

  1. Get the topic of the message
  2. Check which topic it is
    • When it is welcome message we reset the timeout timer and notify the user on the connection drop
    • When it is an heart beat we only reset the timer
    • When it is a message we reset the timeout timer and notify the user

Summary

So what Reliable PubSub really does:

  • Trying to connect to multiple servers and pick the first one that reply with Welcome Message. Can be used as geo load balancer or to connect to less busy server.
  • Recognize temporary connection drop (when Welcome Message arrived while the connection is up), the user can use that to request full snapshot, take a look at the Clone pattern from zeromq guide.
  • Reconnect when the server is dead, will probably connect to the next closest server.

The Reliable PubSub is a good fit for:

  • Financial Market Data (Forex/Stock Quotes)
  • Social Stream
  • Publishing changes to a client

If you implement the pattern in another language please send it to me and I will add a link to the post.

You can find the complete example and another more complete implementation at:
https://github.com/somdoron/ReliablePubSub.

Token-Based PubSub

I recently pushed two new features to both ZeroMQ and NetMQ: manual subscriptions and welcome message, both for the XPub socket. In this post, I will explore what we can do with the manual subscriptions feature. The next post will cover the welcome message.

Neither feature is part of any release of ZeroMQ or NetMQ yet, so you will have to compile from the source code to use the new features.

I will use NetMQ throughout the post, but you can implement the examples using any ZeroMQ binding.

Manual Subscriptions

When the Manual Subscriptions feature is enabled, subscription (or unsubscribe) requests are not added to (or removed from) the internal XPub subscriptions store. They will, however, be available to read as messages. So, after reading a subscription request from XPub socket, we can decide how to handle the socket. If we do nothing (and drop the subscription), the subscriber will not receive anything, including messages that match the subscription. If we want to confirm the subscription as is, we need to call the subscribe method on the XPub socket with the subscription (with ZeroMQ, we need to call the setsockopt with ZMQ_SUBSCRIBE) the same as we subscribe on the Sub socket. Following is an example that confirms each subscription:

using (var context = NetMQContext.Create())  
{
  string[] topics = new string[] { "A", "B", "C" };
  Random random = new Random();

  using (var publisher = context.CreateXPublisherSocket())
    {
    // Set publisher to manual subscriptions mode
    publisher.Options.ManualPublisher = true;
    publisher.Bind("tcp://*:5556");
    publisher.ReceiveReady += (sender, eventArgs) =>
    {
      var message = publisher.ReceiveString();

      // we only handle subscription requests, unsubscription and any
      // other type of messages will be dropped
      if (message[0] == (char) 1)
      {
        // calling Subscribe to add subscription to the last subscriber
        publisher.Subscribe(message.Substring(1));
      }
    };

    NetMQTimer sendTimer = new NetMQTimer(1000);
    sendTimer.Elapsed += (sender, eventArgs) =>
    {
      // sends a message every second with random topic and current time
      publisher.
        SendMore(topics[random.Next(3)]).
        Send(DateTime.Now.ToString());
    };

    Poller poller = new Poller();
    poller.AddSocket(publisher);
    poller.AddTimer(sendTimer);
    poller.PollTillCancelled();
  }
}

As noted in the example, we confirm all subscription requests, but we drop any other type of request. The subscriber will not be able to unsubscribe.

Permission-Based PubSub

With permission-based PubSub, we first check that the subscriber has permission to subscribe to the topic and only then call the subscribe method.

Permission-based subscriptions have one main issue--because we can only use the Subscribe method on the message we are currently handling, the permission check has to be synchronous. If all permissions are stored in memory, that is not a problem. However, usually we need to check with the database or another service and blocking the publisher thread is not an option. The next pattern solves this issue.

Permission-based subscriptions are only relevant to ZeroMQ and not NetMQ, because NetMQ does not currently support ZMTPv3 and ZAP (authentication).

Token-Based PubSub

With token-based subscriptions, the subscriber uses tokens instead of subscriptions. The server then decodes those tokens and decides to which subscriptions to subscribe the subscriber. To acquire the tokens, the subscriber needs to make a request to an authorization service that generates them. Following is the typical workflow:

  1. The client sends a request with username and password to the authorization service.
  2. The authorization service processes the request and replies to the client with a token.
  3. The client connects to the publisher and subscribes with the token.
  4. The publisher decodes the token, confirming its validity, and adds the relevant subscriptions.

XSub Instead of Sub

For a token-based subscription, we cannot use a Sub socket and must use an XSub socket. Sub sockets filter messages according to the subscriptions. Because we use tokens instead of subscriptions, the subscriber will filter all messages. An XSub socket does not filter messages, so we will use that one.

Generating Tokens

Generating tokens is beyond the scope of this post. I'm certain the web is full of examples. For our demo, I will use HMACSHA1, where the publisher and authorization have to share the same key.

Full Source Code

You can find the full source code of the example at: https://github.com/somdoron/TokenPubSub.

Authorization Service

static void Main(string[] args)  
{
  // this key should be shared between authorization server and publisher
  const string Key = "SecretKey";

  using (var context = NetMQContext.Create())
  {
    using (var response = context.CreateResponseSocket())
    {
      response.Bind("tcp://*:5557");

      while (true)
      {
        var requestMessage = response.ReceiveMessage();

        string command = requestMessage.Pop().ConvertToString();

        if (command == AuthorizationProtocol.GetTokenCommand &&
            requestMessage.FrameCount == 3)
        {
          string username = requestMessage.Pop().ConvertToString();
          string password = requestMessage.Pop().ConvertToString();
          string subscription = requestMessage.Pop().ConvertToString();

          // TODO: validating username and password is not part
          // of the example
          // TODO: validate that the user has permission to
          // the subscription is not part of the example

          Console.WriteLine("Received GetTokenCommand {0} {1} {2}",
              username, password, subscription);

          // Create a token
          Token token = new Token(subscription, Key);

          // send token to the client
          response.
              SendMore(AuthorizationProtocol.SuccessReply).
              Send(token.Serialize());
        }
        else
        {
          // unsupported command
          response.Send(AuthorizationProtocol.ErrorReply);
        }
      }
    }
  }
}

Publisher

static void Main(string[] args)  
{
  // this key should be shared between authorization server and publisher
  const string Key = "SecretKey";

  string[] symbols = new[] {"EURUSD", "GBPUSD", "EURJPY",
    "USDJPY", "EURGBP", "GBPJPY"};

  Random random = new Random();

  using (var context = NetMQContext.Create())
  {
    using (var publisher = context.CreateXPublisherSocket())
    {
      publisher.Options.ManualPublisher = true;
      publisher.Bind("tcp://*:5558");
      publisher.ReceiveReady += (sender, eventArgs) =>
      {
        byte[] subscriptionBytes = publisher.Receive();

        // first byte indicate if it a subscription or unsubscription
        if (subscriptionBytes[0] == 1 || subscriptionBytes[0] == 0)
        {
          // the rest of the bytes is the token, convert them to string
          string serializedToken = Encoding.ASCII.GetString(
              subscriptionBytes, 1, subscriptionBytes.Length - 1);

          // deserialize the token
          Token token;

          if (Token.TryDeserialize(serializedToken, out token))
          {
            // Check if the token is valid
            if (token.Validate(Key))
            {                                                        
              if (subscriptionBytes[0] == 1)
              {
                Console.WriteLine("Subscription request {0}",
                    token.Subscription);
                publisher.Subscribe(token.Subscription);
              }
              else
              {
                publisher.Unsubscribe(token.Subscription);
              }
            }
            else
            {
              Console.WriteLine("Invalid token {0}",
                  serializedToken);
            }
          }
        }
      };

      // Some fake publishing
      NetMQTimer publishTimer = new NetMQTimer(100);
      publishTimer.Elapsed += (sender, eventArgs) =>
      {
        publisher.
            SendMore(symbols[random.Next(symbols.Length)]).
            Send(random.Next().ToString());
      };

      Poller poller = new Poller();
      poller.AddSocket(publisher);
      poller.AddTimer(publishTimer);
      poller.PollTillCancelled();
    }
  }
}

Client

static void Main(string[] args)  
{
  string username = args[0];
  string password = args[1];
  string subscription = args[2].ToUpper();

  using (var context = NetMQContext.Create())
  {
    string token;

    // first we try to get a token
    using (var request = context.CreateRequestSocket())
    {
      request.Connect("tcp://localhost:" + AuthorizationProtocol.Port);

      // send token request
      request.
          SendMore(AuthorizationProtocol.GetTokenCommand).
          SendMore(username).
          SendMore(password).
          Send(subscription);

      string result = request.ReceiveString();

      if (result == AuthorizationProtocol.SuccessReply)
      {
        token = request.ReceiveString();
      }
      else
      {
        throw new Exception("Invalid username or password");
      }
    }

    // we must use XSUB because
    using (var subscriber = context.CreateXSubscriberSocket())
    {
      subscriber.Connect("tcp://localhost:" + StreamingProtocol.Port);

      // create the subscription message
      byte[] subscriptionMessage = new byte[token.Length + 1];
      subscriptionMessage[0] = 1;
      Encoding.ASCII.GetBytes(token, 0, token.Length, subscriptionMessage, 1);
      subscriber.Send(subscriptionMessage);

      while (true)
      {
        string symbol = subscriber.ReceiveString();
        string price = subscriber.ReceiveString();

        Console.WriteLine("{0} {1}", symbol, price);
      }
    }
  }
}

Token

public class Token  
{
  public Token(string subscription, string key)
  {
    Subscription = subscription;
    MAC = GenerateMAC(subscription, key);
  }

  public Token()
  {

  }

  public string Subscription { get; set; }
  public string MAC { get; set; }

  private static string GenerateMAC(string subscription, string key)
  {
    HMACSHA1 hmac = new HMACSHA1(Encoding.ASCII.GetBytes(key));
    byte[] hmacBytes = hmac.ComputeHash(Encoding.ASCII.GetBytes(subscription));
    return Convert.ToBase64String(hmacBytes);
  }

  public string Serialize()
  {
    return JsonConvert.SerializeObject(this);
  }

  public bool Validate(string key)
  {
    return MAC.Equals(GenerateMAC(Subscription, key));
  }

  public static bool TryDeserialize(string json, out Token token)
  {
    try
    {
      token = JsonConvert.DeserializeObject<Token>(json);
      return true;
    }
    catch (Exception)
    {
      token = null;
      return false;
    }
  }
}

Token Never Expires

The token in the example never expires. In reality, this is a big concern. Tokens should always have an expiration date. If we use an expiration date on a token, the client should continuously request a new token when the old token expires and subscribe with the replacement token. If the client does not request a new token, they will stop receiving any messages on the next connection drop (ZeroMQ automatically reconnects and sends subscriptions on reconnect. As a result, the publisher would reject the expired token and the client won't receive any messages).

Summary

Before the advent of the manual subscription, it was impossible to create a secure pub-sub using the Pub and Sub sockets. You could create it using a dealer-router, but you had to manage the subscription store yourself. Manual subscription features make this possible.

Introducing NetMQ.WebSockets and JSMQ

NetMQ version 3.3.10.0 introduced Stream socket type, which added the ability to read raw data from a TCP socket.
Today I want to introduce what you can do with Stream socket type and what I think could be a great development for NetMQ.

Let’s start with NetMQ.WebSockets, NetMQ.WebSockets is an extension to NetMQ which adds WebSocket transport as an extension.

Because NetMQ doesn’t have a pluggable transport feature, NetMQ.WebSockets actually wraps NetMQ and provides a new socket object which has a very similar interface as the NetMQ socket.
NetMQ.WebSockets currently implements only Router and Publisher patterns.

So who can communicate with NetMQ.WebSockets? Time to introduce JSMQ.

JSMQ is a NetMQ/ZeroMQ client in javascript whose API is very similar to other zeromq bindings and can communicate with NetMQ.WebSockets.

Now some of the C/C++ or even Java gurus need to throw down the gauntlet and implement WebSocket extension for zeromq/JeroMQ and then we will have a javascript library that can talk to all zeromq implementations.

You can find the projects on github:
https://github.com/somdoron/NetMQ.WebSockets
https://github.com/somdoron/JSMQ

You can download both JSMQ and NetMQ.WebSockets from nuget (make sure to choose prerelease) or visit there pages:
https://www.nuget.org/packages/NetMQ.WebSockets
https://www.nuget.org/packages/JSMQ

And now let’s see some examples:

NetMQ.WebSockets example

static void Main(string[] args)  
{
  using (NetMQContext context = NetMQContext.Create())
  {
    using (WSRouter router = context.CreateWSRouter())
    using (WSPublisher publisher = context.CreateWSPublisher())
    {
      router.Bind("ws://localhost:80");
      publisher.Bind("ws://localhost:81");

      router.ReceiveReady += (sender, eventArgs) =&gt;
      {
        byte[] identity = eventArgs.WSSocket.Receive();
        string message = eventArgs.WSSocket.ReceiveString();

        eventArgs.WSSocket.SendMore(identity).Send("OK");

        publisher.SendMore("chat").Send(message);
      };

      Poller poller = new Poller();
      poller.AddSocket(router);
    }
  }
}

JSMQ example

Javascript File

var dealer = new JSMQ.Dealer();  
dealer.connect("ws://localhost");

// we must wait for the dealer to be connected before we can send messages, 
// any messages we are trying to send while the dealer is not connected will be dropped
dealer.sendReady = function() {  
    document.getElementById("sendButton").disabled = "";
};

var subscriber = new JSMQ.Subscriber();  
subscriber.connect("ws://localhost:81");  
subscriber.subscribe("chat");

subscriber.onMessage = function (message) {  
   // we ignore the first frame because it's topic
   message.popString();

   document.getElementById("chatTextArea").value =
     document.getElementById("chatTextArea").value +
     message.popString()  + "\n";
};

dealer.onMessage = function (message) {  
    // the response from the server
    alert(message.popString());
};

function send() {  
   var message = new JSMQ.Message();
   message.addString(document.getElementById("messageTextBox").value);

   dealer.send(message);
}

HTML File

<textarea id="chatTextArea" readonly="readonly"></textarea>  
<label>Message:</label>  
<input id="messageTextBox" type="text" value="" /> 

<button id="sendButton" onclick="javascript:send();" disabled="disabled"> Send </button>  

Securing NetMQ

Inspired by the coming ZMTP v3.0 and CurveZMQ I started to develop security  layer for NetMQ as well.

As CurveZMQ the library is currently working over NetMQ sockets, in the future with ZMTP v3.0 it will probably be part of the library.

NetMQ Secure Channel is based on TLS 1.2 and DTLS 1.2, after spending a lot of time with both RFC (TLS & DTLS) I’m happy to say that the library covered most of the the features.

Before diving into some code I need to say that the library is not yet ready for production use.

Client

using (var socket = context.CreateDealerSocket())  
{
  socket.Connect("tcp://127.0.0.1:5556");

  SecureChannel secureChannel = new SecureChannel(ConnectionEnd.Client);

  // we are not using signed certificate so we need to validate 
  // the certificate of the server, by default the secure channel 
  // is checking that the source of the 
  // certitiface is a root certificate authority
  secureChannel.SetVerifyCertificate(c =&gt;  true);

  IList outgoingMessages = new List();

  // call the process message with null as the incoming message 
  // because the client is initiating the connection
  secureChannel.ProcessMessage(null, outgoingMessages);

  // the process message method fill the outgoing messages list with 
  // messages to send over the socket
  foreach (NetMQMessage outgoingMessage in outgoingMessages)
  {
    socket.SendMessage(outgoingMessage);
  }
  outgoingMessages.Clear();

  // waiting for a message from the server
  NetMQMessage incomingMessage= socket.ReceiveMessage();

  // calling ProcessMessage until ProcessMessage return true 
  // and the SecureChannel is ready to encrypt and decrypt messages
  while (!secureChannel.ProcessMessage(incomingMessage, outgoingMessages))
  {
    foreach (NetMQMessage outgoingMessage in outgoingMessages)
    {
      socket.SendMessage(outgoingMessage);
    }
    outgoingMessages.Clear();

    incomingMessage = socket.ReceiveMessage();  
  }

  foreach (NetMQMessage outgoingMessage in outgoingMessages)
  {
    socket.SendMessage(outgoingMessage);
  }
  outgoingMessages.Clear();

  // you can now use the secure channel to encrypt messages
  NetMQMessage plainMessage = new NetMQMessage();
  plainMessage.Append("Hello");

  // encrypting the message and sending it over the socket
  socket.SendMessage(secureChannel.EncryptApplicationMessage(plainMessage));
}

Server

// we are using dealer here, but we can use router as well, we just have to manager
// SecureChannel for each identity
using (var socket = context.CreateDealerSocket())  
{
  socket.Bind("tcp://*:5556");

  SecureChannel secureChannel = new SecureChannel(ConnectionEnd.Server);

  // we need to set X509Certificate with a private key for the server
  X509Certificate2 certificate = new X509Certificate2("NetMQ.Testing.pfx", "1");
  secureChannel.Certificate = certificate;

  IList outgoingMessages = new List();

  // waiting for message from client
  NetMQMessage incomingMessage = socket.ReceiveMessage();

  // calling ProcessMessage until ProcessMessage return true 
  // and the SecureChannel is ready to encrypt and decrypt messages
  while (!secureChannel.ProcessMessage(incomingMessage, outgoingMessages))
  {
    foreach (NetMQMessage outgoingMessage in outgoingMessages)
    {
      socket.SendMessage(outgoingMessage);
    }
    outgoingMessages.Clear();

    incomingMessage = socket.ReceiveMessage();
  }
  foreach (NetMQMessage outgoingMessage in outgoingMessages)
  {
    socket.SendMessage(outgoingMessage);
  }
  outgoingMessages.Clear();

  // this message is now encrypted
  NetMQMessage cipherMessage = socket.ReceiveMessage();

  // decrypting the message
  NetMQMessage plainMessage = secureChannel.DecryptApplicationMessage(cipherMessage);
  Console.WriteLine(plainMessage.First.ConvertToString());
}

Code

Both client and server should call ProcessMessage until the method return true.

First call to client ProcessMessage should be with null as the incomingMessage because client is the one initiating the connection.

Calling ProcessMessage fill the outgoingMessages list with messages that need to be send to the other peer. Make sure to send those.

Summary

The code is not yet merge into NetMQ, you can find the source code at my github.

Learning from past mistakes the SecureChannel protocol is versioned, so any breaking changes will happen in new a version.
Server will be able to support multiple versions of clients.

Some notes about the library:

  • Only RSA, AES (128/256) and SHA (1/256) combination exist.
  • Renegotiation is not supported inside the library, however it’s very simple to implement over the library.
  • Alert layer from TLS is missing completely from the layer, might be implement in future versions.
  • Only block ciphers are supported, compression is not supported.
  • Client cannot authenticate with a certificate.
  • You can send multipart messages.
  • The messages doesn’t have to be ordered (like DTLS)
  • Make sure to catch and handle NetMQSecurityException.

NetMQ Lesson #1 – Basics

So you probably already know that NetMQ is port of ZeroMQ (also spelled ØMQ, 0MQ or ZMQ) to .Net.

If you are not familiar with ZeroMQ or NetMQ please read my previous post about introducing NetMQ.

Transport

In NetMQ you can choose from different transport, in this post I will focus on TCP.

  • TCP – communication over the network.
  • INPROC – in process communication, or inter-thread communication.
  • PGM – reliable multicast implemented with Microsoft implementation of PGM.
  • IPC – inter process communication, on NetMQ it’s exactly the same as TCP.

Patterns

As with transports NetMQ comes with different communication patterns, in this post I will cover request-response pattern.

  • Request/Response – client send request and server answer with response.
  • Pub/Sub – client subscribe for messages and server distribute message to all subscribed clients.
  • Dealer/Router – in simple words its client-server communication, dealer is the client and the router is the server, more about this in the future.
  • Push/Pull – one peer push messages and one or more workers peek those messages, good to build a pipeline.
  • Pair – one to one connection, for use with INPROC transport.
  • Dealer/Dealer, Router/Router – more advanced patterns for some cases.

Multi-threading

When you are entering the world of high performance systems (like trading platforms) you soon find out that everything you learned about multi-threading is wrong.

When you are writing high performance systems you don’t use locks, not reader writer lock, not mutex, not monitor and not any other type of lock. If you are writing very good code you even not using .net concurrent collection (except blocking collection, which is one pattern you can use) or interlocked. Now there is very easy way to do it, just don’t share data between threads, but this is another topic worth posting about.

The reason I’m telling you this is that NetMQ (and ZeroMQ) is coming from that world, you are not sharing the socket (which I explain later what is) between threads, the socket is belonging to the thread that create it. If you want to pass data to another thread you can use NetMQ with INPROC as transport (or blocking collection or ring buffer or any other inter-thread communication library). The topic of multi-threading is covered very good in the ZeroMQ guide.

Socket like API

So the basic of NetMQ is the socket object, no matter how many clients you have connected you have one socket, and not matter to how may servers you are connected to you have one socket. When you want to send a message you just call the Send method and when you want to receive a message you just call the Receive message.

Code

So let’s start with simple request response client server example (you need to add a reference to NetMQ, you can find the library on nuget):

  class Program
  {
    static void Main(string[] args)
    {
      using (NetMQContext context = NetMQContext.Create())
      {
        Task serverTask = Task.Factory.StartNew(() => Server(context));
        Task clientTask = Task.Factory.StartNew(() => Client(context));
        Task.WaitAll(serverTask, clientTask);
      }
    }

    static void Server(NetMQContext context)
    {
      using (NetMQSocket serverSocket = context.CreateResponseSocket())
      {
        serverSocket.Bind("tcp://*:5555");

        while (true)
        {
          string message = serverSocket.ReceiveString();

          Console.WriteLine("Receive message {0}", message);

          serverSocket.Send("World");

          if (message == "exit")
          {
            break;
          }
        }
      }
    }

    static void Client(NetMQContext context)
    {
      using (NetMQSocket clientSocket = context.CreateRequestSocket())
      {
        clientSocket.Connect("tcp://127.0.0.1:5555");

        while (true)
        {
          Console.WriteLine("Please enter your message:");
          string message = Console.ReadLine();
          clientSocket.Send(message);

          string answer = clientSocket.ReceiveString();

          Console.WriteLine("Answer from server: {0}", answer);

          if (message == "exit")
          {
            break;
          }
        }
      }
    }
  }

Remember what I told you about not sharing anything between threads? So you are actually allowed to share the NetMQContext between threads. It’s the only NetMQ object that you allowed to share between threads.

So as you can see using NetMQ is pretty easy. One important thing to note, string is not the only thing you can send, NetMQ is actually about delivering binary data, the Send method that receives string is just another overload of the receive method, more usually you will pass byte array and do the object serialization your self or with another library.

You can create different application for the client and for the server and launch multiple clients, in the request response pattern you don’t have to specify which client you sending the response to, the socket just know. In more advanced scenario (dealer/router pattern) you can specify to which client you are sending the message.