I have finally managed to get my beam code to run on Google Data Flow:

The following is the simple program that does a simple map reduce task on data stored in S3 bucket

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import argparse
import os
def run_pipeline():
    service_account_path = "C:/garage/reuters/trmi/api/auth-json/marketpsych-demo-bcd601e3493b.json"
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = service_account_path
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                        dest='input',
                        default='gs://mydataflow_ground/input/dept-data.txt',
                        help='Input file to process.')

    parser.add_argument('--output',
                        dest='output',
                        default='gs://mydataflow_ground/output/dept',
                        help='Output file to write results to.')

    known_args, pipeline_args = parser.parse_known_args()
    pipeline_args.extend([
        '--runner=DataflowRunner',
        '--project=marketpsych-demo',
        '--staging_location=gs://mydataflow_ground/staging',
        '--temp_location=gs://mydataflow_ground/temp',
        '--job_name=dept-count-job',
    ])
    pipeline_options = PipelineOptions(pipeline_args)
    p = beam.Pipeline(options = pipeline_options )
    crunch_data= (p
                  |'Read from file' >> beam.io.ReadFromText(known_args.input)
                  |'Split the data' >> beam.Map(lambda x: x.split(","))
                  |'Filter'  >> beam.Filter(lambda x : x[3]=='Accounts')
                  |'Create Key value' >> beam.Map(lambda x : (x[0],1))
                  |'Group and Sum'  >> beam.CombinePerKey(sum)
                  |'Write the key values' >> beam.io.WriteToText(known_args.output)
    )
    p.run()

if __name__ == '__main__':
    run_pipeline()