.Net Async Programming - Part 1

Introduction

When working on high performance systems (thousands of operation per second) very fast you understand that you cannot use locks (to synchronize access to state) in your code and also cannot do any blocking operation (like writing to a file, socket or a database).

The remain question is, How can you write you code without a single lock and without any blocking operation?
In this part I will cover how to write code with zero locks (Blocking operation will be covered in next articles).

Single Thread Programming

I want to take you years back when we only had one processor and this processor had one core and even before threads and multi-threading exist.
If I told that all of your code is running on a single thread? If everything is running on a single thread you don’t need locks, and if everything is running on one thread you actually not using multithreading.

You are probably thinking but one thread won’t be fast enough, well you are wrong (in the next article of the series we are going to cover multi-threads as well).
When you really need more than one thread (I really doubt it) the magic trick is to not have shared state between the threads, each state belong to one thread and only one thread can read and write from it, but more about this in the next articles.

If you are building a system that doesn’t need to support more then 1000 (and actually even much higher numbers) operation per second (I’m assumening that the operation is short and without blocking IO) you should use one thread, of course there are more variables but this is a rule of thumb.

SingleThreadTaskScheduler

So after the long introduction, how do we use only one thread in C# ? Actually everything already exist in C#, we just taking 3 different component and combine them together: BlockingCollection, TaskSchdeuker and Thread, following is a simple implementation:

public class SingleThreadTaskScheduler : TaskScheduler,  
  IDisposable, IOrderedScheduler
{
  private BlockingCollection<Task> m_queue = new BlockingCollection<Task>();
  private ThreadLocal<bool> m_isSchedulerThread = new ThreadLocal<bool>(() => false);
  private Task m_workerTask;
  private bool m_disposed = false;

  public SingleThreadTaskScheduler()
  {
    m_workerTask = Task.Factory.StartNew(() => 
      Worker(), TaskCreationOptions.LongRunning);
  }

  ~SingleThreadTaskScheduler()
  {
    Dispose();
  }

  private void Worker()
  {
    m_isSchedulerThread.Value = true;
    try
    {
      foreach (Task task in m_queue.GetConsumingEnumerable())
      {
        base.TryExecuteTask(task);
      }
    }
    finally
    {
      m_isSchedulerThread.Value = false;
    }
  }

  protected override IEnumerable<Task> GetScheduledTasks()
  {
    return m_queue.ToArray();
  }

  protected override void QueueTask(Task task)
  {
    m_queue.Add(task);
  }

  protected override bool TryExecuteTaskInline(Task task, 
    bool taskWasPreviouslyQueued)
  {
    return m_isSchedulerThread.Value && TryExecuteTask(task);
  }

  public void Dispose()
  {
    if (!m_disposed)
    {
      m_queue.CompleteAdding();
      m_workerTask.Wait();
      m_disposed = true;

      GC.SuppressFinalize(this);
    }
  }
}

Async Programming

Now our application receiving input from somewhere (for the example let’s assume WCF service) our example will be a Brokerage service which provide the client with the current balance, before using the Task Scheduler the code will probably look something like this:

public double GetBalance(int accountId)  
{
  Account account = m_accountRepository.GetAccountById(accountId);

  return account.GetBalance();
}

Now the simplest thing to do is to wrap this code with Task and invoke on our new task scheduler:

public double GetBalance(int accountId)  
{
  Task<double> task =
    new Task<double>(() =>
      {
        Account account = m_accountRepository.GetAccountById(accountId);
        return account.GetBalance();
      });

  task.Start(m_singleThreadTaskScheduler);
  task.Wait();

  return task.Result;
}

But this code is not encapsulated, somebody can call the method without wrapping with task and run it on wrong task scheduler, we can insert the wrapping into the Account class:

class Account  
{
  private readonly SingleThreadTaskScheduler m_taskScheduler;
  private double m_balance = 0;

  public Account(SingleThreadTaskScheduler taskScheduler)
  {
    m_taskScheduler = taskScheduler;
  }

  public double GetBalance()
  {
    Task<double> task = new Task<double>(() => m_balance);
    task.Start(m_taskScheduler);
    task.Wait();

    return task.Result;
  }
}

Now the account class is encapsulated and making sure nobody can access the class state from another thread, the SingleThreadTaskSchduler will be injected to all of our classes or it can also be static (we already said we will have only one thread).
We can improve our Account class a little more and return a task instead of the double, this will come very handy in .Net 4.5 with the await keyword, but more about this in the next articles in the series, for the example we also adding another method to account to show both methods write and read the state.

class Account  
{
  private readonly SingleThreadTaskScheduler m_taskScheduler;
  private double m_balance = 0;

  public Account(SingleThreadTaskScheduler taskScheduler)
  {
    m_taskScheduler = taskScheduler;
  }

  public Task<double> GetBalance()
  {
    Task<double> task = new Task<double>(() => m_balance);
    task.Start(m_taskScheduler);
    task.Wait();

    return task;
  }

  public Task Deposit(double amount)
  {
    Task task = new Task(() =>
      {
        m_balance += amount;
      });

    return task;
  }
}

And for last we can now implement the WCF service with the async pattern model using the task from the Account.GetBalance and our system is completely async:

public IAsyncResult BeginGetBalance(int accountId, AsyncCallback  
  callback, object state)
{
  Account account = m_accountRepository.GetAcountById(accountId);

  Task<double> task = account.GetBalance();

  // using Task extension method from 
  // http://blogs.msdn.com/b/pfxteam/archive/2011/06/27/10179452.aspx
  return task.ToAPM(callback, state);
}

public double EndGetBalance(IAsyncResult asyncResult)  
{
  return ((Task<double>) asyncResult).Result;
}

In .Net 4.5 WCF contracts can have methods that return tasks so the GetBalance method can return the Account.GetBalance directly, like so:

public Task<double> GetBalanceAsync(int accountId)  
{
  Account account = m_accountRepository.GetAccountById(accountId);

  return account.GetBalance();
}

In the next article I will cover calling IO blocking methods without blocking our one and only thread.