5

I am trying to create a google dataflow template but I can't seem to find a way to do it without producing the following exception:

WARNING: Size estimation of the source failed: RuntimeValueProvider{propertyName=inputFile, default=null}
java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider{propertyName=inputFile, default=null}
        at org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:234)
        at org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:218)
        at org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:78)
        at org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:53)
        at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:40)
        at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:37)
        at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:453)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
        at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
        at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
        at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:392)
        at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:170)
        at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:680)
        at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:174)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
        at org.apache.beam.examples.MyMinimalWordCount.main(MyMinimalWordCount.java:69)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
        at java.lang.Thread.run(Thread.java:748)

I can reproduce it with a simple modified version of the MinimalWordCount example from Beam.

public class MyMinimalWordCount {

    public interface WordCountOptions extends PipelineOptions {
        @Description("Path of the file to read from")
        ValueProvider<String> getInputFile();

        void setInputFile(ValueProvider<String> valueProvider);
    }

    public static void main(String[] args) {

        WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(WordCountOptions.class);

        Pipeline p = Pipeline.create(options);

        p.apply(TextIO.read().from(options.getInputFile()))

                .apply(FlatMapElements
                        .into(TypeDescriptors.strings())
                        .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
                .apply(Filter.by((String word) -> !word.isEmpty()))
                .apply(Count.perElement())
                .apply(MapElements
                        .into(TypeDescriptors.strings())
                        .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
                .apply(TextIO.write().to("wordcounts"));

        // Having the waitUntilFinish causes a NPE when trying to create a dataflow template
        //p.run().waitUntilFinish();

        p.run();
    }
}

I can run the example locally with:

mvn compile exec:java \
     -Pdirect-runner \
     -Dexec.mainClass=org.apache.beam.examples.MyMinimalWordCount \
     -Dexec.args="--inputFile=pom.xml " 

It also runs on Google Dataflow with:

mvn compile exec:java \
     -Pdataflow-runner \
     -Dexec.mainClass=org.apache.beam.examples.MyMinimalWordCount \
     -Dexec.args="--runner=DataflowRunner \
                  --project=[project] \
                  --inputFile=gs://[bucket]/input.csv "

But when I try to create a Google Dataflow template with the following, I get the error:

mvn compile exec:java \
     -Pdataflow-runner \
     -Dexec.mainClass=org.apache.beam.examples.MyMinimalWordCount \
     -Dexec.args="--runner=DataflowRunner \
                  --project=[project] \
                  --stagingLocation=gs://[bucket]/staging \
                  --templateLocation=gs://[bucket]/templates/MyMinimalWordCountTemplate " 

The other confusing thing is that the maven build continues and ends with BUILD SUCCESS

So my questions are:

Q1) Should I be able to create a Google Dataflow template like this (using ValueProviders to provide TextIO input at runtime)?

Q2) Is the exception during the build a real error or just a WARNING as the logging seems to indicate?

Q3) If the answers to Q1 and Q2 are yes and 'just a warning' and I try to create a job from the uploaded template why does it not have any metadata or know about my input options?

Google Dataflow screenshot

References I have used:

Oliver Henlich
  • 283
  • 4
  • 13
  • I can answer Q3). I thought the maven/dataflow build would produce the required metadata automatically. It does not. A good description of how to supply your own metadata can be found here: https://cloud.google.com/dataflow/docs/templates/creating-templates#example-metadata-file – Oliver Henlich May 15 '18 at 07:29
  • Once you've compiled your program, it generates the template for you in the path you defined in `--templateLocation=`. Next, you would go to the screen you have in 3 and and run the job with the template location you've have added. Finally, click on `add item` in `additional parameters` and in your key type `inputFile` and in the value type the bucket location of the input file. Once that is done, run the job and you shouldn't have issues anymore. – Haris Nadeem May 15 '18 at 07:39
  • Thanks @haris so does that mean the exception is just a warning? – Oliver Henlich May 15 '18 at 08:36
  • 1
    The error is occurring because it expects you to pass the `--inputFile` as an argument which you didn't. So technically, you told the program to create a template and run the job. So when it saw no job, it created an error. At least from what I understand. I never had to work with templates directly. The template should still have been created though – Haris Nadeem May 15 '18 at 08:51

2 Answers2

3

The Correct answer is that you do not have to give an input in making the template and it should take the input as a value at the run-time. The exception is an internal issue at the Google Data-flow which should be removed in future.

Shahin Vakilinia
  • 355
  • 1
  • 11
-1

I believe that the --inputFiles are bundled in with template when the template is created.

Please see note 1: "In addition to the template file, templated pipeline execution also relies on files that were staged and referenced at the time of template creation. If the staged files are moved or removed, your pipeline execution will fail."

This thread seems relevant as well 2

brugz
  • 52
  • 2