2

I'd like to embed my dataflow inside a Cloud Function WITHOUT USING TEMPLATE. I ran into an error at first and according to this answer, I should packaging up my code as a dependency. This is the structure of my Cloud Function:

file wb_flow.py

def main(identifier, schema_file):
    """The main function which creates the pipeline and runs it."""
    table_name = f"wijken_en_buurten_{cbsodata.get_info(identifier)['Period']}"

    pipeline_options = PipelineOptions(
        [
        '--runner', 'DataflowRunner',
        '--project', 'veneficus',
        '--region', 'europe-west4',
        '--temp_location', 'gs://vf_etl/test',
        '--staging_location', 'gs://vf_etl/temp',
        '--setup_file', 'setup.py'
        ]
    )


    p = beam.Pipeline(options=pipeline_options)
    (p
        | 'Read from BQ Table' >> beam.Create(cbsodata.get_data(identifier))
        | 'Write Projects to BigQuery' >> beam.io.WriteToBigQuery(
                f"cbs.{table_name}",
                schema=schema,
                # Creates the table in BigQuery if it does not yet exist.
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
     )
    p.run()#.wait_until_finish()

file main.py:

import base64
from wb_flow import main


def run(event, context):
    """The main function which creates the pipeline and runs it."""
    message = base64.b64decode(event['data']).decode('utf-8').split(',')
    identifier, schema_file = message[0], message[1]
    main(identifier, schema_file)

and setup.py:

import setuptools

setuptools.setup(
    name='wb_flow',
    version='1.0.0',
    install_requires=[],
    packages=setuptools.find_packages(),
)

I got this error in the construction of the dataflow

File "/layers/google.python.pip/pip/lib/python3.8/site-packages/apache_beam/runners/portability/stager.py", line 579, in _build_setup_package
    os.chdir(os.path.dirname(setup_file))
FileNotFoundError: [Errno 2] No such file or directory: '' 

And I believe it means that it couldnt find my setup.py. How can I specify the path to my setup file?

Alternative, I tried to do this without setup.py, and the Dataflow said it couldnt find wb_flow module

Update

When I specified my setup path as /workspace/setup.py, I have this error

subprocess.CalledProcessError: Command '['/layers/google.python.pip/pip/bin/python3', 'setup.py', 'sdist', '--dist-dir', '/tmp/tmp3rxejr4g']' returned non-zero exit status 1."
pa-nguyen
  • 417
  • 1
  • 5
  • 16

1 Answers1

1

Try to change

    '--setup_file', 'setup.py'

into

    '--setup_file', './setup.py'

This worked for me :)