From d73f348d521add087cfadffe28547fec63174934 Mon Sep 17 00:00:00 2001 From: Steve Pothier Date: Wed, 27 Nov 2024 12:37:44 -0700 Subject: [PATCH] does query of exposures with consdb as source --- notebooks_tsqr/NightLog.ipynb | 230 ++++++++---------- .../lsst/ts/logging_and_reporting/consdb.py | 157 ++++++++++++ .../logging_and_reporting/source_adapters.py | 54 +++- 3 files changed, 305 insertions(+), 136 deletions(-) create mode 100644 python/lsst/ts/logging_and_reporting/consdb.py diff --git a/notebooks_tsqr/NightLog.ipynb b/notebooks_tsqr/NightLog.ipynb index 491a6ca..906e9a7 100644 --- a/notebooks_tsqr/NightLog.ipynb +++ b/notebooks_tsqr/NightLog.ipynb @@ -71,7 +71,8 @@ "from lsst.ts.logging_and_reporting.all_reports import AllReports\n", "import lsst.ts.logging_and_reporting.utils as ut\n", "from lsst.ts.logging_and_reporting.reports import md, mdlist, mdpathlink\n", - "from lsst.ts.logging_and_reporting.reports import html_draft, html_beta" + "from lsst.ts.logging_and_reporting.reports import html_draft, html_beta\n", + "import lsst.ts.logging_and_reporting.consdb as cdb" ] }, { @@ -119,114 +120,67 @@ "id": "5", "metadata": {}, "outputs": [], - "source": [] + "source": [ + "con_src = cdb.ConsdbAdapter(\n", + " server_url=\"https://usdf-rsp-dev.slac.stanford.edu\",\n", + " min_dayobs=\"2024-11-25\",\n", + " max_dayobs=\"2024-11-26\",\n", + " verbose=True,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "6", + "metadata": {}, + "source": [ + "TODO use min,max for exposures." + ] }, { "cell_type": "code", "execution_count": null, - "id": "6", + "id": "7", "metadata": {}, "outputs": [], "source": [ - "from lsst.ts.logging_and_reporting.consolidated_database import CDBc\n", - "\n", - "print(dirs(consolidated_database))\n", - "\n", - "\n", - "class ConsDbAdapter:\n", - " \"\"\"Create and manage the connection to the Consolidated Database\n", - " including knowledge of the schemas and instruments\"\"\"\n", - "\n", - " def __init__(self, url, day_obs):\n", - " self.url = url\n", - " self.day_obs = day_obs\n", - " self.day_obs_int = int(\n", - " day_obs.replace(\"-\", \"\")\n", - " ) # Day Obs needs to be an int like 20241013\n", - " self.client = CDBc(url) if have_consdb else None\n", - " os.environ[\"no_proxy\"] += \",.consdb\"\n", - " # Something about token from consdb usage page needs to happen\n", - "\n", - " def query_visit(self, instrument: str, type: str = \"visit1\"):\n", - " \"\"\"Query visit1 and visit1_quicklook tables and join the data on\n", - " visit_id, type can also be ccdvisit1\"\"\"\n", - " visit1 = f\"\"\"SELECT * FROM cdb_{instrument}.{type}\n", - " where day_obs = {self.day_obs_int}\"\"\"\n", - " ccdvisit1_quicklook = f\"SELECT * FROM cdb_{instrument}.{type}_quicklook\"\n", - "\n", - " try:\n", - " visits = self.client.query(visit1)\n", - " quicklook = self.client.query(ccdvisit1_quicklook)\n", - " except Exception as erry:\n", - " print(f\"{erry=}\")\n", - " return None\n", - "\n", - " # Join both on visit_id so we can access obs_start for a time axis\n", - " return visits.join(quicklook, on=\"visit_id\", lsuffix=\"\", rsuffix=\"_q\")\n", - "\n", - " def query_exposure(self, instrument: str, type: str = \"exposure\"):\n", - " \"\"\"Query exposure table and return data,\n", - " Type may also be ccdexposure\"\"\"\n", - " exposure_query = f\"\"\"SELECT * FROM cdb_{instrument}.{type}\n", - " where day_obs = {self.day_obs_int}\"\"\"\n", - " try:\n", - " exposures = self.client.query(exposure_query)\n", - " except Exception as erry:\n", - " print(f\"{erry=}\")\n", - " return None\n", - "\n", - " return exposures\n", - "\n", - "\n", - "def plot(y, x):\n", - " \"\"\"Plot the given x and y data.\"\"\"\n", - " fig = plt.figure(figsize=(6, 6))\n", - " ax = fig.subplots()\n", - " ax.scatter(x, y)\n", - " plt.show()\n", - "\n", - "\n", - "def plot_ra_dec(y, x):\n", - " \"\"\"I expect this plot type will be different\"\"\"\n", - " fig = plt.figure(figsize=(6, 6))\n", - " ax = fig.subplots()\n", - " ax.scatter(x, y)\n", - " plt.show()\n", - "\n", - "\n", - "def make_plots(day_obs, instruments=[\"latiss, lsstcomcamsim, lsstcomcam\"]):\n", - " URL = \"http://consdb-pq.consdb:8080/consdb\"\n", - "\n", - " for instrument in instruments:\n", - " db_client = ConsDbAdapter(URL, day_obs)\n", - " visits = db_client.query_visit(instrument=instrument)\n", - " exposures = db_client.query_exposure(instrument=instrument)\n", - " if visits:\n", - " # This is our time axis for each visit\n", - " obs_start = visits[\"obs_start\"]\n", - "\n", - " psf_area = visits[\"psf_area\"]\n", - " plot(psf_area, obs_start)\n", - " sky_bg = visits[\"sky_bg\"]\n", - " plot(sky_bg, obs_start)\n", - " zero_point = visits[\"zero_point\"]\n", - " plot(zero_point, obs_start)\n", - "\n", - " if exposures:\n", - " ra = exposures[\"s_ra\"]\n", - " dec = exposures[\"s_dec\"]\n", - " plot_ra_dec(dec, ra)\n", - "\n", - "\n", - "print(date.date())\n", + "con_src.get_exposures(20241125)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8", + "metadata": {}, + "outputs": [], + "source": [ + "con_src.schemas" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9", + "metadata": {}, + "outputs": [], + "source": [ + "from lsst.summit.utils.utils import getSite\n", "\n", - "make_plots(day_obs=date.date(), instruments=[\"lsstcomcam\"])" + "getSite()" ] }, { "cell_type": "code", "execution_count": null, - "id": "7", + "id": "10", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "11", "metadata": {}, "outputs": [], "source": [ @@ -245,7 +199,7 @@ }, { "cell_type": "markdown", - "id": "8", + "id": "12", "metadata": {}, "source": [ "----------" @@ -254,7 +208,7 @@ { "cell_type": "code", "execution_count": null, - "id": "9", + "id": "13", "metadata": {}, "outputs": [], "source": [ @@ -267,7 +221,7 @@ }, { "cell_type": "markdown", - "id": "10", + "id": "14", "metadata": {}, "source": [ "# Table of Contents\n", @@ -291,7 +245,7 @@ }, { "cell_type": "markdown", - "id": "11", + "id": "15", "metadata": {}, "source": [ "## Night Report BETA \n", @@ -303,7 +257,15 @@ { "cell_type": "code", "execution_count": null, - "id": "12", + "id": "16", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "17", "metadata": {}, "outputs": [], "source": [ @@ -314,7 +276,7 @@ }, { "cell_type": "markdown", - "id": "13", + "id": "18", "metadata": {}, "source": [ "## Almanac BETA \n", @@ -326,7 +288,7 @@ { "cell_type": "code", "execution_count": null, - "id": "14", + "id": "19", "metadata": {}, "outputs": [], "source": [ @@ -337,7 +299,7 @@ }, { "cell_type": "markdown", - "id": "15", + "id": "20", "metadata": {}, "source": [ "## Summary plots of whole night DRAFT" @@ -345,7 +307,7 @@ }, { "cell_type": "markdown", - "id": "16", + "id": "21", "metadata": {}, "source": [ "(content not yet defined in storyboard)\n", @@ -356,7 +318,7 @@ { "cell_type": "code", "execution_count": null, - "id": "17", + "id": "22", "metadata": {}, "outputs": [], "source": [ @@ -367,7 +329,7 @@ }, { "cell_type": "markdown", - "id": "18", + "id": "23", "metadata": {}, "source": [ "## Links to related resources BETA\n", @@ -377,7 +339,7 @@ { "cell_type": "code", "execution_count": null, - "id": "19", + "id": "24", "metadata": {}, "outputs": [], "source": [ @@ -396,7 +358,7 @@ }, { "cell_type": "markdown", - "id": "20", + "id": "25", "metadata": {}, "source": [ "## Time Accounting BETA\n", @@ -413,7 +375,7 @@ { "cell_type": "code", "execution_count": null, - "id": "21", + "id": "26", "metadata": {}, "outputs": [], "source": [ @@ -425,7 +387,7 @@ }, { "cell_type": "markdown", - "id": "22", + "id": "27", "metadata": {}, "source": [ "- (1) *Expected* slew time as per Schedular\n", @@ -434,7 +396,7 @@ }, { "cell_type": "markdown", - "id": "23", + "id": "28", "metadata": {}, "source": [ "## Jira Tickets BETA \n", @@ -446,7 +408,7 @@ { "cell_type": "code", "execution_count": null, - "id": "24", + "id": "29", "metadata": {}, "outputs": [], "source": [ @@ -460,7 +422,7 @@ { "cell_type": "code", "execution_count": null, - "id": "25", + "id": "30", "metadata": {}, "outputs": [], "source": [ @@ -481,7 +443,7 @@ }, { "cell_type": "markdown", - "id": "26", + "id": "31", "metadata": {}, "source": [ "## BLOCKS Observed DRAFT\n", @@ -490,7 +452,7 @@ }, { "cell_type": "markdown", - "id": "27", + "id": "32", "metadata": {}, "source": [ "## Data Log BETA\n", @@ -500,7 +462,7 @@ { "cell_type": "code", "execution_count": null, - "id": "28", + "id": "33", "metadata": {}, "outputs": [], "source": [ @@ -517,7 +479,7 @@ }, { "cell_type": "markdown", - "id": "29", + "id": "34", "metadata": {}, "source": [ "## Narrative Log BETA \n", @@ -530,7 +492,7 @@ { "cell_type": "code", "execution_count": null, - "id": "30", + "id": "35", "metadata": { "editable": true, "slideshow": { @@ -547,7 +509,7 @@ }, { "cell_type": "markdown", - "id": "31", + "id": "36", "metadata": {}, "source": [ "-----------\n", @@ -556,7 +518,7 @@ }, { "cell_type": "markdown", - "id": "32", + "id": "37", "metadata": {}, "source": [ "# Developer Only Section REMOVE\n", @@ -567,7 +529,7 @@ }, { "cell_type": "markdown", - "id": "33", + "id": "38", "metadata": {}, "source": [ "## Overview \n" @@ -576,7 +538,7 @@ { "cell_type": "code", "execution_count": null, - "id": "34", + "id": "39", "metadata": {}, "outputs": [], "source": [ @@ -609,7 +571,7 @@ }, { "cell_type": "markdown", - "id": "35", + "id": "40", "metadata": {}, "source": [ "## Data Status\n", @@ -619,7 +581,7 @@ { "cell_type": "code", "execution_count": null, - "id": "36", + "id": "41", "metadata": {}, "outputs": [], "source": [ @@ -630,7 +592,7 @@ }, { "cell_type": "markdown", - "id": "37", + "id": "42", "metadata": {}, "source": [ "## This report uses the following data sources\n", @@ -645,7 +607,7 @@ }, { "cell_type": "markdown", - "id": "38", + "id": "43", "metadata": {}, "source": [ "## Where was this run?\n", @@ -660,7 +622,7 @@ }, { "cell_type": "markdown", - "id": "39", + "id": "44", "metadata": {}, "source": [ "## Section overviews moved here" @@ -669,7 +631,7 @@ { "cell_type": "code", "execution_count": null, - "id": "40", + "id": "45", "metadata": {}, "outputs": [], "source": [ @@ -684,7 +646,7 @@ { "cell_type": "code", "execution_count": null, - "id": "41", + "id": "46", "metadata": {}, "outputs": [], "source": [ @@ -698,7 +660,7 @@ { "cell_type": "code", "execution_count": null, - "id": "42", + "id": "47", "metadata": {}, "outputs": [], "source": [ @@ -708,7 +670,7 @@ }, { "cell_type": "markdown", - "id": "43", + "id": "48", "metadata": {}, "source": [ "## Finale" @@ -717,7 +679,7 @@ { "cell_type": "code", "execution_count": null, - "id": "44", + "id": "49", "metadata": {}, "outputs": [], "source": [ @@ -742,7 +704,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.9" + "version": "3.11.10" } }, "nbformat": 4, diff --git a/python/lsst/ts/logging_and_reporting/consdb.py b/python/lsst/ts/logging_and_reporting/consdb.py new file mode 100644 index 0000000..60d4de2 --- /dev/null +++ b/python/lsst/ts/logging_and_reporting/consdb.py @@ -0,0 +1,157 @@ +# Used efd.py as a starting point. Layered in consolidated_database.py + +import os +from collections import defaultdict + +import pandas as pd +from lsst.ts.logging_and_reporting.source_adapters import SourceAdapter + +# curl -X 'POST' \ +# 'https://usdf-rsp.slac.stanford.edu/consdb/query' \ +# -H 'accept: application/json' \ +# -H 'Content-Type: application/json' \ +# -d '{ +# "query": "SELECT * FROM cdb_lsstcomcam.exposure LIMIT 2" +# }' + + +class ConsdbAdapter(SourceAdapter): + # See https://usdf-rsp.slac.stanford.edu/consdb/docs + service = "consdb" + endpoints = [ + "schema", # => list of instruments + "schema/{instrument}", # => list of tables + "schema/{instrument}/{table}", # => schema; dict(fname)=[type,dflt] + "query", # POST dict(query)=sql_string + ] + primary_endpoint = "NA" + + def __init__( + self, + *, + server_url=None, + min_dayobs=None, # INCLUSIVE: default=Yesterday + max_dayobs=None, # EXCLUSIVE: default=Today other=YYYY-MM-DD + limit=None, + verbose=False, + ): + super().__init__( + server_url=server_url, + max_dayobs=max_dayobs, + min_dayobs=min_dayobs, + limit=limit, + verbose=verbose, + ) + try: + import lsst.rsp + + self.token = lsst.rsp.get_access_token() + except Exception as err: + if self.verbose: + print(f"Could not get_access_token: {err}") + self.token = os.environ.get("ACCESS_TOKEN") + + self.instruments = list() + self.tables = defaultdict(list) # tables[instrument]=>[tab1, ...] + self.schemas = defaultdict(dict) # schemas[instrum][fname]=[type,dflt] + self.load_schemas() + + # get schemas to facilitate generation of SQL + def load_schemas(self): + # get instruments + if self.verbose: + print("Loading schema: instruments") + endpoint = f"{self.server}/{self.service}/schema" + url = endpoint + ok, result, code = self.protected_get(url, token=self.token) + if not ok: # failure + status = dict( + endpoint_url=url, + number_of_records=None, + error=result, + ) + return status + # success + self.instruments = result + if self.verbose: + print(f"Loaded {self.instruments=}") + + # get tables[instrument] => [table1, ...] + if self.verbose: + print("Loading schema: tables[instrument]") + for instrument in self.instruments: + endpoint = f"{self.server}/{self.service}/schema" + url = f"{endpoint}/{instrument}" + ok, result, code = self.protected_get(url, token=self.token) + if not ok: # failure + status = dict( + endpoint_url=url, + number_of_records=None, + error=result, + ) + return status + # success + if self.verbose: + print(f"Stuffing {self.tables=}") + + self.tables[instrument] = result + if self.verbose: + print(f"Loaded {self.tables[instrument]=}") + + # get schemas[instrument][fname] => [type,default] + if self.verbose: + print("Loading schema: fields [instrument][table]") + for instrument in self.instruments: + for table in self.tables[instrument]: + endpoint = f"{self.server}/{self.service}/schema" + url = f"{endpoint}/{instrument}/{table}" + ok, result, code = self.protected_get(url, token=self.token) + if not ok: # failure + status = dict( + endpoint_url=url, + number_of_records=None, + error=result, + ) + return status + # success + self.schemas[instrument][table] = result + if self.verbose: + print(f"Loaded {self.schemas[instrument][table]=}") + + if self.verbose: + print(f"Loaded Consolidated Databased schemas: {self.schemas=}") + # END load_schemas() + + def query(self, sql): + url = f"{self.server}/{self.service}/query" + if self.verbose: + print(f"DEBUG query: {url=}") + qdict = dict(query=sql) + ok, result, code = self.protected_post(url, qdict, token=self.token) + if not ok: # failure + status = dict( + endpoint_url=url, + number_of_records=None, + error=result, + ) + print(f"DEBUG query: {status=}") + return status + records = result + return pd.DataFrame.from_records(records["data"], columns=records["columns"]) + + def get_sample_of_each(self, day_obs): + instrument = self.instruments[0] + exposure_sql = ( + f"SELECT * FROM cdb_{instrument}.exposure WHERE day_obs = {day_obs}" + ) + s1 = self.query(exposure_sql) + return s1 + + def get_exposures(self, day_obs, instrument="lsstcomcam"): + outfields = "exposure_id,day_obs," "seq_num,exp_time,shut_time,dark_time" + sql = ( + f"SELECT {outfields} " + f"FROM cdb_{instrument}.exposure " + f"WHERE day_obs = {day_obs}" + ) + return self.query(sql) diff --git a/python/lsst/ts/logging_and_reporting/source_adapters.py b/python/lsst/ts/logging_and_reporting/source_adapters.py index 84ad880..067aa6c 100644 --- a/python/lsst/ts/logging_and_reporting/source_adapters.py +++ b/python/lsst/ts/logging_and_reporting/source_adapters.py @@ -149,7 +149,51 @@ def __init__( assert self.min_date < self.max_date self.min_dayobs = ut.datetime_to_dayobs(self.min_date) - def protected_get(self, url, timeout=None): + def protected_post(self, url, jsondata, token=None, timeout=None): + """Do a POST against an API url. + Do NOT stop processing when we have a problem with a URL. There + have been cases where the problem has been with + connectivity or API functioning. We want to process as many of our + sources as possible even if one or more fail. But we want to + KNOW that we had a problem so we can report it to someone. + + RETURN: If the POST works well: ok=True, result=json + RETURN: If the POST is bad: ok=False, result=error_msg_string + """ + ok = True + code = 200 + timeout = timeout or self.timeout + if self.verbose: + print(f"DEBUG protected_post({url=},{timeout=})") + try: + auth = ("user", token) + response = requests.post(url, json=jsondata, auth=auth, timeout=timeout) + if self.verbose: + print( + f"DEBUG protected_post({url=},{auth=},{timeout=}) => " + f"{response.status_code=} {response.reason}" + ) + response.raise_for_status() + except requests.exceptions.HTTPError as err: + # Invalid URL?, etc. + ok = False + code = err.response.status_code + result = f"Error getting data from API at {url}. " + result += str(err) + except requests.exceptions.ConnectionError as err: + # No VPN? Broken API? + ok = False + code = None + result = f"Error connecting to {url} (with timeout={timeout}). " + result += str(err) + else: # No exception. Could something else be wrong? + result = response.json() + + if self.verbose and not ok: + print(f"DEBUG protected_post: FAIL: {result=}") + return ok, result, code + + def protected_get(self, url, token=None, timeout=None): """Do a GET against an API url. Do NOT stop processing when we have a problem with a URL. There have been cases where the problem has been with @@ -163,10 +207,16 @@ def protected_get(self, url, timeout=None): ok = True code = 200 timeout = timeout or self.timeout + auth = ("user", token) if self.verbose: print(f"DEBUG protected_get({url=},{timeout=})") try: - response = requests.get(url, timeout=timeout) + response = requests.get(url, auth=auth, timeout=timeout) + if self.verbose: + print( + f"DEBUG protected_get({url=},{auth=},{timeout=}) => " + f"{response.status_code=} {response.reason}" + ) response.raise_for_status() except requests.exceptions.HTTPError as err: # Invalid URL?, etc.