Introducing NetMQ.WebSockets and JSMQ

NetMQ version 3.3.10.0 introduced Stream socket type, which added the ability to read raw data from a TCP socket.
Today I want to introduce what you can do with Stream socket type and what I think could be a great development for NetMQ.

Let’s start with NetMQ.WebSockets, NetMQ.WebSockets is an extension to NetMQ which adds WebSocket transport as an extension.

Because NetMQ doesn’t have a pluggable transport feature, NetMQ.WebSockets actually wraps NetMQ and provides a new socket object which has a very similar interface as the NetMQ socket.
NetMQ.WebSockets currently implements only Router and Publisher patterns.

So who can communicate with NetMQ.WebSockets? Time to introduce JSMQ.

JSMQ is a NetMQ/ZeroMQ client in javascript whose API is very similar to other zeromq bindings and can communicate with NetMQ.WebSockets.

Now some of the C/C++ or even Java gurus need to throw down the gauntlet and implement WebSocket extension for zeromq/JeroMQ and then we will have a javascript library that can talk to all zeromq implementations.

You can find the projects on github:
https://github.com/somdoron/NetMQ.WebSockets
https://github.com/somdoron/JSMQ

You can download both JSMQ and NetMQ.WebSockets from nuget (make sure to choose prerelease) or visit there pages:
https://www.nuget.org/packages/NetMQ.WebSockets
https://www.nuget.org/packages/JSMQ

And now let’s see some examples:

NetMQ.WebSockets example

static void Main(string[] args)  
{
  using (NetMQContext context = NetMQContext.Create())
  {
    using (WSRouter router = context.CreateWSRouter())
    using (WSPublisher publisher = context.CreateWSPublisher())
    {
      router.Bind("ws://localhost:80");
      publisher.Bind("ws://localhost:81");

      router.ReceiveReady += (sender, eventArgs) =>
      {
        byte[] identity = eventArgs.WSSocket.Receive();
        string message = eventArgs.WSSocket.ReceiveString();

        eventArgs.WSSocket.SendMore(identity).Send("OK");

        publisher.SendMore("chat").Send(message);
      };

      Poller poller = new Poller();
      poller.AddSocket(router);
    }
  }
}

JSMQ example

Javascript File

var dealer = new JSMQ.Dealer();  
dealer.connect("ws://localhost");

// we must wait for the dealer to be connected before we can send messages, 
// any messages we are trying to send while the dealer is not connected will be dropped
dealer.sendReady = function() {  
    document.getElementById("sendButton").disabled = "";
};

var subscriber = new JSMQ.Subscriber();  
subscriber.connect("ws://localhost:81");  
subscriber.subscribe("chat");

subscriber.onMessage = function (message) {  
   // we ignore the first frame because it's topic
   message.popString();

   document.getElementById("chatTextArea").value =
     document.getElementById("chatTextArea").value +
     message.popString()  + "\n";
};

dealer.onMessage = function (message) {  
    // the response from the server
    alert(message.popString());
};

function send() {  
   var message = new JSMQ.Message();
   message.addString(document.getElementById("messageTextBox").value);

   dealer.send(message);
}

HTML File

<textarea id="chatTextArea" readonly="readonly"></textarea>  
<label>Message:</label>  
<input id="messageTextBox" type="text" value="" /> 

<button id="sendButton" onclick="javascript:send();" disabled="disabled"> Send </button>  

Securing NetMQ

Inspired by the coming ZMTP v3.0 and CurveZMQ I started to develop security  layer for NetMQ as well.

As CurveZMQ the library is currently working over NetMQ sockets, in the future with ZMTP v3.0 it will probably be part of the library.

NetMQ Secure Channel is based on TLS 1.2 and DTLS 1.2, after spending a lot of time with both RFC (TLS & DTLS) I’m happy to say that the library covered most of the the features.

Before diving into some code I need to say that the library is not yet ready for production use.

Client

using (var socket = context.CreateDealerSocket())  
{
  socket.Connect("tcp://127.0.0.1:5556");

  SecureChannel secureChannel = new SecureChannel(ConnectionEnd.Client);

  // we are not using signed certificate so we need to validate 
  // the certificate of the server, by default the secure channel 
  // is checking that the source of the 
  // certitiface is a root certificate authority
  secureChannel.SetVerifyCertificate(c =&gt;  true);

  IList outgoingMessages = new List();

  // call the process message with null as the incoming message 
  // because the client is initiating the connection
  secureChannel.ProcessMessage(null, outgoingMessages);

  // the process message method fill the outgoing messages list with 
  // messages to send over the socket
  foreach (NetMQMessage outgoingMessage in outgoingMessages)
  {
    socket.SendMessage(outgoingMessage);
  }
  outgoingMessages.Clear();

  // waiting for a message from the server
  NetMQMessage incomingMessage= socket.ReceiveMessage();

  // calling ProcessMessage until ProcessMessage return true 
  // and the SecureChannel is ready to encrypt and decrypt messages
  while (!secureChannel.ProcessMessage(incomingMessage, outgoingMessages))
  {
    foreach (NetMQMessage outgoingMessage in outgoingMessages)
    {
      socket.SendMessage(outgoingMessage);
    }
    outgoingMessages.Clear();

    incomingMessage = socket.ReceiveMessage();  
  }

  foreach (NetMQMessage outgoingMessage in outgoingMessages)
  {
    socket.SendMessage(outgoingMessage);
  }
  outgoingMessages.Clear();

  // you can now use the secure channel to encrypt messages
  NetMQMessage plainMessage = new NetMQMessage();
  plainMessage.Append("Hello");

  // encrypting the message and sending it over the socket
  socket.SendMessage(secureChannel.EncryptApplicationMessage(plainMessage));
}

Server

// we are using dealer here, but we can use router as well, we just have to manager
// SecureChannel for each identity
using (var socket = context.CreateDealerSocket())  
{
  socket.Bind("tcp://*:5556");

  SecureChannel secureChannel = new SecureChannel(ConnectionEnd.Server);

  // we need to set X509Certificate with a private key for the server
  X509Certificate2 certificate = new X509Certificate2("NetMQ.Testing.pfx", "1");
  secureChannel.Certificate = certificate;

  IList outgoingMessages = new List();

  // waiting for message from client
  NetMQMessage incomingMessage = socket.ReceiveMessage();

  // calling ProcessMessage until ProcessMessage return true 
  // and the SecureChannel is ready to encrypt and decrypt messages
  while (!secureChannel.ProcessMessage(incomingMessage, outgoingMessages))
  {
    foreach (NetMQMessage outgoingMessage in outgoingMessages)
    {
      socket.SendMessage(outgoingMessage);
    }
    outgoingMessages.Clear();

    incomingMessage = socket.ReceiveMessage();
  }
  foreach (NetMQMessage outgoingMessage in outgoingMessages)
  {
    socket.SendMessage(outgoingMessage);
  }
  outgoingMessages.Clear();

  // this message is now encrypted
  NetMQMessage cipherMessage = socket.ReceiveMessage();

  // decrypting the message
  NetMQMessage plainMessage = secureChannel.DecryptApplicationMessage(cipherMessage);
  Console.WriteLine(plainMessage.First.ConvertToString());
}

Code

Both client and server should call ProcessMessage until the method return true.

First call to client ProcessMessage should be with null as the incomingMessage because client is the one initiating the connection.

Calling ProcessMessage fill the outgoingMessages list with messages that need to be send to the other peer. Make sure to send those.

Summary

The code is not yet merge into NetMQ, you can find the source code at my github.

Learning from past mistakes the SecureChannel protocol is versioned, so any breaking changes will happen in new a version.
Server will be able to support multiple versions of clients.

Some notes about the library:

  • Only RSA, AES (128/256) and SHA (1/256) combination exist.
  • Renegotiation is not supported inside the library, however it’s very simple to implement over the library.
  • Alert layer from TLS is missing completely from the layer, might be implement in future versions.
  • Only block ciphers are supported, compression is not supported.
  • Client cannot authenticate with a certificate.
  • You can send multipart messages.
  • The messages doesn’t have to be ordered (like DTLS)
  • Make sure to catch and handle NetMQSecurityException.

NetMQ Lesson #1 – Basics

So you probably already know that NetMQ is port of ZeroMQ (also spelled ØMQ, 0MQ or ZMQ) to .Net.

If you are not familiar with ZeroMQ or NetMQ please read my previous post about introducing NetMQ.

Transport

In NetMQ you can choose from different transport, in this post I will focus on TCP.

  • TCP – communication over the network.
  • INPROC – in process communication, or inter-thread communication.
  • PGM – reliable multicast implemented with Microsoft implementation of PGM.
  • IPC – inter process communication, on NetMQ it’s exactly the same as TCP.

Patterns

As with transports NetMQ comes with different communication patterns, in this post I will cover request-response pattern.

  • Request/Response – client send request and server answer with response.
  • Pub/Sub – client subscribe for messages and server distribute message to all subscribed clients.
  • Dealer/Router – in simple words its client-server communication, dealer is the client and the router is the server, more about this in the future.
  • Push/Pull – one peer push messages and one or more workers peek those messages, good to build a pipeline.
  • Pair – one to one connection, for use with INPROC transport.
  • Dealer/Dealer, Router/Router – more advanced patterns for some cases.

Multi-threading

When you are entering the world of high performance systems (like trading platforms) you soon find out that everything you learned about multi-threading is wrong.

When you are writing high performance systems you don’t use locks, not reader writer lock, not mutex, not monitor and not any other type of lock. If you are writing very good code you even not using .net concurrent collection (except blocking collection, which is one pattern you can use) or interlocked. Now there is very easy way to do it, just don’t share data between threads, but this is another topic worth posting about.

The reason I’m telling you this is that NetMQ (and ZeroMQ) is coming from that world, you are not sharing the socket (which I explain later what is) between threads, the socket is belonging to the thread that create it. If you want to pass data to another thread you can use NetMQ with INPROC as transport (or blocking collection or ring buffer or any other inter-thread communication library). The topic of multi-threading is covered very good in the ZeroMQ guide.

Socket like API

So the basic of NetMQ is the socket object, no matter how many clients you have connected you have one socket, and not matter to how may servers you are connected to you have one socket. When you want to send a message you just call the Send method and when you want to receive a message you just call the Receive message.

Code

So let’s start with simple request response client server example (you need to add a reference to NetMQ, you can find the library on nuget):

  class Program
  {
    static void Main(string[] args)
    {
      using (NetMQContext context = NetMQContext.Create())
      {
        Task serverTask = Task.Factory.StartNew(() => Server(context));
        Task clientTask = Task.Factory.StartNew(() => Client(context));
        Task.WaitAll(serverTask, clientTask);
      }
    }

    static void Server(NetMQContext context)
    {
      using (NetMQSocket serverSocket = context.CreateResponseSocket())
      {
        serverSocket.Bind("tcp://*:5555");

        while (true)
        {
          string message = serverSocket.ReceiveString();

          Console.WriteLine("Receive message {0}", message);

          serverSocket.Send("World");

          if (message == "exit")
          {
            break;
          }
        }
      }
    }

    static void Client(NetMQContext context)
    {
      using (NetMQSocket clientSocket = context.CreateRequestSocket())
      {
        clientSocket.Connect("tcp://127.0.0.1:5555");

        while (true)
        {
          Console.WriteLine("Please enter your message:");
          string message = Console.ReadLine();
          clientSocket.Send(message);

          string answer = clientSocket.ReceiveString();

          Console.WriteLine("Answer from server: {0}", answer);

          if (message == "exit")
          {
            break;
          }
        }
      }
    }
  }

Remember what I told you about not sharing anything between threads? So you are actually allowed to share the NetMQContext between threads. It’s the only NetMQ object that you allowed to share between threads.

So as you can see using NetMQ is pretty easy. One important thing to note, string is not the only thing you can send, NetMQ is actually about delivering binary data, the Send method that receives string is just another overload of the receive method, more usually you will pass byte array and do the object serialization your self or with another library.

You can create different application for the client and for the server and launch multiple clients, in the request response pattern you don’t have to specify which client you sending the response to, the socket just know. In more advanced scenario (dealer/router pattern) you can specify to which client you are sending the message.