I have managed to recreate a simple example that illustrates the interaction between Apache Beam and Google Pubsub. The basic setting is as follows:

  • Topic 1 has Subscription 1
  • Topic 2 has Subscription 2
  • Program publishes messages to Topic 1
  • Apache Beam program reads from Subscription 1 and pumps in to Topic 2
  • Program reads messages from Subscription 2

Program that publishes to Topic 1

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import time
import json
import numpy as np
from google.auth import jwt
from google.cloud import pubsub_v1


if __name__=="__main__":
    service_account_info =  json.load(open("C:/garage/reuters/trmi/api/auth-json/marketpsych-demo-bcd601e3493b.json"))
    publisher_audience   = "https://pubsub.googleapis.com/google.pubsub.v1.Publisher"
    credentials          = jwt.Credentials.from_service_account_info(service_account_info, audience=publisher_audience)
    publisher            = pubsub_v1.PublisherClient(credentials=credentials)
    project_id   = "marketpsych-demo"
    topic_name   = "Topic1"
    topic_path   = publisher.topic_path(project_id, topic_name)

    for n in range(200,400):
        data = u'Message number {}'.format(n)
        data = data.encode('utf-8')
        print(data)
        # When you publish a message, the client returns a future.
        publisher.publish(topic_path, data=data, source="refinitiv",sender="rk")
        time.sleep(1)

Program that receives messages from Subscription 1 and pumps the messages to Topic 2

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import os
from google.auth import jwt
from google.cloud import pubsub_v1

service_account_path = "C:/garage/reuters/trmi/api/auth-json/marketpsych-demo-bcd601e3493b.json"
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = service_account_path

service_account_info =  json.load(open(service_account_path))

subscriber_audience  = "https://pubsub.googleapis.com/google.pubsub.v1.Subscriber"
credentials_sub      = jwt.Credentials.from_service_account_info(service_account_info,
                                                                     audience=subscriber_audience)
subscriber           = pubsub_v1.SubscriberClient(credentials=credentials_sub)

publisher_audience   = "https://pubsub.googleapis.com/google.pubsub.v1.Publisher"
credentials_pub      = jwt.Credentials.from_service_account_info(service_account_info,
                                                                     audience=publisher_audience)
publisher            = pubsub_v1.PublisherClient(credentials=credentials_pub)

project_id           = "marketpsych-demo"

topic_name           = "Topic2"
topic_path = publisher.topic_path(project_id, topic_name)

subscription_name = "Subscription1"
subscription_path = subscriber.subscription_path(project_id, subscription_name)

if __name__=="__main__":
    options = PipelineOptions()
    options.view_as(StandardOptions).streaming=True
    p = beam.Pipeline(options = options)

    pubsub_data = (p
                   | beam.io.ReadFromPubSub(subscription=subscription_path)
                   | beam.io.WriteToPubSub(topic_path)
    )

    result = p.run()

    result.wait_until_finish()


               

Program that receives messages from Subscription 2

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import time
import json
import numpy as np
from google.auth import jwt
from google.cloud import pubsub_v1


if __name__=="__main__":
    service_account_info =  json.load(open("C:/garage/reuters/trmi/api/auth-json/marketpsych-demo-bcd601e3493b.json"))
    audience             = "https://pubsub.googleapis.com/google.pubsub.v1.Subscriber"
    credentials          = jwt.Credentials.from_service_account_info(service_account_info, audience=audience)
    subscriber           = pubsub_v1.SubscriberClient(credentials=credentials)

    project_id           = "marketpsych-demo"
    subscription_name = "Subscription2"
    subscription_path = subscriber.subscription_path(project_id, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message))
        message.ack()

    pull_future = subscriber.subscribe(subscription_path, callback=callback)
    print('Listening for messages on {}'.format(subscription_path))
    try:
        pull_future.result(timeout=120)
    except:  
        pull_future.cancel()

Takeaways

I came across this example two weeks ago when I had no clue about Google PubSub, Hadoop Infra and many other stuff. I had a hazy view of what Apache Beam can do. Now I am equipped far better to understand Beam.