Life of a Codeflow–Quick look at Codeflow state machine

Last time I promised to write a bit about implementing ICodeflowTracker interface. So here we go…

At its heart, a Codeflow revolves around a finite state machine which looks like this on paper (click on the image to view larger version).

image

Codeflows runtime, Codeflow instance itself and your ICodeflowTracker works hand-in-hand to drive this state machine.

CodeflowApplication is your access point to Codeflows runtime. When you construct a CodeflowApplication instance, you are required to provide an implementation of ICodeflowTracker.

When a Codeflow instance is first created it should be in Runnable state. You can call one of CodeflowApplication.Run() method overloads to begin the execution of a Codeflow. First thing Codeflows runtime does is invoking ICodeflowTracker.PrepareAction function.

 

This function receives codeflowId, name of the action that runtime intends to execute, arguments for that action and current state of variables used by Codeflow.

Implementers of this method can read the current state of Codeflow from their persistence store and ensure that the action specified by name parameter is runnable. If it is, it should change Codeflow instance’s status to Running, increment the the dispatch attempts count for that action and store a snapshot of arguments are variables.

Finally the function should return the current runnable action name, number of dispatch attempts seen by ICodeflowTracker and Codeflow status via an instance of CodeflowState class. For example, our CodeflowTracker that uses SQL server as its backing store has the following PrepareAction method.

 

As you can see it first serializes arguments and variables to JSON for storage. If the next action stored in the database matches the action Codeflows runtime is intending to run and if the current Codeflow state is Runnable, it updates the status to Running, modified time to current time and increments the dispatch attempts count. Finally it uses that data to create CodeflowState instance returned to the runtime.

When Codeflows runtime receives the control back, it checks if the action method in CodeflowState is as same the one that it intended to run and Codeflow state is Running. If both of these conditions are met, it continues to execute the action in the Codeflow instance.

The action being executed can tell Codeflow runtime what to do next by  returning different kinds of messages. Let’s look at each type of message currently supported by Codeflows runtime.

Jump

Transfers the control to a another action. When this result is returned, before calling PrepareAction method to prepare the new Action, Codeflows runtime invokes ICodeflowTracker.Complete method.

 

Implementations of ICodeflowTracker can use this method to persist the result of the current action, store next action name, arguments for next action and new state of variables used by Codeflow. Finally it must also change the Codeflow status back to the status indicated by status parameter. In case of a Jump this status will be Runnable.

Paused

This is similar to Jump. You can transfer the control to a different action via Paused message. However, runtime will stop executing the Codeflow until you explicitly execute it again using a CodeflowApplication.Run method overload. When this message is received, runtime invokes ICodeflowTracker.CompleteAction method with status Paused.

Paused message is used in scenarios where you need to pause the execution of a Codeflow until an external even occurs. For example, in an online payments processing scenario, you may  want to pause the Codeflow after VerifyPayment action until payment gateway calls you back so that you can resume from ProcessPaymentVerification action.

Delayed Jump

Similar to Paused message. But this message can be use to pause the Codeflow for a period of time. When this message is received Codeflows runtime invokes ICodeflowTracker.CompleteAction method with status Sleeping. (We are currently adding support APIs to Codeflows to resume these sleeping Codeflows when their timers expire. I will cover this scenario in detail in another post.)

End

This message indicates that Codeflow is successfully completed its execution. Runtime will invoke ICodeflowTracker.End method.

In addition to these messages, Codeflows runtime uses two internal messages to handle two special cases.

First, when there’s an unhandled exception in the Codeflow action being executed, ICodeflowTracker.Suspend method is invoked with the exception details. Implementations of this method should update Codeflows status to Suspended in this method.

Second special case is termination. Codeflows runtime marks a Codeflow as terminated when there are programming errors in Codeflows. For example, if it returns a Jump message with an invalid target action name. Terminated Codeflows are considered as non-recoverable whilst suspended Codeflows can be recovered. When a Codeflow is terminated, runtime notifies it to ICodeflowTracker via ICodeflowTracker.Terminate method

Well… that’s pretty much all you need to know implement ICodeflowTracker.

Applied Codeflows

I revealed Codeflows couple of weeks ago. Since then I had tremendous feedback and I managed to address most of them and also publish the latest bits to nuget.

image

In this post I’m going talk a little bit more about Codeflows and show you how it is used in a real world application.

First to the obvious question… why Codeflows?

Most applications have requirements to get number of things done in response to various application events. Among many other things, sending emails, image processing, accessing external services such as geo-location, social network services, payment gateways are quite common examples. When these complex requirements arise, they also come with even more complex requirements such as keeping track of the status of these interactions, being resilient to failures, compensation etc. Codeflows is a simple library that helps you to program these units of work, execute and track them in a natural way. Codeflows act as a coordinator for units of work interacting with each other by passing messages.

In addition to that the library intends to provide fist class support for testability, composition and writing software with scalability in mind.

Enough details. Let’s take a look at an MVC web application that we converted to use Codeflows. When a user registers in this application, it generates an image representing a QR code and sends out an registration confirmation email. Before migrating to Codeflows, code for these steps was in the MVC controller action that handled user registration (see below).

 

This approach has two downsides.

1. Since both image generation and emailing happened synchronously, overall user experience during registration was slow.

2. If something goes wrong during one of these steps there wasn’t an easier way to know where it failed and recover and resume the execution without adding extra code.

We are going to improve this with Codeflows in three steps. First, we create a Codeflow to implement the units of work we have. Then we are going to create a simple WCF service to host our Codeflow. Finally we are going to modify above action method to invoke the WCF service to kick off the Codeflow and immediately return to the user.

Creating the Codeflow is the fun part. Completed Codeflow looks like this.

 

Start action in our Codeflow simply jumps to GenerateQrTag action by returning a Jump (most of you might be wondering if you could use a LINQ expression to represent your jump target rather than using string. Don’t worry, we are working on this feature as I write this post). GenerateQrTag action performs the image processing work required to generate the QR tag and jumps to SendActivationEmail.

As you can see, SendActivationEmail is slightly different to GenerateQrTag method as it returns a System.Threading.Task<Result> instead of Result. Reason being, in this action we use SmtpClient to send an email. However, we don’t want the thread invoking this action to block until SmtpClient completes the underlying IO operation. We can indicate the Codeflow runtime to use that thread to perform other things (or return it to the threadpool if there’s nothing to be done) while pending IO completes by returning Task<Result> instead of Result.

SendActivationEmail action also has a parameter of a special type – IActionContext. IActionContext gives you the ability to access runtime values such as DispatchAttemptsCount. In SendActivationEmail, we use this value to retry the emailing operation if it fails first time.

Once we’ve got that Codeflow ready, we can create a WCF service with the following contract to host this Codeflow.

 

There are couple of interesting things about this WCF operation contract. It’s marked as IsOneWay because we don’t want MVC application to wait until Codeflow is completely executed. We want it to return to the user immediately after we captured enough information to complete the registration process. Also it uses asynchronous signature instead of the synchronous one. This is because our Codeflow is asynchronous. There’s no point in making our SendActivationEmail action async if WCF dispatcher thread has to block until Codeflow finishes.

Implementation of this service looks as follows

 

Begin method creates an instance of CodeflowApplication by passing an instance of CodeflowTracker (we will talk about this shortly) and runs an instance of ConsumerRegistrationCompletion Codeflow. Finally it returns an instance of CodeflowResultBoundAsyncResult which is an IAsyncResult implementation (see below) that can wait until Codeflow runtime finishes one execution cycle.

 

If all of this is bit overwhelming, don’t worry. You don’t have to do this if your Codeflow doesn’t have an async action. Simply invoke the WorkflowApplication.Run method from in your WCF operation instead.

Finally we can modify our controller action to the following.

 

In the modified action, we save user details to the database and then we also create an entry in the database to represent the Codeflow for completing that consumer registration process. We do that in a transaction so that we can ensure the integrity of that data.

After committing the transaction, we notify the WCF service to complete the execution of the newly created Codeflow and return to the user.

So we created a testable, composable and scalable Codeflow and integrated that with our MVC web app. But where’s the tracking feature?

Remember the CodeflowTracker instance we passed into CodeflowApplication constructor? Codeflows runtime uses an ICodeflowTracker interface to notify a tracking service the state changes (transitions) happen during the execution of a Codeflow.

Our application has CodeflowTracker which implements ICodeflowTracker interface to persist tracking information to application database.

If you read my previous post, you might be wondering what happened to the built-in SQL tracker we talked about. Based on the feedback from some initial users, we realized that shipping Codeflows with separate database or schema makes things harder when you want to use Codeflows. For instance, if we have a separate database, applications wanting to write to app database and tracking database may want to use distributed transactions. Also, our tracking schema may not be what you are after. You may want to store more information than what we think is important. So we decided to pass the implementation of ICodeflowTracker to you.

I will cover ICodeflowTracker implementation and underlying state machine management machinery in Codeflows in a separate post. Following figures depict how tracking information is persisted in the database.

image

image

 

Once we persist tracking data like this, it’s easier to build functionality to recover from failures. I will cover those scenarios in a future post.

Alright, enough code-snippets and screenshots. I think you get the idea. Don’t you? Have a play around the version available via nuget. Also the source code for this library is available in Codeplex. If you are interested in hacking some code get in touch with me via buddhike.desilva{AT}geeksdiary{DOT}com.

Unveiling Codeflows

Codeflows is a free, open source and lightweight .NET class library for coordinating the execution of units of work. Using Codeflows is easy. First you have to write a class that inherits Codeflows.Codeflow class. In that you can include list of actions (methods) that you want Codeflows to manage (Listing 1). Action methods have to be public and they should return an instance of Codeflows.Result type. In addition to that, the first method you want to run (i.e. entry point) must be called “Start”.

Listing 1 Simple Codeflow to handle typical password recovery process

Then you can use the primitives in Codeflows library to control the flow of your actions. Start action in our RecoverPassword Codeflow verifies the existence of the user account with the given email address and jumps to SendPasswordRecoveryEmail action using Jump method. Using Jump method, you can also specify the arguments to the target action as an anonymous object.

Listing 2 Jumping to another action with arguments

When we return Jump result from our Start action, Codeflow runtime takes care of transferring the control to SendPasswordRecoveryEmail action. You can also pause the execution of the Codeflow until an external event occurs by calling Pause method. SendPasswordRecoveryEmail action uses this technique to pause the Codeflow until user receives the email and clicks on the embedded password recovery link. When calling Pause method, you can specify the next action to be invoked when Codeflow is resumed.

Listing 3 Pausing the Codeflow until an external event occurs

To notify that Codeflow has finished its execution, you can use End method as shown in Listing 4.

Listing 4 Ending a Codeflow

Sometimes the life of a Codeflow is not as blissful as the scenario we just walked through. For example, our Codeflow may have small jitters when it tries to send out emails. You can make your Codeflows resilient to these errors using standard, structured exception handling primitives and control flow primitives in Codeflows. For instance, we can change our SendNotification action to retry the action in case of an SmtpException as follows.

Listing 5 Using Retry method with try/catch clauses

Furthermore, you may not want to retry forever if there’s an SmtpException. In that case, you could add a parameter called dispatchCount to limit the number of times you retry (Listing 6).

Listing 6 Using dispatchCount to limit the number of retries

When Codeflow runtime sees that you have an integer argument called dispatchCount, it automatically invokes it with the correct dispatch count maintained in the runtime.

Once you finish your Codeflow, you would need to run it. Wouldn’t you? Running the Codeflow is also as easy as the following snippet.

Listing 7 Running a Codeflow

In addition to Run method, CodeflowApplication also has Resume method to resume the execution of paused Codeflows (Listing 8).

Listing 8 Resuming a Codeflow

You can create and run CodeflowApplications virtually from any application. Although, the most common scenario would be running ( or resuming) them from operations in a WCF service.

CodeflowApplication class can be initialized with a Func<ITracker>. You can either use built-in SqlTracker or have your own implementation. Responsibility of ITracker is tracking the Codeflow transitions notified by Codeflow runtime. This way, if something goes wrong in the host process, you can re-run your Codeflows from the last known good execution point. I will leave the details of how this state transition happens under the covers for another blog post.

Codeflow is still in a very early incubation stage. However, if this sounds something interesting, poke around the library on Codeplex and let me hear your thoughts.

Coreinfo – Simple utility to view host processor cache configuration

Today I wrote this little tool to view host processor cache configuration to help me with my experiments. Thought it would be nice to put it out here in case someone else may find it useful too.

Screenshot:

image

Plugging IoThreadScheduler to System.Thread.Tasks

I’m currently looking at UMS API in Windows and slightly got distracted with new Task APIs in .NET 4. One thing I immediately noticed was the extensibility. In fact, I could write a really simple task scheduler that uses IO threads in CLR thread pool via IoThreadScheduler I discussed in my previous post.

Then you can simply direct the runtime to use it in your parallel methods via options.

Also, I spent some time writing a lock free queue for my IoThreadScheduler. However, it looks like (from what I understand via Reflector) the new System.Collections.Concurrent.ConcurrentQueue is already lock free!. There are few other interesting concurrent implementations in that namespace.

ThreadPool.UnsafeQueueNativeOverlapped

Few years ago, I did an academic exercise to understand why ThreadPool.UnsafeQueueNativeOverlapped generally gives better throughput than ThreadPool.QueueUserWorkItem. At that time, I was digging into WCF and was just curious to know why it was the preferred approach for WCF. Last year, I and my team built an application for running scheduled work in the background. It is a vanilla Windows service with the ability to plugin different types of handlers to handle different types of work. Handling high work loads and running arbitrary work handlers were our main goals. As I was writing dispatcher core for this toy, I decided to use IoThreadScheduler I had up in my sleeve.
Those who have reflected over System.ServiceModel assembly may find this name quite familiar. In fact the first implementation I had was pretty much identical to what’s in System.ServiceModel apart from the fact that it had eight priority levels (which we never ended up using anyway… clip_image001) instead of two.
Over time, I encountered some interesting scenarios introduced by some work handlers. The more I encountered these subtle cases, more I needed to observe the internal behavior of CLR thread pool. Unfortunately, it does not seem to have any performance counters and setting up the ETW trace for CLRs ETW trace support turned out to be a bit challenging after .NET 4.
So I built a small tool with custom performance counters to stress test the two scheduling mechanisms and measure the throughput. Based on the observations, I decided to write-up this post so that I could probably get some insights from the community out there poking into this area.
Before I start though, I must say, I couldn’t guarantee the accuracy of this information. This is based on different experiments I did, observations made in the kernel debugger, information I collected from several different resources (see resources section at the end of this blog) and briefrandom discussions I had with engineering folks in Windows and CLR teams.

A Quick Tour on Windows IO

When I first started looking at this several of years ago, I found my self scratching my head a few times as I had no idea of how IO completion ports play a role in scheduling. In my attempt to learn it, I figured that, exploring some Windows APIs is probably the best thing helped me to see how it all fits together.
In Windows, all IO operations are performed on virtual files. This abstraction gives a consistent interface to all IO devices attached to the system. Applications perform IO operations using file management functions. As a .NET developer, you don’t invoke these functions directly. But APIs provided with the framework (e.g. System.IO.File.Open()) do it for you.
Usually doing IO involves mixing a few API calls in file management functions. But there are three basic things we always do regardless. First, we start interacting with a device, by calling CreateFile function. Upon a successful call to that function, we get a handle to the file/device that can be used with ReadFile and WriteFile to perform IO. Finally, when we are done, we close the device/file by calling CloseHandle function.
All of these APIs result in calls to Windows kernel’s special part for managing IO – “IO Manager”.
IO manager in Windows is the interface between the system and devices attached to it. It knows how to interact with the device via device driver stack. Communication between devices and IO manager are always asynchronous. However, IO manager decides whether to return to the caller or not based on FILE_FLAG_OVERLAPPED flag passed into CreateFile function.
If it’s a synchronous call for IO, then thread blocks in the kernel (@ IO manager) until it receives the result of IO operation. During this time, thread goes to a wait state and will no longer be scheduled by the kernel thread scheduler.
If it’s an asynchronous IO call on the other hand, IO manager returns to the caller immediately after acknowledging the requested operation. This is where IOCP comes to the picture.

IOCP

IOCP is basically a queue in the kernel represented by _KQUEUE structure (listing 1).

typedef struct _KQUEUE

  DISPATCHER_HEADER Header;
    LIST_ENTRY EntryListHead;
  ULONG CurrentCount;
  ULONG MaximumCount;
  LIST_ENTRY ThreadListHead;
} KQUEUE, *PKQUEUE;

Listing 1– _KQUEUE structure used for IOCP

You can create an IO completion port and associate the file handles with it. After that, asynchronous file IO calls to that handle will return immediately. When IO completes, IO manager, notifies IOCP about the completed IO by enqueuing an item to the queue. This item is known as queued completion status (Figure 1).
clip_image002
Figure 1 – IO manager queues the IO completion status to IOCP.
That sounds nice. But the obvious question is how to notify the thread originated IO that it has completed? Naive way to do this is; when the thread originating async IO reaches the point where it can no longer do any work without the result of the IO operation, it can use GetQueuedCompletionStatus function to see if IO has returned. If it hasn’t then the thread will block again in the kernel at IOCP. This simple model works when there’s only one thread in the process making asynchronous calls one at a time. But that’s not the case for majority of programs using async IO. Difficulties arise because completion of IO operations can overlap and also, IOCP is a shared queue. In fact can be bound to multiple file handles. Therefore when we call GetQueuedCompletionStatus from the thread initiating IO, we are not guaranteed to get the completion status for IO operation we initiated. We cannot just let go someone else’s completion status either. More generalized approach would be to have a some threads waiting on IOCP to resume work after IO operation is completed.
clip_image003
Figure 2 – Multiple threads waiting on IOCP to pickup completion status
If we put the model depicted in Figure 2 to do a list of things performing async IO, we could see that Thread A could do little bit of a thing and when it performs an async IO operation it picks up another thing and continue. When IO completes, one of the threads waiting on IOCP will complete the remainder of the thing. Looks more reasonable. Doesn’t it?
What if we had three IO operations being completed and threads B, C and D are released to finish those things? We will no longer have any more threads waiting on IOCP until one of those threads finish and return to wait. As you may see, we are now stepping in to the thread pooling territory. Luckily this functionality is already baked in to both native and managed thread pools in Windows.
In fact native and managed thread pools, own an IOCP. You can associate a file handle with this IOCP by calling BindIoCompletionCallback (or ThreadPool.BindHandle() for managed thread pool) function. Thread pool’s thread injection and retirement algorithm makes sure there’s always a thread waiting on IOCP to pickup the completed IO.

Self-balancing nature in IOCP

So we briefly looked at how IOCP work and how thread pool makes it easier to work with async IO by taking care of thread injection and retirement. Before we relate this to work scheduling there’s one more thing we need to understand which makes it a compelling approach. When we create an IOCP by calling CreateIoCompletionPort we have the option to specify the maximum number of active threads. This controls how many threads are released concurrently when IOCP receives queued completion status. Obviously, if we are concerned about the maximum throughput from CPU, this should be the number of processors we have. Once a thread is released via IOCP, it’s active threads count is increased by one. Thread being released may be doing a CPU bound operation, in which case IOCP is correct about increasing the active threads count by one. However, thread could also perform a blocking system call (e.g. Synchronous IO operation or wait on a synchronization primitive such as a Mutex).
Good thing about the latter scenario is, kernel scheduler can see this happening because IOCP is a kernel queue, scheduler has fist class visibility to it. The trick used by kernel becomes quite obvious if we look at _KTHRED kernel structure used for managing threads (Listing 2).

typedef struct _KTHREAD
{
  DISPATCHER_HEADER Header;
  UINT64 CycleTime;
  ULONG HighCycleTime;
  UINT64 QuantumTarget;
  PVOID InitialStack;
  PVOID StackLimit; 
  PVOID KernelStack;
  ULONG ThreadLock;
  PKQUEUE Queue;
  ULONG WaitTime;
} KTHREAD, *PKTHREAD;

Listing 2 – _KTHREAD structure used to represent threads (fields are removed for brevity)

When a thread is released via IOCP, Queue field in _KTHREAD structure representing the thread has a pointer to IOCP it was released through. Scheduler knows when a thread goes to a wait state. At that point, it looks at the IOCP field of the thread to see if it was release through one and decrements the active threads count of the associated IOCP. This mechanism ensures that CPU time is given to another thread waiting in the the queue while the other one is blocked on the system call.
One caveat of this implementation is, when the first thread returns from the blocking call, it does not check the active thread count of its IOCP prior to becoming ready again. Under normal circumstances, this may cause the active number of threads to be slightly higher than the maximum number of active threads specified during CreateIoCompletionPort.

Exploiting the natural balancing power

When developing applications similar to our work item scheduler in .NET, we should be able use CLR thread pool threads to execute individual work items. That is using ThreadPool.QueueUserWorkItem function. In fact CLR thread pool has a very sophisticated thread injection and retirement algorithm to produce the best throughput. However, it is also compelling to see if the kernel mode scheduling optimizations we saw before could make any difference if we manage to do our work on thread pool threads released via IOCP (let’s call them IO threads from now on. But be aware that term IO thread means something completely different under the context of native Window thread pool). Specially when the type of work we do perform do insignificant amount of IO and CPU bound work (IMHO this is how majority of work we do behave). But wait, didn’t we say IOCP is for asynchronous IO? It is indeed. So how can we use those IO threads to perform any other work we have? Answer lies in PostQueuedCompletionStatus function. This function lets us manually post a queued completion status to an IOCP. So if we call this API with the handle to the IOCP owned by thread pool, we can release one of the IO threads to do something we like. As a matter of fact, ThreadPool.UnsafeQueueNativeOverlapped function is built for that purpose.

Putting it altogether

If you are still reading, you must be pretty anxious to see some code. Let’s dig in.
When we call ThreadPool.QueueUserWorkItem with a work item (which is a callback function) it goes into an internal queue in the thread pool. One of worker threads waiting on this queue can execute it afterwards. Because IOCP in thread pool is designed to do asynchronous IO work, we don’t have a way to get an IO thread to pick up the work items from thread pool’s work queue. So we have to come up with a way to steal an IO thread and use it to execute work in our own queue. IoThreadSchduler abstracts this functionality.
To steal IO threads we invoke ThreadPool.UnsafeQueueNativeOverlapped. This function accepts a NativeOverlapped reference as a parameter. We can use Overlapped.UnsafePack method to create a NativeOverlapped and during this process we could specify the callback function invoked by the IO thread being released. We could easily abstract this functionality in a small helper class as listed below.

unsafe sealed class QueuedCompletionStatus
{
  NativeOverlapped* _lpOverlapped;
  Action _schedulerCallback;
  public QueuedCompletionStatus(Action schedulerCallback)
  {
    _lpOverlapped = new Overlapped().UnsafePack(OnIoCompletion, null);
    _schedulerCallback = schedulerCallback;
  }
  public void Post()
  {
    ThreadPool.UnsafeQueueNativeOverlapped(_lpOverlapped);
  }
  void OnIoCompletion(uint high, uint low, NativeOverlapped* lpOverlapped)
  {
     _schedulerCallback();
  }
}

Listing 3 – QueuedCompletionStatus helper


If you were reading the code carefully, you may have noticed that we never free NativeOverlapped we created. This could lead us to a potential memory leak. However, in our IoThreadScheduler, we have only one static instance of QueuedCompletionStatus because we always have the same callback to execute every time an IO thread is released (Listing 4).

static QueuedCompletionStatus _queuedCompletionStatus = new QueuedCompletionStatus(IoCompletionCallback);

Listing 4 – IoThreadScheduler has a static instance of QueuedCompletionStatus.

Another reason for having single static instance is creating a NativeOverlapped is an expensive operation. Therefore, Overlapped instances are cached internally. This cache lookup is a linear lookup which may result in poor performance if we have too many moving in and out.
IoThreadScheduler’s main entry point is a public function called ScheduleCallback. I’ve tried to keep this signature consistent with ThreadPool.QueueUserWorkItem because then swapping between IO/worker threads would be easier. All this method does is, queuing the incoming work to an internal queue and posting the queued completion status to steal an IO thread to process the queue. However, we do that only if there’s isn’t pending request do that.

[SecurityCritical]
public static bool ScheduleCallback(WaitCallback callback, object state)
{
  var workItem = new WorkItem(callback, state);
  bool shouldPostQueuedCompletionStatus = false;
  try
  {
  }
  finally
  {
    _workItems.Enqueue(workItem);
    if (Interlocked.CompareExchange(ref _awaitingForIoCompletion, 1, 0) == 0)
    _queuedCompletionStatus.Post();
  }
  return true;
}

Once an IO thread is released it invokes the registered callback – OnIoCompletion in IoThreadScheduler. This method turns off the flag indicating that we are waiting for an IO thread release and invokes the ProcessQueue method.

[SecurityCritical]
static void IoCompletionCallback()
{
  Interlocked.CompareExchange(ref _awaitingForIoCompletion, 0, 1);
  ProcessQueue();
}

ProcessQueue method dequeues the next work item from the queue and sends a request to release another IO thread to process the remainder of the queue. Finally it starts executing the dequeued work item. Once the work item finishes, it tries to process the next available work item. If the queue is empty at that point in time, thread is returned to the pool.

[SecurityCritical]
static void ProcessQueue()
{
  while (true)
  {
    WorkItem workItem = null;
    try
    {
    }
    finally
    { 
      if (_workItems.TryDequeue(out workItem)) 
      {
        if (Interlocked.CompareExchange(
ref _awaitingForIoCompletion, 1, 0) == 0)
_queuedCompletionStatus.Post();
      }
    }
  if (workItem != null)
    workItem.Callback(workItem.State);
  else
    return;
  }
}

clip_image004
Figure 6 Structure of IoThreadScheduler

Those three methods and the lock free queue used to store work items sums up IoThreadScheduler.

Loading it up

Now that we have a mechanism to execute work on IO threads, it’s time to compare the throughput results of this approach to the throughput of worker threads in thread pool.
As I said at the beginning, I didn’t manage to get ETW tracing to get stats out from CLR thread pool. So I had to look for an alternative. Essentially, to measure the throughput, we need to measure the length of our work queue and number of work items being completed per second. Therefore I created a simple class listed below to wrap the calls to ThreadPool.QueueUserWorkItem and IoThreadSchduler.ScheduleCallback functions.

public class ThroughputMeasuringThreadPool
{
  class Frame
  {
    public Frame(WaitCallback callback, object state)
    {
      if (callback == null) throw new ArgumentNullException("callback");
      Callback = callback;
      State = state;
    }
    public WaitCallback Callback { get; private set; }
    public object State { get; private set; }
  }
  PerformanceCounter _numberOfOperationsPerSecond = new PerformanceCounter(ThreadPoolThroughputCounters.CategoryName,ThreadPoolThroughputCounters.NumberOfOperationsPerSecondCounter, false);
  PerformanceCounter _queueLengthCounter = new PerformanceCounter(ThreadPoolThroughputCounters.CategoryName, ThreadPoolThroughputCounters.QueueLengthCounter, false);
  bool _useIoThreads;
  public ThroughputMeasuringThreadPool(bool useIoThreads)
  {
    _useIoThreads = useIoThreads;
    IoThreadScheduler.ScheduleCallback(s => { }, null);
  }
  public bool ScheduleWorkItem(WaitCallback callback, object state)
  {
    _queueLengthCounter.Increment();
    if (_useIoThreads)
      return IoThreadScheduler.ScheduleCallback(CallbackThunk, new Frame(callback, state));
    return ThreadPool.QueueUserWorkItem(CallbackThunk, new Frame(callback, state));
  }
  void CallbackThunk(object state)
  {
    _queueLengthCounter.Decrement();
    var frame = (Frame)state;
    frame.Callback(frame.State);
    _numberOfOperationsPerSecond.Increment();
  }
}

ThroughputMeasuringThreadPool has two performance counters – QueueLengthCounter and NumberOfOperationsPerSecondCounter. It increments the QueueLengthCounter every time a new work item is scheduled. After that it thunks the callback so that it can decrement the QueueLengthCounter when a thread picks up the work item and increment the NumberOfOperationsPerSecondCounter when the actual work is being completed. Some results captured with our custom counters are as follows. Green line represents the queue length and blue line represents number of operations completed per second. Dashed red line represents the CPU usage of the test process.

Calculating 10th Fibonacci number and blocking 100ms on a system call (Thread.Sleep(100))

Worker Threads

clip_image005

IO Threads

clip_image006
As you can see, IO threads struggles a bit at the very beginning but continues to produce more throughput than worker threads as the load grows.

Calculating 10th Fibonacci number and blocking 100ms on a system call (SQL server network)

This test is identical to the previous one. However, instead of artificially creating a sys call delay by using Thread.Sleep, I made a call to a SQL server stored procedure over the network (stored procedure is written to block for certain amount of time).

Worker Threads

clip_image007

IO Threads

clip_image008
Although IO threads still gives better throughput, CPU usage stays at 100% on a flat line for a long time. I’m yet to discover whether this is due to the actual work being done or algorithm used my lock free queue (which I’m going to discuss in the next post).

Calculating 10th Fibonacci number and blocking 2s on a system call (SQL server network)

This was an interesting one. In fact this is the pattern that lead me to investigate the throughput of the two different mechanisms. The graphs seem to indicate that IO threads produce a better throughput. But only for a short while. As the load grows, we begin to notice lots of fluctuations in the throughput line. Throughput on worker threads on the other hand seems to be steadier than the IO threads. It seems to end up with the same behavior as the IO threads towards the end, but at least it looks better for the most part of the graph.

Worker Threads

clip_image009

IO Threads

clip_image010
Under some heavy loads, we encountered this pattern in our work item scheduler. And more interestingly our process died with OutOfMemoryExceptions time to time (when it shouldn’t of course). Our crash dumps didn’t indicate any kind of resource leak. However, it did indicate that we have a very large number of threads. In fact if we enable the number of threads and private byte counters on these graphs it becomes more obvious.

Worker Threads

clip_image011

IO Threads

clip_image012
As you can see when using worker threads, thread count steps up in a controlled style. Very much like a stairwell. However, in IO threads case, it constantly climbs up. The more threads we have more memory we consume. This behavior is introduced by how CLR thread pool’s thread injection and retirement algorithm works for the two kinds of threads. For worker threads it uses a sophisticated algorithm that works out the number of threads based on the input and current resource consumption. For IO threads, after a certain threshold is reached, it creates a new thread for each 500ms (this must be obviously more complicated than this, but that’s basically it). Also, IO threads returning from a blocking system calls don’t go back and wait on IOCP before they run. In our tests, we do some long running IO and do an CPU intensive operation. So when our threads block on IO, IOCP will release more threads to do work which will eventually block on IO. And when they all come alive, again we have the chaotic situation where all the threads strive for CPU.
From IoThreadScheduler’s perspective, we could implement a smart algorithm which controls how many IO threads are stolen based on the queue length. But that’s the not the purpose of it. Worker threads already do that for us and we may better off using them.
A better way to solve this would be doing lengthy IO work asynchronously (e.g. calling Command.BeginExecuteNonQuery instead of Command.ExecuteNonQuery in this case) . This way we release the threads while we are awaiting for IO. When they return from IO, they will be executed on an IO thread anyway helping IOCP to maintain the number of active threads.


Food for thought
If my observations are correct, I think you would have similar throughput issues if you do long running (as in 2s) synchronous IO from your WCF services.
Switching to asynchronous IO is undoubtedly the way to go. I think this should be the case regardless of the type of thread pool threads you use. However, it might be challenging to sprinkle the async APIs in large code bases. This is where new C# compiler goodness async/wait could come in handy.
Another interesting thing to look at would be whether we can make a thread returning from IO can be directed back to IOCP before making it ready. This way if we already have the max number of active threads executing, we can defer the execution of the thread just returned until one of the active threads releases CPU. User mode scheduling APIs introduced to 64bit Windows 7/2008 might be the way to do this.
Last but not least, I’ve made my IoThreadSchduler source code available for public. It’s however important to understand that this is not a recommendation to use this approach over ThreadPool.QueueUserWorkItem. You should still use ThreadPool.QueueUserWorkItem and new Task APIs when possible. If you think you have a very specific scenario where you think IoThreadSchduler would be beneficial, give it a shot. Let me know if you decided to keep it or throw it away.

Resources

http://technet.microsoft.com/en-us/sysinternals/bb963901
http://www.amazon.com/Concurrent-Programming-Windows-Joe-Duffy/dp/032143482X
http://msdn.microsoft.com/en-us/magazine/ff960958.aspx

Who said you can’t return a Queue from a Web Service?

You are right, I wasn’t serious. Well… although you should not do that, there’s nothing that would prevent you/or your fellow developer doing that because

is serializable.

What could happen next? How is Queue<T> represented in the auto generated schema? More importantly, would your clients be able to see it as Queue<T> or as a more generalized collection? What if it’s a Java (JAX-WS) client?

Our new whitepaper is published on MSDN covering these scenarios – Data type interoperability between .NET and Java. In fact, MSDN released just announced a series of whitepapers at .NET framework developer center.

Let us hear your thoughts, interoperability pains and scenarios.