Paul Selles

Computers and cats

Category Archives: Task

Task Parallelism: Passing Values into a Tasks

One road block that I stubble upon when starting to work with Task Parallelism is passing values into my Tasks. Here is a quick guide to help get you head around this issue.

 

We can easily create a single task that takes in non-changing values within a function and run it getting expected results:

int i = 0;
int j = 1;
int k = 2;

Task task = Task.Factory.StartNew(() =>
    {
        Console.WriteLine(string.Format("i='{1}'{0}j='{2}'{0}k='{3}'{0}{0}", Environment.NewLine, i, j, k));
    });

task.Wait();
Console.ReadLine();

Results:

i=’0′
j=’1′
k=’2′

 

Once the the values start changing then the Tasks will run against the latest values rather than the original value when the Task was created. The values are not fixed within the scope of the Task itself but rather in the scope of the parent function, so the variables do not preserve the value from when the Task was created:

int i = 0;
int j = 1;
int k = 2;

Task task = Task.Factory.StartNew(() =>
    {
        Console.WriteLine(string.Format("i='{1}'{0}j='{2}'{0}k='{3}'{0}{0}", Environment.NewLine, i, j, k));
    });

i = -1;
j = -1;
k = -1;

task.Wait();
Console.ReadLine();

Results:

i=’2′
j=’2′
k=’2′

i=’2′
j=’2′
k=’2′

i=’2′
j=’2′
k=’2′

i=’2′
j=’2′
k=’2′

i=’2′
j=’2′
k=’2′

i=’2′
j=’2′
k=’2′

i=’2′
j=’2′
k=’2′

i=’2′
j=’2′
k=’2′

What happened here is that all the loops completed before any of the tasks had a chance to run (all the loops incrementing incremented to 2 before failing the for condition).

 

We can get around the problem a couple of ways, first we can reassign the values to an unchanging variable that shares the same scope as our Task. This will ensure that when ours Tasks are created the values that they use will be assigned uniquely for that instance. If we choose this route then our code will look like this:

List<Task> tasks = new List<Task>();

for (int i = 0; i < 2; i++)
{
	for (int j = 0; j < 2; j++)
	{
		for (int k = 0; k < 2; k++)
		{
			int I = i;
			int J = j;
			int K = k;

			tasks.Add(Task.Factory.StartNew(() =>
			{
				Console.WriteLine(string.Format("i='{1}'{0}j='{2}'{0}k='{3}'{0}{0}", Environment.NewLine, I, K, J));
			}));
		}
	}
}

Results:

i=’0′
j=’0′
k=’0′

i=’0′
j=’1′
k=’0′

i=’0′
j=’1′
k=’1′

i=’1′
j=’0′
k=’0′

i=’1′
j=’1′
k=’0′

i=’1′
j=’0′
k=’1′

i=’0′
j=’0′
k=’1′

i=’1′
j=’1′
k=’1′

Though the Tasks may not have completed in order, all the tasks are present and all the values have been accounted for.

 

Finally, my favourite solution, passing all the variables as an Anonymous Type to the Task [1]. Within the Task the object will have to be casted to a dynamic type to access all the Anonymous Type’s members [2]. The reason why I like this solution is that it lends itself well to working with more complicated data types without having to reassign a large number of variables locally. For our above example our solution will look like this:

List<Task> tasks = new List<Task>();

for (int i = 0; i < 2; i++)
{
    for (int j = 0; j < 2; j++)
    {
        for (int k = 0; k < 2; k++)
        {
            tasks.Add(Task.Factory.StartNew((Object obj) =>
            {
                var data = (dynamic)obj;
                Console.WriteLine(string.Format("i='{1}'{0}j='{2}'{0}k='{3}'{0}{0}", Environment.NewLine, data.i, data.k, data.j));
            }, new { i = i, j = j, k = k }));
        }
    }
}

Task.WaitAll(tasks.ToArray());
Console.ReadLine();

And the results will be similar to those above.

 

Paul

 

References

 

[1] Anonymous Types (C# Programming Guide). MSDN Library.

[2] Using Type dynamic (C# Programming Guide). MSDN Library.

Renaming File with Team Foundation Server API: Extending Workspace to Rename a Large Group of Files

The Problem

We have TFS check-in policies that will enforce naming conventions for certain files. The policy will allow the developer to programmatically rename any files that do not match the acceptable naming convention. Behind the scene the renaming is done using the WorkSpace.PendRename method [1]. This policy works fine for one or two files but for a large number of files we start having performance issues. Anyone who has experience renaming multiple files on TFS knows that it is a painfully slow ordeal. So how can we speed it up.

The Solution

I have solved this issue through the use of extending methods in the Microsoft.TeamFoundation.VersionControl.Client.Workspace class, and incorporating Task Parallelism [2][3][4]. Since the original Workspace.PendRename returns the number of files that it has renamed as an Int32, we want to make sure that we preserve that functionality.  I have only created a solution for the Workspace.PendRename(string,string) case since this is the only variation of the method that I am currently using; to expand this to all Workspace.PendRename implementations is trivial.

This the extended method:

public static class WorkSpaceExtension
{
    // Extend PendRename(String, String)
    public static int PendRename(this Workspace workspace,
        string[] oldPaths,
        string[] newPaths)
    {
        // Make sure that the oldPath and new Paths match
        if (oldPaths.Count() != newPaths.Count())
            throw new ArgumentException("Every oldPath must have corresponding newPath");

        // This list will contain all our tasks
        List pendRenameTasks = new List();

        // Loop throuh all newPath and oldPath pairs
        for (int i = 0; i < oldPaths.Count(); i++)
        {
            // Add each new task in out task container
            pendRenameTasks.Add(Task.Factory.StartNew((Object obj) =>
            {
                // We are going to pass the current old path and the
                // current new path into the path as an anonymous type
                var path = (dynamic)obj;
                return workspace.PendRename(path.oldPath, path.newPath);
            }, new { oldPath = oldPaths[i], newPath = newPaths[i] }));
        }

        // Wait for all tasks to complete
        Task.WaitAll(pendRenameTasks.ToArray());

        // Sum up the result of all the original PendRename method
        int taskCount=0;
        foreach (Task pendRenameTask in pendRenameTasks)
        {
            taskCount += pendRenameTask.Result;
        }

        // Return the number of file names changed
        return taskCount;
    }
}

References

[1] Workspace.PendRename Method. MSDN Library.

[2] Extension Methods (C# Programming Guide). MSDN Library.

[3] WorkSpace Class. MSDN Library.

[4] Extension Methods (C# Programming Guide). MSDN Library.

Threading: Task Parallelism and Task Waiting with EventWaitHandle, ManualResetEvent, and AutoResetEvent

Introduction to the Parallel Task Library

Since the introduction of Parallelism with the .NET Framework 4, we’ve had access to new run time libraries that give .NET developers control over multithreading in their projects [1]. Working with with Tasks allows the developer concentrate on performing a specific tasks (for the lack of a better word) rather than concentrating on low level worker threads. For .NET developers, this means working with the Parallel Task Library (PTL) where MSDN identifies a task as being

an asynchronous operation. In some ways, a task resembles a thread or ThreadPool work item, but at a higher level of abstraction [2].

Traditional Task Waiting

Here is a really simple Tasking example where we are going to want to print a list in a certain order to the Console [3]. We want the console output to have a certain order, established by a for loop, and a certain hierarchy (modelled after xml). Here is an example of a function using a asynchronous task with no wait handler:

static void NoWaitHandlerExample(int events=1)
{
    // Open Events
    Console.WriteLine("Start Events");

    for (int i = 0; i 
        {
            // Open Close StartEvent Id=eventId+1
            Console.WriteLine("\t\tStart Task: " + (eventId + 1).ToString());
            Thread.Sleep(_random.Next(500));
            Console.WriteLine("\t\tEnd Task: " + (eventId + 1).ToString());
        });

        // Close Event Id=eventId+1
        Console.WriteLine("\tEnd Event: " + (eventId + 1).ToString());
    }

    // Close Events
    Console.WriteLine("End Events");
}

If we are to call the function with 5 events, here is a sample of the output:

Start Events
        Start Event: 1
        End Event: 1
        Start Event: 2
        End Event: 2
        Start Event: 3
        End Event: 3
        Start Event: 4
        End Event: 4
        Start Event: 5
                Start Task: 1
        End Event: 5
End Events
                Start Task: 3
                Start Task: 4
                Start Task: 2
                Start Task: 5
                End Task: 4
                End Task: 1
                End Task: 2
                End Task: 3
                End Task: 5

As you can see neither the order nor the hierarchy are preserved and most errors are caused by tasks starting and ending prior to the main thread (with the exception of the first task which started before the the main thread finished). To fix this we can implement a task waiting solution as outlined in the TPL [2].

static void TaskWaitExample(int events=1)
{
    // Open Events
    Console.WriteLine("Start Events");


    for (int i = 0; i 
        {
            // Open Close StartEvent Id=eventId+1
            Console.WriteLine("\t\tStart Task: " + (eventId + 1).ToString());
            Thread.Sleep(_random.Next(500));
            Console.WriteLine("\t\tEnd Task: " + (eventId + 1).ToString());
        });

        // *** Wait for task to complete ***
        task.Wait();

        // Close Event Id=eventId+1
        Console.WriteLine("\tEnd Event: " + (eventId + 1).ToString());
    }

    // Close Events
    Console.WriteLine("End Events");

The only difference is highlighted with *** in the comments which from this point on will represent task waiting events. In the code above, we wait for each task to complete before continuing. As for the results:

Start Events
        Start Event: 1
                Start Task: 1
                End Task: 1
        End Event: 1
        Start Event: 2
                Start Task: 2
                End Task: 2
        End Event: 2
        Start Event: 3
                Start Task: 3
                End Task: 3
        End Event: 3
        Start Event: 4
                Start Task: 4
                End Task: 4
        End Event: 4
        Start Event: 5
                Start Task: 5
                End Task: 5
        End Event: 5
End Events

The output preserve both order and hierarchy.

 

Task Waiting Through Event Wait Handling using EventWaitHandle, ManualResetEvent, and AutoResetEvent Classes

A very powerful but sometimes overlooked tool for task wait handling is to use the System.Threading Namespace,Threading Objects EventWaitHandle, ManualResetEvent, and AutoResetEvent [4],[5],[6],[7],[8]. The principle behind these classes is the same, the event object has two basic states signalled or not signalled, for convenience sake I will refer to them as ON or OFF. An OFF state will block the current thread wherever at the WaitOne method. The thread will continue to be blocked until the object calls the Set method, which sets the state to ON. We can then reset the signal back to OFF. The difference between the two EventResetMode (ManualReset and AutoReset) is how the signal is reset [9]. For the AuotReset, the signal is reset to OFF once the thread becomes unblock, this is trigger by calling the Set method and the unblocking WaitOne, if there are not waiting threads, the signal will remain ON until Reset is called. For the ManualReset, the signal remains OFF until the Reset method is called. The MSDN entry of the EventWaitHandle implementation of methods Set, WaitOne, and Reset should provides more details on thread signalling [6],[10],[11],[12]. WaitOne methods come in different flavours that allow for more functionality such as timeouts.

We can rewrite our program to use the ManuaResetEvent Class:

static void ManualResetEventExample(int events=1)
{
    // *** New reset event unsignaled by default
    ManualResetEvent manualResetEvent = new ManualResetEvent(false);

    // Open Events
    Console.WriteLine("Start Events");

    for (int i = 0; i 
        {
            // Open Close StartEvent Id=eventId+1
            Console.WriteLine("\t\tStart Task: " + (eventId + 1).ToString());
            Thread.Sleep(_random.Next(500));
            Console.WriteLine("\t\tEnd Task: " + (eventId + 1).ToString());

            // *** Set: Sets ManualResetEvent to signaled ***
            manualResetEvent.Set();
        });

        // *** WaitOne: Block thread until ManualResetEvent is signalled ***
        manualResetEvent.WaitOne();

        // Close Event Id=eventId+1
        Console.WriteLine("\tEnd Event: " + (eventId + 1).ToString());

        // *** Reset: Set state to not signalled ***
        manualResetEvent.Reset();
    }

    // Close Events
    Console.WriteLine("End Events");
}

and the AutoResetEvent Class:

static void AutoResetEventExample(int events=1)
{
    // *** New reset event unsignaled by default
    AutoResetEvent autoResetEvent = new AutoResetEvent(false);

    // Open Events
    Console.WriteLine("Start Events");

    for (int i = 0; i 
        {
            // Open Close StartEvent Id=eventId+1
            Console.WriteLine("\t\tStart Task: " + (eventId + 1).ToString());
            Thread.Sleep(_random.Next(500));
            Console.WriteLine("\t\tEnd Task: " + (eventId + 1).ToString());

            // *** Set: Sets AutoResetEvent to signalled ***
            autoResetEvent.Set();
        });

        // *** WaitOne: Block thread until AutoResetEvent is signaled and clear signalled state ***
        autoResetEvent.WaitOne();

        // Close Event Id=eventId+1
        Console.WriteLine("\tEnd Event: " + (eventId + 1).ToString());
    }

    // Close Events
    Console.WriteLine("End Events");
}

The EventWaitHandle becomes trivial as we state if the reset is manual or automatic in with the EventResetMode enumerator. We can reuse the code above with the following constructors.

    // *** New reset event unsignaled by default and set for a manual EventResetMode
    EventWaitHandle manualResetEvent = new EventWaitHandle(false, EventResetMode.ManualReset);

    // *** New reset event unsignaled by default and set for an automatic EventResetMode
    EventWaitHandle autoResetEvent = new EventWaitHandle(false, EventResetMode.AutolReset);

For short wait times where performance is a concerned and are not planning on using all the features that come with ManualResetEvent Class, you can use ManualResetEventSlim [13]. ManualResetEventSlim uses busy spinning while waiting for an event to become signalled. After a certain time limit ManualResetEventSlim will revert to work like ManualResetEvent class.

 

Server Example

Now that we are more comfortable with using these wait events we can start working on a real world example. For this example we will be creating a simple TCP server that will accept a connection, read a string, print the string it to the Console, and reverse the string, then finally return it to the caller. The class is an extension of an example provided by the MSDN Library under the NetworkStream.BeginRead Method [14].

Now without further ado, here is our simple server example making full use of the ManualResetEvent class:

public class ManualResetEventTcpServer
{
    // *** Listener wait handler, initial state is Initial State is notsignales (false) 
    private static ManualResetEvent ManualResetEvent = new ManualResetEvent(false);

    // Buffer length
    private int _bufferLength = 2048;

    // Will be access by multiple threads
    private bool _running;

    // Start listener loop
    public void Start()
    {
        _running = true;
        
        TcpListener listener = new TcpListener(IPAddress.Any, 3000);
        listener.Start();

        try
        {
            while (_running)
            {
                // *** Reset to unsignalled
                ManualResetEvent.Reset();

                listener.BeginAcceptTcpClient(
                    ListenerCallback,
                    listener);

                // *** Block until signalled
                ManualResetEvent.WaitOne();
            }
        }
        catch {}
    }

    // Stop listener loop
    private void Stop()
    {
        _running = false;
    }

    // Async BeginAcceptTcpClient listener callback
    private void ListenerCallback(IAsyncResult ar)
    {
        try
        {
            // *** Signal
            ManualResetEvent.Set();

            // Get listener and end async AcceptTcpClient
            TcpListener listener = ar.AsyncState as TcpListener;
            TcpClient client = listener.EndAcceptTcpClient(ar);

            using (NetworkStream stream = client.GetStream())
            {
                // Read message
                string message = ReadMessage(stream);       

                // Write to console
                Console.WriteLine(message);

                // Reverse and return message
                WriteMessage(stream, new string(message.ToArray().Reverse().ToArray()));
            }        
        }
        catch {}
    }

    // Read message
    private string ReadMessage(NetworkStream networkStream)
    {
        // Incoming buffer and message string
        byte[] buffer = new byte[_bufferLength];
        string message = string.Empty;
     
        // Loop through the entire stream in case the message is bigger than our buffer
        while (networkStream.DataAvailable)
        {
            int bytesRead = networkStream.Read(buffer, 0, _bufferLength);
            message += Encoding.Default.GetString(buffer, 0, bytesRead);
        }

        return message;
    }

    // Return message
    private void WriteMessage(NetworkStream networkStream, string message)
    {
        // Incoming buffer and message string
        byte[] buffer = Encoding.UTF8.GetBytes(message);
        networkStream.Write(buffer, 0, buffer.Length);
    }

}

This class can be attached to a Console Application where Start and Stop will start and stop the listener. The server is currently configured to listen on port 3000, find a port that works with you.

Looking though the class above your attention should be focused on both the Start function and the ListenerCallbackFunction, as they are the only two function using our wait handler. Within the listener we are looping over the TcpListener Class asynchronous  method BeginAcceptTcpClient [15],[14]. We want to be able to handle multiple Tcp clients but we don’t want to be spinning in the loop, so before we begin to accept a client we will Reset the wait handler to an OFF state. The asynchronous method BeginAcceptTcpClient is then called and the main thread is blocked at WaitOne. Once our listener picks up a client the callback function will be called and the wait handler will be set to the ON state, removing the block at WaitOne, and the main thread can continue through the while loop.

Of course this is no fun without a client so to mix things up here is our Powershell client:

#Simple Tcp Client
Set-StrictMode -Version Latest

## Message as argurment
$Message = "$Args"
	
## Create message buffer
$BufferOut = [System.Text.Encoding]::UTF8.GetBytes($Message);

## Address and port
$Address = "PAULS-W530-W8.iQmetrixHO.local"
$DnsAddress = [System.Net.Dns]::GetHostAddresses($Address)
$Port = 3000

## Connect Tcp Client
$Client = New-Object System.Net.Sockets.TcpClient
$Client.Connect($DnsAddress,$Port)
	
## Get stream and write message buffer
$Stream = $Client.GetStream()
$Stream.Write($BufferOut, 0, $BufferOut.Length)
$Stream.Flush()

## Setup input buffer
$BufferInLength = 1024
$BufferIn = New-Object byte[] $BufferInLength
$BufferIndex = 0

## Set a read date timeout
$TimeOut = [TimeSpan]"00:00:15"
$StartTime = Get-Date
do {
	if ($Stream.CanRead -and $Stream.DataAvailable) { break }
} while (((Get-Date) - $StartTime) -le $TimeOut)
	
if (!($Stream.CanRead -and $Stream.DataAvailable))
{
	Write-Error "Timeout"
}

## Read data
while ($Stream.DataAvailable)
{
	$BytesRead = $Stream.Read($BufferIn, 0, $BufferInLength)
	$Message = [System.Text.Encoding]::UTF8.GetString($BufferIn, 0, $BytesRead)

	Write-Host $Message -NoNewline
}

Write-Host ""

Paul

 

References

[1]  Parallel Programming in the .NET Framework. MSDN Library

[2] Task Parallelism (Task Parallel Library). MSDN Library

[3] Console Class. MSDN Library

[4] System.Threading Namespace. MSDN Library

[5] EventWaitHandle, AutoResetEvent, CountdownEvent, ManualResetEvent. MSDN Library

[6] EventWaitHandle Class. MSDN Library

[7] ManualResetEvent Class. MSDN Library

[8] AutoResetEvent Class. MSDN Library

[9] EventResetMode Enumeration. MSDN Library

[10] EventWaitHandle.Set. MSDN Library

[11] EventWaitHandle.WaitOne. MSDN Library

[12] EventWaitHandle.Reset. MSDN Library

[13] ManualResetEventSlim Class. MSDN Library

[14] NetworkStream.BeginRead Method. MSDN Library

[15] TcpListener Class. MSDN Library

%d bloggers like this: