4

I'm trying to get my head around controlling dataflow in TPL Dataflow. I have a very fast producer, and a very slow consumer. (My real code is more complex, but none the less, this is a pretty good model and it reproduces the problem.)

When I run it, the code starts drinking memory like it's going out of style--and the output queue on the producer fills up as fast as it can. What I'd really prefer to see is the Producer stop running for a while, until the Consumer has a chance to ask for it. From my readings of the documentation, this is what is supposed to happen: that is, I thought that the producer waits until the consumer has space.

This isn't the case, clearly. How do I fix it so that the queue doesn't go crazy?

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Threading;

namespace MemoryLeakTestCase
{
    class Program
    {

        static void Main(string[] args)
        {
            var CreateData = new TransformManyBlock<int, string>(ignore =>
            {
                return Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i);
            });

            var ParseFile = new TransformManyBlock<string, string>(fileContent =>
            {
                Thread.Sleep(1000);
                return Enumerable.Range(0, 100).Select((sst, iii) => "Hello, " + iii);
            }, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1000 }
            );

            var EndOfTheLine = new ActionBlock<object>(f =>
                {
                });


            var linkOptions = new DataflowLinkOptions { PropagateCompletion = true, };
            CreateData.LinkTo(ParseFile, linkOptions);
            ParseFile.LinkTo(EndOfTheLine, linkOptions);

            Task t = new Task(() =>
            {
                while (true)
                {
                    Console.WriteLine("CreateData: " + Report(CreateData));
                    Console.WriteLine("ParseData:  " + Report(ParseFile));
                    Console.WriteLine("NullTarget: " +  EndOfTheLine.InputCount );
                    Thread.Sleep(1000);
                }

            });
            t.Start();

            CreateData.SendAsync(0);
            CreateData.Complete();

            EndOfTheLine.Completion.Wait();
        }

        public static string Report<T, U>(TransformManyBlock<T, U> block)
        {
            return String.Format("INPUT: {0}   OUTPUT: {1} ", block.InputCount.ToString().PadLeft(10, ' '), block.OutputCount.ToString().PadLeft(10, ' '));
        }


    }
}
  • Have you tried lowering the `BoundedCapacity` on `ParseFile` to something a little smaller like maybe 10 just to make sure it works? – Los Frijoles Dec 21 '13 at 05:33
  • Lois, good thought. I have tried that; it keeps the queue on `CreateData` even bigger. The major challenge I'm trying to solve here is how to make sure that `CreateData` pauses when the `ParseFile` queue is too big (or when its own output queue is too big). – Danyel Fisher Dec 22 '13 at 21:25
  • For an alternative `TransformManyBlock` implementation that is output bounded and supports all `ExecutionDataflowBlockOptions`, look [here](https://stackoverflow.com/questions/30994544/tpl-dataflow-block-consumes-all-available-memory/62385415#62385415). – Theodor Zoulias Aug 05 '20 at 00:16

1 Answers1

5

Normally, what you would do in a situation like this is to also set BoundedCapacity of the CreateData block. But that won't work here, because TransformManyBlock doesn't seem to take BoundedCapacity into account when filling the output queue from a single IEnumerable.

What you can do instead is to create a function that iterates the collection and uses SendAsync() to send more data only when the target can accept them:

/// <remarks>
/// If iterating data throws an exception, the target block is faulted
/// and the returned Task completes successfully.
/// 
/// Depending on the usage, this might or might not be what you want.
/// </remarks>
public static async Task SendAllAsync<T>(
    this ITargetBlock<T> target, IEnumerable<T> data)
{
    try
    {
        foreach (var item in data)
        {
            await target.SendAsync(item);
        }
    }
    catch (Exception e)
    {
        target.Fault(e);
    }
}

Usage:

var data = Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i);
await ParseFile.SendAllAsync(data);
ParseFile.Complete();

If you still wanted to have CreateData block that would behave similarly to your original code, you could have two bounded BufferBlocks, SendAllAsync() between them and then use Encapsulate() to make them look like one block:

/// <remarks>
/// boundedCapacity represents the capacity of the input queue
/// and the output queue separately, not their total.
/// </remarks>
public static IPropagatorBlock<TInput, TOutput>
    CreateBoundedTransformManyBlock<TInput, TOutput>(
    Func<TInput, IEnumerable<TOutput>> transform, int boundedCapacity)
{
    var input = new BufferBlock<TInput>(
        new DataflowBlockOptions { BoundedCapacity = boundedCapacity });
    var output = new BufferBlock<TOutput>(
        new DataflowBlockOptions { BoundedCapacity = boundedCapacity });

    Task.Run(
        async () =>
        {
            try
            {
                while (await input.OutputAvailableAsync())
                {
                    var data = transform(await input.ReceiveAsync());

                    await output.SendAllAsync(data);
                }

                output.Complete();
            }
            catch (Exception e)
            {
                ((IDataflowBlock)input).Fault(e);
                ((IDataflowBlock)output).Fault(e);
            }
        });

    return DataflowBlock.Encapsulate(input, output);
}
svick
  • 236,525
  • 50
  • 385
  • 514
  • thank you for the response! That seems to solve the immediate problem, I think, although I'm still puzzling my way through your code. In particular, this model makes things happen in batch mode--I get a `boundedCapacity` worth of data once, then it pours out. Then a new `boundedCapacity`. And so on. I'd rather see, if it was possible, each unit sent individually. – Danyel Fisher Dec 23 '13 at 01:29
  • And, to articulate a little on @svick's comment: It turns out that TransformMany only checks queues BETWEEN input entries; in order to make this scenario work, I would need to check WITHIN an entry. – Danyel Fisher Dec 24 '13 at 01:55