-1

I have a Dataflow pipeline that consists of multiple blocks that are processing heterogeneous documents (XLS, PDF etc). Each type of document is processed by a dedicated TransformBlock. At the end of the pipeline I have an ActionBlock that receives all the processed documents, and uploads them one by one to a web server. My problem is that I can't find a way to satisfy the requirement of uploading the documents in the same order they were initially entered in the pipeline. For example I can't use the EnsureOrdered option to my advantage, because this option configures the behavior of a single block, and not the behavior of multiple blocks that are working in parallel. My requirements are:

  1. Insert the documents in the pipeline in a specific order.
  2. Process each document differently, depending on its type.
  3. The documents of a specific type should be processed sequentially.
  4. Documents of different types can (and should) be processed in parallel.
  5. All documents should be uploaded ASAP after they are processed.
  6. The documents must be uploaded sequentially, and in the same order they were entered in the pipeline.

For example it is required that the document #8 must be uploaded after the document #7, even if it is processed before the document #7.

The fifth requirement means that I can't wait for all documents to be processed, then sort them by index, and finally upload them. The uploading must happen concurrently with the processing.

Here is a minimal example of what I'm trying to do. For simplicity I am not feeding the blocks with instances of the IDocument interface, but with simple integers. The value of each integer represents the order in which it was entered in the pipeline, and the order in which it must be uploaded:

var xlsBlock = new TransformBlock<int, int>(document =>
{
    int duration = 300 + document % 3 * 300;
    Thread.Sleep(duration); // Simulate CPU-bound work
    return document;
});
var pdfBlock = new TransformBlock<int, int>(document =>
{
    int duration = 100 + document % 5 * 200;
    Thread.Sleep(duration); // Simulate CPU-bound work
    return document;
});

var uploader = new ActionBlock<int>(async document =>
{
    Console.WriteLine($"Uploading document #{document}");
    await Task.Delay(500); // Simulate I/O-bound work
});

xlsBlock.LinkTo(uploader);
pdfBlock.LinkTo(uploader);

foreach (var document in Enumerable.Range(1, 10))
{
    if (document % 2 == 0)
        xlsBlock.Post(document);
    else
        pdfBlock.Post(document);
}
xlsBlock.Complete();
pdfBlock.Complete();
_ = Task.WhenAll(xlsBlock.Completion, pdfBlock.Completion)
    .ContinueWith(_ => uploader.Complete());

await uploader.Completion;

The output is:

Uploading document #1
Uploading document #2
Uploading document #3
Uploading document #5
Uploading document #4
Uploading document #7
Uploading document #6
Uploading document #9
Uploading document #8
Uploading document #10

(Try it on Fiddle)

The desirable order is #1, #2, #3, #4, #5, #6, #7, #8, #9, #10.

How can I restore the order of the processed documents, before sending them to the uploader block?

Clarification: Changing drastically the schema of the pipeline, by replacing the multiple specific TransformBlocks with a single generic TransformBlock, is not an option. The ideal scenario would be to intercept a single block between the processors and the uploader, that will restore the order of the documents.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • The normal method is to add a sequence number at the beginning of the transfer so blocks can be reassembled in the correct order. – jdweng Dec 18 '20 at 14:14
  • @jdweng you can assume that the sequence number is already a property of the `document` objects. I am allowed to add a `public long SequenceNumber` property to them, and initialize it properly. The problem is how to reassemble them after they are processed by all these different blocks. – Theodor Zoulias Dec 18 '20 at 14:18
  • The uploader need to add a sequence number in front of each block so when the server receives the block the server can combine in the correct order. You cannot use a number that gets removed during the processing. – jdweng Dec 18 '20 at 14:24
  • @jdweng I can't delegate the order restoration to the web server (if that's what you mean). I must do it in my own program. – Theodor Zoulias Dec 18 '20 at 14:28
  • Then you have to do it before upload and you cannot upload in parallel. – jdweng Dec 18 '20 at 14:34
  • @jdweng processing and uploading in parallel is part of my requirements. I can't change them. If I was allowed to change the requirements, I wouldn't be asking this question right now. – Theodor Zoulias Dec 18 '20 at 14:42
  • You cannot upload in parallel if you need also to combine in order and have no control on recombining. You need to check the requirements especially on the server end of how items are combined. – jdweng Dec 18 '20 at 14:54
  • @jdweng if you think that my requirements are impossible to be satisfied, you could consider posting it as an answer. – Theodor Zoulias Dec 18 '20 at 15:03
  • I said you need to be able to combine in order at server. You said you couldn't. I think you are wrong. You must be able to control the order of recombining if you are sending in parallel. – jdweng Dec 18 '20 at 15:19
  • @jdweng I can't be wrong. If one of us knows better what this specific web api can and can't do, then that person is me. I have privileged information about this subject! – Theodor Zoulias Dec 18 '20 at 15:26
  • Then you cannot upload in parallel. There is no guarantee the the data will get recombined properly. – jdweng Dec 18 '20 at 15:30
  • Btw a [nasty bug](https://github.com/dotnet/runtime/issues/31513 "Data loss during parallelization of BufferBlock") currently exists in Dataflow configurations that follow the pattern one-to-many-to-one. In case the intermediate `TransformBlock`s are configured with `BoundedCapacity` different than `-1`, they should be also configured with `EnsureOrdered = false`, otherwise some messages might be lost. – Theodor Zoulias Nov 24 '22 at 17:38

2 Answers2

2

uploader should add the document into some sorted list of completed documents, and check if the added document is the one that it should be uploaded next. If it should then remove and upload all the documents from the sorted list until there is one missing.

There is also a synchronization problem. Access to this sorted list must be synchronized across threads. But you want all threads to be doing something instead of waiting for other threads to complete their work. So, uploader should work with the list like this:

  • Within sync lock add new document to the list, and release lock
  • In a loop
    • enter the same sync lock again,
    • if upload_in_progress flag is set then do nothing and return.
    • check if document on top of the list should be uploaded,
      • if not then reset upload_in_progress flag, and return.
      • otherwise remove the document from the list,
      • set upload_in_progress flag,
      • release lock,
      • upload the document.

I hope I imagined it right. As you can see it's tricky do make it both safe and efficient. There's surely a way to do it with only one lock in most of the cases, but it wouldn't add too much to efficiency. The upload_in_progress flag is shared between tasks, like the list itself.

Dialecticus
  • 16,400
  • 7
  • 43
  • 103
  • Thanks Dialecticus for the answer! I think that your idea leans towards a possible solution. Do you think that I could use the [`SortedList`](https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.sortedlist-2) class to solve this problem? – Theodor Zoulias Dec 18 '20 at 15:13
  • Indeed. `TKey` is the document ordinal, `TValue` is the document itself. BTW, access to the list (and also uploading itself) must be guarded by some synchronization mechanism, like a `lock`. This is now tricky to implement, but it's also a good new question for Stack Overflow, once you can see and formalize the issue. – Dialecticus Dec 18 '20 at 15:17
  • OK, I'll try to implement your idea using a `SortedList`, and see where it goes. – Theodor Zoulias Dec 18 '20 at 15:25
  • I added a possible solution to the synchronization problem. Good luck. – Dialecticus Dec 18 '20 at 15:36
  • Thanks! Btw the `uploader` uploads the documents sequentially (one at a time), so I think that I can get away with synchronization. But I'll keep that in mind, in case the requirements change in the future. – Theodor Zoulias Dec 18 '20 at 15:38
  • If there is only one uploader working at any time then there is no synchronization problem, and the algorithm above is not needed, but I believe there can be more than one working at times. – Dialecticus Dec 18 '20 at 15:41
1

I managed to implement a dataflow block that can restore the order of my shuffled pipeline, based on Dialecticus idea of a sorted list that contains the processed documents. Instead of a SortedList I ended up using a simple Dictionary, that seems to work just as well.

/// <summary>Creates a dataflow block that restores the order of
/// a shuffled pipeline.</summary>
public static IPropagatorBlock<T, T> CreateRestoreOrderBlock<T>(
    Func<T, long> indexSelector,
    long startingIndex = 0L,
    DataflowBlockOptions options = null)
{
    if (indexSelector == null) throw new ArgumentNullException(nameof(indexSelector));
    var executionOptions = new ExecutionDataflowBlockOptions();
    if (options != null)
    {
        executionOptions.CancellationToken = options.CancellationToken;
        executionOptions.BoundedCapacity = options.BoundedCapacity;
        executionOptions.EnsureOrdered = options.EnsureOrdered;
        executionOptions.TaskScheduler = options.TaskScheduler;
        executionOptions.MaxMessagesPerTask = options.MaxMessagesPerTask;
        executionOptions.NameFormat = options.NameFormat;
    }

    var buffer = new Dictionary<long, T>();
    long minIndex = startingIndex;

    IEnumerable<T> Transform(T item)
    {
        // No synchronization needed because MaxDegreeOfParallelism = 1
        long index = indexSelector(item);
        if (index < startingIndex)
            throw new InvalidOperationException($"Index {index} is out of range.");
        if (index < minIndex)
            throw new InvalidOperationException($"Index {index} has been consumed.");
        if (!buffer.TryAdd(index, item)) // .NET Core only API
            throw new InvalidOperationException($"Index {index} is not unique.");
        while (buffer.Remove(minIndex, out var minItem)) // .NET Core only API
        {
            minIndex++;
            yield return minItem;
        }
    }

    // Ideally the assertion buffer.Count == 0 should be checked on the completion
    // of the block.
    return new TransformManyBlock<T, T>(Transform, executionOptions);
}

Usage example:

var xlsBlock = new TransformBlock<int, int>(document =>
{
    int duration = 300 + document % 3 * 300;
    Thread.Sleep(duration); // Simulate CPU-bound work
    return document;
});
var pdfBlock = new TransformBlock<int, int>(document =>
{
    int duration = 100 + document % 5 * 200;
    Thread.Sleep(duration); // Simulate CPU-bound work
    return document;
});

var orderRestorer = CreateRestoreOrderBlock<int>(
    indexSelector: document => document, startingIndex: 1L);

var uploader = new ActionBlock<int>(async document =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Uploading document #{document}");
    await Task.Delay(500); // Simulate I/O-bound work
});

xlsBlock.LinkTo(orderRestorer);
pdfBlock.LinkTo(orderRestorer);
orderRestorer.LinkTo(uploader, new DataflowLinkOptions { PropagateCompletion = true });

foreach (var document in Enumerable.Range(1, 10))
{
    if (document % 2 == 0)
        xlsBlock.Post(document);
    else
        pdfBlock.Post(document);
}
xlsBlock.Complete();
pdfBlock.Complete();
_ = Task.WhenAll(xlsBlock.Completion, pdfBlock.Completion)
    .ContinueWith(_ => orderRestorer.Complete());

await uploader.Completion;

Output:

09:24:18.846 Uploading document #1
09:24:19.436 Uploading document #2
09:24:19.936 Uploading document #3
09:24:20.441 Uploading document #4
09:24:20.942 Uploading document #5
09:24:21.442 Uploading document #6
09:24:21.941 Uploading document #7
09:24:22.441 Uploading document #8
09:24:22.942 Uploading document #9
09:24:23.442 Uploading document #10

(Try it on Fiddle, featuring a .NET Framework compatible version)

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104