I have managed to recreate a simple example that illustrates the interaction between Apache Beam and Google Pubsub. The basic setting is as follows:
- Topic 1 has Subscription 1
- Topic 2 has Subscription 2
- Program publishes messages to Topic 1
- Apache Beam program reads from Subscription 1 and pumps in to Topic 2
- Program reads messages from Subscription 2
Program that publishes to Topic 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import time
import json
import numpy as np
from google.auth import jwt
from google.cloud import pubsub_v1
if __name__=="__main__":
service_account_info = json.load(open("C:/garage/reuters/trmi/api/auth-json/marketpsych-demo-bcd601e3493b.json"))
publisher_audience = "https://pubsub.googleapis.com/google.pubsub.v1.Publisher"
credentials = jwt.Credentials.from_service_account_info(service_account_info, audience=publisher_audience)
publisher = pubsub_v1.PublisherClient(credentials=credentials)
project_id = "marketpsych-demo"
topic_name = "Topic1"
topic_path = publisher.topic_path(project_id, topic_name)
for n in range(200,400):
data = u'Message number {}'.format(n)
data = data.encode('utf-8')
print(data)
# When you publish a message, the client returns a future.
publisher.publish(topic_path, data=data, source="refinitiv",sender="rk")
time.sleep(1)
|
Program that receives messages from Subscription 1 and pumps the messages to Topic 2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import os
from google.auth import jwt
from google.cloud import pubsub_v1
service_account_path = "C:/garage/reuters/trmi/api/auth-json/marketpsych-demo-bcd601e3493b.json"
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = service_account_path
service_account_info = json.load(open(service_account_path))
subscriber_audience = "https://pubsub.googleapis.com/google.pubsub.v1.Subscriber"
credentials_sub = jwt.Credentials.from_service_account_info(service_account_info,
audience=subscriber_audience)
subscriber = pubsub_v1.SubscriberClient(credentials=credentials_sub)
publisher_audience = "https://pubsub.googleapis.com/google.pubsub.v1.Publisher"
credentials_pub = jwt.Credentials.from_service_account_info(service_account_info,
audience=publisher_audience)
publisher = pubsub_v1.PublisherClient(credentials=credentials_pub)
project_id = "marketpsych-demo"
topic_name = "Topic2"
topic_path = publisher.topic_path(project_id, topic_name)
subscription_name = "Subscription1"
subscription_path = subscriber.subscription_path(project_id, subscription_name)
if __name__=="__main__":
options = PipelineOptions()
options.view_as(StandardOptions).streaming=True
p = beam.Pipeline(options = options)
pubsub_data = (p
| beam.io.ReadFromPubSub(subscription=subscription_path)
| beam.io.WriteToPubSub(topic_path)
)
result = p.run()
result.wait_until_finish()
|
Program that receives messages from Subscription 2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
import time
import json
import numpy as np
from google.auth import jwt
from google.cloud import pubsub_v1
if __name__=="__main__":
service_account_info = json.load(open("C:/garage/reuters/trmi/api/auth-json/marketpsych-demo-bcd601e3493b.json"))
audience = "https://pubsub.googleapis.com/google.pubsub.v1.Subscriber"
credentials = jwt.Credentials.from_service_account_info(service_account_info, audience=audience)
subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
project_id = "marketpsych-demo"
subscription_name = "Subscription2"
subscription_path = subscriber.subscription_path(project_id, subscription_name)
def callback(message):
print('Received message: {}'.format(message))
message.ack()
pull_future = subscriber.subscribe(subscription_path, callback=callback)
print('Listening for messages on {}'.format(subscription_path))
try:
pull_future.result(timeout=120)
except:
pull_future.cancel()
|
Takeaways
I came across this example two weeks ago when I had no clue about Google PubSub, Hadoop Infra and many other stuff. I had a hazy view of what Apache Beam can do. Now I am equipped far better to understand Beam.