pittgoogle.pubsub
Classes to facilitate connections to Pub/Sub streams.
Note
This module relies on pittgoogle.auth
to authenticate API calls.
The examples given below assume the use of a service account and
environment variables. In this case, pittgoogle.auth
does not
need to be called explicitly.
Usage Examples
import pittgoogle
Create a subscription to the “ztf-loop” topic:
subscription = pittgoogle.pubsub.Subscription(
"my-ztf-loop-subscription",
# topic only required if the subscription does not yet exist in Google Cloud
topic=pittgoogle.pubsub.Topic("ztf-loop", pittgoogle.utils.ProjectIds.pittgoogle)
)
subscription.touch()
Pull a small batch of alerts. Helpful for testing. Not recommended for long-runnining listeners.
alerts = pittgoogle.pubsub.pull_batch(subscription, max_messages=4)
Open a streaming pull. Recommended for long-runnining listeners. This will pull and process
messages in the background, indefinitely. User must supply a callback that processes a single message.
It should accept a pittgoogle.pubsub.Alert
and return a pittgoogle.pubsub.Response
.
Optionally, can provide a callback that processes a batch of messages. Note that messages are
acknowledged (and thus permanently deleted) _before_ the batch callback runs, so it is recommended
to do as much processing as possible in the message callback and use a batch callback only when
necessary.
def my_msg_callback(alert):
# process the message here. we'll just print the ID.
print(f"processing message: {alert.metadata['message_id']}")
# return a Response. include a result if using a batch callback.
return pittgoogle.pubsub.Response(ack=True, result=alert.dict)
def my_batch_callback(results):
# process the batch of results (list of results returned by my_msg_callback)
# we'll just print the number of results in the batch
print(f"batch processing {len(results)} results)
consumer = pittgoogle.pubsub.Consumer(
subscription=subscription, msg_callback=my_msg_callback, batch_callback=my_batch_callback
)
# open the stream in the background and process messages through the callbacks
# this blocks indefinitely. use `Ctrl-C` to close the stream and unblock
consumer.stream()
Delete the subscription from Google Cloud.
subscription.delete()
- class pittgoogle.pubsub.Alert(*, bytes=None, dict=None, metadata=None, msg=None)[source]
Pitt-Google container for a Pub/Sub message.
Typical usage is to instantiate an Alert using only a msg, and then the other attributes will be automatically extracted and returned (lazily).
All parameters are keyword only.
- Parameters
bytes (bytes, optional) – The message payload, as returned by Pub/Sub. It may be Avro or JSON serialized depending on the topic.
dict (dict, optional) – The message payload as a dictionary.
metadata (dict, optional) – The message metadata.
msg (google.cloud.pubsub_v1.types.PubsubMessage, optional) – The Pub/Sub message object, documented at https://googleapis.dev/python/pubsub/latest/types.html.
- property bytes: bytes
Message payload in original format (Avro or JSON serialized bytes).
- property dict: dict
Message payload as a dictionary.
- Raises
pittgoogle.exceptions.OpenAlertError – if unable to deserialize the alert bytes.
- property metadata: <property object at 0x7fccb16c0fb0>
Message metadata as a flat dictionary.
-
msg:
Optional
[PubsubMessage
] Original Pub/Sub message object.
- class pittgoogle.pubsub.Consumer(subscription, msg_callback, batch_callback=None, batch_maxn=100, batch_maxwait=30, max_backlog=1000, max_workers=None, executor=None)[source]
Consumer class to pull a Pub/Sub subscription and process messages.
- Parameters
subscription (str or
pittgoogle.pubsub.Subscription
) – Pub/Sub subscription to be pulled (it must already exist in Google Cloud).msg_callback (callable) – Function that will process a single message. It should accept a
pittgoogle.pubsub.Alert
and return apittgoogle.pubsub.Response
.batch_callback (callable, optional) – Function that will process a batch of results. It should accept a list of the results returned by the msg_callback.
batch_maxn (int, optional) – Maximum number of messages in a batch. This has no effect if batch_callback is None.
batch_maxwait (int, optional) – Max number of seconds to wait between messages before before processing a batch. This has no effect if batch_callback is None.
max_backlog (int, optional) – Maximum number of pulled but unprocessed messages before pausing the pull.
max_workers (int, optional) – Maximum number of workers for the executor. This has no effect if an executor is provided.
executor (concurrent.futures.ThreadPoolExecutor, optional) – Executor to be used by the Google API to pull and process messages in the background.
- property executor: ThreadPoolExecutor
Executor to be used by the Google API for a streaming pull.
- pull_batch(max_messages=1)[source]
Pull a single batch of messages.
Recommended for testing. Not recommended for long-running listeners (use the
stream()
method instead).- Parameters
max_messages (int) – Maximum number of messages to be pulled.
- Return type
List
[Alert
]
- stop()[source]
Attempt to shutdown the streaming pull and exit the background threads gracefully.
- Return type
None
- stream(block=True)[source]
Open the stream in a background thread and process messages through the callbacks.
Recommended for long-running listeners.
- Parameters
block (bool) – Whether to block the main thread while the stream is open. If True, block indefinitely (use Ctrl-C to close the stream and unblock). If False, open the stream and then return (use
stop()
to close the stream). This must be True in order to use a batch_callback.- Return type
None
- property subscription: Subscription
Subscription to be consumed.
- class pittgoogle.pubsub.Response(*, ack=True, result=None)[source]
Container for a response, to be returned by a
pittgoogle.pubsub.Consumer.msg_callback()
.- Parameters
ack (bool) – Whether to acknowledge the message. Use True if the message was processed successfully, False if an error was encountered and you would like Pub/Sub to redeliver the message at a later time. Note that once a message is acknowledged to Pub/Sub it is permanently deleted (unless the subscription has been explicitly configured to retain acknowledged messages).
result (Any) – Anything the user wishes to return. If not None, the Consumer will collect the results in a list and pass the list to the user’s batch callback for further processing. If there is no batch callback the results will be lost.
- class pittgoogle.pubsub.Subscription(name, auth=_Nothing.NOTHING, topic=None, client=None)[source]
Basic attributes of a Pub/Sub subscription and methods to manage it.
- Parameters
name (str) – Name of the Pub/Sub subscription.
auth (
pittgoogle.auth.Auth
, optional) – Credentials for the Google Cloud project that owns this subscription. If not provided, it will be created from environment variables.topic (
pittgoogle.pubsub.Topic
, optional) – Topic this subscription should be attached to. Required only when the subscription needs to be created.client (pubsub_v1.SubscriberClient, optional) – Pub/Sub client that will be used to access the subscription. This kwarg is useful if you want to reuse a client. If None, a new client will be created.
- property client: SubscriberClient
Pub/Sub client that will be used to access the subscription. If not provided, a new client will be created using self.auth.credentials.
- property path: str
Fully qualified path to the subscription.
- property projectid: str
Subscription owner’s Google Cloud project ID.
- touch()[source]
Test the connection to the subscription, creating it if necessary.
Note that messages published to the topic before the subscription was created are not available to the subscription.
- Raises
TypeError – if the subscription needs to be created but no topic was provided.
NotFound – if the subscription needs to be created but the topic does not exist in Google Cloud.
AssertionError – if the subscription exists but it is not attached to self.topic and self.topic is not None.
- Return type
None
- class pittgoogle.pubsub.Topic(name, projectid)[source]
Basic attributes of a Pub/Sub topic.
- Parameters
name (str) – Name of the Pub/Sub topic.
projectid (str) – The topic owner’s Google Cloud project ID. Note:
pittgoogle.utils.ProjectIds
is a registry containing Pitt-Google’s project IDs.
- property path: str
Fully qualified path to the topic.
- pittgoogle.pubsub.pull_batch(subscription, max_messages=1, **subscription_kwargs)[source]
Pull a single batch of messages from the subscription.
- Parameters
subscription (str or
pittgoogle.pubsub.Subscription
) – Subscription to be pulled. If str, the name of the subscription.max_messages (int) – Maximum number of messages to be pulled.
subscription_kwargs – Keyword arguments sent to
pittgoogle.pubsub.Subscription
. Ignored if subscription is apittgoogle.pubsub.Subscription
.
- Return type
List
[Alert
]