1

I am trying to understand Tasks in C# but still having some problems. I am trying to create an application containing video. The main purpose is to read the video from a file (I am using Emgu.CV) and send it via TCP/IP for process in a board and then back in a stream (real-time) way. Firstly, I did it in serial. So, reading a Bitmap, sending-receiving from board, and plotting. But reading the bitmaps and plotting them takes too much time. I would like to have a Transmit, Receive FIFO Buffers that save the video frames, and a different task that does the job of sending receiving each frame. So I would like to do it in parallel. I thought I should create 3 Tasks:

        tasks.Add(Task.Run(() => Video_load(video_path)));
        tasks.Add(Task.Run(() => Video_Send_Recv(video_path)));
        tasks.Add(Task.Run(() => VideoDisp_hw(32)));

Which I would like to run "parallel". What type of object should I use? A concurrent queue? BufferBlock? or just a list?

Thanks for the advices! I would like to ask something. I am trying to create a simple console program with 2 TPL blocks. 1 Block would be Transform block (taking a message i.e. "start" ) and loading data to a List and another block would be ActionBlock (just reading the data from the list and printing them). Here is the code below:

namespace TPL_Dataflow
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Hello World!");
            Random randn = new Random();

            var loadData = new TransformBlock<string, List<int>>(async sample_string =>
           {
               List<int> input_data = new List<int>();
               int cnt = 0;

                if (sample_string == "start")
                {
                   Console.WriteLine("Inside loadData");
                   while (cnt < 16)
                   {
                       input_data.Add(randn.Next(1, 255));
                       await Task.Delay(1500);
                       Console.WriteLine("Cnt");
                       cnt++;
                   }
                                    }
                else
                {
                    Console.WriteLine("Not started yet");

                }
            return input_data;
           });


            var PrintData = new ActionBlock<List<int>>(async input_data =>
            {
                while(input_data.Count > 0)
                {


                    Console.WriteLine("output Data = " + input_data.First());
                    await Task.Delay(1000);
                    input_data.RemoveAt(0);
                    
                }
 

              });

            var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
            loadData.LinkTo(PrintData, input_data => input_data.Count() >0  );
            //loadData.LinkTo(PrintData, linkOptions);
            
            loadData.SendAsync("start");
            loadData.Complete();
            PrintData.Completion.Wait();

        }
    }
}

But it seems to work in serial way.. What am I doing wrong? I tried to do the while loops async. I would like to do the 2 things in parallel. When data available from the List then plotted.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
ThKont
  • 47
  • 4
  • 1
    The TPL Dataflow library in its [overview](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library) refers to an application that processes image data from a web camera as its archetypal use case. So you may want to give it a try. – Theodor Zoulias Jul 09 '21 at 10:46
  • Hello @TheodorZoulias, and thanks for helping! So, as far as I understand it could be one Source Block (Loading Video) , one Propagate Block ( Receive and Transmit the data) and one Target Block (Write the data to the Image output) ? I am trying to understand the TPL Dataflow but still having some problems... Is there any easiest way with simple programming to accomplish the parallelism? Tasks are not helping right? – ThKont Jul 13 '21 at 08:40
  • The idea of the TPL Dataflow is to create a pipeline consisting of blocks (usually `TransformBlock`s), then feed the first block with raw data, and let the data flow through the pipeline, being processed along the way. You can see an example [here](https://stackoverflow.com/questions/62602684/c-sharp-process-files-concurrently-and-asynchronously/62613098#62613098). It is much easier to create a multi-step processor with this library, than by managing tasks manually. It has a learning curve, but not too steep. One-two days of study, and you'll be able to use it successfully. – Theodor Zoulias Jul 13 '21 at 09:49
  • I added an update in the first post – ThKont Jul 14 '21 at 08:59

1 Answers1

0

You could use a TransformManyBlock<string, int> as the producer block, and an ActionBlock<int> as the consumer block. The TransformManyBlock would be instantiated with the constructor that accepts a Func<string, IEnumerable<int>> delegate, and passed an iterator method (the Produce method in the example below) that yields values one by one:

Random random = new Random();

var producer = new TransformManyBlock<string, int>(Produce);

IEnumerable<int> Produce(string message)
{
    if (message == "start")
    {
        int cnt = 0;
        while (cnt < 16)
        {
            int value;
            lock (random) value = random.Next(1, 255);
            Console.WriteLine($"Producing #{value}");
            yield return value;
            Thread.Sleep(1500);
            cnt++;
        }
    }
    else
    {
        yield break;
    }
}

var consumer = new ActionBlock<int>(async value =>
{
    Console.WriteLine($"Received: {value}");
    await Task.Delay(1000);
});

producer.LinkTo(consumer, new() { PropagateCompletion = true });

producer.Post("start");
producer.Complete();
consumer.Completion.Wait();

Unfortunately the producer has to block the worker thread during the idle period between yielding each value (Thread.Sleep(1500);), because the TransformManyBlock currently does not have a constructor that accepts a Func<string, IAsyncEnumerable<int>>. This will be probably fixed in the next release of the TPL Dataflow library. You could track this GitHub issue, to be informed about when this feature will be released.


Alternative solution: Instead of linking explicitly the producer and the consumer, you could keep them unlinked, and send manually the values produced by the producer to the consumer. In this case both blocks would be ActionBlocks:

Random random = new Random();

var consumer = new ActionBlock<int>(async value =>
{
    Console.WriteLine($"Received: {value}");
    await Task.Delay(1000);
});

var producer = new ActionBlock<string>(async message =>
{
    if (message == "start")
    {
        int cnt = 0;
        while (cnt < 16)
        {
            int value;
            lock (random) value = random.Next(1, 255);
            Console.WriteLine($"Producing #{value}");
            var accepted = await consumer.SendAsync(value);
            if (!accepted) break; // The consumer has failed
            await Task.Delay(1500);
            cnt++;
        }
    }
});

PropagateCompletion(producer, consumer);

producer.Post("start");
producer.Complete();
consumer.Completion.Wait();

async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
{
    try { await source.Completion.ConfigureAwait(false); } catch { }
    var ex = source.Completion.IsFaulted ? source.Completion.Exception : null;
    if (ex != null) target.Fault(ex); else target.Complete();
}

The main difficulty with this approach is how to propagate the completion of the producer to the consumer, so that eventually both blocks are completed. Obviously you can't use the new DataflowLinkOptions { PropagateCompletion = true } configuration, since the blocks are not linked explicitly. You also can't Complete manually the consumer, because in this case it would stop prematurely accepting values from the producer. The solution to this problem is the PropagateCompletion method shown in the above example.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Thanks very much for the help!! I tried to load a bitmap instead of code but the Image does not update. Should I use lock or something? Also, after some time the App crashes because of too much memory allocation. Is there any way to have a "flow control" in TransformManyBLock? (Sorry f asking all the time, i am quite new with C# and object orienteted programming...) – ThKont Jul 15 '21 at 07:58
  • Hi @ThKont, I am glad that this was helpful. The TPL Dataflow has a great way to keep the memory under control: the [`BoundedCapacity`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.dataflowblockoptions.boundedcapacity) option. Usually all blocks in a pipeline should be configured with this option, except maybe from the first one. Regarding the non-updating images, if you don't figure out a solution you could consider posting a new question, along with a minimal and reproducible example, so that we can take a look. – Theodor Zoulias Jul 15 '21 at 08:17