I have finally managed to get my beam code to run on Google Data Flow:
The following is the simple program that does a simple map reduce task on data stored in S3 bucket
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import argparse
import os
def run_pipeline():
service_account_path = "C:/garage/reuters/trmi/api/auth-json/marketpsych-demo-bcd601e3493b.json"
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = service_account_path
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://mydataflow_ground/input/dept-data.txt',
help='Input file to process.')
parser.add_argument('--output',
dest='output',
default='gs://mydataflow_ground/output/dept',
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args()
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=marketpsych-demo',
'--staging_location=gs://mydataflow_ground/staging',
'--temp_location=gs://mydataflow_ground/temp',
'--job_name=dept-count-job',
])
pipeline_options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options = pipeline_options )
crunch_data= (p
|'Read from file' >> beam.io.ReadFromText(known_args.input)
|'Split the data' >> beam.Map(lambda x: x.split(","))
|'Filter' >> beam.Filter(lambda x : x[3]=='Accounts')
|'Create Key value' >> beam.Map(lambda x : (x[0],1))
|'Group and Sum' >> beam.CombinePerKey(sum)
|'Write the key values' >> beam.io.WriteToText(known_args.output)
)
p.run()
if __name__ == '__main__':
run_pipeline()
|