Source code for pittgoogle.pubsub

# -*- coding: UTF-8 -*-
"""Classes to facilitate connections to Pub/Sub streams.

.. contents::
   :local:
   :depth: 2

.. note::

    This module relies on :mod:`pittgoogle.auth` to authenticate API calls.
    The examples given below assume the use of a :ref:`service account <service account>` and
    :ref:`environment variables <set env vars>`. In this case, :mod:`pittgoogle.auth` does not
    need to be called explicitly.

Usage Examples
---------------

.. code-block:: python

    import pittgoogle

Create a subscription to the "ztf-loop" topic:

.. code-block:: python

    subscription = pittgoogle.pubsub.Subscription(
        "my-ztf-loop-subscription",
        # topic only required if the subscription does not yet exist in Google Cloud
        topic=pittgoogle.pubsub.Topic("ztf-loop", pittgoogle.utils.ProjectIds.pittgoogle)
    )
    subscription.touch()

Pull a small batch of alerts. Helpful for testing. Not recommended for long-runnining listeners.

.. code-block:: python

    alerts = pittgoogle.pubsub.pull_batch(subscription, max_messages=4)

Open a streaming pull. Recommended for long-runnining listeners. This will pull and process
messages in the background, indefinitely. User must supply a callback that processes a single message.
It should accept a :class:`pittgoogle.pubsub.Alert` and return a :class:`pittgoogle.pubsub.Response`.
Optionally, can provide a callback that processes a batch of messages. Note that messages are
acknowledged (and thus permanently deleted) _before_ the batch callback runs, so it is recommended
to do as much processing as possible in the message callback and use a batch callback only when
necessary.

.. code-block:: python

    def my_msg_callback(alert):
        # process the message here. we'll just print the ID.
        print(f"processing message: {alert.metadata['message_id']}")

        # return a Response. include a result if using a batch callback.
        return pittgoogle.pubsub.Response(ack=True, result=alert.dict)

    def my_batch_callback(results):
        # process the batch of results (list of results returned by my_msg_callback)
        # we'll just print the number of results in the batch
        print(f"batch processing {len(results)} results)

    consumer = pittgoogle.pubsub.Consumer(
        subscription=subscription, msg_callback=my_msg_callback, batch_callback=my_batch_callback
    )

    # open the stream in the background and process messages through the callbacks
    # this blocks indefinitely. use `Ctrl-C` to close the stream and unblock
    consumer.stream()

Delete the subscription from Google Cloud.

.. code-block:: python

    subscription.delete()

API
----

"""
import logging
import queue
from concurrent.futures import ThreadPoolExecutor
from time import sleep
from typing import Any, ByteString, Callable, List, Optional, Union

from attrs import converters, define, field
from attrs.validators import gt, instance_of, is_callable, optional
from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1

from .auth import Auth
from .exceptions import OpenAlertError
from .utils import Cast

LOGGER = logging.getLogger(__name__)


def msg_callback_example(alert: "Alert") -> "Response":
    print(f"processing message: {alert.metadata['message_id']}")
    return Response(ack=True, result=alert.dict)


def batch_callback_example(batch: list) -> None:
    oids = set(alert.dict["objectId"] for alert in batch)
    print(f"num oids: {len(oids)}")
    print(f"batch length: {len(batch)}")


[docs]def pull_batch( subscription: Union[str, "Subscription"], max_messages: int = 1, **subscription_kwargs, ) -> List["Alert"]: """Pull a single batch of messages from the `subscription`. Parameters ---------- subscription : `str` or :class:`pittgoogle.pubsub.Subscription` Subscription to be pulled. If `str`, the name of the subscription. max_messages : `int` Maximum number of messages to be pulled. subscription_kwargs Keyword arguments sent to :class:`pittgoogle.pubsub.Subscription`. Ignored if `subscription` is a :class:`pittgoogle.pubsub.Subscription`. """ if isinstance(subscription, str): subscription = Subscription(subscription, **subscription_kwargs) response = subscription.client.pull( {"subscription": subscription.path, "max_messages": max_messages} ) message_list = [Alert(msg=msg.message) for msg in response.received_messages] ack_ids = [msg.ack_id for msg in response.received_messages] if len(ack_ids) > 0: subscription.client.acknowledge({"subscription": subscription.path, "ack_ids": ack_ids}) return message_list
[docs]@define class Topic: """Basic attributes of a Pub/Sub topic. Parameters ------------ name : `str` Name of the Pub/Sub topic. projectid : `str` The topic owner's Google Cloud project ID. Note: :attr:`pittgoogle.utils.ProjectIds` is a registry containing Pitt-Google's project IDs. """ name: str = field() projectid: str = field() @property def path(self) -> str: """Fully qualified path to the topic.""" return f"projects/{self.projectid}/topics/{self.name}"
[docs] @classmethod def from_path(cls, path) -> "Topic": """Parse the `path` and return a new `Topic`.""" _, projectid, _, name = path.split("/") return cls(name, projectid)
[docs]@define class Subscription: """Basic attributes of a Pub/Sub subscription and methods to manage it. Parameters ----------- name : `str` Name of the Pub/Sub subscription. auth : :class:`pittgoogle.auth.Auth`, optional Credentials for the Google Cloud project that owns this subscription. If not provided, it will be created from environment variables. topic : :class:`pittgoogle.pubsub.Topic`, optional Topic this subscription should be attached to. Required only when the subscription needs to be created. client : `pubsub_v1.SubscriberClient`, optional Pub/Sub client that will be used to access the subscription. This kwarg is useful if you want to reuse a client. If None, a new client will be created. """ name: str = field() auth: Auth = field(factory=Auth, validator=instance_of(Auth)) topic: Optional[Topic] = field(default=None, validator=optional(instance_of(Topic))) _client: Optional[pubsub_v1.SubscriberClient] = field( default=None, validator=optional(instance_of(pubsub_v1.SubscriberClient)) ) @property def projectid(self) -> str: """Subscription owner's Google Cloud project ID.""" return self.auth.GOOGLE_CLOUD_PROJECT @property def path(self) -> str: """Fully qualified path to the subscription.""" return f"projects/{self.projectid}/subscriptions/{self.name}" @property def client(self) -> pubsub_v1.SubscriberClient: """Pub/Sub client that will be used to access the subscription. If not provided, a new client will be created using `self.auth.credentials`. """ if self._client is None: self._client = pubsub_v1.SubscriberClient(credentials=self.auth.credentials) return self._client
[docs] def touch(self) -> None: """Test the connection to the subscription, creating it if necessary. Note that messages published to the topic before the subscription was created are not available to the subscription. Raises ------ `TypeError` if the subscription needs to be created but no topic was provided. `NotFound` if the subscription needs to be created but the topic does not exist in Google Cloud. `AssertionError` if the subscription exists but it is not attached to self.topic and self.topic is not None. """ try: subscrip = self.client.get_subscription(subscription=self.path) LOGGER.info(f"subscription exists: {self.path}") except NotFound: subscrip = self._create() LOGGER.info(f"subscription created: {self.path}") self._validate_topic(subscrip.topic)
def _create(self) -> pubsub_v1.types.Subscription: if self.topic is None: raise TypeError("The subscription needs to be created but no topic was provided.") try: return self.client.create_subscription(name=self.path, topic=self.topic.path) # this error message is not very clear. let's help. except NotFound as nfe: raise NotFound(f"The topic does not exist: {self.topic.path}") from nfe def _validate_topic(self, connected_topic_path) -> None: if (self.topic is not None) and (connected_topic_path != self.topic.path): raise AssertionError( f"The subscription is attached to topic {connected_topic_path}. Expected {self.topic.path}" ) self.topic = Topic.from_path(connected_topic_path) LOGGER.debug("topic validated")
[docs] def delete(self) -> None: """Delete the subscription.""" try: self.client.delete_subscription(subscription=self.path) except NotFound: LOGGER.info(f"nothing to delete. subscription not found: {self.path}") else: LOGGER.info(f"deleted subscription: {self.path}")
[docs]@define() class Consumer: """Consumer class to pull a Pub/Sub subscription and process messages. Parameters ----------- subscription : `str` or :class:`pittgoogle.pubsub.Subscription` Pub/Sub subscription to be pulled (it must already exist in Google Cloud). msg_callback : `callable` Function that will process a single message. It should accept a :class:`pittgoogle.pubsub.Alert` and return a :class:`pittgoogle.pubsub.Response`. batch_callback : `callable`, optional Function that will process a batch of results. It should accept a list of the results returned by the `msg_callback`. batch_maxn : `int`, optional Maximum number of messages in a batch. This has no effect if `batch_callback` is None. batch_maxwait : `int`, optional Max number of seconds to wait between messages before before processing a batch. This has no effect if `batch_callback` is None. max_backlog : `int`, optional Maximum number of pulled but unprocessed messages before pausing the pull. max_workers : `int`, optional Maximum number of workers for the `executor`. This has no effect if an `executor` is provided. executor : `concurrent.futures.ThreadPoolExecutor`, optional Executor to be used by the Google API to pull and process messages in the background. """ _subscription: Union[str, Subscription] = field(validator=instance_of((str, Subscription))) msg_callback: Callable[["Alert"], "Response"] = field(validator=is_callable()) batch_callback: Optional[Callable[[list], None]] = field( default=None, validator=optional(is_callable()) ) batch_maxn: int = field(default=100, converter=int) batch_maxwait: int = field(default=30, converter=int) max_backlog: int = field(default=1000, validator=gt(0)) max_workers: Optional[int] = field(default=None, validator=optional(instance_of(int))) _executor: ThreadPoolExecutor = field( default=None, validator=optional(instance_of(ThreadPoolExecutor)) ) _queue: queue.Queue = field(factory=queue.Queue, init=False) streaming_pull_future: pubsub_v1.subscriber.futures.StreamingPullFuture = field( default=None, init=False ) @property def subscription(self) -> Subscription: """Subscription to be consumed.""" if isinstance(self._subscription, str): self._subscription = Subscription(self._subscription) self._subscription.touch() return self._subscription @property def executor(self) -> ThreadPoolExecutor: """Executor to be used by the Google API for a streaming pull.""" if self._executor is None: self._executor = ThreadPoolExecutor(self.max_workers) return self._executor
[docs] def stream(self, block: bool = True) -> None: """Open the stream in a background thread and process messages through the callbacks. Recommended for long-running listeners. Parameters ---------- block : `bool` Whether to block the main thread while the stream is open. If `True`, block indefinitely (use `Ctrl-C` to close the stream and unblock). If `False`, open the stream and then return (use :meth:`~Consumer.stop()` to close the stream). This must be `True` in order to use a `batch_callback`. """ # open a streaming-pull and process messages through the callback, in the background self._open_stream() if not block: msg = "The stream is open in the background. Use consumer.stop() to close it." print(msg) LOGGER.info(msg) return try: self._process_batches() # catch all exceptions and attempt to close the stream before raising except (KeyboardInterrupt, Exception): self.stop() raise
def _open_stream(self) -> None: """Open a streaming pull and process messages in the background.""" LOGGER.info(f"opening a streaming pull on subscription: {self.subscription.path}") self.streaming_pull_future = self.subscription.client.subscribe( self.subscription.path, self._callback, flow_control=pubsub_v1.types.FlowControl(max_messages=self.max_backlog), scheduler=pubsub_v1.subscriber.scheduler.ThreadScheduler(executor=self.executor), await_callbacks_on_shutdown=True, ) def _callback(self, message: pubsub_v1.types.PubsubMessage) -> None: """Unpack the message, run the :attr:`~Consumer.msg_callback` and handle the response.""" # LOGGER.info("callback started") response = self.msg_callback(Alert(msg=message)) # Response # LOGGER.info(f"{response.result}") if response.result is not None: self._queue.put(response.result) if response.ack: message.ack() else: message.nack() def _process_batches(self): """Run the batch callback if provided, otherwise just sleep. This never returns -- it runs until it encounters an error. """ # if there's no batch_callback there's nothing to do except wait until the process is killed if self.batch_callback is None: while True: sleep(60) batch, count = [], 0 while True: try: batch.append(self._queue.get(block=True, timeout=self.batch_maxwait)) except queue.Empty: # hit the max wait. process the batch self.batch_callback(batch) batch, count = [], 0 # catch anything else and try to process the batch before raising except (KeyboardInterrupt, Exception): self.batch_callback(batch) raise else: self._queue.task_done() count += 1 if count == self.batch_maxn: # hit the max number of results. process the batch self.batch_callback(batch) batch, count = [], 0
[docs] def stop(self) -> None: """Attempt to shutdown the streaming pull and exit the background threads gracefully.""" LOGGER.info("closing the stream") self.streaming_pull_future.cancel() # trigger the shutdown self.streaming_pull_future.result() # block until the shutdown is complete
[docs] def pull_batch(self, max_messages: int = 1) -> List["Alert"]: """Pull a single batch of messages. Recommended for testing. Not recommended for long-running listeners (use the :meth:`~Consumer.stream` method instead). Parameters ---------- max_messages : `int` Maximum number of messages to be pulled. """ return pull_batch(self.subscription, max_messages)
[docs]@define(kw_only=True) class Alert: """Pitt-Google container for a Pub/Sub message. Typical usage is to instantiate an `Alert` using only a `msg`, and then the other attributes will be automatically extracted and returned (lazily). All parameters are keyword only. Parameters ------------ bytes : `bytes`, optional The message payload, as returned by Pub/Sub. It may be Avro or JSON serialized depending on the topic. dict : `dict`, optional The message payload as a dictionary. metadata : `dict`, optional The message metadata. msg : `google.cloud.pubsub_v1.types.PubsubMessage`, optional The Pub/Sub message object, documented at `<https://googleapis.dev/python/pubsub/latest/types.html>`__. """ _bytes: Optional[ByteString] = field(default=None) _dict: Optional[dict] = field(default=None) _metadata: Optional[dict] = field(default=None) msg: Optional["pubsub_v1.types.PubsubMessage"] = field(default=None) """Original Pub/Sub message object.""" @property def bytes(self) -> bytes: """Message payload in original format (Avro or JSON serialized bytes).""" if self._bytes is None: # add try-except when we know what we're looking for self._bytes = self.msg.data if self._bytes is None: # if we add a "path" attribute for the path to an avro file on disk # we can load it like this: # with open(self.path, "rb") as f: # self._bytes = f.read() pass return self._bytes @property def dict(self) -> dict: """Message payload as a dictionary. Raises ------ :class:`pittgoogle.exceptions.OpenAlertError` if unable to deserialize the alert bytes. """ if self._dict is None: # this should be rewritten to catch specific errors # for now, just try avro then json, catching basically all errors in the process try: self._dict = Cast.avro_to_dict(self.bytes) except Exception: try: self._dict = Cast.json_to_dict(self.bytes) except Exception: raise OpenAlertError("failed to deserialize the alert bytes") return self._dict @property def metadata(self) -> dict: """Message metadata as a flat dictionary.""" if self._metadata is None: self._metadata = { "message_id": self.msg.message_id, "publish_time": self.msg.publish_time, # ordering must be enabled on the subscription for this to be useful "ordering_key": self.msg.ordering_key, # flatten the dict containing our custom attributes **self.msg.attributes, } return self._metadata
[docs]@define(kw_only=True, frozen=True) class Response: """Container for a response, to be returned by a :meth:`pittgoogle.pubsub.Consumer.msg_callback`. Parameters ------------ ack : `bool` Whether to acknowledge the message. Use `True` if the message was processed successfully, `False` if an error was encountered and you would like Pub/Sub to redeliver the message at a later time. Note that once a message is acknowledged to Pub/Sub it is permanently deleted (unless the subscription has been explicitly configured to retain acknowledged messages). result : `Any` Anything the user wishes to return. If not `None`, the Consumer will collect the results in a list and pass the list to the user's batch callback for further processing. If there is no batch callback the results will be lost. """ ack: bool = field(default=True, converter=converters.to_bool) result: Any = field(default=None)