I have a Dataflow pipeline that consists of multiple blocks that are processing heterogeneous documents (XLS, PDF etc). Each type of document is processed by a dedicated TransformBlock
. At the end of the pipeline I have an ActionBlock
that receives all the processed documents, and uploads them one by one to a web server. My problem is that I can't find a way to satisfy the requirement of uploading the documents in the same order they were initially entered in the pipeline. For example I can't use the EnsureOrdered
option to my advantage, because this option configures the behavior of a single block, and not the behavior of multiple blocks that are working in parallel. My requirements are:
- Insert the documents in the pipeline in a specific order.
- Process each document differently, depending on its type.
- The documents of a specific type should be processed sequentially.
- Documents of different types can (and should) be processed in parallel.
- All documents should be uploaded ASAP after they are processed.
- The documents must be uploaded sequentially, and in the same order they were entered in the pipeline.
For example it is required that the document #8 must be uploaded after the document #7, even if it is processed before the document #7.
The fifth requirement means that I can't wait for all documents to be processed, then sort them by index, and finally upload them. The uploading must happen concurrently with the processing.
Here is a minimal example of what I'm trying to do. For simplicity I am not feeding the blocks with instances of the IDocument
interface, but with simple integers. The value of each integer represents the order in which it was entered in the pipeline, and the order in which it must be uploaded:
var xlsBlock = new TransformBlock<int, int>(document =>
{
int duration = 300 + document % 3 * 300;
Thread.Sleep(duration); // Simulate CPU-bound work
return document;
});
var pdfBlock = new TransformBlock<int, int>(document =>
{
int duration = 100 + document % 5 * 200;
Thread.Sleep(duration); // Simulate CPU-bound work
return document;
});
var uploader = new ActionBlock<int>(async document =>
{
Console.WriteLine($"Uploading document #{document}");
await Task.Delay(500); // Simulate I/O-bound work
});
xlsBlock.LinkTo(uploader);
pdfBlock.LinkTo(uploader);
foreach (var document in Enumerable.Range(1, 10))
{
if (document % 2 == 0)
xlsBlock.Post(document);
else
pdfBlock.Post(document);
}
xlsBlock.Complete();
pdfBlock.Complete();
_ = Task.WhenAll(xlsBlock.Completion, pdfBlock.Completion)
.ContinueWith(_ => uploader.Complete());
await uploader.Completion;
The output is:
Uploading document #1
Uploading document #2
Uploading document #3
Uploading document #5
Uploading document #4
Uploading document #7
Uploading document #6
Uploading document #9
Uploading document #8
Uploading document #10
The desirable order is #1, #2, #3, #4, #5, #6, #7, #8, #9, #10.
How can I restore the order of the processed documents, before sending them to the uploader
block?
Clarification: Changing drastically the schema of the pipeline, by replacing the multiple specific TransformBlock
s with a single generic TransformBlock
, is not an option. The ideal scenario would be to intercept a single block between the processors and the uploader, that will restore the order of the documents.