Feature Requests for C# 7.0

C# is an amazing language which get improved with every version and now .Net become portable which is even more amazing. However there are still some features I'm missing or problems I want to see solved.

Scope and Resource

IDisposable interface and using statement are problematic, I really think it is the second biggest design flaw in C# (after events, the cause of all memory leaks).

Some of the issues:

  • You can forget to dispose an object and cause memory leak or exhaust operating system resources.
  • With using keyword you unnecessarily indenting the code and make it harder to read, especially when nesting multiple using.
  • You don't get any errors with you using object that implements IDisposable without using using, this is just calling for bugs.
  • If you are writing a public library and didn't use IDisposable at the beginning and add it later you actually cause same problem as forgetting to dispose an object as in the first bullet.

So the first addition is the resource keyword which you can use at class declaration. When using the resource keyword it is like implementing IDisposable except you get a compile error if you didn't scope the instance. So what exactly is scope? first let see the how to use the resource keyword:

resource class Socket  
{
  void Dispose()
  {
    // Do something, or not...
  }
}

Actually the Dispose method is optional, because usually with resource class all resources will be released automatically because of the scope keyword.

So what is the scope keyword, when create a resource class you must scope (in golang it called defer), when you scope a resource class it will get disposed when you exit the scope. Scope can be at class level or scope level.

Let see some examples:

class resource PersistentList  
{
  // class level scope variable
  private scope AutoResetEvent loadedEvent = new AutoResetEvent();

  public void Load()
  {
     scope FileStream stream = new FileStream(FileName);

     loadedEvent.Set();
  }

  public void Save()
  {
    // If we really want we can still use Using
    using (FileStream stream = new FileStream(FileName))
    {
    }
  }  
}

So as you see we don't even need to implement the Dispose method, because all resources will be disposed automatically when the scope will exit. The important role of the resource keyword is to make sure a compilation error will happen if resource class is used without scope or using. Bottom line this saves bugs, memory leaks, writing dispose method and code is cleaner (no unnecessary indent).

Messaging

It is hard to write high performance or concurrent systems with C# and .Net, I know this from first experience, even async and await doesn't really solve the problem. The real problem is that C# and .Net doesn't encourage the right pattern for high performance systems.

Let me explain, if you read the ZeroMQ guide or familiar with NetMQ and/or ZeroMQ you know the philosophy is to never share data between threads, instead we exchange messages between threads. Sharing data between thread is slow and very complicated, usually an art only few can master.

The problem with C# is that sharing data is in the syntax of the language with lock and volatile keywords. Also the library is full with share data classes like ConcurrentBag, ConcurrentDictionary, ReaderWriterLock, ManualResetEvent and more.

Language that want to support high performance should prefer messaging over locks, golang has channels (which are like queues) and goroutines (which are cheap threads), erlang has actors, and C#? C# has BlockingCollection, only from .net 4.0, which is not enough.

First we need a way to create threads that are not really OS threads, we can do this with the various single thread TaskScheduler implementations (as NetMQ Poller is doing) but remember we want the support in the language.
The other parts is that we need a way to wait on a queue (without really blocking a thread) and on multiple queues.

Following is a suggestion:

void Main()  
{
  Channel<int> channel1 = new Channel<int>();
  Channel<int> channel2 = new Channel<int>();  
  Channel<int> channel3 = new Channel<int>();  

  // Calling a concurrent routine
  SingelQueue(channel1);
  MultipleChannels(channel2, channel3);

  // Sending to a channel
  5 => channel1;
}

concurrent void SingleQueue(Channel<int> channel)  
{
  while (channel.Open)
  {
    // Receiving from channel, this is actually not blocking the
    // thread but release it back to the thread pool, like await.
    var item <= channel; 
  }
}

concurrent void MultipleChannels(  
  Channel<int> channel1, Channel<int> channel2)
{
  while (channel.Open)
  {
    select
    {
      case var m <= channel1:
        // Do some processing
        break;
      case var m <= channel2:
        // Do some processing
        break;
    }
  }
}

Another important point is to make this extendable (which golang didn't do), so if I want to develop my custom channel (NetMQChannel for example) I can, following is a suggestion for receiving interface:

delegate void Callback(IReceiveChannel<T> channel);

interface IReceiveChannel<T>  
{
  void RegisterForCallback(Callback callback);  
  bool TryReceive(out T value);
}

So the custom channel will need to implement RegisterForCallback which will be called when message is ready to be fetched.
TryReceive, which will be called within the callback, will actually retrieve the message if it still available.

For more information take a look at libmill for C or golang channels.

Parameter-less Lambda

Today when you want to create parameter-less lambda you have to use parenthesis, which is just boilerplate and verbose, so instead of this:

() => Console.WriteLine("Hello");

I want to do this:

=> Console.WriteLine("Hello);

Syntax is cleaner and less code to write.

Multiple return value and pre-statement

In my opinion the exception usages in .net library is mostly wrong, if we actually want to write safe code almost every line of code need to be wrapped with try-catch. I much prefer the Try pattern, like with TryGetValue of dictionary.

However using the Try pattern is a little annoying, you need to remember to create a variable before calling the Try method, which is not how my brain works, when I get to the Try method I usually go up one line and declare the out variable. Also the variable is defined in an outer scope while you usually use it inside the if statement. My suggestion is to return multiple values, so instead of this:

string text;  
if (dictionary.TryGetValue("Greeting", out text))  
  Console.WriteLine(text);

We will write this:

if (var text, found = dictionary.TryGetValue("Greeting"); found)  
  Console.WriteLine(text);

The text variable is defined in the correct scope, we don't have to use the out keyword and wrote less code. I like this syntax better, although the pre-statement can look better, I'm leaving this to Microsoft.

Instant enums

Continuing the previous example, it would be much nicer if instead of checking the found boolean value we will do it like this:

if (var text, result = dictionary.TryGetValue("Greeting");  
    result == Found)
  Console.WriteLine(text);

The problem with writing a lot of enums, it is too verbose, you get a class with a lot of internal public enums, you don't know which method use which enum and you also have to think of the names for all those enums. So I would like to do it in method declaration:

// Property
public {NotStarted, Running, Disposed} Status {get; private set;}

// Method
public {Ok, QueueIsFull} Enqueue(string value);

// Method with two return values
public string, {Found, Missing} TryGetValue(string key);  

Summary

Hopefully you get the point, I didn't make it bulletproof features yet, just suggestions and way of thinking. The bottom line is clearer code, less code, more safe, less bugs and built for performance.

Reliable PubSub

In the last post I told you about two new features I recently pushed to both ZeroMQ and NetMQ. Last post was about the manual subscription feature. In this post I will create a reliable pubsub using the new Welcome Message feature.

I will use NetMQ through out the post but everything apply to ZeroMQ as well.

ZeroMQ guide has a great chapter on pubsub including reliable pubsub, however I will tackle a different problem, how to make sure the subscriber is always connected to a publisher (and also to the closest one).

Welcome Message

Welcome message is pretty simple, when welcome message is set on a publisher (must be of type XPub) the publisher will send a welcome message to each new subscriber. So the first message subscriber receive will be the welcome message. Subscriber must subscribe to the welcome message before trying to connect.

To set the welcome message with ZeroMQ you need to call zmq_setsockopt with ZMQ_XPUB_WELCOME_MSG and the welcome message.

Following is small example of setting welcome message with NetMQ:

using (var context = NetMQContext.Create())  
{ 
  using (var publisher = context.CreateXPublisherSocket()) 
  { 
    // Set the welcome message, this will be sent to any new subscriber
    publisher.SetWelcomeMessage("WM"); 
    publisher.Bind("tcp://*:5555"); 

    while (true) 
    { 
      // we have to handle all subscription 
      // requests in order for welcome message to work, 
      // we can just drop them afterwards
      publisher.Receive(); 
    } 
  } 
}

Client example:

using (var context = NetMQContext.Create())  
{ 
  using (var subscriber = context.CreateSubscriberSocket()) 
  { 
    subscriber.Subscribe("WM"); 
    subscriber.Connect("tcp://localhost:5555"); 
    var welcomeMessage = subscriber.ReceiveString(); 
    Console.WriteLine(welcomeMessage); 
  } 
}

Connect attempt

So you probably been asking yourself why welcome message is useful? the main problem welcome message is solving is weather a connect attempt was successful, before this feature we have to use a heartbeat message sent from the server periodically and on the client wait for the heartbeat message, which can take up to the time of the keep alive interval.

Our reliable pubsub will have a list of available publishers and will try to connect to each until one of them return welcome message in acceptable time.

Connect to closest publisher

As I mentioned we would have a list of publishers, but what if instead of trying to connect one after the other until successful connection I will connect to all of them, take the first one to reply and close the rest? In the LAN I will probably will connect to the less busy publisher or randomly and in the WAN I will probably connect to closest publisher. We achieved geo-load-balancer without expensive hardware or DNS service.

Recognize connection drop

Welcome message has another useful behavior, ZeroMQ automatically try to reconnect a dropped connection. When welcome message is set every time a reconnect happen the publisher will send a welcome message. We can use that to check if we missed messages. With the clone pattern we can use the welcome message to request full snapshot again from the server.

We would use that behavior to create our reliable pub sub.

Heartbeat

Welcome message solves the connect attempt problem, but we still need to recognize if publisher is down after the connection has been made. So if for 5 seconds no message has been received we would try to reconnect. But what if there was no message sent for 5 seconds? To solve this problem we would send a heartbeat message from the server every 2 seconds, and now if for 5 seconds we didn’t receive any message we can know for sure the publisher is down and we should reconnect.

Code

The following code is simplified for the example in the post. You can view full source code at https://github.com/somdoron/ReliablePubSub.

Server

using (NetMQContext context = NetMQContext.Create())  
{
  using (var publisherSocket = context.CreateXPublisherSocket())
  {
    publisherSocket.SetWelcomeMessage("WM");
    publisherSocket.Bind("tcp://*:6669");

    // we just drop subscriptions                     
    publisherSocket.ReceiveReady += (sender, eventArgs) => 
      publisherSocket.SkipMultipartMessage();

    Poller poller = new Poller(publisherSocket);

    // send a message every second
    NetMQTimer sendMessageTimer = new NetMQTimer(1000);
    poller.AddTimer(sendMessageTimer);
    sendMessageTimer.Elapsed += (sender, eventArgs) => 
      publisherSocket.
        SendMoreFrame("A").
        SendFrame(new Random().Next().ToString());

    // send heartbeat every two seconds
    NetMQTimer heartbeatTimer = new NetMQTimer(2000);
    poller.AddTimer(heartbeatTimer);
    heartbeatTimer.Elapsed += 
      (sender, eventArgs) => publisherSocket.SendFrame("HB");

    poller.PollTillCancelled();
  }
}

Client

Client code is a little more complicated and is split into multiple methods. Let's start with the Connect method which connect to the first socket that reply with Welcome Message.

Connect

private SubscriberSocket Connect(string[] addresses)  
{
  List<SubscriberSocket> sockets = new List<SubscriberSocket>();
  Poller poller = new Poller();

  SubscriberSocket connectedSocket = null;

  // event handler to handle message from socket
  EventHandler<NetMQSocketEventArgs> handleMessage = (sender, args) =>
  {
    if (connectedSocket == null)
    {
      connectedSocket = (SubscriberSocket)args.Socket;
      poller.Cancel();
    }
  };

  // If timeout elapsed just cancel the
  // poller without setting the connected socket
  NetMQTimer timeoutTimer = new NetMQTimer(TimeSpan.FromSeconds(5));
  timeoutTimer.Elapsed += (sender, args) => poller.Cancel();
  poller.AddTimer(timeoutTimer);

  foreach (var address in addresses)
  {
    var socket = m_context.CreateSubscriberSocket();
    sockets.Add(socket);

    socket.ReceiveReady += handleMessage;
    poller.AddSocket(socket);

    // Subscribe to welcome message
    socket.Subscribe("WM");
    socket.Connect(address);
  }

  poller.PollTillCancelled();

  // if we have a connected socket the connection attempt succeed
  if (connectedSocket != null)
  {
    // remove the connected socket from the list
    sockets.Remove(connectedSocket);

    // close all existing sockets
    CloseSockets(sockets);

    // drop the welcome message
    connectedSocket.SkipMultipartMessage();

    // subscribe to heartbeat
    connectedSocket.Subscribe("HB");

    // subscribe to our only topic
    connectedSocket.Subscribe("A");

    connectedSocket.ReceiveReady -= handleMessage;
    connectedSocket.ReceiveReady += OnSubscriberMessage;

    return connectedSocket;
  }
  else
  {
    // close all existing sockets
    CloseSockets(sockets);

    return null;
  }
}

What we are doing?

  1. Create a timeout which will cancel the poller when timeout is elapsed.
  2. Create a handler to handle the welcome messages, first time handler will be called it will cancel the poller.
  3. Create all sockets and connect.
  4. Poll until cancelled, which will either be timeout or welcome message arrived.
  5. Close all existing connections except the one connected, subscribe to heartbeat and topics and register the handler to handle messages from now on.

Run

The run method orchestrate the entire process.

public void Run(params string[] addresses)  
{
  using (m_context = NetMQContext.Create())
  {
    var subscriber = Connect(addresses);

    if (subscriber == null)
      throw new Exception("cannot connect to eny of the endpoints");

    // timeout timer, when heartbeat was not arrived for 5 seconds
    m_timeoutTimer = new NetMQTimer(TimeSpan.FromSeconds(5));
    m_timeoutTimer.Elapsed += (sender, args) =>
    {
      // timeout happend, first dispose existing subscriber
      subscriber.Dispose();
      m_poller.RemoveSocket(subscriber);

      // connect again
      subscriber = Connect(addresses);

      if (subscriber == null)
        throw new Exception("cannot connect to any of the endpoints");

      m_poller.AddSocket(subscriber);
    };

    m_poller = new Poller(subscriber);
    m_poller.AddTimer(m_timeoutTimer);

    m_poller.PollTillCancelled();
  }
}

What we are doing:

  1. Trying to connect to a server
  2. Create a timeout timer in order to recognize connection drops and try to reconnect
  3. Creating the poller and polling

OnSubscriberMessage

The only thing missing in the puzzle is the handle of subscriber messages, which we registered in the Connect method.

private void OnSubscriberMessage(object sender, NetMQSocketEventArgs e)  
{
  var topic = e.Socket.ReceiveFrameString();

  switch (topic)
  {
    case "WM":
      // welcome message, print and reset timeout timer
      Console.WriteLine("Connection drop and reconnect monitoed");
      m_timeoutTimer.Enable = false;
      m_timeoutTimer.Enable = true;
      break;
    case "HB":
      // heartbeat, we reset timeout timer
      m_timeoutTimer.Enable = false;
      m_timeoutTimer.Enable = true;
      break;
    default:
      // its a message, reset timeout timer, notify the client, 
      // for the example we just print it
      m_timeoutTimer.Enable = false;
      m_timeoutTimer.Enable = true;
      string message = e.Socket.ReceiveFrameString();
      Console.WriteLine("Message received. Topic: {0}, Message: {1}", 
        topic, message);
      break;
  }
}

What we are doing:

  1. Get the topic of the message
  2. Check which topic it is
    • When it is welcome message we reset the timeout timer and notify the user on the connection drop
    • When it is an heart beat we only reset the timer
    • When it is a message we reset the timeout timer and notify the user

Summary

So what Reliable PubSub really does:

  • Trying to connect to multiple servers and pick the first one that reply with Welcome Message. Can be used as geo load balancer or to connect to less busy server.
  • Recognize temporary connection drop (when Welcome Message arrived while the connection is up), the user can use that to request full snapshot, take a look at the Clone pattern from zeromq guide.
  • Reconnect when the server is dead, will probably connect to the next closest server.

The Reliable PubSub is a good fit for:

  • Financial Market Data (Forex/Stock Quotes)
  • Social Stream
  • Publishing changes to a client

If you implement the pattern in another language please send it to me and I will add a link to the post.

You can find the complete example and another more complete implementation at:
https://github.com/somdoron/ReliablePubSub.

Token-Based PubSub

I recently pushed two new features to both ZeroMQ and NetMQ: manual subscriptions and welcome message, both for the XPub socket. In this post, I will explore what we can do with the manual subscriptions feature. The next post will cover the welcome message.

Neither feature is part of any release of ZeroMQ or NetMQ yet, so you will have to compile from the source code to use the new features.

I will use NetMQ throughout the post, but you can implement the examples using any ZeroMQ binding.

Manual Subscriptions

When the Manual Subscriptions feature is enabled, subscription (or unsubscribe) requests are not added to (or removed from) the internal XPub subscriptions store. They will, however, be available to read as messages. So, after reading a subscription request from XPub socket, we can decide how to handle the socket. If we do nothing (and drop the subscription), the subscriber will not receive anything, including messages that match the subscription. If we want to confirm the subscription as is, we need to call the subscribe method on the XPub socket with the subscription (with ZeroMQ, we need to call the setsockopt with ZMQ_SUBSCRIBE) the same as we subscribe on the Sub socket. Following is an example that confirms each subscription:

using (var context = NetMQContext.Create())  
{
  string[] topics = new string[] { "A", "B", "C" };
  Random random = new Random();

  using (var publisher = context.CreateXPublisherSocket())
    {
    // Set publisher to manual subscriptions mode
    publisher.Options.ManualPublisher = true;
    publisher.Bind("tcp://*:5556");
    publisher.ReceiveReady += (sender, eventArgs) =>
    {
      var message = publisher.ReceiveString();

      // we only handle subscription requests, unsubscription and any
      // other type of messages will be dropped
      if (message[0] == (char) 1)
      {
        // calling Subscribe to add subscription to the last subscriber
        publisher.Subscribe(message.Substring(1));
      }
    };

    NetMQTimer sendTimer = new NetMQTimer(1000);
    sendTimer.Elapsed += (sender, eventArgs) =>
    {
      // sends a message every second with random topic and current time
      publisher.
        SendMore(topics[random.Next(3)]).
        Send(DateTime.Now.ToString());
    };

    Poller poller = new Poller();
    poller.AddSocket(publisher);
    poller.AddTimer(sendTimer);
    poller.PollTillCancelled();
  }
}

As noted in the example, we confirm all subscription requests, but we drop any other type of request. The subscriber will not be able to unsubscribe.

Permission-Based PubSub

With permission-based PubSub, we first check that the subscriber has permission to subscribe to the topic and only then call the subscribe method.

Permission-based subscriptions have one main issue--because we can only use the Subscribe method on the message we are currently handling, the permission check has to be synchronous. If all permissions are stored in memory, that is not a problem. However, usually we need to check with the database or another service and blocking the publisher thread is not an option. The next pattern solves this issue.

Permission-based subscriptions are only relevant to ZeroMQ and not NetMQ, because NetMQ does not currently support ZMTPv3 and ZAP (authentication).

Token-Based PubSub

With token-based subscriptions, the subscriber uses tokens instead of subscriptions. The server then decodes those tokens and decides to which subscriptions to subscribe the subscriber. To acquire the tokens, the subscriber needs to make a request to an authorization service that generates them. Following is the typical workflow:

  1. The client sends a request with username and password to the authorization service.
  2. The authorization service processes the request and replies to the client with a token.
  3. The client connects to the publisher and subscribes with the token.
  4. The publisher decodes the token, confirming its validity, and adds the relevant subscriptions.

XSub Instead of Sub

For a token-based subscription, we cannot use a Sub socket and must use an XSub socket. Sub sockets filter messages according to the subscriptions. Because we use tokens instead of subscriptions, the subscriber will filter all messages. An XSub socket does not filter messages, so we will use that one.

Generating Tokens

Generating tokens is beyond the scope of this post. I'm certain the web is full of examples. For our demo, I will use HMACSHA1, where the publisher and authorization have to share the same key.

Full Source Code

You can find the full source code of the example at: https://github.com/somdoron/TokenPubSub.

Authorization Service

static void Main(string[] args)  
{
  // this key should be shared between authorization server and publisher
  const string Key = "SecretKey";

  using (var context = NetMQContext.Create())
  {
    using (var response = context.CreateResponseSocket())
    {
      response.Bind("tcp://*:5557");

      while (true)
      {
        var requestMessage = response.ReceiveMessage();

        string command = requestMessage.Pop().ConvertToString();

        if (command == AuthorizationProtocol.GetTokenCommand &&
            requestMessage.FrameCount == 3)
        {
          string username = requestMessage.Pop().ConvertToString();
          string password = requestMessage.Pop().ConvertToString();
          string subscription = requestMessage.Pop().ConvertToString();

          // TODO: validating username and password is not part
          // of the example
          // TODO: validate that the user has permission to
          // the subscription is not part of the example

          Console.WriteLine("Received GetTokenCommand {0} {1} {2}",
              username, password, subscription);

          // Create a token
          Token token = new Token(subscription, Key);

          // send token to the client
          response.
              SendMore(AuthorizationProtocol.SuccessReply).
              Send(token.Serialize());
        }
        else
        {
          // unsupported command
          response.Send(AuthorizationProtocol.ErrorReply);
        }
      }
    }
  }
}

Publisher

static void Main(string[] args)  
{
  // this key should be shared between authorization server and publisher
  const string Key = "SecretKey";

  string[] symbols = new[] {"EURUSD", "GBPUSD", "EURJPY",
    "USDJPY", "EURGBP", "GBPJPY"};

  Random random = new Random();

  using (var context = NetMQContext.Create())
  {
    using (var publisher = context.CreateXPublisherSocket())
    {
      publisher.Options.ManualPublisher = true;
      publisher.Bind("tcp://*:5558");
      publisher.ReceiveReady += (sender, eventArgs) =>
      {
        byte[] subscriptionBytes = publisher.Receive();

        // first byte indicate if it a subscription or unsubscription
        if (subscriptionBytes[0] == 1 || subscriptionBytes[0] == 0)
        {
          // the rest of the bytes is the token, convert them to string
          string serializedToken = Encoding.ASCII.GetString(
              subscriptionBytes, 1, subscriptionBytes.Length - 1);

          // deserialize the token
          Token token;

          if (Token.TryDeserialize(serializedToken, out token))
          {
            // Check if the token is valid
            if (token.Validate(Key))
            {                                                        
              if (subscriptionBytes[0] == 1)
              {
                Console.WriteLine("Subscription request {0}",
                    token.Subscription);
                publisher.Subscribe(token.Subscription);
              }
              else
              {
                publisher.Unsubscribe(token.Subscription);
              }
            }
            else
            {
              Console.WriteLine("Invalid token {0}",
                  serializedToken);
            }
          }
        }
      };

      // Some fake publishing
      NetMQTimer publishTimer = new NetMQTimer(100);
      publishTimer.Elapsed += (sender, eventArgs) =>
      {
        publisher.
            SendMore(symbols[random.Next(symbols.Length)]).
            Send(random.Next().ToString());
      };

      Poller poller = new Poller();
      poller.AddSocket(publisher);
      poller.AddTimer(publishTimer);
      poller.PollTillCancelled();
    }
  }
}

Client

static void Main(string[] args)  
{
  string username = args[0];
  string password = args[1];
  string subscription = args[2].ToUpper();

  using (var context = NetMQContext.Create())
  {
    string token;

    // first we try to get a token
    using (var request = context.CreateRequestSocket())
    {
      request.Connect("tcp://localhost:" + AuthorizationProtocol.Port);

      // send token request
      request.
          SendMore(AuthorizationProtocol.GetTokenCommand).
          SendMore(username).
          SendMore(password).
          Send(subscription);

      string result = request.ReceiveString();

      if (result == AuthorizationProtocol.SuccessReply)
      {
        token = request.ReceiveString();
      }
      else
      {
        throw new Exception("Invalid username or password");
      }
    }

    // we must use XSUB because
    using (var subscriber = context.CreateXSubscriberSocket())
    {
      subscriber.Connect("tcp://localhost:" + StreamingProtocol.Port);

      // create the subscription message
      byte[] subscriptionMessage = new byte[token.Length + 1];
      subscriptionMessage[0] = 1;
      Encoding.ASCII.GetBytes(token, 0, token.Length, subscriptionMessage, 1);
      subscriber.Send(subscriptionMessage);

      while (true)
      {
        string symbol = subscriber.ReceiveString();
        string price = subscriber.ReceiveString();

        Console.WriteLine("{0} {1}", symbol, price);
      }
    }
  }
}

Token

public class Token  
{
  public Token(string subscription, string key)
  {
    Subscription = subscription;
    MAC = GenerateMAC(subscription, key);
  }

  public Token()
  {

  }

  public string Subscription { get; set; }
  public string MAC { get; set; }

  private static string GenerateMAC(string subscription, string key)
  {
    HMACSHA1 hmac = new HMACSHA1(Encoding.ASCII.GetBytes(key));
    byte[] hmacBytes = hmac.ComputeHash(Encoding.ASCII.GetBytes(subscription));
    return Convert.ToBase64String(hmacBytes);
  }

  public string Serialize()
  {
    return JsonConvert.SerializeObject(this);
  }

  public bool Validate(string key)
  {
    return MAC.Equals(GenerateMAC(Subscription, key));
  }

  public static bool TryDeserialize(string json, out Token token)
  {
    try
    {
      token = JsonConvert.DeserializeObject<Token>(json);
      return true;
    }
    catch (Exception)
    {
      token = null;
      return false;
    }
  }
}

Token Never Expires

The token in the example never expires. In reality, this is a big concern. Tokens should always have an expiration date. If we use an expiration date on a token, the client should continuously request a new token when the old token expires and subscribe with the replacement token. If the client does not request a new token, they will stop receiving any messages on the next connection drop (ZeroMQ automatically reconnects and sends subscriptions on reconnect. As a result, the publisher would reject the expired token and the client won't receive any messages).

Summary

Before the advent of the manual subscription, it was impossible to create a secure pub-sub using the Pub and Sub sockets. You could create it using a dealer-router, but you had to manage the subscription store yourself. Manual subscription features make this possible.

NetMQ and IO Completion Ports

One of the original goals of NetMQ was to use IO Completion ports (a.k.a IOCP) on Windows.
I’m happy to let you know that after two years and multiple attempts NetMQ is now using IOCP.

IO Completion Ports

IO Completion ports is windows answer to C10k (http://www.kegel.com/c10k.html,
Wikipedia) problem, C10K problem is the problem of optimizing sockets to handle large number (10K) of clients at the same time. Linux has epoll, FreeBSD has kqueue and Windows has IO Completion ports.

ZeroMQ

ZeroMQ doesn’t scale well on Windows, on Linux ZeroMQ is using epoll which can scale to thousands of sockets. On windows ZeroMQ is using select which is slow and doesn’t scale well. NetMQ was ported from ZeroMQ and up until now it was using select as well.

In the past multiple attempts were made to integrate IOCP to ZeroMQ but none of them succeeded.

Reactor vs Proactor

The main problem of porting network project from Linux (or any other operating system) to Windows is the different asynchronous network model, Linux is using a pattern called reactor and windows is using proactor.

Both reactor and procator patterns enable multiple asynchronous receive and send operations without blocking the thread.
Both are using an event loop, the different is with the meaning of the event.

With reactor pattern you get an event when the socket is ready for an operation. For example you can register a socket for receive readiness and get an event when the data is available for receiving data from the socket.

Linux has a native support for the reactor pattern with epoll (which ZeroMQ is using) that can scale to thousands of clients. Windows also has support for the reactor pattern with select, but as I mentioned select is slow and doesn’t scale well.

With the proactor pattern you first call the method and get an event when the operation is completed. .Net is using the proactor pattern heavily with Begin/End pattern, tasks and Async pattern (from .net4.5) and that is no surprise because Windows has a native support for the proactor pattern with IO Completion ports.

As you can understand it hard to make same code-base support for both reactor and proactor patterns. This the main reason all the attempts to use IO Completion ports in ZeroMQ failed.
ZeroMQ supports multiple implantation of reactor pattern including epoll on linux, kqueue on FreeBSD and of course select on Windows.

On his new project nanomsg, Martin Sustrik, original developer of ZeroMQ, succeeded in using IO Completion ports and epoll linux on the same code-base. Martin’s approach was to make the epoll behave like proactor. In a nutshell, the send/receive is called in a non-blocking way, if the call failed because the socket was not ready the method will be called again once the ready event is sent and only then the procator completion event is raised.

Mono framework is using same approach as nanomsg when running on Linux.

AsyncIO Library

So as I mentioned earlier, in the past I attempted to make NetMQ use IO Completion ports and failed, the main reason is that .Net support for IOCP is a bit annoying because you don’t have a control over which thread the completion event will be handled on.

Eventually I decided to develop my own library for IO Completion ports with control over the thread and using events instead of callbacks. On windows native IO Completion ports API are used (with pinvoke). When running on other platforms (or when forced) the project is using native .Net Async API (which on Linux with Mono using epoll).

You can find the project on Github and Nuget.

Summary

So to summarize, NetMQ master repository is now using IO Completion ports, which means you can use it with thousands of clients (I don’t have the numbers yet).

So if you only used NetMQ to communicate between your servers you can now use it for client-server communication with multiple clients.

Nuget current version of NetMQ (3.3.0.11) is not using IOCP, to get NetMQ with IOCP you need to compile it from the source code.

Using NetMQ and ASP.NET

From time to time a question regarding how to use NetMQ in ASP.NET application is popping up in the NetMQ mailing list so I have decided to write a post on the subject.

WebAPI

For the examples in the post I will use WebAPI 2.0, but it should work for other asp.net application types. Also I’m changing the way WebAPI is configured, here is WebApiConfig class:

public static class WebApiConfig  
{
  public static void Register(HttpConfiguration config)
  {
    // Web API configuration and services

    // Web API routes
    config.MapHttpAttributeRoutes();

    config.Routes.MapHttpRoute(
        name: "DefaultApi",
        routeTemplate: "api/{controller}/{action}",
        defaults: new { id = RouteParameter.Optional }
    );
  }
}      

The only difference is using /{action} instead of /{id} and this is because I’m not writing a REST service.

I’m using Autofac as dependency injection for the examples in the post.

Calculator

Through out the post I will use a simple calculator example, following is the code of the calculator server application:

class Program  
{
  static void Main(string[] args)
  {
    using (NetMQContext context = NetMQContext.Create())
    {
      using (var responseSocket = context.CreateResponseSocket())
      {
        responseSocket.Bind("tcp://*:10001");

        while (true)
        {
          var requestMessage = responseSocket.ReceiveMessage();
          string a = requestMessage.Pop().ConvertToString();
          string b = requestMessage.Pop().ConvertToString();

          int aNumber = Convert.ToInt32(a);
          int bNumber = Convert.ToInt32(b);

          string result = (aNumber + bNumber).ToString();

          NetMQMessage responseMessage = new NetMQMessage();
          responseMessage.Append(result);

          responseSocket.SendMessage(responseMessage);
        }
      }
    }
  }
}

Simple Pattern

In the simple pattern each controller will create and connect it’s own socket. The NetMQ context will be created once and will be injected into the controllers.

SimpleController.cs

public class SimpleController : ApiController  
{
  private readonly NetMQContext m_context;
  private string m_serviceAddress;

  public SimpleController(NetMQContext context, string serviceAddress)
  {
    m_context = context;
    m_serviceAddress = serviceAddress;
  }

  [HttpGet]
  public int Calc(int a, int b)
  {
    using (var requestSocket = m_context.CreateRequestSocket())
    {
      requestSocket.Connect(m_serviceAddress);            

      NetMQMessage message = new NetMQMessage();

      // converting to string, not most efficient but will do for our example
      message.Append(a.ToString()); 
      message.Append(b.ToString());

      requestSocket.SendMessage(message);

      var replyMessage = requestSocket.ReceiveMessage();
      string result = replyMessage.Pop().ConvertToString();

      return Convert.ToInt32(result);
    }
  }
}

Global.asax.cs:

public class WebApiApplication : System.Web.HttpApplication  
{
  protected void Application_Start()
  {
    string address = "tcp://127.0.0.1:10001";

    var builder = new ContainerBuilder();

    // Register the NetMQ context
    builder.RegisterInstance(NetMQContext.Create()).SingleInstance();
    builder.RegisterType().WithParameter("serviceAddress", address);

    // Build the container.
    var container = builder.Build();

    // Create the dependency resolver.
    var resolver = new AutofacWebApiDependencyResolver(container);

    // Configure Web API with the dependency resolver.
    GlobalConfiguration.Configuration.DependencyResolver = resolver;

    GlobalConfiguration.Configure(WebApiConfig.Register);
  }
}

The advantage of the simple pattern is that it’s very simple.

However we are creating and connecting a TCP socket on each request, this is not efficient and can take time (Because of TCP and ZMTP handshake process).

We can fix this easily with a device in the middle, and this take us to the next solution.

Simple Device Pattern

Device in ZeroMQ/NetMQ is a component that sits in the middle of zeromq applications and forward messages between them, you can learn more about devices at the zeromq guide.

The simple device bind on a inproc address and connect to the calculator service.

Any request coming from the inproc is forward to the service and responses are routing back to the inproc socket.

We don’t change anything in the SimpleController except injecting the inproc address instead of the service address.

Let’s take a look at the Device class:

public class Device : IDisposable, IStartable  
{
  private readonly NetMQContext m_context;
  private readonly string m_backEndAddress;
  private readonly string m_frontEndAddress;
  private Poller m_poller;
  private RouterSocket m_frontendSocket;
  private DealerSocket m_backendSocket;

  public Device(NetMQContext context, string backEndAddress, 
    string frontEndAddress)
  {
    m_context = context;
    m_backEndAddress = backEndAddress;
    m_frontEndAddress = frontEndAddress;
  }

  public void Start()
  {
    Task.Factory.StartNew(() =>
    {
      m_poller = new Poller();

      using (m_frontendSocket = m_context.CreateRouterSocket())
      {
          using (m_backendSocket = m_context.CreateDealerSocket())
        {
          m_backendSocket.Connect(m_backEndAddress);
          m_frontendSocket.Bind(m_frontEndAddress);

          m_backendSocket.ReceiveReady += OnBackEndReady;
          m_frontendSocket.ReceiveReady += OnFrontEndReady;

          m_poller.AddSocket(m_backendSocket);
          m_poller.AddSocket(m_frontendSocket);

          m_poller.Start();
        }
      }
    }, TaskCreationOptions.LongRunning);
  }

  private void OnFrontEndReady(object sender, NetMQSocketEventArgs e)
  {
    NetMQMessage message = m_frontendSocket.ReceiveMessage();
    m_backendSocket.SendMessage(message);
  }

  private void OnBackEndReady(object sender, NetMQSocketEventArgs e)
  {
    NetMQMessage message = m_backendSocket.ReceiveMessage();
    m_frontendSocket.SendMessage(message);
  }

  public void Dispose()
  {
      m_poller.Stop(true);
  }
}

The device class will start automatically by the Autofac because it is inherited from IStartable.

Let’s take a look at the global.asax.cs file:

public class WebApiApplication : System.Web.HttpApplication  
{
  protected void Application_Start()
  {
    const string serviceAddress = "tcp://127.0.0.1:10001";
    const string inprocAddress = "inproc://broker";        

    var builder = new ContainerBuilder();

    // Register the NetMQ context
    builder.RegisterInstance(NetMQContext.Create()).SingleInstance();
    builder.RegisterType().
      WithParameter("serviceAddress", inprocAddress).
      InstancePerRequest();
    builder.RegisterType().SingleInstance().As().
      WithParameter("backEndAddress", serviceAddress).
      WithParameter("frontEndAddress", inprocAddress);

    // Build the container.
    var container = builder.Build();

    // Create the dependency resolver.
    var resolver = new AutofacWebApiDependencyResolver(container);

    // Configure Web API with the dependency resolver.
    GlobalConfiguration.Configuration.DependencyResolver = resolver;

    GlobalConfiguration.Configure(WebApiConfig.Register);
  }
}

With the simple device only one place connects to the calculator service and in rest of the web application we send the request to the device using inproc transport, also as you see we didn’t have to change anything in the controller code.

The simple device pattern will usually be enough for most asp.net web application, however we still have some problems that the simple device pattern doesn’t solve:

  1. We are still creating and disposing a lot of NetMQ sockets and sockets are an expensive resource.
  2. The controllers code is blocking, we occupy a thread until the response is arrived
  3. Timeout is not handled, what if the response is gone? or the calculator service is down?

For the third problem I suggest reading the Reliable Request-Reply chapter at the zeromq guide.

For a quick fix we can set the ReceiveTimeout of the socket and catch the AgainException, like this:

[HttpGet]
public IHttpActionResult Calc(int a, int b)  
{
  using (var requestSocket = m_context.CreateRequestSocket())
  {
    requestSocket.Options.ReceiveTimeout = TimeSpan.FromSeconds(10);
    requestSocket.Connect(m_serviceAddress);

    NetMQMessage message = new NetMQMessage();
    message.Append(a.ToString()); 
    message.Append(b.ToString());

    requestSocket.SendMessage(message);

    try
    {
      var replyMessage = requestSocket.ReceiveMessage();
      string result = replyMessage.Pop().ConvertToString();

      return Ok(Convert.ToInt32(result));
    }
    catch (AgainException ex)
    {
      return BadRequest();
    }
  }
}

For the blocking problem we need convert our controllers to be asynchronous, imagine we can wrote this:

[HttpGet]
public async Task Calc(int a, int b)  
{
  using (var requestSocket = m_context.CreateRequestSocket())
  {
    requestSocket.Options.ReceiveTimeout = TimeSpan.FromSeconds(10);
    requestSocket.Connect(m_serviceAddress);

    NetMQMessage message = new NetMQMessage();
    message.Append(a.ToString()); 
    message.Append(b.ToString());

    requestSocket.SendMessage(message);

    try
    {
      var replyMessage = await requestSocket.ReceiveMessageAsync();
      string result = replyMessage.Pop().ConvertToString();

      return Ok(Convert.ToInt32(result));
    }
    catch (AgainException ex)
    {
      return BadRequest();
    }
  }
}

But sadly we cannot write this code, yet.

AsyncSocket

In the last pattern in the post I will write an asynchronous wrapper for NetMQ socket in order to be able to use the async/await keywords of .Net 4.5.

So the code here is getting a little complicated, we will have to use TaskCompletionSource and NetMQScheduler, take a look:

public class AsyncSocket : IDisposable  
{
  private readonly NetMQContext m_context;
  private readonly string m_serviceAddress;
  private NetMQScheduler m_scheduler;
  private Poller m_poller;
  private NetMQSocket m_requestSocket;
  private TaskCompletionSource<NetMQMessage> m_taskCompletionSource; 

  public AsyncSocket(NetMQContext context, string address)
  {
    m_context = context;
    m_serviceAddress = address;

    m_requestSocket = context.CreateRequestSocket();
    m_requestSocket.ReceiveReady += OnReceiveReady;
    m_requestSocket.Connect(address);

    m_poller = new Poller(m_requestSocket);
    m_scheduler = new NetMQScheduler(m_context, m_poller);

    Task.Factory.StartNew(() => m_poller.Start(), 
      TaskCreationOptions.LongRunning);
  }

  public Task<NetMQMessage> SendAndReceiveAsync(NetMQMessage message)
  {
    var task = new Task<Task<NetMQMessage>>(() =>
    {
      m_taskCompletionSource = 
        new TaskCompletionSource<NetMQMessage>();

      m_requestSocket.SendMessage(message);

      return m_taskCompletionSource.Task;
    });

    // will start the task on the scheduler which 
    /  the same thread as the Poller thread
    task.Start(m_scheduler);
    return task.Result;
  }

  private void OnReceiveReady(object sender, NetMQSocketEventArgs e)
  {
    NetMQMessage message = m_requestSocket.ReceiveMessage();
    m_taskCompletionSource.SetResult(message);
    m_taskCompletionSource = null;
  }

  public void Dispose()
  {
    m_scheduler.Dispose();
    m_poller.Stop(true);
    m_requestSocket.Dispose();            
  }
}

And the Controller code:

[HttpGet]
public async Task Calc(int a, int b)  
{
  using (var asyncSocket = 
    new AsyncSocket(m_context, m_serviceAddress))
  {
    NetMQMessage message = new NetMQMessage();
    message.Append(a.ToString());
    message.Append(b.ToString());

    var replyMessage = await asyncSocket.SendAndReceiveAsync(message);

    string result = replyMessage.Pop().ConvertToString();

    return Ok(Convert.ToInt32(result));
  }
}

So now we have an async socket with an async controller, but we created a new problem, we now create a background thread for every request coming, how can we fix that?

We can change the AsyncSocket to handle multiple events, the pure zeromq way to write this is complicated and actually not very neat (NetMQ cannot pass objects, we have to pass the TaskCompletionSource between sockets, the only way to that is using there address, which is not very neat).

The way I’m going to implement it will force us to share AsyncSocket between threads, however it will be completely safe and lock-free with the magic of NetMQScheduler.

public class AsyncSocket : IStartable, IDisposable  
{
  private readonly NetMQContext m_context;
  private readonly string m_serviceAddress;
  private NetMQScheduler m_scheduler;
  private Poller m_poller;
  private NetMQSocket m_dealerSocket;
  private Dictionary<int, TaskCompletionSource<NetMQMessage>> m_requests;
  private int m_requestId;

  public AsyncSocket(NetMQContext context, string address)
  {
    m_context = context;
    m_serviceAddress = address;
    m_requests = 
      new Dictionary<int, TaskCompletionSource<NetMQMessage>>();
    m_requestId = 0;       
  }

  public void Start()
  {
    m_dealerSocket = m_context.CreateDealerSocket();
    m_dealerSocket.ReceiveReady += OnReceiveReady;
    m_dealerSocket.Connect(m_serviceAddress);

    m_poller = new Poller(m_dealerSocket);
    m_scheduler = new NetMQScheduler(m_context, m_poller);

    Task.Factory.StartNew(() => m_poller.Start(), 
      TaskCreationOptions.LongRunning);
  }

  public Task<NetMQMessage> SendAndReceiveAsync(NetMQMessage message)
  {
    // duplicate the message because we are not the owner of the message
    NetMQMessage duplicteMessage = new NetMQMessage(message);

    var task = new Task<Task<NetMQMessage>>(() =>
    {
      var taskCompletionSource = 
        new TaskCompletionSource<NetMQMessage>();

      // because we are using a dealer we have to push the delimiter
      duplicteMessage.PushEmptyFrame();

      // sending the request id the request identifier
      duplicteMessage.Push(m_requestId.ToString());

      // add the request to the pending request dictionary
      m_requests.Add(m_requestId, taskCompletionSource);

      // increase the request id for the next request
      m_requestId++;

      m_dealerSocket.SendMessage(duplicteMessage);

      return taskCompletionSource.Task;
    });

    // will start the task on the scheduler which 
    // is the same thread as the Poller thread
    task.Start(m_scheduler);
    return task.Result;
  }

  private void OnReceiveReady(object sender, NetMQSocketEventArgs e)
  {
    NetMQMessage message = m_dealerSocket.ReceiveMessage();

    // pop the request id
    string identity = message.Pop().ConvertToString();

    // pop the delimiter
    message.Pop();

    int requestId = Convert.ToInt32(identity);

    TaskCompletionSource<NetMQMessage> taskCompletionSource;

    // getting the task completion source, if we were also try to 
    // handle timeout the request will be 
    // gone and the response will be dropped
    if (m_requests.TryGetValue(requestId, out taskCompletionSource))
    {
      taskCompletionSource.SetResult(message);
      m_requests.Remove(requestId);
    }                
  }

  public void Dispose()
  {
    m_scheduler.Dispose();
    m_poller.Stop(true);
    m_dealerSocket.Dispose();            
  }  
}

Please note that with new implementation of AsyncSocket we don't need the SimpleDevice anymore and the AsyncSocket can connect directly to the service, the reason is that we know only have one socket connecting to the service.

This how our new controller looks:

public class AsyncController : ApiController  
{
  private AsyncSocket m_asyncSocket;

  public AsyncController(AsyncSocket asyncSocket)
  {
    m_asyncSocket = asyncSocket;
  }

  [HttpGet]
  public async Task Calc(int a, int b)
  {
    NetMQMessage message = new NetMQMessage();
    message.Append(a.ToString()); 
    message.Append(b.ToString());

    var replyMessage = await m_asyncSocket.SendAndReceiveAsync(message);

    string result = replyMessage.Pop().ConvertToString();

    return Ok(Convert.ToInt32(result));
  }
}

And the autofac magic:

protected void Application_Start()  
{
  const string serviceAddress = "tcp://127.0.0.1:10001";
  const string inprocAddress = "inproc://broker";        

  var builder = new ContainerBuilder();

  // Register the NetMQ context
  builder.RegisterInstance(NetMQContext.Create()).
    SingleInstance();
  builder.RegisterType().
    WithParameter("serviceAddress", inprocAddress).
    InstancePerRequest();        
  builder.RegisterType().SingleInstance().
    As().
    AsSelf().
    WithParameter("address", serviceAddress);

  // Build the container.
  var container = builder.Build();

  // Create the dependency resolver.
  var resolver = new AutofacWebApiDependencyResolver(container);

  // Configure Web API with the dependency resolver.
  GlobalConfiguration.Configuration.DependencyResolver = resolver;

  GlobalConfiguration.Configure(WebApiConfig.Register);
}

As you can see I’m handling timeouts in the code, good handling of timeouts and reliability is out of the scope for the this post, however if you already read the zeromq guide you probably know how to handle reliability. An easy fix here would be to also record the request time of each request and use NetMQTimer to remove timed-out request (we can call SetException on the TaskCompletionSource).

Summary

In the post we explore 3 patterns to use NetMQ inside ASP.NET application, the Simple Pattern,
the Simple Device Pattern and the Async Socket Pattern.

Although the AsyncSocket is not pure NetMQ solution and we have to share the object between threads it is my favorite, using the AsyncSocket we can write very fast lock-free ASP.NET controllers without blocking an ASP.NET thread.

What would be a nicer solution? what about writing the entire web server using NetMQ? without ASP.NET at all? would it be nice to write the following code?

using (NetMQContext context = NetMQContext.Create())  
{
  using (var responseSocket = context.CreateResponseSocket())
  {
    responseSocket.Bind("http://localhost:80/api/Calculator/Calc");

    while (true)
    {
      var requestMessage = responseSocket.ReceiveMessage();

      string a = requestMessage.Pop().ConvertToString();
      string b = requestMessage.Pop().ConvertToString();

      int aNumber = Convert.ToInt32(a);
      int bNumber = Convert.ToInt32(b);

      string result = (aNumber + bNumber).ToString();

      NetMQMessage responseMessage = new NetMQMessage();
      responseMessage.Append(result);

      responseSocket.SendMessage(responseMessage);
    }
  }
}

Maybe one day…

Introducing NetMQ.WebSockets and JSMQ

NetMQ version 3.3.10.0 introduced Stream socket type, which added the ability to read raw data from a TCP socket.
Today I want to introduce what you can do with Stream socket type and what I think could be a great development for NetMQ.

Let’s start with NetMQ.WebSockets, NetMQ.WebSockets is an extension to NetMQ which adds WebSocket transport as an extension.

Because NetMQ doesn’t have a pluggable transport feature, NetMQ.WebSockets actually wraps NetMQ and provides a new socket object which has a very similar interface as the NetMQ socket.
NetMQ.WebSockets currently implements only Router and Publisher patterns.

So who can communicate with NetMQ.WebSockets? Time to introduce JSMQ.

JSMQ is a NetMQ/ZeroMQ client in javascript whose API is very similar to other zeromq bindings and can communicate with NetMQ.WebSockets.

Now some of the C/C++ or even Java gurus need to throw down the gauntlet and implement WebSocket extension for zeromq/JeroMQ and then we will have a javascript library that can talk to all zeromq implementations.

You can find the projects on github:
https://github.com/somdoron/NetMQ.WebSockets
https://github.com/somdoron/JSMQ

You can download both JSMQ and NetMQ.WebSockets from nuget (make sure to choose prerelease) or visit there pages:
https://www.nuget.org/packages/NetMQ.WebSockets
https://www.nuget.org/packages/JSMQ

And now let’s see some examples:

NetMQ.WebSockets example

static void Main(string[] args)  
{
  using (NetMQContext context = NetMQContext.Create())
  {
    using (WSRouter router = context.CreateWSRouter())
    using (WSPublisher publisher = context.CreateWSPublisher())
    {
      router.Bind("ws://localhost:80");
      publisher.Bind("ws://localhost:81");

      router.ReceiveReady += (sender, eventArgs) =&gt;
      {
        byte[] identity = eventArgs.WSSocket.Receive();
        string message = eventArgs.WSSocket.ReceiveString();

        eventArgs.WSSocket.SendMore(identity).Send("OK");

        publisher.SendMore("chat").Send(message);
      };

      Poller poller = new Poller();
      poller.AddSocket(router);
    }
  }
}

JSMQ example

Javascript File

var dealer = new JSMQ.Dealer();  
dealer.connect("ws://localhost");

// we must wait for the dealer to be connected before we can send messages, 
// any messages we are trying to send while the dealer is not connected will be dropped
dealer.sendReady = function() {  
    document.getElementById("sendButton").disabled = "";
};

var subscriber = new JSMQ.Subscriber();  
subscriber.connect("ws://localhost:81");  
subscriber.subscribe("chat");

subscriber.onMessage = function (message) {  
   // we ignore the first frame because it's topic
   message.popString();

   document.getElementById("chatTextArea").value =
     document.getElementById("chatTextArea").value +
     message.popString()  + "\n";
};

dealer.onMessage = function (message) {  
    // the response from the server
    alert(message.popString());
};

function send() {  
   var message = new JSMQ.Message();
   message.addString(document.getElementById("messageTextBox").value);

   dealer.send(message);
}

HTML File

<textarea id="chatTextArea" readonly="readonly"></textarea>  
<label>Message:</label>  
<input id="messageTextBox" type="text" value="" /> 

<button id="sendButton" onclick="javascript:send();" disabled="disabled"> Send </button>  

NetMQ 3.3.0.10 Released

A new NetMQ version was released today to nuget, you can find it at https://www.nuget.org/packages/NetMQ/, or just search NetMQ on nuget.

Features:

  • Stream socket, coming from ZeroMQ, you can use NetMQ to talk to any protocol.
  • ReceiveString with a timeout parameter.
  • NetMQMessage.ToString is now returning a better result.
  • BindRandomPort – which bind the socket to a random port.
  • The assembly is now signed

Bug Fixes:

  • NetMQMonitor – Null reference exception was thrown when used ConnectDelayed.
  • NetMQScheduler – socket was created for each thread that used the scheduler.
  • Bind and Connecting on local was a big mess, now it’s working as should.
  • IPv6 – tested, also when now using IPv6 the socket will also accept IPv4 sockets if the operation system support it (> Windows Vista).

    If you wonder what you can do with Stream, what about talking in WebSocket to web browser? Stay tuned for the next post…

Enjoy.

.Net Async Programming - Part 2

In the first article in the series I covered the programming with a single thread, in this article I will cover how to call blocking method in our one thread without blocking our single thread.

In order to write completely async code with a single thread we need to make sure that the one thread we have will always do work and not block on IO operation (disk, network, database, etc…).
Some IO classes on .net have async pattern implementation, in the next article we will see how to use them as well.

Let’s that when a deposit is made we want to save it to DB, so we have a repository with the insert method and when the Deposit method is being called we will insert it to the DB:

public Task Deposit(double amount)  
{
  Task task = new Task(() =>
    {
      m_balance += amount;

      m_depositRepository.Insert(AccountId, amount);
    });
  task.Start(m_taskScheduler);

  return task;
}

We took the example from the last post and just added the insert call, the problem with this code is that our single thread will block on the Insert call until the insert is complete, deny other tasks from being processed. To solve this we are going to schedule the Insert call on a thread pool thread:

public Task Deposit(double amount)  
{
  Task task = new Task(() =>
    {
      m_balance += amount;

      Task.Factory.StartNew(
        () => m_depositRepository.Insert(AccountId, amount));
    });
  task.Start(m_taskScheduler);

  return task;
}

Now the single thread won’t be blocked, but there is a still a problem with this code because the Deposit method might return before the insert is actually completed, compromise the integrity of our code and software, let’s try to solve this using .Net 4.0:

public Task Deposit(double amount)  
{
  TaskCompletionSource<int> taskCompletionSource = 
    new TaskCompletionSource<int>();

  Task task = new Task(() =>
    {
      m_balance += amount;

      Task.Factory.StartNew(
        () => m_depositRepository.Insert(AccountId, amount)).ContinueWith(
          () => taskCompletionSource.SetResult(1));
    });
  task.Start(m_taskScheduler);

  return taskCompletionSource.Task;
}

We are using a task completion source (we are using int because there is no void task completion source, yes it is ugly) to signal when the insert is actually completed.
The problem with this code is that it’s not so elegant and contain too much boilerplate code, with .Net 4.5 this become much easier:

public async Task Deposit(double amount)  
{
  Task task = new Task(() =>
  {
    m_balance += amount;

    await Task.Factory.StartNew(
      () => m_depositRepository.Insert(AccountId, amount));
  });
  task.Start();

  await task;
}

Summary

Using the thread pool is for the example but if our system is handling a lot of requests we can exhaust the thread pool with the blocking operations, the better approach will be to use the Async Pattern of .Net, we will see an example in our next article in the series.

Another thing worth mentioning, when writing a single thread code the gate (the place we transition into our thread) will not actually be in our business objects as in our example, I’m only doing it there for sake of the example, it will probably be in higher layers (application layer, infrastructure of the facade, depend on your system architecture), I will discuss this in future article in the series.

.Net Async Programming - Part 1

Introduction

When working on high performance systems (thousands of operation per second) very fast you understand that you cannot use locks (to synchronize access to state) in your code and also cannot do any blocking operation (like writing to a file, socket or a database).

The remain question is, How can you write you code without a single lock and without any blocking operation?
In this part I will cover how to write code with zero locks (Blocking operation will be covered in next articles).

Single Thread Programming

I want to take you years back when we only had one processor and this processor had one core and even before threads and multi-threading exist.
If I told that all of your code is running on a single thread? If everything is running on a single thread you don’t need locks, and if everything is running on one thread you actually not using multithreading.

You are probably thinking but one thread won’t be fast enough, well you are wrong (in the next article of the series we are going to cover multi-threads as well).
When you really need more than one thread (I really doubt it) the magic trick is to not have shared state between the threads, each state belong to one thread and only one thread can read and write from it, but more about this in the next articles.

If you are building a system that doesn’t need to support more then 1000 (and actually even much higher numbers) operation per second (I’m assumening that the operation is short and without blocking IO) you should use one thread, of course there are more variables but this is a rule of thumb.

SingleThreadTaskScheduler

So after the long introduction, how do we use only one thread in C# ? Actually everything already exist in C#, we just taking 3 different component and combine them together: BlockingCollection, TaskSchdeuker and Thread, following is a simple implementation:

public class SingleThreadTaskScheduler : TaskScheduler,  
  IDisposable, IOrderedScheduler
{
  private BlockingCollection<Task> m_queue = new BlockingCollection<Task>();
  private ThreadLocal<bool> m_isSchedulerThread = new ThreadLocal<bool>(() => false);
  private Task m_workerTask;
  private bool m_disposed = false;

  public SingleThreadTaskScheduler()
  {
    m_workerTask = Task.Factory.StartNew(() => 
      Worker(), TaskCreationOptions.LongRunning);
  }

  ~SingleThreadTaskScheduler()
  {
    Dispose();
  }

  private void Worker()
  {
    m_isSchedulerThread.Value = true;
    try
    {
      foreach (Task task in m_queue.GetConsumingEnumerable())
      {
        base.TryExecuteTask(task);
      }
    }
    finally
    {
      m_isSchedulerThread.Value = false;
    }
  }

  protected override IEnumerable<Task> GetScheduledTasks()
  {
    return m_queue.ToArray();
  }

  protected override void QueueTask(Task task)
  {
    m_queue.Add(task);
  }

  protected override bool TryExecuteTaskInline(Task task, 
    bool taskWasPreviouslyQueued)
  {
    return m_isSchedulerThread.Value && TryExecuteTask(task);
  }

  public void Dispose()
  {
    if (!m_disposed)
    {
      m_queue.CompleteAdding();
      m_workerTask.Wait();
      m_disposed = true;

      GC.SuppressFinalize(this);
    }
  }
}

Async Programming

Now our application receiving input from somewhere (for the example let’s assume WCF service) our example will be a Brokerage service which provide the client with the current balance, before using the Task Scheduler the code will probably look something like this:

public double GetBalance(int accountId)  
{
  Account account = m_accountRepository.GetAccountById(accountId);

  return account.GetBalance();
}

Now the simplest thing to do is to wrap this code with Task and invoke on our new task scheduler:

public double GetBalance(int accountId)  
{
  Task<double> task =
    new Task<double>(() =>
      {
        Account account = m_accountRepository.GetAccountById(accountId);
        return account.GetBalance();
      });

  task.Start(m_singleThreadTaskScheduler);
  task.Wait();

  return task.Result;
}

But this code is not encapsulated, somebody can call the method without wrapping with task and run it on wrong task scheduler, we can insert the wrapping into the Account class:

class Account  
{
  private readonly SingleThreadTaskScheduler m_taskScheduler;
  private double m_balance = 0;

  public Account(SingleThreadTaskScheduler taskScheduler)
  {
    m_taskScheduler = taskScheduler;
  }

  public double GetBalance()
  {
    Task<double> task = new Task<double>(() => m_balance);
    task.Start(m_taskScheduler);
    task.Wait();

    return task.Result;
  }
}

Now the account class is encapsulated and making sure nobody can access the class state from another thread, the SingleThreadTaskSchduler will be injected to all of our classes or it can also be static (we already said we will have only one thread).
We can improve our Account class a little more and return a task instead of the double, this will come very handy in .Net 4.5 with the await keyword, but more about this in the next articles in the series, for the example we also adding another method to account to show both methods write and read the state.

class Account  
{
  private readonly SingleThreadTaskScheduler m_taskScheduler;
  private double m_balance = 0;

  public Account(SingleThreadTaskScheduler taskScheduler)
  {
    m_taskScheduler = taskScheduler;
  }

  public Task<double> GetBalance()
  {
    Task<double> task = new Task<double>(() => m_balance);
    task.Start(m_taskScheduler);
    task.Wait();

    return task;
  }

  public Task Deposit(double amount)
  {
    Task task = new Task(() =>
      {
        m_balance += amount;
      });

    return task;
  }
}

And for last we can now implement the WCF service with the async pattern model using the task from the Account.GetBalance and our system is completely async:

public IAsyncResult BeginGetBalance(int accountId, AsyncCallback  
  callback, object state)
{
  Account account = m_accountRepository.GetAcountById(accountId);

  Task<double> task = account.GetBalance();

  // using Task extension method from 
  // http://blogs.msdn.com/b/pfxteam/archive/2011/06/27/10179452.aspx
  return task.ToAPM(callback, state);
}

public double EndGetBalance(IAsyncResult asyncResult)  
{
  return ((Task<double>) asyncResult).Result;
}

In .Net 4.5 WCF contracts can have methods that return tasks so the GetBalance method can return the Account.GetBalance directly, like so:

public Task<double> GetBalanceAsync(int accountId)  
{
  Account account = m_accountRepository.GetAccountById(accountId);

  return account.GetBalance();
}

In the next article I will cover calling IO blocking methods without blocking our one and only thread.

NetMQ Scheduler - NetMQ and Task Library (TPL)

.Net 4.0 Task Library (TPL) is cool, it makes it easier to develop concurrency application.

ZeroMQ / NetMQ is very cool as well, but NetMQ and Task Library are not working together at the moment.

NetMQScheduler is Task Library (TPL) scheduler which let you run Tasks on NetMQ socket thread.

Let’s say you develop client-server application using NetMQ, now you want to send a message to the server from multiple threads in the application.
So you create a client class with two Sockets, one connected to the server and one inproc socket which listen to requests from other threads in the application, once message arrive on the inproc socket you immediately send it to the server. You also have a poller which listen to messages from both sockets (or only from the inproc if it one way communication).

Now instead of waiting to messages from the inproc you create a NetMQScheduler which request a NetMQ poller in the constructor, now any tasks that you run on the scheduler will run on the same thread of the poller (the thread that own the client socket).

Let’s see some code:

  public class Client : IDisposable
  {
    private readonly NetMQContext m_context;
    private readonly string m_address;
    private Poller m_poller;
    private NetMQScheduler m_scheduler;
    private NetMQSocket m_clientSocket;

    public Client(NetMQContext context, string address)
    {
      m_context = context;
      m_address = address;
    }

    public void Start()
    {
      m_poller = new Poller();

      m_clientSocket = m_context.CreateDealerSocket();

      m_clientSocket.Bind(m_address);

      m_scheduler = new NetMQScheduler(m_context, m_poller);

      Task.Factory.StartNew(m_poller.Start, TaskCreationOptions.LongRunning);
    }

    public void SendMessage(byte[] message)
    {
      // instead of creating inproc socket which listen to messages 
      // and then send to the server we just creating task and run a code on
      // the poller thread which the the thread of the m_clientSocket
      Task task = new Task(() => m_clientSocket.Send(message));
      task.Start(m_scheduler);

      // this is optional, we can also return the task which make the method async, 
      // can be used with async keyword in C# 5.0
      task.Wait();
    }

    public void Dispose()
    {
      m_scheduler.Dispose();
      m_clientSocket.Dispose();

      m_poller.Stop();
    }
  }

Another example can be in case of a publisher from the server side, multiple threads in the server want to publish messages and you only have one publisher socket which publish messages to the client, of course you can use inproc subscriber that listen to messages and publish to clients but it easier and you don’t need to develop inproc protocol.

It can also make life easier to retrieve information hold by the NetMQSocket thread, let’s say we have Server class which hold all the clients that connected so far and we want to retrieve that information from another thread, we can develop inproc protocol that retrieve the information or we can just use NetMQScheduler and possibilities are endless.

public IEnumerable<string> GetClientList()  
{
  Task<IEnumerable<string>> task = new Task<IEnumerable<string>>(() => 
    new List<string>(m_clients));
  task.Wait();

  return task.Result;      
}

To summarize the NetMQScheduler can make it easier to threads of NetMQ sockets to communicate with other threads and bring NetMQ to the world of Task Library.

Securing NetMQ

Inspired by the coming ZMTP v3.0 and CurveZMQ I started to develop security  layer for NetMQ as well.

As CurveZMQ the library is currently working over NetMQ sockets, in the future with ZMTP v3.0 it will probably be part of the library.

NetMQ Secure Channel is based on TLS 1.2 and DTLS 1.2, after spending a lot of time with both RFC (TLS & DTLS) I’m happy to say that the library covered most of the the features.

Before diving into some code I need to say that the library is not yet ready for production use.

Client

using (var socket = context.CreateDealerSocket())  
{
  socket.Connect("tcp://127.0.0.1:5556");

  SecureChannel secureChannel = new SecureChannel(ConnectionEnd.Client);

  // we are not using signed certificate so we need to validate 
  // the certificate of the server, by default the secure channel 
  // is checking that the source of the 
  // certitiface is a root certificate authority
  secureChannel.SetVerifyCertificate(c =&gt;  true);

  IList outgoingMessages = new List();

  // call the process message with null as the incoming message 
  // because the client is initiating the connection
  secureChannel.ProcessMessage(null, outgoingMessages);

  // the process message method fill the outgoing messages list with 
  // messages to send over the socket
  foreach (NetMQMessage outgoingMessage in outgoingMessages)
  {
    socket.SendMessage(outgoingMessage);
  }
  outgoingMessages.Clear();

  // waiting for a message from the server
  NetMQMessage incomingMessage= socket.ReceiveMessage();

  // calling ProcessMessage until ProcessMessage return true 
  // and the SecureChannel is ready to encrypt and decrypt messages
  while (!secureChannel.ProcessMessage(incomingMessage, outgoingMessages))
  {
    foreach (NetMQMessage outgoingMessage in outgoingMessages)
    {
      socket.SendMessage(outgoingMessage);
    }
    outgoingMessages.Clear();

    incomingMessage = socket.ReceiveMessage();  
  }

  foreach (NetMQMessage outgoingMessage in outgoingMessages)
  {
    socket.SendMessage(outgoingMessage);
  }
  outgoingMessages.Clear();

  // you can now use the secure channel to encrypt messages
  NetMQMessage plainMessage = new NetMQMessage();
  plainMessage.Append("Hello");

  // encrypting the message and sending it over the socket
  socket.SendMessage(secureChannel.EncryptApplicationMessage(plainMessage));
}

Server

// we are using dealer here, but we can use router as well, we just have to manager
// SecureChannel for each identity
using (var socket = context.CreateDealerSocket())  
{
  socket.Bind("tcp://*:5556");

  SecureChannel secureChannel = new SecureChannel(ConnectionEnd.Server);

  // we need to set X509Certificate with a private key for the server
  X509Certificate2 certificate = new X509Certificate2("NetMQ.Testing.pfx", "1");
  secureChannel.Certificate = certificate;

  IList outgoingMessages = new List();

  // waiting for message from client
  NetMQMessage incomingMessage = socket.ReceiveMessage();

  // calling ProcessMessage until ProcessMessage return true 
  // and the SecureChannel is ready to encrypt and decrypt messages
  while (!secureChannel.ProcessMessage(incomingMessage, outgoingMessages))
  {
    foreach (NetMQMessage outgoingMessage in outgoingMessages)
    {
      socket.SendMessage(outgoingMessage);
    }
    outgoingMessages.Clear();

    incomingMessage = socket.ReceiveMessage();
  }
  foreach (NetMQMessage outgoingMessage in outgoingMessages)
  {
    socket.SendMessage(outgoingMessage);
  }
  outgoingMessages.Clear();

  // this message is now encrypted
  NetMQMessage cipherMessage = socket.ReceiveMessage();

  // decrypting the message
  NetMQMessage plainMessage = secureChannel.DecryptApplicationMessage(cipherMessage);
  Console.WriteLine(plainMessage.First.ConvertToString());
}

Code

Both client and server should call ProcessMessage until the method return true.

First call to client ProcessMessage should be with null as the incomingMessage because client is the one initiating the connection.

Calling ProcessMessage fill the outgoingMessages list with messages that need to be send to the other peer. Make sure to send those.

Summary

The code is not yet merge into NetMQ, you can find the source code at my github.

Learning from past mistakes the SecureChannel protocol is versioned, so any breaking changes will happen in new a version.
Server will be able to support multiple versions of clients.

Some notes about the library:

  • Only RSA, AES (128/256) and SHA (1/256) combination exist.
  • Renegotiation is not supported inside the library, however it’s very simple to implement over the library.
  • Alert layer from TLS is missing completely from the layer, might be implement in future versions.
  • Only block ciphers are supported, compression is not supported.
  • Client cannot authenticate with a certificate.
  • You can send multipart messages.
  • The messages doesn’t have to be ordered (like DTLS)
  • Make sure to catch and handle NetMQSecurityException.

Array Length in C#

As you know in C# you can query the length of the array with Length property, but it’s not so obvious, in c++ for example you allocate the array to the size you want but the length is unknown after the allocation.

My question is, how in C# the language can tell you the length of the array, or to ask this question in another way, where does C# save the array length, my first guess was that the first item in an array is the array length, I decide to go and search for it. First I tried to search in the first item:

  static void Main(string[] args)
  {
    int[] array = new int[6];

    array[0] = 0;
    array[1] = 1;
    array[2] = 2;
    array[3] = 3;
    array[4] = 4;

    var handle = GCHandle.Alloc(array, GCHandleType.Pinned);

    IntPtr address = handle.AddrOfPinnedObject();

    int value = Marshal.ReadInt32(address);
    Console.WriteLine(value);

    address += 4;
    value = Marshal.ReadInt32(address);
    Console.WriteLine(value);

    address += 4;
    value = Marshal.ReadInt32(address);
    Console.WriteLine(value);

    address += 4;
    value = Marshal.ReadInt32(address);
    Console.WriteLine(value);

    address += 4;
    value = Marshal.ReadInt32(address);
    Console.WriteLine(value);

    handle.Free();

    Console.ReadLine();
  }

And the output:

3

So you can see my guess was wrong, the first item is actually the first item.

Then I tried the 7 item (last + 1), which make no sense because it’s has to be the first otherwise the language won’t know where to search for it.

Finally I tried to search before the first item which actually prove right:

  static void Main(string[] args)
  {
    int[] array = new int[6];

    array[0] = 0;
    array[1] = 1;
    array[2] = 2;
    array[3] = 3;
    array[4] = 4;

    var handle = GCHandle.Alloc(array, GCHandleType.Pinned);

    IntPtr address = handle.AddrOfPinnedObject();

    int value = Marshal.ReadInt32(address-4);
    Console.WriteLine(value);

    value = Marshal.ReadInt32(address);
    Console.WriteLine(value);

    address += 4;
    value = Marshal.ReadInt32(address);
    Console.WriteLine(value);

    address += 4;
    value = Marshal.ReadInt32(address);
    Console.WriteLine(value);

    address += 4;
    value = Marshal.ReadInt32(address);
    Console.WriteLine(value);

    address += 4;
    value = Marshal.ReadInt32(address);
    Console.WriteLine(value);

    handle.Free();

    Console.ReadLine();
  }

And the output was:
4

To summarize C# save the length of the array one item before the first item of the array (or 4 bytes before the beginning of the array).
Which actually make the size of the array numofitems * sizeofitem + 4 (which is size of integer holding the array size).

Structures in C# & .Net

You are probably not sure why I’m writing post about structures in the Advanced C# category. I hope that by the end of the post you will be sure, because structures have some interesting attributes in .net that worth understanding, especially when writing high performance and low level code in C#.

Structures are value type

What does that mean? Let’s dive directly into an example:

struct Number  
{
  public int Value;
}

static void Main(string[] args)  
{
  Number n;
  n.Value = 5;

  Number n2 = n;
  Console.WriteLine("Value of n2 after copyied from n: {0}", n2.Value);

  n2.Value = 10;
  Console.WriteLine("Value of n2 after a change: {0}", n2.Value);
  Console.WriteLine("Value of n after the change to n2: {0}", n.Value);
}

Before showing you the result, let’s talk about some interesting points in this code, I didn’t initialize the n variable and didn’t get null reference exception, why is that? because when you create a structure variable it’s created on the stack, it’s not a pointer to memory on the heap. Now let’s take a look at the result:

2

Strange? doing the same with classes would show 10 for both n and n2, but with structures n keep the original value. Why? because it’s value type, it’s not like a class that when you are doing assignment you just copy the reference, you copy the entire structure. n and n2 are two different location on memory, when the assignment happen it just copied all the memory from n to n2, when we changed n2 the n didn’t change.

Deep Copy

So we know that every time we use assignment with structure we are actually coping memory, this copy thing can be a good for case when you want an object to have deep copy capability, instead of manual copy all the object and fields or using serialization solutions.

The rule for using structure for deep copy is that the structure will only hold primitives and other structures (which hold primitives), string will be copied as well but arrays won't be deep copeied.

Some example can be the memento pattern, which is a pattern when an object can go back to previous state, you can hold all the state of the object (which is a class) in a structure and when you start editing copy the structure to another field inside the class, when you want to rollback just copy the structure back to the main field.

Serialization

I think is the fastest way to serialize an object in .net, what I’m I talking about? because structure is a block of memory, you can copy this block of memory into a byte array, no serialization and no other stuff, just copy into a byte array and your are ready to save to a file, send with socket or write to a stream. How you do this? following is a short example:

int size = Marshal.SizeOf(n);  
byte[] bytes = new byte[size];

var handle = GCHandle.Alloc(n, GCHandleType.Pinned);  
IntPtr ptr = handle.AddrOfPinnedObject();

Marshal.Copy(ptr, bytes, 0, size);

handle.Free();  

What happened here? We got the memory location of the structure with line 4 & 5 (this trick is usually used with interop) then copied the memory to the byte array and released the pointer and handle.

NetMQ Lesson #1 – Basics

So you probably already know that NetMQ is port of ZeroMQ (also spelled ØMQ, 0MQ or ZMQ) to .Net.

If you are not familiar with ZeroMQ or NetMQ please read my previous post about introducing NetMQ.

Transport

In NetMQ you can choose from different transport, in this post I will focus on TCP.

  • TCP – communication over the network.
  • INPROC – in process communication, or inter-thread communication.
  • PGM – reliable multicast implemented with Microsoft implementation of PGM.
  • IPC – inter process communication, on NetMQ it’s exactly the same as TCP.

Patterns

As with transports NetMQ comes with different communication patterns, in this post I will cover request-response pattern.

  • Request/Response – client send request and server answer with response.
  • Pub/Sub – client subscribe for messages and server distribute message to all subscribed clients.
  • Dealer/Router – in simple words its client-server communication, dealer is the client and the router is the server, more about this in the future.
  • Push/Pull – one peer push messages and one or more workers peek those messages, good to build a pipeline.
  • Pair – one to one connection, for use with INPROC transport.
  • Dealer/Dealer, Router/Router – more advanced patterns for some cases.

Multi-threading

When you are entering the world of high performance systems (like trading platforms) you soon find out that everything you learned about multi-threading is wrong.

When you are writing high performance systems you don’t use locks, not reader writer lock, not mutex, not monitor and not any other type of lock. If you are writing very good code you even not using .net concurrent collection (except blocking collection, which is one pattern you can use) or interlocked. Now there is very easy way to do it, just don’t share data between threads, but this is another topic worth posting about.

The reason I’m telling you this is that NetMQ (and ZeroMQ) is coming from that world, you are not sharing the socket (which I explain later what is) between threads, the socket is belonging to the thread that create it. If you want to pass data to another thread you can use NetMQ with INPROC as transport (or blocking collection or ring buffer or any other inter-thread communication library). The topic of multi-threading is covered very good in the ZeroMQ guide.

Socket like API

So the basic of NetMQ is the socket object, no matter how many clients you have connected you have one socket, and not matter to how may servers you are connected to you have one socket. When you want to send a message you just call the Send method and when you want to receive a message you just call the Receive message.

Code

So let’s start with simple request response client server example (you need to add a reference to NetMQ, you can find the library on nuget):

  class Program
  {
    static void Main(string[] args)
    {
      using (NetMQContext context = NetMQContext.Create())
      {
        Task serverTask = Task.Factory.StartNew(() => Server(context));
        Task clientTask = Task.Factory.StartNew(() => Client(context));
        Task.WaitAll(serverTask, clientTask);
      }
    }

    static void Server(NetMQContext context)
    {
      using (NetMQSocket serverSocket = context.CreateResponseSocket())
      {
        serverSocket.Bind("tcp://*:5555");

        while (true)
        {
          string message = serverSocket.ReceiveString();

          Console.WriteLine("Receive message {0}", message);

          serverSocket.Send("World");

          if (message == "exit")
          {
            break;
          }
        }
      }
    }

    static void Client(NetMQContext context)
    {
      using (NetMQSocket clientSocket = context.CreateRequestSocket())
      {
        clientSocket.Connect("tcp://127.0.0.1:5555");

        while (true)
        {
          Console.WriteLine("Please enter your message:");
          string message = Console.ReadLine();
          clientSocket.Send(message);

          string answer = clientSocket.ReceiveString();

          Console.WriteLine("Answer from server: {0}", answer);

          if (message == "exit")
          {
            break;
          }
        }
      }
    }
  }

Remember what I told you about not sharing anything between threads? So you are actually allowed to share the NetMQContext between threads. It’s the only NetMQ object that you allowed to share between threads.

So as you can see using NetMQ is pretty easy. One important thing to note, string is not the only thing you can send, NetMQ is actually about delivering binary data, the Send method that receives string is just another overload of the receive method, more usually you will pass byte array and do the object serialization your self or with another library.

You can create different application for the client and for the server and launch multiple clients, in the request response pattern you don’t have to specify which client you sending the response to, the socket just know. In more advanced scenario (dealer/router pattern) you can specify to which client you are sending the message.

Introducing NetMQ - .net port of ZeroMQ

Update: this post was written more than a year ago and since then the NetMQ has become the default ZeroMQ choice for .Net developers. Also the project has a growing community with more than 4,000 downloads on nuget and it is a production ready library.

NetMQ is C# port of ZeroMQ.

ZeroMQ

ZeroMQ  is a high-performance asynchronous messaging library aimed at use in scalable distributed or concurrent applications. It provides a message queue, but unlike message-oriented middleware, a ZeroMQ system can run without a dedicated message broker. The library is designed to have a familiar socket-style API.

If you are not familiar with ZeroMQ, (also spelled ØMQ, 0MQ or ZMQ) you should be, becuse besides being a very fast messaging library it’s also changes the way you think about programming. If you really want to learn about ZeroMQ, make sure to read Pieter Hintjens book “Code Connected”. The ebook version is free and you can get it here or you can also read it on the ZeroMQ website.

This book will change the way you think about distributed enterprise systems and about programming all together. It did for me and I read it 3 6 times already, front to back and I will probably read it again.

How it all started

So the NetMQ project started as a weekend project (I have plenty of those, but most of them stay on my computer).

The original reason was that the library was missing some features that I really needed in order to push ZeroMQ at my company. But not being familiar with the library and C++ not being my native programming language I decided to port it to C#.

Lucky for me, somebody already did part of the job by porting ZeroMQ to Java. With java being more close to C# then C++, I took the java project as my base project. And 30 hours later and after a lot of coffee and no sleep ZeroMQ was ported to C#. It took me around 4 more month to do the extra 20% and make it ready for use.

Why NetMQ?

So if you want to use ZeroMQ or already are using ZeroMQ and you are developing with .Net, I would suggest that your try out NetMQ. You might be asking yourself, “why should I use NetMQ and not ZeroMQ with the C# binding?” I have multiple answers for this question:

  • Running unmanaged code inside a managed application can do some nasty stuff like memory leak and weird no access errors.
  • It is easier to debug with native c# code and you can download the code and just debug your system.
  • You can contribute. If you are using .Net, then C#  is probably better than your C++, and NetMQ gives you the opportunity to give something back to the community and get your name on the contributors list.
  • Update: As of 2014 C# binding (CLRZMQ) is no longer maintained and NetMQ is the default choice for ZeroMQ and .Net.
  • Update: As of 2014 NetMQ is a stable project with a growing community and is in production use by multiple companies.

NetMQ is being maintained by a community. You can visit our website at github, and download the nuget package.