Using NetMQ and ASP.NET

From time to time a question regarding how to use NetMQ in ASP.NET application is popping up in the NetMQ mailing list so I have decided to write a post on the subject.

WebAPI

For the examples in the post I will use WebAPI 2.0, but it should work for other asp.net application types. Also I’m changing the way WebAPI is configured, here is WebApiConfig class:

public static class WebApiConfig  
{
  public static void Register(HttpConfiguration config)
  {
    // Web API configuration and services

    // Web API routes
    config.MapHttpAttributeRoutes();

    config.Routes.MapHttpRoute(
        name: "DefaultApi",
        routeTemplate: "api/{controller}/{action}",
        defaults: new { id = RouteParameter.Optional }
    );
  }
}      

The only difference is using /{action} instead of /{id} and this is because I’m not writing a REST service.

I’m using Autofac as dependency injection for the examples in the post.

Calculator

Through out the post I will use a simple calculator example, following is the code of the calculator server application:

class Program  
{
  static void Main(string[] args)
  {
    using (NetMQContext context = NetMQContext.Create())
    {
      using (var responseSocket = context.CreateResponseSocket())
      {
        responseSocket.Bind("tcp://*:10001");

        while (true)
        {
          var requestMessage = responseSocket.ReceiveMessage();
          string a = requestMessage.Pop().ConvertToString();
          string b = requestMessage.Pop().ConvertToString();

          int aNumber = Convert.ToInt32(a);
          int bNumber = Convert.ToInt32(b);

          string result = (aNumber + bNumber).ToString();

          NetMQMessage responseMessage = new NetMQMessage();
          responseMessage.Append(result);

          responseSocket.SendMessage(responseMessage);
        }
      }
    }
  }
}

Simple Pattern

In the simple pattern each controller will create and connect it’s own socket. The NetMQ context will be created once and will be injected into the controllers.

SimpleController.cs

public class SimpleController : ApiController  
{
  private readonly NetMQContext m_context;
  private string m_serviceAddress;

  public SimpleController(NetMQContext context, string serviceAddress)
  {
    m_context = context;
    m_serviceAddress = serviceAddress;
  }

  [HttpGet]
  public int Calc(int a, int b)
  {
    using (var requestSocket = m_context.CreateRequestSocket())
    {
      requestSocket.Connect(m_serviceAddress);            

      NetMQMessage message = new NetMQMessage();

      // converting to string, not most efficient but will do for our example
      message.Append(a.ToString()); 
      message.Append(b.ToString());

      requestSocket.SendMessage(message);

      var replyMessage = requestSocket.ReceiveMessage();
      string result = replyMessage.Pop().ConvertToString();

      return Convert.ToInt32(result);
    }
  }
}

Global.asax.cs:

public class WebApiApplication : System.Web.HttpApplication  
{
  protected void Application_Start()
  {
    string address = "tcp://127.0.0.1:10001";

    var builder = new ContainerBuilder();

    // Register the NetMQ context
    builder.RegisterInstance(NetMQContext.Create()).SingleInstance();
    builder.RegisterType().WithParameter("serviceAddress", address);

    // Build the container.
    var container = builder.Build();

    // Create the dependency resolver.
    var resolver = new AutofacWebApiDependencyResolver(container);

    // Configure Web API with the dependency resolver.
    GlobalConfiguration.Configuration.DependencyResolver = resolver;

    GlobalConfiguration.Configure(WebApiConfig.Register);
  }
}

The advantage of the simple pattern is that it’s very simple.

However we are creating and connecting a TCP socket on each request, this is not efficient and can take time (Because of TCP and ZMTP handshake process).

We can fix this easily with a device in the middle, and this take us to the next solution.

Simple Device Pattern

Device in ZeroMQ/NetMQ is a component that sits in the middle of zeromq applications and forward messages between them, you can learn more about devices at the zeromq guide.

The simple device bind on a inproc address and connect to the calculator service.

Any request coming from the inproc is forward to the service and responses are routing back to the inproc socket.

We don’t change anything in the SimpleController except injecting the inproc address instead of the service address.

Let’s take a look at the Device class:

public class Device : IDisposable, IStartable  
{
  private readonly NetMQContext m_context;
  private readonly string m_backEndAddress;
  private readonly string m_frontEndAddress;
  private Poller m_poller;
  private RouterSocket m_frontendSocket;
  private DealerSocket m_backendSocket;

  public Device(NetMQContext context, string backEndAddress, 
    string frontEndAddress)
  {
    m_context = context;
    m_backEndAddress = backEndAddress;
    m_frontEndAddress = frontEndAddress;
  }

  public void Start()
  {
    Task.Factory.StartNew(() =>
    {
      m_poller = new Poller();

      using (m_frontendSocket = m_context.CreateRouterSocket())
      {
          using (m_backendSocket = m_context.CreateDealerSocket())
        {
          m_backendSocket.Connect(m_backEndAddress);
          m_frontendSocket.Bind(m_frontEndAddress);

          m_backendSocket.ReceiveReady += OnBackEndReady;
          m_frontendSocket.ReceiveReady += OnFrontEndReady;

          m_poller.AddSocket(m_backendSocket);
          m_poller.AddSocket(m_frontendSocket);

          m_poller.Start();
        }
      }
    }, TaskCreationOptions.LongRunning);
  }

  private void OnFrontEndReady(object sender, NetMQSocketEventArgs e)
  {
    NetMQMessage message = m_frontendSocket.ReceiveMessage();
    m_backendSocket.SendMessage(message);
  }

  private void OnBackEndReady(object sender, NetMQSocketEventArgs e)
  {
    NetMQMessage message = m_backendSocket.ReceiveMessage();
    m_frontendSocket.SendMessage(message);
  }

  public void Dispose()
  {
      m_poller.Stop(true);
  }
}

The device class will start automatically by the Autofac because it is inherited from IStartable.

Let’s take a look at the global.asax.cs file:

public class WebApiApplication : System.Web.HttpApplication  
{
  protected void Application_Start()
  {
    const string serviceAddress = "tcp://127.0.0.1:10001";
    const string inprocAddress = "inproc://broker";        

    var builder = new ContainerBuilder();

    // Register the NetMQ context
    builder.RegisterInstance(NetMQContext.Create()).SingleInstance();
    builder.RegisterType().
      WithParameter("serviceAddress", inprocAddress).
      InstancePerRequest();
    builder.RegisterType().SingleInstance().As().
      WithParameter("backEndAddress", serviceAddress).
      WithParameter("frontEndAddress", inprocAddress);

    // Build the container.
    var container = builder.Build();

    // Create the dependency resolver.
    var resolver = new AutofacWebApiDependencyResolver(container);

    // Configure Web API with the dependency resolver.
    GlobalConfiguration.Configuration.DependencyResolver = resolver;

    GlobalConfiguration.Configure(WebApiConfig.Register);
  }
}

With the simple device only one place connects to the calculator service and in rest of the web application we send the request to the device using inproc transport, also as you see we didn’t have to change anything in the controller code.

The simple device pattern will usually be enough for most asp.net web application, however we still have some problems that the simple device pattern doesn’t solve:

  1. We are still creating and disposing a lot of NetMQ sockets and sockets are an expensive resource.
  2. The controllers code is blocking, we occupy a thread until the response is arrived
  3. Timeout is not handled, what if the response is gone? or the calculator service is down?

For the third problem I suggest reading the Reliable Request-Reply chapter at the zeromq guide.

For a quick fix we can set the ReceiveTimeout of the socket and catch the AgainException, like this:

[HttpGet]
public IHttpActionResult Calc(int a, int b)  
{
  using (var requestSocket = m_context.CreateRequestSocket())
  {
    requestSocket.Options.ReceiveTimeout = TimeSpan.FromSeconds(10);
    requestSocket.Connect(m_serviceAddress);

    NetMQMessage message = new NetMQMessage();
    message.Append(a.ToString()); 
    message.Append(b.ToString());

    requestSocket.SendMessage(message);

    try
    {
      var replyMessage = requestSocket.ReceiveMessage();
      string result = replyMessage.Pop().ConvertToString();

      return Ok(Convert.ToInt32(result));
    }
    catch (AgainException ex)
    {
      return BadRequest();
    }
  }
}

For the blocking problem we need convert our controllers to be asynchronous, imagine we can wrote this:

[HttpGet]
public async Task Calc(int a, int b)  
{
  using (var requestSocket = m_context.CreateRequestSocket())
  {
    requestSocket.Options.ReceiveTimeout = TimeSpan.FromSeconds(10);
    requestSocket.Connect(m_serviceAddress);

    NetMQMessage message = new NetMQMessage();
    message.Append(a.ToString()); 
    message.Append(b.ToString());

    requestSocket.SendMessage(message);

    try
    {
      var replyMessage = await requestSocket.ReceiveMessageAsync();
      string result = replyMessage.Pop().ConvertToString();

      return Ok(Convert.ToInt32(result));
    }
    catch (AgainException ex)
    {
      return BadRequest();
    }
  }
}

But sadly we cannot write this code, yet.

AsyncSocket

In the last pattern in the post I will write an asynchronous wrapper for NetMQ socket in order to be able to use the async/await keywords of .Net 4.5.

So the code here is getting a little complicated, we will have to use TaskCompletionSource and NetMQScheduler, take a look:

public class AsyncSocket : IDisposable  
{
  private readonly NetMQContext m_context;
  private readonly string m_serviceAddress;
  private NetMQScheduler m_scheduler;
  private Poller m_poller;
  private NetMQSocket m_requestSocket;
  private TaskCompletionSource<NetMQMessage> m_taskCompletionSource; 

  public AsyncSocket(NetMQContext context, string address)
  {
    m_context = context;
    m_serviceAddress = address;

    m_requestSocket = context.CreateRequestSocket();
    m_requestSocket.ReceiveReady += OnReceiveReady;
    m_requestSocket.Connect(address);

    m_poller = new Poller(m_requestSocket);
    m_scheduler = new NetMQScheduler(m_context, m_poller);

    Task.Factory.StartNew(() => m_poller.Start(), 
      TaskCreationOptions.LongRunning);
  }

  public Task<NetMQMessage> SendAndReceiveAsync(NetMQMessage message)
  {
    var task = new Task<Task<NetMQMessage>>(() =>
    {
      m_taskCompletionSource = 
        new TaskCompletionSource<NetMQMessage>();

      m_requestSocket.SendMessage(message);

      return m_taskCompletionSource.Task;
    });

    // will start the task on the scheduler which 
    /  the same thread as the Poller thread
    task.Start(m_scheduler);
    return task.Result;
  }

  private void OnReceiveReady(object sender, NetMQSocketEventArgs e)
  {
    NetMQMessage message = m_requestSocket.ReceiveMessage();
    m_taskCompletionSource.SetResult(message);
    m_taskCompletionSource = null;
  }

  public void Dispose()
  {
    m_scheduler.Dispose();
    m_poller.Stop(true);
    m_requestSocket.Dispose();            
  }
}

And the Controller code:

[HttpGet]
public async Task Calc(int a, int b)  
{
  using (var asyncSocket = 
    new AsyncSocket(m_context, m_serviceAddress))
  {
    NetMQMessage message = new NetMQMessage();
    message.Append(a.ToString());
    message.Append(b.ToString());

    var replyMessage = await asyncSocket.SendAndReceiveAsync(message);

    string result = replyMessage.Pop().ConvertToString();

    return Ok(Convert.ToInt32(result));
  }
}

So now we have an async socket with an async controller, but we created a new problem, we now create a background thread for every request coming, how can we fix that?

We can change the AsyncSocket to handle multiple events, the pure zeromq way to write this is complicated and actually not very neat (NetMQ cannot pass objects, we have to pass the TaskCompletionSource between sockets, the only way to that is using there address, which is not very neat).

The way I’m going to implement it will force us to share AsyncSocket between threads, however it will be completely safe and lock-free with the magic of NetMQScheduler.

public class AsyncSocket : IStartable, IDisposable  
{
  private readonly NetMQContext m_context;
  private readonly string m_serviceAddress;
  private NetMQScheduler m_scheduler;
  private Poller m_poller;
  private NetMQSocket m_dealerSocket;
  private Dictionary<int, TaskCompletionSource<NetMQMessage>> m_requests;
  private int m_requestId;

  public AsyncSocket(NetMQContext context, string address)
  {
    m_context = context;
    m_serviceAddress = address;
    m_requests = 
      new Dictionary<int, TaskCompletionSource<NetMQMessage>>();
    m_requestId = 0;       
  }

  public void Start()
  {
    m_dealerSocket = m_context.CreateDealerSocket();
    m_dealerSocket.ReceiveReady += OnReceiveReady;
    m_dealerSocket.Connect(m_serviceAddress);

    m_poller = new Poller(m_dealerSocket);
    m_scheduler = new NetMQScheduler(m_context, m_poller);

    Task.Factory.StartNew(() => m_poller.Start(), 
      TaskCreationOptions.LongRunning);
  }

  public Task<NetMQMessage> SendAndReceiveAsync(NetMQMessage message)
  {
    // duplicate the message because we are not the owner of the message
    NetMQMessage duplicteMessage = new NetMQMessage(message);

    var task = new Task<Task<NetMQMessage>>(() =>
    {
      var taskCompletionSource = 
        new TaskCompletionSource<NetMQMessage>();

      // because we are using a dealer we have to push the delimiter
      duplicteMessage.PushEmptyFrame();

      // sending the request id the request identifier
      duplicteMessage.Push(m_requestId.ToString());

      // add the request to the pending request dictionary
      m_requests.Add(m_requestId, taskCompletionSource);

      // increase the request id for the next request
      m_requestId++;

      m_dealerSocket.SendMessage(duplicteMessage);

      return taskCompletionSource.Task;
    });

    // will start the task on the scheduler which 
    // is the same thread as the Poller thread
    task.Start(m_scheduler);
    return task.Result;
  }

  private void OnReceiveReady(object sender, NetMQSocketEventArgs e)
  {
    NetMQMessage message = m_dealerSocket.ReceiveMessage();

    // pop the request id
    string identity = message.Pop().ConvertToString();

    // pop the delimiter
    message.Pop();

    int requestId = Convert.ToInt32(identity);

    TaskCompletionSource<NetMQMessage> taskCompletionSource;

    // getting the task completion source, if we were also try to 
    // handle timeout the request will be 
    // gone and the response will be dropped
    if (m_requests.TryGetValue(requestId, out taskCompletionSource))
    {
      taskCompletionSource.SetResult(message);
      m_requests.Remove(requestId);
    }                
  }

  public void Dispose()
  {
    m_scheduler.Dispose();
    m_poller.Stop(true);
    m_dealerSocket.Dispose();            
  }  
}

Please note that with new implementation of AsyncSocket we don't need the SimpleDevice anymore and the AsyncSocket can connect directly to the service, the reason is that we know only have one socket connecting to the service.

This how our new controller looks:

public class AsyncController : ApiController  
{
  private AsyncSocket m_asyncSocket;

  public AsyncController(AsyncSocket asyncSocket)
  {
    m_asyncSocket = asyncSocket;
  }

  [HttpGet]
  public async Task Calc(int a, int b)
  {
    NetMQMessage message = new NetMQMessage();
    message.Append(a.ToString()); 
    message.Append(b.ToString());

    var replyMessage = await m_asyncSocket.SendAndReceiveAsync(message);

    string result = replyMessage.Pop().ConvertToString();

    return Ok(Convert.ToInt32(result));
  }
}

And the autofac magic:

protected void Application_Start()  
{
  const string serviceAddress = "tcp://127.0.0.1:10001";
  const string inprocAddress = "inproc://broker";        

  var builder = new ContainerBuilder();

  // Register the NetMQ context
  builder.RegisterInstance(NetMQContext.Create()).
    SingleInstance();
  builder.RegisterType().
    WithParameter("serviceAddress", inprocAddress).
    InstancePerRequest();        
  builder.RegisterType().SingleInstance().
    As().
    AsSelf().
    WithParameter("address", serviceAddress);

  // Build the container.
  var container = builder.Build();

  // Create the dependency resolver.
  var resolver = new AutofacWebApiDependencyResolver(container);

  // Configure Web API with the dependency resolver.
  GlobalConfiguration.Configuration.DependencyResolver = resolver;

  GlobalConfiguration.Configure(WebApiConfig.Register);
}

As you can see I’m handling timeouts in the code, good handling of timeouts and reliability is out of the scope for the this post, however if you already read the zeromq guide you probably know how to handle reliability. An easy fix here would be to also record the request time of each request and use NetMQTimer to remove timed-out request (we can call SetException on the TaskCompletionSource).

Summary

In the post we explore 3 patterns to use NetMQ inside ASP.NET application, the Simple Pattern,
the Simple Device Pattern and the Async Socket Pattern.

Although the AsyncSocket is not pure NetMQ solution and we have to share the object between threads it is my favorite, using the AsyncSocket we can write very fast lock-free ASP.NET controllers without blocking an ASP.NET thread.

What would be a nicer solution? what about writing the entire web server using NetMQ? without ASP.NET at all? would it be nice to write the following code?

using (NetMQContext context = NetMQContext.Create())  
{
  using (var responseSocket = context.CreateResponseSocket())
  {
    responseSocket.Bind("http://localhost:80/api/Calculator/Calc");

    while (true)
    {
      var requestMessage = responseSocket.ReceiveMessage();

      string a = requestMessage.Pop().ConvertToString();
      string b = requestMessage.Pop().ConvertToString();

      int aNumber = Convert.ToInt32(a);
      int bNumber = Convert.ToInt32(b);

      string result = (aNumber + bNumber).ToString();

      NetMQMessage responseMessage = new NetMQMessage();
      responseMessage.Append(result);

      responseSocket.SendMessage(responseMessage);
    }
  }
}

Maybe one day…

comments powered by Disqus