2

Introduction

I am solving the following problem:

  • Given a C# type X with one or more publicly visible constants what are all the C# types in the solution that depend on the constants in the type X?

Because constants are inlined at compile time, there is no point inspecting the binaries. We need to inspect the source code using Roslyn API.

I am not going to use Semantic Analysis, because it would be very expensive. Instead I will use regex to check if the given file seems to use the constants and then verify using the Syntax Tree. Not bulletproof, but good enough and relatively fast.

Right now the stats are:

  • Project Count: 333
  • File Count: 45280
  • SSD drive

My implementation

The overall scheme is:

  1. Produce a stream of the relevant Microsoft.Build.Evaluation.Project objects, which would give us the list of C# files
  2. From the C# files produce a stream of C# file content
  3. For each C# file content match the given regex and for every match determine if the relevant constant is used using the Syntax Tree of that C# file content. If affirmative - report the respective type.

I do not want to overload with details, on the other hand I would like to provide enough details to be able to explain my difficulty. It is not much, please bear with me.

I use several small auxiliary types:

ProjectItem

private class ProjectItem
{
    public readonly string AssemblyName;
    public readonly CSharpParseOptions ParseOptions;
    public readonly IEnumerable<string> CSFilePaths;

    public ProjectItem(TypeMap typeMap, string asmName)
    {
        AssemblyName = asmName;
        var asmProps = typeMap.Assemblies[asmName];
        ParseOptions = asmProps.GetParseOptions();
        CSFilePaths = new Project(asmProps.ProjectPath).GetItems("Compile").Select(item => item.GetMetadataValue("FullPath"));
    }

    public IEnumerable<TextItem> YieldTextItems() => CSFilePaths.Select(csFilePath => new TextItem(this, csFilePath, File.ReadAllText(csFilePath)));
}

Where TypeMap is a registry of all the types and assemblies used in our solutions. Some other code has built it previously. Consider it as an oracle that can answer some questions, like "Give me the parse options (or project path) for the given assembly". But it does not specify the list of C# files used by the project. For that we need to instantiate the respective Microsoft.Build.Evaluation.Project instance. Which is expensive.

TextItem

private class TextItem
{
    public readonly string AssemblyName;
    public readonly CSharpParseOptions ParseOptions;
    public readonly string CSFilePath;
    public readonly string Text;

    public TextItem(ProjectItem item, string csFilePath, string text)
    {
        AssemblyName = item.AssemblyName;
        ParseOptions = item.ParseOptions;
        CSFilePath = csFilePath;
        Text = text;
    }

    public IEnumerable<TypeDefKey> YieldDependentTypes(TypeMap typeMap, TypeDefKey constTypeDefKey, Regex regex)
    {
        ...
        SyntaxTree syntaxTree = null;
        foreach (Match m in regex.Matches(Text))
        {
            if (syntaxTree == null)
            {
                syntaxTree = CSharpSyntaxTree.ParseText(Text, ParseOptions, CSFilePath);
                ...
            }

            ...
            if (IsTheRegexMatchIndeedCorrespondsToTheGivenConstantType(syntaxTree, ...))
            {
                var typeDefKey = GetTheType(syntaxTree, ...);
                yield return typeDefKey;
            }
        }
    }
}

Given the aforementioned types I have came up with this simple TPL Dataflow pipeline:

var regex = GetRegex(...);
var dependentAssemblies = GetDependentAssemblies(...);
var dependentTypes = new ConcurrentDictionary<TypeDefKey, object>();

var produceCSFilePaths = new TransformManyBlock<ICollection<string>, ProjectItem>(asmNames => asmNames.Select(asmName => new ProjectItem(typeMap, asmName)));
var produceCSFileText = new TransformManyBlock<ProjectItem, TextItem>(p => p.YieldTextItems());
var produceDependentTypes = new TransformManyBlock<TextItem, TypeDefKey>(t => t.YieldDependentTypes(typeMap, constTypeDefKey, regex));
var getDependentTypes = new ActionBlock<TypeDefKey>(typeDefKey => dependentTypes.TryAdd(typeDefKey, null));

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

produceCSFilePaths.LinkTo(produceCSFileText, linkOptions);
produceCSFileText.LinkTo(produceDependentTypes, linkOptions);
produceDependentTypes.LinkTo(getDependentTypes, linkOptions);

produceCSFilePaths.Post(dependentAssemblies);
produceCSFilePaths.Complete();
getDependentTypes.Completion.Wait();

The problems and the questions

  1. It is slow - takes about 50 seconds, CPU utilization is low. I realize there is a lot of IO here, but there is still CPU involved to apply the regex and parse the content into the respective Syntax Trees.
  2. I do not understand how I can use TransformManyBlock with async IO. The ProjectItem.YieldTextItems function could return an IObservable<TextItem> or an IAsyncEnumerable<TextItem>, but the TransformManyBlock would not recognize it. I am new to TPL Dataflow and so it is unclear to me how I can work around this. That is why I am using the blocking File.ReadAllText instead of File.ReadAllTextAsync.
  3. I think my pipeline uses ThreadPool threads (through the default TaskScheduler), but shouldn't it use the real threads? Like the ones created with Task.Factory.StartNew(..., TaskCreationOptions.LongRunning);? So, does it use the "proper" threads? And if not - how to fix it? I have seen recommendations to implement a custom TaskScheduler, but I could not find an example. The existing ones seem to rely on internal implementation, so not clear how to proceed.
  4. I tried to increase MaxDegreeOfParallelism for the ProjectItem and TextItem production, since these two are mostly IO and thus much slower than the last part - examining the C# file content. But it did not yield much improvement in performance. It is my understanding that the slower the piece of pipeline the more parallelism should be there. On the other hand I do not know how much parallelism can there be when reading from an SSD. It is unclear at all how to profile it.
halfer
  • 19,824
  • 17
  • 99
  • 186
mark
  • 59,016
  • 79
  • 296
  • 580
  • 1
    It seems this problem is too large and complex. It's better to split it into smaller pieces and investigate them separately, asking different questions – AndrewSilver Sep 11 '21 at 06:59
  • FYI adding support for `IAsyncEnumerable` to the `TransformManyBlock` is in Microsoft's plans, but the team couldn't finish it in time for .NET 6, and so [it has been postponed for .NET 7](https://github.com/dotnet/runtime/issues/30863#issuecomment-899982598). For now, you may get some ideas about how to surpass this limitation by looking at [this](https://stackoverflow.com/questions/68315122/asynchronous-task-video-buffering/68376176#68376176) answer. – Theodor Zoulias Sep 11 '21 at 12:13
  • 1
    @TheodorZoulias - I suppose the idea is not to link `produceCSFileText` to `produceDependentTypes` and instead let the former send the `TextItem` objects using the `SendAsync` method. and then manually handle propagation of continuation between the two. – mark Sep 12 '21 at 00:12

1 Answers1

1

Here is a drop-in replacement of the TransformManyBlock<TInput,TOutput> dataflow component, with a constructor that accepts a Func<TInput, IAsyncEnumerable<TOutput>> delegate. It has all the API and supports all the options (but see the caveat about the BoundedCapacity at the end). It is composed internally by three linked components, a TransformBlock, an ActionBlock and a BufferBlock:

public class TransformManyBlockEx<TInput, TOutput>
    : IPropagatorBlock<TInput, TOutput>, IReceivableSourceBlock<TOutput>
{
    private readonly TransformBlock<TInput, (long, TInput)> _input;
    private readonly ActionBlock<(long, TInput)> _transformer;
    private readonly BufferBlock<TOutput> _output;
    private readonly Dictionary<long, (Queue<TOutput> Queue, bool Completed)> _byIndex;
    private readonly CancellationToken _cancellationToken;
    private long currentIndex = 0L;
    private long minIndex = 0L;

    public TransformManyBlockEx(Func<TInput, IAsyncEnumerable<TOutput>> transform,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null)
    {
        // Arguments validation omitted
        dataflowBlockOptions ??= new ExecutionDataflowBlockOptions();
        _cancellationToken = dataflowBlockOptions.CancellationToken;
        if (dataflowBlockOptions.EnsureOrdered)
            _byIndex = new Dictionary<long, (Queue<TOutput>, bool)>();

        _input = new TransformBlock<TInput, (long, TInput)>(item =>
        {
            return (currentIndex++, item); // No parallelism here
        }, new ExecutionDataflowBlockOptions()
        {
            BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
            CancellationToken = _cancellationToken
        });

        _transformer = new ActionBlock<(long, TInput)>(async entry =>
        {
            var (index, item) = entry;
            Queue<TOutput> queue = null;
            if (_byIndex != null)
            {
                // EnsureOrdered is enabled
                queue = new Queue<TOutput>();
                lock (_byIndex) _byIndex.Add(index, (queue, false));
            }
            var resultSequence = transform(item);
            await foreach (var result in resultSequence
                .WithCancellation(_cancellationToken))
            {
                if (_byIndex != null)
                {
                    lock (queue) queue.Enqueue(result);
                    if (!await SendPendingResultsAsync()) return;
                }
                else
                {
                    if (!await _output.SendAsync(result, _cancellationToken)) return;
                }
            }
            if (_byIndex != null)
            {
                lock (_byIndex) _byIndex[index] = (queue, true); // Mark as completed
                await SendPendingResultsAsync();
            }
        }, dataflowBlockOptions);

        _input.LinkTo(_transformer, new() { PropagateCompletion = true });

        _output = new BufferBlock<TOutput>(dataflowBlockOptions);

        Task transformerPostCompletion = _transformer.Completion.ContinueWith(t =>
        {
            if (_byIndex != null)
            {
                int pendingCount;
                lock (_byIndex)
                {
                    pendingCount = _byIndex.Count;
                    _byIndex.Clear(); // Cleanup
                }
                if (t.IsCompletedSuccessfully && pendingCount > 0)
                    throw new InvalidOperationException(
                    "The transformer completed before emitting all queued results.");
            }
        }, TaskScheduler.Default);

        // The Task.WhenAll aggregates nicely the exceptions of the two tasks
        PropagateCompletion(
            Task.WhenAll(_transformer.Completion, transformerPostCompletion),
            _output);
    }

    private static async void PropagateCompletion(Task sourceCompletion,
        IDataflowBlock target)
    {
        try { await sourceCompletion.ConfigureAwait(false); } catch { }
        var ex = sourceCompletion.IsFaulted ? sourceCompletion.Exception : null;
        if (ex != null) target.Fault(ex); else target.Complete();
    }

    private async Task<bool> SendPendingResultsAsync()
    {
        // Returns false in case the BufferBlock rejected a result
        // This may happen in case of cancellation
        while (TrySendNextPendingResult(out var sendTask))
        {
            if (!await sendTask) return false;
        }
        return true;
    }

    private bool TrySendNextPendingResult(out Task<bool> sendTask)
    {
        // Returns false in case currently there is no pending result
        sendTask = null;
        lock (_byIndex)
        {
            while (true)
            {
                if (!_byIndex.TryGetValue(minIndex, out var entry))
                    return false; // The next queue in not in the dictionary yet

                var (queue, completed) = entry; // We found the next queue

                lock (queue)
                {
                    if (queue.TryDequeue(out var result))
                    {
                        // We found the next result
                        // Send the result while holding the lock on _byIndex
                        // The BufferBlock respects the order of submited items
                        sendTask = _output.SendAsync(result, _cancellationToken);
                        return true;
                    }
                }

                // Currently the queue is empty
                // If it's not completed yet, return. It may have more items later.
                if (!completed) return false;

                // OK, the queue is now both empty and completed
                _byIndex.Remove(minIndex); // Remove it
                minIndex++; // Continue with the next queue in order
            }
        }
    }

    public TransformManyBlockEx(Func<TInput, Task<IEnumerable<TOutput>>> transform,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null)
        : this(ToAsyncEnumerable(transform), dataflowBlockOptions) { }

    public TransformManyBlockEx(Func<TInput, IEnumerable<TOutput>> transform,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null)
        : this(ToAsyncEnumerable(transform), dataflowBlockOptions) { }

    public Task Completion => _output.Completion;
    public void Complete() => _input.Complete();
    void IDataflowBlock.Fault(Exception exception)
        => ((IDataflowBlock)_input).Fault(exception);

    public int InputCount
        => _input.InputCount + _input.OutputCount + _transformer.InputCount;

    public int OutputCount
    {
        get
        {
            int count = _output.Count;
            if (_byIndex == null) return count;
            lock (_byIndex) return count + _byIndex.Values
                .Select(e => { lock (e.Queue) return e.Queue.Count; }).Sum();
        }
    }

    public IDisposable LinkTo(ITargetBlock<TOutput> target,
        DataflowLinkOptions linkOptions)
            => _output.LinkTo(target, linkOptions);

    public bool TryReceive(Predicate<TOutput> filter, out TOutput item)
        => ((IReceivableSourceBlock<TOutput>)_output).TryReceive(filter, out item);

    public bool TryReceiveAll(out IList<TOutput> items)
        => ((IReceivableSourceBlock<TOutput>)_output).TryReceiveAll(out items);

    DataflowMessageStatus ITargetBlock<TInput>.OfferMessage(
        DataflowMessageHeader messageHeader, TInput messageValue,
        ISourceBlock<TInput> source, bool consumeToAccept)
            => ((ITargetBlock<TInput>)_input).OfferMessage(messageHeader,
                messageValue, source, consumeToAccept);

    TOutput ISourceBlock<TOutput>.ConsumeMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<TOutput> target, out bool messageConsumed)
            => ((ISourceBlock<TOutput>)_output).ConsumeMessage(messageHeader, target,
                out messageConsumed);

    bool ISourceBlock<TOutput>.ReserveMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<TOutput> target)
            => ((ISourceBlock<TOutput>)_output).ReserveMessage(messageHeader, target);

    void ISourceBlock<TOutput>.ReleaseReservation(DataflowMessageHeader messageHeader,
        ITargetBlock<TOutput> target)
            => ((ISourceBlock<TOutput>)_output).ReleaseReservation(messageHeader,
                target);

    private static Func<TInput, IAsyncEnumerable<TOutput>> ToAsyncEnumerable(
        Func<TInput, Task<IEnumerable<TOutput>>> transform)
    {
        return (item) => Iterator(item);
        async IAsyncEnumerable<TOutput> Iterator(TInput item)
        {
            foreach (var result in await transform(item)) yield return result;
        }
    }

    private static Func<TInput, IAsyncEnumerable<TOutput>> ToAsyncEnumerable(
        Func<TInput, IEnumerable<TOutput>> transform)
    {
        return (item) => Iterator(item);
        async IAsyncEnumerable<TOutput> Iterator(TInput item)
        {
            foreach (var result in transform(item)) yield return result;
            await Task.CompletedTask; // Suppress CS1998
        }
    }
}

The main difficulty is with the EnsureOrdered option, because we can't rely on the built-in functionality of any of the composing blocks. The chosen solution is based on ideas from this question: a Dictionary with long keys where the results are stored in their original order, and a long counter that holds the minimum index that has not been emitted yet.

This implementation has the usual drawbacks of all custom dataflow components that are built by composition:

  1. The exceptions are wrapped in extra AggregateExceptions. This usually requires to Flatten the exception propagated by the last block in the pipeline.
  2. The BoundedCapacity is multiplied by the number of the linked internal components. The effective BoundedCapacity of the TransformManyBlockEx is thrice the value passed in the options, plus the results stored in the internal reordering buffer (when the EnsureOrdered option is enabled). This buffer is not really limited. Unfortunately fixing this flaw is not trivial.
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    I will have to grok and test it. Might take some time. But +1 for sure. – mark Sep 13 '21 at 05:09
  • @mark assuming that it's not buggy, it might serve you as a temporary solution, until Microsoft releases the real thing (hopefully in .NET 7). – Theodor Zoulias Sep 13 '21 at 05:15
  • @mark I added a robustification feature in the implementation: In case the internal `transformer` block completes successfully without emitting all the queued results (due to some bug), an `InvalidOperationException` is propagated through the `Completion` task. – Theodor Zoulias Sep 13 '21 at 16:27
  • 1
    @mark I just found a bug in the implementation. When multiple producers are sending items concurrently to the `TransformManyBlockEx`, it might deadlock. It is probably related to the `ImmediateTaskScheduler`. I am looking at it. – Theodor Zoulias Sep 14 '21 at 09:26
  • @mark all known bugs are currently fixed. The `BoundedCapacity` is now three times the value passed in the options, since all three internal blocks have the same capacity. – Theodor Zoulias Sep 14 '21 at 13:28
  • 1
    Does it make sense for you to publish it on github? Seems a useful thing until Microsoft implements it. – mark Sep 14 '21 at 17:02
  • @mark yeah, I may publish it, as soon as I learn how to publish things on GitHub. :-) – Theodor Zoulias Sep 14 '21 at 17:08