4

I have a program as follows

 class Program
    {
        public static int TaskCount { get; set; }
        public static BlockingCollection<string> queue = new BlockingCollection<string>(new ConcurrentQueue<string>());
        static void Main(string[] args)
        {
            TaskCount = 3;
            Task.Factory.StartNew(() => Producer());

            for (int i = 0; i < TaskCount; i++)
                Task.Factory.StartNew(() => Consumer());
            Console.ReadKey();
        }

        private static void Producer()
        {
            using (StreamWriter sw = File.AppendText(@"C:\pcadder.txt"))
            {
                for (int i = 0; i < 15; i++)
                {
                    queue.Add("Item: " + (i+1).ToString());
                    var message = string.Format("{2}.Item added: Item {0} at {1}", (i+1).ToString(), DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss.ffffff"),i+1);
                    Console.WriteLine(message);
                    sw.WriteLine(message);

                }
                queue.CompleteAdding();
            }
        }
        private static void Consumer()
        {
            int count = 1;
            foreach (var item in queue.GetConsumingEnumerable())
            {
                var message = string.Format("{3}.Item taken: {0} at {1} by thread {2}.", item, DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss.ffffff"),
                        Thread.CurrentThread.ManagedThreadId,count);
                Console.WriteLine(message);

                using (StreamWriter sw = File.AppendText(@"C:\pctaker.txt"))
                    sw.WriteLine(message);
                count += 1;
            }
        }
    }

The Output

1.Item added: Item 1 at 2017.07.06 09:58:49.784734
2.Item added: Item 2 at 2017.07.06 09:58:49.784734
3.Item added: Item 3 at 2017.07.06 09:58:49.784734
4.Item added: Item 4 at 2017.07.06 09:58:49.784734
5.Item added: Item 5 at 2017.07.06 09:58:49.784734
6.Item added: Item 6 at 2017.07.06 09:58:49.784734
7.Item added: Item 7 at 2017.07.06 09:58:49.784734
8.Item added: Item 8 at 2017.07.06 09:58:49.784734
9.Item added: Item 9 at 2017.07.06 09:58:49.784734
10.Item added: Item 10 at 2017.07.06 09:58:49.784734
11.Item added: Item 11 at 2017.07.06 09:58:49.784734
12.Item added: Item 12 at 2017.07.06 09:58:49.784734
13.Item added: Item 13 at 2017.07.06 09:58:49.784734
14.Item added: Item 14 at 2017.07.06 09:58:49.784734
15.Item added: Item 15 at 2017.07.06 09:58:49.784734

1.Item taken: Item: 3 at 2017.07.06 09:58:49.784734 by thread 7.
1.Item taken: Item: 2 at 2017.07.06 09:58:49.784734 by thread 4.
1.Item taken: Item: 1 at 2017.07.06 09:58:49.784734 by thread 5.
2.Item taken: Item: 5 at 2017.07.06 09:58:49.784734 by thread 4.
2.Item taken: Item: 4 at 2017.07.06 09:58:49.784734 by thread 7.
2.Item taken: Item: 6 at 2017.07.06 09:58:49.784734 by thread 5.
3.Item taken: Item: 7 at 2017.07.06 09:58:49.784734 by thread 4.
3.Item taken: Item: 8 at 2017.07.06 09:58:49.784734 by thread 7.
3.Item taken: Item: 9 at 2017.07.06 09:58:49.784734 by thread 5.
4.Item taken: Item: 11 at 2017.07.06 09:58:49.784734 by thread 7.
4.Item taken: Item: 12 at 2017.07.06 09:58:49.784734 by thread 5.
5.Item taken: Item: 13 at 2017.07.06 09:58:49.784734 by thread 7.
5.Item taken: Item: 14 at 2017.07.06 09:58:49.784734 by thread 5.
6.Item taken: Item: 15 at 2017.07.06 09:58:49.784734 by thread 7.

After almost every running the program, I have one item missing in consumer logs.(Here, Item 10 is missing). I could't catch why this is happening.

  1. How does not this item processed?
  2. While using multiple tasks as consumer, is processing items in order (FIFO) being spoiled? If I want to keep/force processing in FIFO order inside the consumer method, should I avoid using multiple tasks? (Processing may include I/O,Networking operations)
ibubi
  • 2,469
  • 3
  • 29
  • 50

2 Answers2

6

Here

using (StreamWriter sw = File.AppendText(@"C:\pctaker.txt"))
    sw.WriteLine(message);

You rapidly write to the same file from multiple threads. It's not a good idea and this code actually throws an exception. It goes unnoticed in your code because you don't handle any exceptions and it happens in background thread so does not crash your application. That answers why you have missing items in your log. You can write to the same file like that for example:

// create it outside `Consumer` and make synchronized
using (var taker = TextWriter.Synchronized(File.AppendText(@"pctaker.txt"))) {
    TaskCount = 3;
    Task.Factory.StartNew(() => Producer());
    //Producer();
    for (int i = 0; i < TaskCount; i++)
        // pass to consumer
        Task.Factory.StartNew(() => Consumer(taker));
    Console.ReadKey();
}

private static void Consumer(TextWriter writer)
{
    int count = 1;
    foreach (var item in queue.GetConsumingEnumerable())
    {
        var message = string.Format("{3}.Item taken: {0} at {1} by thread {2}.", item, DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss.ffffff"),
                Thread.CurrentThread.ManagedThreadId, count);
        Console.WriteLine(message);                                
        writer.WriteLine(message);
        writer.Flush();
        count += 1;
    }
}

Or just put a lock around writing to the file.

As for second question - consumers still pull items in FIFO order, but since you have multiple consumers - order of processing is of course not guaranteed, because all consumers process items in parallel. Consumer A pulls item 1, consumer B pulls item 2 at the same time. Consumer A takes 100ms to process item 1, consumer B takes 10ms to process item 2. In result - item 2 is processed (that is - written to your log) before item 1.

Evk
  • 98,527
  • 8
  • 141
  • 191
1

If you want to ensure that the output from a parallel processing block is in the same order as its input, you will have to mess around with something like a Priority Queue if you want to use BlockingCollection.

However, there is a much easier way if you're prepared to use the more modern DataFlow library (part of the Task Parallel Library).

Here's a sample program. Note that this uses await but that's not necessary to use DataFlow. The item type in the queue is int, but you can specify any type - int just happens to be a simple way to demonstrate.

The important thing to note about this sample program is that the input items are processed in parallel by multiple threads, but the final output is still in the same order as it was input.

If you look out the ouput "Returning X from thread Y" you'll see that X is not always in the same order that it was queued. The worker threads can return the data in non-input order.

But if you look at the output "Outputting X" you'll see that X is in the same order that it was queued (monotonically increasing).

The output queue has ensured that the output is in the correct order.

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApp1
{
    public class Program
    {
        static void Main()
        {
            var inQueue  = new TransformBlock<int, int>(item => process(item), processBlockOptions());
            var outQueue = new ActionBlock<int>(item => output(item), outputBlockOptions());

            inQueue.LinkTo(outQueue, new DataflowLinkOptions {PropagateCompletion = true});

            var task = queueData(inQueue);

            Console.WriteLine("Waiting for task to complete in thread " + Thread.CurrentThread.ManagedThreadId);
            task.Wait();
            Console.WriteLine("Completed.");
        }

        static async Task queueData(TransformBlock<int, int> executor)
        {
            await enqueue(executor);

            Console.WriteLine("Indicating that no more data will be queued.");
            executor.Complete(); // Indicate that no more items will be queued.

            Console.WriteLine("Waiting for queue to empty.");
            await executor.Completion; // Wait for executor queue to empty.
        }

        static async Task enqueue(TransformBlock<int, int> executor)
        {
            for (int i = 0; i < 100; ++i)
            {
                Console.WriteLine("Queuing data " + i);
                int v = i;
                await executor.SendAsync(v); // Queues a method that returns v.
            }
        }

        static int process(int value)  // Procss value by adding 1000 to it.
        {
            Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} is processing item {value}");
            value += 1000;
            Thread.Sleep(150+nextRand());  // Simulate work.
            Console.WriteLine($"Returning {value} from thread {Thread.CurrentThread.ManagedThreadId}");

            return value;
        }

        static void output(int value)
        {
            Console.WriteLine($"Outputting {value}");
        }

        static ExecutionDataflowBlockOptions processBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4,
                BoundedCapacity = 8
            };
        }

        static ExecutionDataflowBlockOptions outputBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1
            };
        }

        static int nextRand()
        {
            lock (rngLock)
            {
                return rng.Next(250);
            }
        }

        static Random rng = new Random();
        static object rngLock = new object();
    }
}

It's interesting to play around with the values of MaxDegreeOfParallelism and BoundedCapacity returned from processBlockOptions().

For example, try it with MaxDegreeOfParallelism 8 and BoundedCapacity = 16.


[EDIT] To answer your question about "How does not this item processed?" - I suspect it's because your output logging is not threadsafe (as per the good answer from Evk)

Matthew Watson
  • 104,400
  • 10
  • 158
  • 276
  • You answered before I ask it on a seperate post after shed some light on the problem I currently have. As I am new to TPL, I need to digset it first, I appreciate this answer. – ibubi Jul 06 '17 at 08:59
  • 1
    @Downvoter: Is there anything wrong with suggesting to use DataFlow (and providing a working example) to solve the OP's second question? I think most people would definitely recommend using DataFlow to solve this kind of problem. – Matthew Watson Jul 06 '17 at 09:25
  • As thinking of the producer/consumer approach, Is TPL approach better than a BlockingCollection approach that I used. I inspected your code and ran it, works as I expected. It is really neat! Have you ever seen a document on comparison for the perofrmance these two approcahes? – ibubi Jul 06 '17 at 10:06
  • @ibubi There's some discussion here: https://stackoverflow.com/questions/21163487/tpl-dataflow-vs-blockingcollection; basically BlockingCollection predates DataFlow which often (but not always) provides a better, higher-level approach. I recommend you read [Stephen Cleary's blog on DataFlow](https://blog.stephencleary.com/2012/09/introduction-to-dataflow-part-1.html) for a better idea. – Matthew Watson Jul 06 '17 at 10:21
  • In my real-world case my program will be running infinite mode. (Polling database continously for any new records for the queue) So, I couldn't build a multiple producer codebase. On Stephen Cleary blog, he handles multiple producer manually by breaking up into multiple queues.(I guess not to break the FIFO order for showing a demo) Is there any way to handle this dynamically without breaking the FIFO order? – ibubi Jul 10 '17 at 08:19