Source code for pittgoogle.utils

# -*- coding: UTF-8 -*-
"""Functions to support working with alerts and related data."""
import json
import logging
from base64 import b64decode, b64encode
from collections import OrderedDict
from io import BytesIO
from typing import ClassVar

import fastavro
import pandas as pd
from astropy.table import Table
from astropy.time import Time
from attrs import define

LOGGER = logging.getLogger(__name__)


[docs]@define class ProjectIds: """Registry of Google Cloud Project IDs.""" pittgoogle: ClassVar[str] = "ardent-cycling-243415" """Pitt-Google's production project.""" pittgoogle_dev: ClassVar[str] = "avid-heading-329016" """Pitt-Google's development project.""" # pittgoogle_billing: ClassVar[str] = "light-cycle-328823" # """Pitt-Google's billing project.""" elasticc: ClassVar[str] = "elasticc-challenge" """Project running a classifier for ELAsTiCC alerts and reporting to DESC."""
[docs]@define class Cast: """Methods to convert data types."""
[docs] @staticmethod def bytes_to_b64utf8(bytes_data): """Convert bytes data to UTF-8. Parameters ----------- bytes_data : `bytes` Data to be converted to UTF-8. Returns ----------- data : `dict` ``bytes_data`` converted to a string in UTF-8 format """ if bytes_data is not None: return b64encode(bytes_data).decode("utf-8")
[docs] @staticmethod def json_to_dict(bytes_data): """Convert json serialized bytes data to a dict. Parameters ----------- bytes_data : `bytes` Data to be converted to a dictionary. Returns: data : `dict` ``bytes_data`` unpacked into a dictionary. """ if bytes_data is not None: return json.loads(bytes_data)
[docs] @staticmethod def b64json_to_dict(bytes_data): """Convert base64 encoded, json serialized bytes data to a dict. Parameters ----------- bytes_data : `Base64` Data to be converted to a dictionary. Returns: data : `dict` ``bytes_data`` unpacked into a dictionary. """ if bytes_data is not None: return Cast.json_to_dict(b64decode(bytes_data))
[docs] @staticmethod def avro_to_dict(bytes_data): """Convert Avro serialized bytes data to a dict. The schema must be attached in the header. Parameters ------------ bytes_data : `bytes` Avro serialized bytes data to be converted to a dictionary Returns -------- data : `dict` ``bytes_data`` unpacked into a dictionary. """ if bytes_data is not None: with BytesIO(bytes_data) as fin: alert_dicts = list(fastavro.reader(fin)) # list with single dict if len(alert_dicts) != 1: LOGGER.warning(f"Expected 1 Avro record. Found {len(alert_dicts)}.") return alert_dicts[0]
[docs] @staticmethod def b64avro_to_dict(bytes_data): """Convert base64 encoded, Avro serialized bytes data to a dict. Parameters ----------- bytes_data : `bytes`: base64 encoded, Avro serialized bytes to be converted to a dictionary Returns --------- data : `dict` ``bytes_data`` unpacked into a dictionary. """ return Cast.avro_to_dict(b64decode(bytes_data))
# --- Work with alert dictionaries
[docs] @staticmethod def alert_dict_to_dataframe(alert_dict: dict) -> pd.DataFrame: """Package a ZTF alert dictionary into a dataframe. Adapted from: https://github.com/ZwickyTransientFacility/ztf-avro-alert/blob/master/notebooks/Filtering_alerts.ipynb """ dfc = pd.DataFrame(alert_dict["candidate"], index=[0]) df_prv = pd.DataFrame(alert_dict["prv_candidates"]) df = pd.concat([dfc, df_prv], ignore_index=True, sort=True) df = df[dfc.columns] # return to original column ordering # we'll attach some metadata # note this may not be preserved after all operations # https://stackoverflow.com/questions/14688306/adding-meta-information-metadata-to-pandas-dataframe df.objectId = alert_dict["objectId"] return df
[docs] @staticmethod def alert_dict_to_table(alert_dict: dict) -> Table: """Package a ZTF alert dictionary into an Astopy Table.""" # collect rows for the table candidate = OrderedDict(alert_dict["candidate"]) rows = [candidate] for prv_cand in alert_dict["prv_candidates"]: # astropy 3.2.1 cannot handle dicts with different keys (fixed by 4.1) prv_cand_tmp = {key: prv_cand.get(key, None) for key in candidate.keys()} rows.append(prv_cand_tmp) # create and return the table table = Table(rows=rows) table.meta["comments"] = f"ZTF objectId: {alert_dict['objectId']}" return table
@staticmethod def _strip_cutouts_ztf(alert_dict: dict) -> dict: """Drop the cutouts from the alert dictionary. Args: alert_dict: ZTF alert formated as a dict Returns: `alert_data` with the cutouts (postage stamps) removed """ cutouts = ["cutoutScience", "cutoutTemplate", "cutoutDifference"] alert_stripped = {k: v for k, v in alert_dict.items() if k not in cutouts} return alert_stripped # dates
[docs] @staticmethod def jd_to_readable_date(jd): """Convert a Julian date to a human readable string. Parameters ----------- jd : `float` Datetime value in julian format Returns -------- date : `str` ``jd`` in the format 'day mon year hour:min' """ return Time(jd, format="jd").strftime("%d %b %Y - %H:%M:%S")
# --- Survey-specific
[docs]def ztf_fid_names() -> dict: """Return a dictionary mapping the ZTF `fid` (filter ID) to the common name.""" return {1: "g", 2: "r", 3: "i"}