Introducing NetMQ.WebSockets and JSMQ

NetMQ version 3.3.10.0 introduced Stream socket type, which added the ability to read raw data from a TCP socket.
Today I want to introduce what you can do with Stream socket type and what I think could be a great development for NetMQ.

Let’s start with NetMQ.WebSockets, NetMQ.WebSockets is an extension to NetMQ which adds WebSocket transport as an extension.

Because NetMQ doesn’t have a pluggable transport feature, NetMQ.WebSockets actually wraps NetMQ and provides a new socket object which has a very similar interface as the NetMQ socket.
NetMQ.WebSockets currently implements only Router and Publisher patterns.

So who can communicate with NetMQ.WebSockets? Time to introduce JSMQ.

JSMQ is a NetMQ/ZeroMQ client in javascript whose API is very similar to other zeromq bindings and can communicate with NetMQ.WebSockets.

Now some of the C/C++ or even Java gurus need to throw down the gauntlet and implement WebSocket extension for zeromq/JeroMQ and then we will have a javascript library that can talk to all zeromq implementations.

You can find the projects on github:
https://github.com/somdoron/NetMQ.WebSockets
https://github.com/somdoron/JSMQ

You can download both JSMQ and NetMQ.WebSockets from nuget (make sure to choose prerelease) or visit there pages:
https://www.nuget.org/packages/NetMQ.WebSockets
https://www.nuget.org/packages/JSMQ

And now let’s see some examples:

NetMQ.WebSockets example

static void Main(string[] args)
{
  using (NetMQContext context = NetMQContext.Create())
  {
    using (WSRouter router = context.CreateWSRouter())
    using (WSPublisher publisher = context.CreateWSPublisher())
    {
      router.Bind("ws://localhost:80");
      publisher.Bind("ws://localhost:81");
 
      router.ReceiveReady += (sender, eventArgs) =>
      {
        string identity = eventArgs.WSSocket.Receive();
        string message = eventArgs.WSSocket.Receive();
 
        eventArgs.WSSocket.SendMore(identity).Send("OK");
 
        eventArgs.WSSocket.SendMore("chat").Send(message);
      };
 
      Poller poller = new Poller();
      poller.AddWSSocket(router);
 
      // we must add the publisher to the poller although we are not registering to any event.
      // the protocol processing is happening in the user thread, without adding the publisher to the poller
      // the next time the publisher will accept a socket or receive a subscription is only when send is called.
      // when socket is added to the poller the processing is happen everytime data is ready to be processed
      poller.AddWSSocket(publisher);
      poller.Start();
 
    }
  }
}

JSMQ example

Javascript File

var dealer = new Dealer();
dealer.connect("ws://localhost");
 
// we must wait for the dealer to be connected before we can send messages, 
// any messages we are trying to send while the dealer is not connected will be dropped
dealer.sendReady = function() {
    document.getElementById("sendButton").disabled = "";
};
 
var subscriber = new Subscriber();
subscriber.connect("ws://localhost:81");
subscriber.subscribe("chat");
 
subscriber.onMessage = function (message) {
    // message is an array of all the message parts
    // we ignore the first frame because it's the topic
 
    document.getElementById("chatTextArea").value =
        document.getElementById("chatTextArea").value +
        message[1]  + "\n";
};
 
dealer.onMessage = function (message) {
    // the response from the server
    alert(message[0]);
};
 
function send() {
    dealer.send(document.getElementById("messageTextBox").value);
}

HTML File

<textarea id="chatTextArea" readonly="readonly"></textarea> <label>Message:</label><input id="messageTextBox" type="text" value="" /> <button id="sendButton" onclick="javascript:send();" disabled="disabled"> Send </button>

NetMQ 3.3.0.10 Released

A new NetMQ version was released today to nuget, you can find it at https://www.nuget.org/packages/NetMQ/, or just search NetMQ on nuget.

Features:

  • Stream socket, coming from ZeroMQ, you can use NetMQ to talk to any protocol.
  • ReceiveString with a timeout parameter.
  • NetMQMessage.ToString is now returning a better result.
  • BindRandomPort – which bind the socket to a random port.
  • The assembly is now signed

Bug Fixes:

  • NetMQMonitor – Null reference exception was thrown when used ConnectDelayed.
  • NetMQScheduler – socket was created for each thread that used the scheduler.
  • Bind and Connecting on local was a big mess, now it’s working as should.
  • IPv6 – tested, also when now using IPv6 the socket will also accept IPv4 sockets if the operation system support it (> Windows Vista).

If you wonder what you can do with Stream, what about talking in WebSocket to web browser? Stay tuned for the next post…

Enjoy.

.Net Async Programming – Part 2

In the first article in the series I covered the programming with a single thread, in this article I will cover how to call blocking method in our one thread without blocking our single thread.

In order to write completely async code with a single thread we need to make sure that the one thread we have will always do work and not block on IO operation (disk, network, database, etc…).
Some IO classes on .net have async pattern implementation, in the next article we will see how to use them as well.

Let’s that when a deposit is made we want to save it to DB, so we have a repository with the insert method and when the Deposit method is being called we will insert it to the DB:

public Task Deposit(double amount)
{
  Task task = new Task(() =>
    {
      m_balance += amount;
 
      m_depositRepository.Insert(AccountId, amount);
    });
  task.Start(m_taskScheduler);
 
  return task;
}

We took the example from the last post and just added the insert call, the problem with this code is that our single thread will block on the Insert call until the insert is complete, deny other tasks from being processed. To solve this we are going to schedule the Insert call on a thread pool thread:

public Task Deposit(double amount)
{
  Task task = new Task(() =>
    {
      m_balance += amount;
 
      Task.Factory.StartNew(() => m_depositRepository.Insert(AccountId, amount));
    });
  task.Start(m_taskScheduler);
 
  return task;
}

Now the single thread won’t be blocked, but there is a still a problem with this code because the Deposit method might return before the insert is actually completed, compromise the integrity of our code and software, let’s try to solve this using .Net 4.0:

public Task Deposit(double amount)
{
  TaskCompletionSource<int> taskCompletionSource = new TaskCompletionSource<int>();
 
  Task task = new Task(() =>
    {
      m_balance += amount;
 
      Task.Factory.StartNew(() => m_depositRepository.Insert(AccountId, amount)).ContinueWith(() => taskCompletionSource.SetResult(1));
    });
  task.Start(m_taskScheduler);
 
  return taskCompletionSource.Task;
}

We are using a task completion source (we are using int because there is no void task completion source, yes it is ugly) to signal when the insert is actually completed.
The problem with this code is that it’s not so elegant and contain too much boilerplate code, with .Net 4.5 this become much easier:

public async Task Deposit(double amount)
{
  Task task = new Task(() =>
  {
    m_balance += amount;
 
    await Task.Factory.StartNew(() => m_depositRepository.Insert(AccountId, amount)).ConfigureAwait(true);
  });
  task.Start();
 
  await task;
}

I’m not going to get into the details of async and await, you can read more on that on msdn, but few important things need mentioning, we are using ConfigureAwait to make sure the continuation will be on our single thread and because we cannot return the task as we did in the previous example (because the async keyword) but we still want to the Deposit method to mark as completed only when the inner task complete we await for the inner task.

Summary

Using the thread pool is for the example but if our system is handling a lot of requests we can exhaust the thread pool with the blocking operations, the better approach will be to use the Async Pattern of .Net, we will see an example in our next article in the series.

Another thing worth mentioning, when writing a single thread code the gate (the place we transition into our thread) will not actually be in our business objects as in our example, I’m only doing it there for sake of the example, it will probably be in higher layers (application layer, infrastructure of the facade, depend on your system architecture), I will discuss this in future article in the series.

.Net Async Programming – Part 1

Introduction

When working on high performance systems (thousands of operation per second) very fast you understand that you cannot use locks (to synchronize access to state) in your code and also cannot do any blocking operation (like writing to a file, socket or a database).

The remain question is, How can you write you code without a single lock and without any blocking operation?
In this part I will cover how to write code with zero locks (Blocking operation will be covered in next articles).

Single Thread Programming

I want to take you years back when we only had one processor and this processor had one core and even before threads and multi-threading exist.
If I told that all of your code is running on a single thread? If everything is running on a single thread you don’t need locks, and if everything is running on one thread you actually not using multithreading.

You are probably thinking but one thread won’t be fast enough, well you are wrong (in the next article of the series we are going to cover multi-threads as well).
When you really need more than one thread (I really doubt it) the magic trick is to not have shared state between the threads, each state belong to one thread and only one thread can read and write from it, but more about this in the next articles.

If you are building a system that doesn’t need to support more then 1000 (and actually even much higher numbers) operation per second (I’m assumening that the operation is short and without blocking IO) you should use one thread, of course there are more variables but this is a rule of thumb.

SingleThreadTaskScheduler

So after the long introduction, how do we use only one thread in C# ? Actually everything already exist in C#, we just taking 3 different component and combine them together: BlockingCollection, TaskSchdeuker and Thread, following is a simple implementation:

  public class SingleThreadTaskScheduler : TaskScheduler, IDisposable, IOrderedScheduler
  {
    private BlockingCollection<Task> m_queue = new BlockingCollection<Task>();
    private ThreadLocal<bool> m_isSchedulerThread = new ThreadLocal<bool>(() => false);
    private Task m_workerTask;
    private bool m_disposed = false;
 
    public SingleThreadTaskScheduler()
    {
      m_workerTask = Task.Factory.StartNew(() => Worker(), TaskCreationOptions.LongRunning);
    }
 
    ~SingleThreadTaskScheduler()
    {
      Dispose();
    }
 
    private void Worker()
    {
      m_isSchedulerThread.Value = true;
      try
      {
        foreach (Task task in m_queue.GetConsumingEnumerable())
        {
          base.TryExecuteTask(task);
        }
      }
      finally
      {
        m_isSchedulerThread.Value = false;
      }
    }
 
    protected override IEnumerable<Task> GetScheduledTasks()
    {
      return m_queue.ToArray();
    }
 
    protected override void QueueTask(Task task)
    {
      m_queue.Add(task);
    }
 
    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
      return m_isSchedulerThread.Value && TryExecuteTask(task);
    }
 
    public void Dispose()
    {
      if (!m_disposed)
      {
        m_queue.CompleteAdding();
        m_workerTask.Wait();
        m_disposed = true;
 
        GC.SuppressFinalize(this);
      }
    }
  }

Async Programming

Now our application receiving input from somewhere (for the example let’s assume WCF service) our example will be a Brokerage service which provide the client with the current balance, before using the Task Scheduler the code will probably look something like this:

public double GetBalance(int accountId)
{
  Account account = m_accountRepository.GetAccountById(accountId);
 
  return account.GetBalance();
}

Now the simplest thing to do is to wrap this code with Task and invoke on our new task scheduler:

public double GetBalance(int accountId)
{
  Task<double> task =
    new Task<double>(() =>
      {
        Account account = m_accountRepository.GetAccountById(accountId);
        return account.GetBalance();
      });
 
  task.Start(m_singleThreadTaskScheduler);
  task.Wait();
 
  return task.Result;
}

But this code is not encapsulated, somebody can call the method without wrapping with task and run it on wrong task scheduler, we can insert the wrapping into the Account class:

class Account
{
  private readonly SingleThreadTaskScheduler m_taskScheduler;
  private double m_balance = 0;
 
  public Account(SingleThreadTaskScheduler taskScheduler)
  {
    m_taskScheduler = taskScheduler;
  }
 
  public double GetBalance()
  {
    Task<double> task = new Task<double>(() => m_balance);
    task.Start(m_taskScheduler);
    task.Wait();
 
    return task.Result;
  }
}

Now the account class is encapsulated and making sure nobody can access the class state from another thread, the SingleThreadTaskSchduler will be injected to all of our classes or it can also be static (we already said we will have only one thread).
We can improve our Account class a little more and return a task instead of the double, this will come very handy in .Net 4.5 with the await keyword, but more about this in the next articles in the series, for the example we also adding another method to account to show both methods write and read the state.

class Account
{
  private readonly SingleThreadTaskScheduler m_taskScheduler;
  private double m_balance = 0;
 
  public Account(SingleThreadTaskScheduler taskScheduler)
  {
    m_taskScheduler = taskScheduler;
  }
 
  public Task<double> GetBalance()
  {
    Task<double> task = new Task<double>(() => m_balance);
    task.Start(m_taskScheduler);
    task.Wait();
 
    return task;
  }
 
  public Task Deposit(double amount)
  {
    Task task = new Task(() =>
      {
        m_balance += amount;
      });
 
    return task;
  }
}

And for last we can now implement the WCF service with the async pattern model using the task from the Account.GetBalance and our system is completely async:

public IAsyncResult BeginGetBalance(int accountId, AsyncCallback callback, object state)
{
  Account account = m_accountRepository.GetAcountById(accountId);
 
  Task<double> task = account.GetBalance();
 
  // using Task extension method from http://blogs.msdn.com/b/pfxteam/archive/2011/06/27/10179452.aspx
  return task.ToAPM(callback, state);
}
 
public double EndGetBalance(IAsyncResult asyncResult)
{
  return ((Task<double>) asyncResult).Result;
}

In .Net 4.5 WCF contracts can have methods that return tasks so the GetBalance method can return the Account.GetBalance directly, like so:

public Task<double> GetBalanceAsync(int accountId)
{
  Account account = m_accountRepository.GetAccountById(accountId);
 
  return account.GetBalance();
}

In the next article I will cover calling IO blocking methods without blocking our one and only thread.

NetMQ Scheduler – NetMQ and Task Library (TPL)

.Net 4.0 Task Library (TPL) is cool, it makes it easier to develop concurrency application.

ZeroMQ / NetMQ is very cool as well, but NetMQ and Task Library are not working together at the moment.

NetMQScheduler is Task Library (TPL) scheduler which let you run Tasks on NetMQ socket thread.

Let’s say you develop client-server application using NetMQ, now you want to send a message to the server from multiple threads in the application.
So you create a client class with two Sockets, one connected to the server and one inproc socket which listen to requests from other threads in the application, once message arrive on the inproc socket you immediately send it to the server. You also have a poller which listen to messages from both sockets (or only from the inproc if it one way communication).

Now instead of waiting to messages from the inproc you create a NetMQScheduler which request a NetMQ poller in the constructor, now any tasks that you run on the scheduler will run on the same thread of the poller (the thread that own the client socket).

Let’s see some code:

 
  public class Client : IDisposable
  {
    private readonly NetMQContext m_context;
    private readonly string m_address;
    private Poller m_poller;
    private NetMQScheduler m_scheduler;
    private NetMQSocket m_clientSocket;
 
    public Client(NetMQContext context, string address)
    {
      m_context = context;
      m_address = address;
    }
 
    public void Start()
    {
      m_poller = new Poller();
 
      m_clientSocket = m_context.CreateDealerSocket();
 
      m_clientSocket.Bind(m_address);
 
      m_scheduler = new NetMQScheduler(m_context, m_poller);
 
      Task.Factory.StartNew(m_poller.Start, TaskCreationOptions.LongRunning);
    }
 
    public void SendMessage(byte[] message)
    {
      // instead of creating inproc socket which listen to messages and then send to the server we just creating task and run a code on
      // the poller thread which the the thread of the m_clientSocket
      Task task = new Task(() => m_clientSocket.Send(message));
      task.Start(m_scheduler);
 
      // this is optional, we can also return the task to make the method async, can be used with async keyword in C# 5.0
      task.Wait();
    }
 
    public void Dispose()
    {
      m_scheduler.Dispose();
      m_clientSocket.Dispose();
 
      m_poller.Stop();
    }

Another example can be in case of a publisher from the server side, multiple threads in the server want to publish messages and you only have one publisher socket which publish messages to the client, of course you can use inproc subscriber that listen to messages and publish to clients but it easier and you don’t need to develop inproc protocol.

It can also make life easier to retrieve information hold by the NetMQSocket thread, let’s say we have Server class which hold all the clients that connected so far and we want to retrieve that information from another thread, we can develop inproc protocol that retrieve the information or we can just use NetMQScheduler and possibilities are endless.

 
    public IEnumerable<string> GetClientList()
    {
      Task<IEnumerable<string>> task = new Task<IEnumerable<string>>(() => new List<string>(m_clients));
 
      task.Wait();
 
      return task.Result;      
    }

To summarize the NetMQScheduler can make it easier to threads of NetMQ sockets to communicate with other threads and bring NetMQ to the world of Task Library.

Securing NetMQ

Inspired by the coming ZMTP v3.0 and CurveZMQ I started to develop security  layer for NetMQ as well.

As CurveZMQ the library is currently working over NetMQ sockets, in the future with ZMTP v3.0 it will probably be part of the library.

NetMQ Secure Channel is based on TLS 1.2 and DTLS 1.2, after spending a lot of time with both RFC (TLS & DTLS) I’m happy to say that the library covered most of the the features.

Before diving into some code I need to say that the library is not yet ready for production use.

Client

using (var socket = context.CreateDealerSocket())
{
  socket.Connect("tcp://127.0.0.1:5556");
 
  SecureChannel secureChannel = new SecureChannel(ConnectionEnd.Client);
 
  // we are not using signed certificate so we need to validate the certificate of the server
  // by default the secure channel is checking that the source of the certitiface is root certificate authority
  secureChannel.SetVerifyCertificate(c =&gt;  true);
 
  IList outgoingMessages = new List();
 
  // call the process message with null as the incoming message 
  // because the client is initiating the connection
  secureChannel.ProcessMessage(null, outgoingMessages);
 
  // the process message method fill the outgoing messages list with 
  // messages to send over the socket
  foreach (NetMQMessage outgoingMessage in outgoingMessages)
  {
    socket.SendMessage(outgoingMessage);
  }
  outgoingMessages.Clear();
 
  // waiting for a message from the server
  NetMQMessage incomingMessage= socket.ReceiveMessage();
 
  // calling ProcessMessage until ProcessMessage return true and the SecureChannel is ready
  // to encrypt and decrypt messages
  while (!secureChannel.ProcessMessage(incomingMessage, outgoingMessages))
  {
    foreach (NetMQMessage outgoingMessage in outgoingMessages)
    {
      socket.SendMessage(outgoingMessage);
    }
    outgoingMessages.Clear();
 
    incomingMessage = socket.ReceiveMessage();  
  }
 
  foreach (NetMQMessage outgoingMessage in outgoingMessages)
  {
    socket.SendMessage(outgoingMessage);
  }
  outgoingMessages.Clear();
 
  // you can now use the secure channel to encrypt messages
  NetMQMessage plainMessage = new NetMQMessage();
  plainMessage.Append("Hello");
 
  // encrypting the message and sending it over the socket
  socket.SendMessage(secureChannel.EncryptApplicationMessage(plainMessage));
}

Server

// we are using dealer here, but we can use router as well, we just have to manager
// SecureChannel for each identity
using (var socket = context.CreateDealerSocket())
{
  socket.Bind("tcp://*:5556");
 
  SecureChannel secureChannel = new SecureChannel(ConnectionEnd.Server);
 
  // we need to set X509Certificate with a private key for the server
  X509Certificate2 certificate = new X509Certificate2("NetMQ.Testing.pfx", "1");
  secureChannel.Certificate = certificate;
 
  IList outgoingMessages = new List();
 
  // waiting for message from client
  NetMQMessage incomingMessage = socket.ReceiveMessage();
 
  // calling ProcessMessage until ProcessMessage return true and the SecureChannel is ready
  // to encrypt and decrypt messages
  while (!secureChannel.ProcessMessage(incomingMessage, outgoingMessages))
  {
    foreach (NetMQMessage outgoingMessage in outgoingMessages)
    {
      socket.SendMessage(outgoingMessage);
    }
    outgoingMessages.Clear();
 
    incomingMessage = socket.ReceiveMessage();
  }
  foreach (NetMQMessage outgoingMessage in outgoingMessages)
  {
    socket.SendMessage(outgoingMessage);
  }
  outgoingMessages.Clear();
 
  // this message is now encrypted
  NetMQMessage cipherMessage = socket.ReceiveMessage();
 
  // decrypting the message
  NetMQMessage plainMessage = secureChannel.DecryptApplicationMessage(cipherMessage);
  Console.WriteLine(plainMessage.First.ConvertToString());
}

Code

Both client and server should call ProcessMessage until the method return true.

First call to client ProcessMessage should be with null as the incomingMessage because client is the one initiating the connection.

Calling ProcessMessage fill the outgoingMessages list with messages that need to be send to the other peer. Make sure to send those.

Summary

The code is not yet merge into NetMQ, you can find the source code at my github.

Learning from past mistakes the SecureChannel protocol is versioned, so any breaking changes will happen in new a version.
Server will be able to support multiple versions of clients.

Some notes about the library:

  • Only RSA, AES (128/256) and SHA (1/256) combination exist.
  • Renegotiation is not supported inside the library, however it’s very simple to implement over the library and I might blog about that.
  • Alert layer from TLS is missing completely from the layer, might be implement in future versions.
  • Only block ciphers support and compression is not supported.
  • Client cannot authenticate with a certificate, this is intended because the next library is SASL.
  • You can send multipart messages
  • The messages doesn’t have to be ordered (like DTLS)
  • Make sure to catch and handle NetMQSecurityException.

Array Length in C#

As you know in C# you can query the length of the array with Length property, but it’s not so obvious, in c++ for example you allocate the array to the size you want but the length is unknown after the allocation.

My question is, how in C# the language can tell you the length of the array, or to ask this question in another way, where does C# save the array length, my first guess was that the first item in an array is the array length, I decide to go and search for it. First I tried to search in the first item:

    static void Main(string[] args)
    {
      int[] array = new int[6];
 
      array[0] = 0;
      array[1] = 1;
      array[2] = 2;
      array[3] = 3;
      array[4] = 4;
 
      var handle = GCHandle.Alloc(array, GCHandleType.Pinned);
 
      IntPtr address = handle.AddrOfPinnedObject();
 
      int value = Marshal.ReadInt32(address);
      Console.WriteLine(value);
 
      address += 4;
      value = Marshal.ReadInt32(address);
      Console.WriteLine(value);
 
      address += 4;
      value = Marshal.ReadInt32(address);
      Console.WriteLine(value);
 
      address += 4;
      value = Marshal.ReadInt32(address);
      Console.WriteLine(value);
 
      address += 4;
      value = Marshal.ReadInt32(address);
      Console.WriteLine(value);
 
      handle.Free();
 
      Console.ReadLine();
    }

And the output:

3

So you can see my guess was wrong, the first item is actually the first item.

Then I tried the 7 item (last + 1), which make no sense because it’s has to be the first otherwise the language won’t know where to search for it.

Finally I tried to search before the first item which actually prove right:

    static void Main(string[] args)
    {
      int[] array = new int[6];
 
      array[0] = 0;
      array[1] = 1;
      array[2] = 2;
      array[3] = 3;
      array[4] = 4;
 
      var handle = GCHandle.Alloc(array, GCHandleType.Pinned);
 
      IntPtr address = handle.AddrOfPinnedObject();
 
      int value = Marshal.ReadInt32(address-4);
      Console.WriteLine(value);
 
      value = Marshal.ReadInt32(address);
      Console.WriteLine(value);
 
      address += 4;
      value = Marshal.ReadInt32(address);
      Console.WriteLine(value);
 
      address += 4;
      value = Marshal.ReadInt32(address);
      Console.WriteLine(value);
 
      address += 4;
      value = Marshal.ReadInt32(address);
      Console.WriteLine(value);
 
      address += 4;
      value = Marshal.ReadInt32(address);
      Console.WriteLine(value);
 
      handle.Free();
 
      Console.ReadLine();
    }

And the output was:
4

To summarize C# save the length of the array one item before the first item of the array (or 4 bytes before the beginning of the array).
Which actually make the size of the array num_of_items * size_of_item + 4 (which is size of integer holding the array size).