pittgoogle.pubsub

Classes to facilitate connections to Pub/Sub streams.

Note

This module relies on pittgoogle.auth to authenticate API calls. The examples given below assume the use of a service account and environment variables. In this case, pittgoogle.auth does not need to be called explicitly.

Usage Examples

import pittgoogle

Create a subscription to the “ztf-loop” topic:

subscription = pittgoogle.pubsub.Subscription(
    "my-ztf-loop-subscription",
    # topic only required if the subscription does not yet exist in Google Cloud
    topic=pittgoogle.pubsub.Topic("ztf-loop", pittgoogle.utils.ProjectIds.pittgoogle)
)
subscription.touch()

Pull a small batch of alerts. Helpful for testing. Not recommended for long-runnining listeners.

alerts = pittgoogle.pubsub.pull_batch(subscription, max_messages=4)

Open a streaming pull. Recommended for long-runnining listeners. This will pull and process messages in the background, indefinitely. User must supply a callback that processes a single message. It should accept a pittgoogle.pubsub.Alert and return a pittgoogle.pubsub.Response. Optionally, can provide a callback that processes a batch of messages. Note that messages are acknowledged (and thus permanently deleted) _before_ the batch callback runs, so it is recommended to do as much processing as possible in the message callback and use a batch callback only when necessary.

def my_msg_callback(alert):
    # process the message here. we'll just print the ID.
    print(f"processing message: {alert.metadata['message_id']}")

    # return a Response. include a result if using a batch callback.
    return pittgoogle.pubsub.Response(ack=True, result=alert.dict)

def my_batch_callback(results):
    # process the batch of results (list of results returned by my_msg_callback)
    # we'll just print the number of results in the batch
    print(f"batch processing {len(results)} results)

consumer = pittgoogle.pubsub.Consumer(
    subscription=subscription, msg_callback=my_msg_callback, batch_callback=my_batch_callback
)

# open the stream in the background and process messages through the callbacks
# this blocks indefinitely. use `Ctrl-C` to close the stream and unblock
consumer.stream()

Delete the subscription from Google Cloud.

subscription.delete()
class pittgoogle.pubsub.Alert(*, bytes=None, dict=None, metadata=None, msg=None)[source]

Pitt-Google container for a Pub/Sub message.

Typical usage is to instantiate an Alert using only a msg, and then the other attributes will be automatically extracted and returned (lazily).

All parameters are keyword only.

Parameters
  • bytes (bytes, optional) – The message payload, as returned by Pub/Sub. It may be Avro or JSON serialized depending on the topic.

  • dict (dict, optional) – The message payload as a dictionary.

  • metadata (dict, optional) – The message metadata.

  • msg (google.cloud.pubsub_v1.types.PubsubMessage, optional) – The Pub/Sub message object, documented at https://googleapis.dev/python/pubsub/latest/types.html.

property bytes: bytes

Message payload in original format (Avro or JSON serialized bytes).

property dict: dict

Message payload as a dictionary.

Raises

pittgoogle.exceptions.OpenAlertError – if unable to deserialize the alert bytes.

property metadata: <property object at 0x7fccb16c0fb0>

Message metadata as a flat dictionary.

msg: Optional[PubsubMessage]

Original Pub/Sub message object.

class pittgoogle.pubsub.Consumer(subscription, msg_callback, batch_callback=None, batch_maxn=100, batch_maxwait=30, max_backlog=1000, max_workers=None, executor=None)[source]

Consumer class to pull a Pub/Sub subscription and process messages.

Parameters
  • subscription (str or pittgoogle.pubsub.Subscription) – Pub/Sub subscription to be pulled (it must already exist in Google Cloud).

  • msg_callback (callable) – Function that will process a single message. It should accept a pittgoogle.pubsub.Alert and return a pittgoogle.pubsub.Response.

  • batch_callback (callable, optional) – Function that will process a batch of results. It should accept a list of the results returned by the msg_callback.

  • batch_maxn (int, optional) – Maximum number of messages in a batch. This has no effect if batch_callback is None.

  • batch_maxwait (int, optional) – Max number of seconds to wait between messages before before processing a batch. This has no effect if batch_callback is None.

  • max_backlog (int, optional) – Maximum number of pulled but unprocessed messages before pausing the pull.

  • max_workers (int, optional) – Maximum number of workers for the executor. This has no effect if an executor is provided.

  • executor (concurrent.futures.ThreadPoolExecutor, optional) – Executor to be used by the Google API to pull and process messages in the background.

property executor: ThreadPoolExecutor

Executor to be used by the Google API for a streaming pull.

pull_batch(max_messages=1)[source]

Pull a single batch of messages.

Recommended for testing. Not recommended for long-running listeners (use the stream() method instead).

Parameters

max_messages (int) – Maximum number of messages to be pulled.

Return type

List[Alert]

stop()[source]

Attempt to shutdown the streaming pull and exit the background threads gracefully.

Return type

None

stream(block=True)[source]

Open the stream in a background thread and process messages through the callbacks.

Recommended for long-running listeners.

Parameters

block (bool) – Whether to block the main thread while the stream is open. If True, block indefinitely (use Ctrl-C to close the stream and unblock). If False, open the stream and then return (use stop() to close the stream). This must be True in order to use a batch_callback.

Return type

None

property subscription: Subscription

Subscription to be consumed.

class pittgoogle.pubsub.Response(*, ack=True, result=None)[source]

Container for a response, to be returned by a pittgoogle.pubsub.Consumer.msg_callback().

Parameters
  • ack (bool) – Whether to acknowledge the message. Use True if the message was processed successfully, False if an error was encountered and you would like Pub/Sub to redeliver the message at a later time. Note that once a message is acknowledged to Pub/Sub it is permanently deleted (unless the subscription has been explicitly configured to retain acknowledged messages).

  • result (Any) – Anything the user wishes to return. If not None, the Consumer will collect the results in a list and pass the list to the user’s batch callback for further processing. If there is no batch callback the results will be lost.

class pittgoogle.pubsub.Subscription(name, auth=_Nothing.NOTHING, topic=None, client=None)[source]

Basic attributes of a Pub/Sub subscription and methods to manage it.

Parameters
  • name (str) – Name of the Pub/Sub subscription.

  • auth (pittgoogle.auth.Auth, optional) – Credentials for the Google Cloud project that owns this subscription. If not provided, it will be created from environment variables.

  • topic (pittgoogle.pubsub.Topic, optional) – Topic this subscription should be attached to. Required only when the subscription needs to be created.

  • client (pubsub_v1.SubscriberClient, optional) – Pub/Sub client that will be used to access the subscription. This kwarg is useful if you want to reuse a client. If None, a new client will be created.

property client: SubscriberClient

Pub/Sub client that will be used to access the subscription. If not provided, a new client will be created using self.auth.credentials.

delete()[source]

Delete the subscription.

Return type

None

property path: str

Fully qualified path to the subscription.

property projectid: str

Subscription owner’s Google Cloud project ID.

touch()[source]

Test the connection to the subscription, creating it if necessary.

Note that messages published to the topic before the subscription was created are not available to the subscription.

Raises
  • TypeError – if the subscription needs to be created but no topic was provided.

  • NotFound – if the subscription needs to be created but the topic does not exist in Google Cloud.

  • AssertionError – if the subscription exists but it is not attached to self.topic and self.topic is not None.

Return type

None

class pittgoogle.pubsub.Topic(name, projectid)[source]

Basic attributes of a Pub/Sub topic.

Parameters
  • name (str) – Name of the Pub/Sub topic.

  • projectid (str) – The topic owner’s Google Cloud project ID. Note: pittgoogle.utils.ProjectIds is a registry containing Pitt-Google’s project IDs.

classmethod from_path(path)[source]

Parse the path and return a new Topic.

Return type

Topic

property path: str

Fully qualified path to the topic.

pittgoogle.pubsub.pull_batch(subscription, max_messages=1, **subscription_kwargs)[source]

Pull a single batch of messages from the subscription.

Parameters
Return type

List[Alert]