NetMQ Scheduler - NetMQ and Task Library (TPL)

.Net 4.0 Task Library (TPL) is cool, it makes it easier to develop concurrency application.

ZeroMQ / NetMQ is very cool as well, but NetMQ and Task Library are not working together at the moment.

NetMQScheduler is Task Library (TPL) scheduler which let you run Tasks on NetMQ socket thread.

Let’s say you develop client-server application using NetMQ, now you want to send a message to the server from multiple threads in the application.
So you create a client class with two Sockets, one connected to the server and one inproc socket which listen to requests from other threads in the application, once message arrive on the inproc socket you immediately send it to the server. You also have a poller which listen to messages from both sockets (or only from the inproc if it one way communication).

Now instead of waiting to messages from the inproc you create a NetMQScheduler which request a NetMQ poller in the constructor, now any tasks that you run on the scheduler will run on the same thread of the poller (the thread that own the client socket).

Let’s see some code:

  public class Client : IDisposable
  {
    private readonly NetMQContext m_context;
    private readonly string m_address;
    private Poller m_poller;
    private NetMQScheduler m_scheduler;
    private NetMQSocket m_clientSocket;

    public Client(NetMQContext context, string address)
    {
      m_context = context;
      m_address = address;
    }

    public void Start()
    {
      m_poller = new Poller();

      m_clientSocket = m_context.CreateDealerSocket();

      m_clientSocket.Bind(m_address);

      m_scheduler = new NetMQScheduler(m_context, m_poller);

      Task.Factory.StartNew(m_poller.Start, TaskCreationOptions.LongRunning);
    }

    public void SendMessage(byte[] message)
    {
      // instead of creating inproc socket which listen to messages 
      // and then send to the server we just creating task and run a code on
      // the poller thread which the the thread of the m_clientSocket
      Task task = new Task(() => m_clientSocket.Send(message));
      task.Start(m_scheduler);

      // this is optional, we can also return the task which make the method async, 
      // can be used with async keyword in C# 5.0
      task.Wait();
    }

    public void Dispose()
    {
      m_scheduler.Dispose();
      m_clientSocket.Dispose();

      m_poller.Stop();
    }
  }

Another example can be in case of a publisher from the server side, multiple threads in the server want to publish messages and you only have one publisher socket which publish messages to the client, of course you can use inproc subscriber that listen to messages and publish to clients but it easier and you don’t need to develop inproc protocol.

It can also make life easier to retrieve information hold by the NetMQSocket thread, let’s say we have Server class which hold all the clients that connected so far and we want to retrieve that information from another thread, we can develop inproc protocol that retrieve the information or we can just use NetMQScheduler and possibilities are endless.

public IEnumerable<string> GetClientList()  
{
  Task<IEnumerable<string>> task = new Task<IEnumerable<string>>(() => 
    new List<string>(m_clients));
  task.Wait();

  return task.Result;      
}

To summarize the NetMQScheduler can make it easier to threads of NetMQ sockets to communicate with other threads and bring NetMQ to the world of Task Library.

comments powered by Disqus