Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For a given float WMO, a new class to easily open netcdf dataset for local and remote GDAC #429

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
3 changes: 2 additions & 1 deletion argopy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from .plot import dashboard, ArgoColors # noqa: E402
from .options import set_options, reset_options # noqa: E402
from .data_fetchers import CTDRefDataFetcher # noqa: E402
from .stores import ArgoIndex # noqa: E402
from .stores import ArgoIndex, ArgoFloat # noqa: E402
from .utils import show_versions, show_options # noqa: E402
from .utils import clear_cache, lscache # noqa: E402
from .utils import MonitoredThreadPoolExecutor # noqa: E402, F401
Expand Down Expand Up @@ -67,6 +67,7 @@
"OceanOPSDeployments", # Class
"CTDRefDataFetcher", # Class
"ArgoIndex", # Class
"ArgoFloat", # Class
"ArgoDocs", # Class
"TopoFetcher", # Class
"ArgoDOI", # Class
Expand Down
4 changes: 2 additions & 2 deletions argopy/data_fetchers/gdac_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
has_pyarrow = importlib.util.find_spec('pyarrow') is not None
if has_pyarrow:
from argopy.stores.argo_index_pa import indexstore_pyarrow as indexstore
log.debug("Using pyarrow indexstore")
# log.debug("Using pyarrow indexstore")
else:
from argopy.stores.argo_index_pd import indexstore_pandas as indexstore
# warnings.warn("Consider installing pyarrow in order to improve performances when fetching GDAC data")
log.debug("Using pandas indexstore")
# log.debug("Using pandas indexstore")

access_points = ["wmo", "box"]
exit_formats = ["xarray"]
Expand Down
3 changes: 3 additions & 0 deletions argopy/stores/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
from .argo_index_pd import indexstore_pandas as indexstore_pd

from .argo_index import ArgoIndex
from .float.argo_float import ArgoFloat


#
__all__ = (
# Classes:
"ArgoIndex",
"ArgoFloat",
"indexstore_pa",
"indexstore_pd",
"filestore",
Expand Down
22 changes: 12 additions & 10 deletions argopy/stores/argo_index_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,29 +66,28 @@ class ArgoIndexStoreProto(ABC):

def __init__(
self,
host: str = "https://data-argo.ifremer.fr",
host: str = None,
index_file: str = "ar_index_global_prof.txt",
convention: str = None,
cache: bool = False,
cachedir: str = "",
timeout: int = 0,
**kwargs,
):
"""Create an Argo index file store
"""Create an Argo index store

Parameters
----------
host: str, default: ``https://data-argo.ifremer.fr``
Local or remote (ftp, https or s3) path to a `dac` folder (GDAC structure compliant).
host: str, optional, default=OPTIONS["gdac"]
Local or remote (http, ftp or s3) path to a `dac` folder (compliant with GDAC structure).

This parameter takes values like:

- ``https://data-argo.ifremer.fr``
- ``ftp://ftp.ifremer.fr/ifremer/argo``
- ``s3://argo-gdac-sandbox/pub/idx``
- a local absolute path

You can also use the following keywords: ``http``/``https``, ``ftp`` and ``s3``/``aws``, respectively.
- ``https://data-argo.ifremer.fr``, shortcut with ``http`` or ``https``
- ``https://usgodae.org/pub/outgoing/argo``, shortcut with ``us-http`` or ``us-https``
- ``ftp://ftp.ifremer.fr/ifremer/argo``, shortcut with ``ftp``
- ``s3://argo-gdac-sandbox/pub/idx``, shortcut with ``s3`` or ``aws``
index_file: str, default: ``ar_index_global_prof.txt``
Name of the csv-like text file with the index.

Expand All @@ -114,12 +113,15 @@ def __init__(
timeout: int, default: OPTIONS['api_timeout']
Time out in seconds to connect to a remote host (ftp or http).
"""
host = OPTIONS["gdac"] if host is None else host

# Catchup keywords for host:
if str(host).lower() in ["ftp"]:
host = "ftp://ftp.ifremer.fr/ifremer/argo"
elif str(host).lower() in ["http", "https"]:
elif str(host).lower() in ["http", "https", "fr-http", "fr-https"]:
host = "https://data-argo.ifremer.fr"
elif str(host).lower() in ["us-http", "us-https"]:
host = "https://usgodae.org/pub/outgoing/argo"
elif str(host).lower() in ["s3", "aws"]:
host = "s3://argo-gdac-sandbox/pub/idx"
self.host = host
Expand Down
Empty file added argopy/stores/float/__init__.py
Empty file.
73 changes: 73 additions & 0 deletions argopy/stores/float/argo_float.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
If client is online (connected to the web) we work with the 'online' implementation
otherwise we fall back on an offline implementation.

The choice is really meaningfull when the client is using a local host. In this case
we don't know if client intends to be online or offline, so we check and implement.

"""

import logging

from ...utils import isconnected


log = logging.getLogger("argopy.stores.ArgoFloat")


if isconnected():
from .implementations.argo_float_online import ArgoFloatOnline as FloatStore

log.info("Using ONLINE Argo Float implementation")
else:
from .implementations.argo_float_offline import ArgoFloatOffline as FloatStore

log.info("Using OFFLINE Argo Float implementation")


class ArgoFloat(FloatStore):
"""Argo GDAC float store

This store makes it easy to load/read data for a given float from any GDAC location and netcdf files

Examples
--------
.. code-block:: python
:caption: A float store is instantiated with float WMO number and a host (any access path: local, http, ftp or s3) where float files are to be found.

>>> from argopy import ArgoFloat
>>> af = ArgoFloat(WMO) # Use argopy 'gdac' option by default
>>> af = ArgoFloat(WMO, host='/home/ref-argo/gdac') # Use your local GDAC copy
>>> af = ArgoFloat(WMO, host='http') # Shortcut for https://data-argo.ifremer.fr
>>> af = ArgoFloat(WMO, host='ftp') # shortcut for ftp://ftp.ifremer.fr/ifremer/argo
>>> af = ArgoFloat(WMO, host='s3') # Shortcut for s3://argo-gdac-sandbox/pub

.. code-block:: python
:caption: Load/read GDAC netcdf files as a :class:`xarray.Dataset`

>>> af.list_dataset() # Return a dictionary with all available datasets for this float
>>> ds = af.open_dataset('prof') # Use keys from the available datasets dictionary
>>> ds = af.open_dataset('meta')
>>> ds = af.open_dataset('tech')
>>> ds = af.open_dataset('Rtraj')
>>> ds = af.open_dataset('Sprof')

.. code-block:: python
:caption: Other attributes and methods

>>> af.N_CYCLES # Number of cycles (estimated)
>>> af.path # root path for all float datasets
>>> af.dac # name of the DAC this float belongs to
>>> af.metadata # a dictionary with all available metadata for this file (from netcdf or fleetmonitoring API)
>>> af.ls() # list af.path folder content

.. code-block:: python
:caption: Working with float profiles

>>> af.lsprofiles() # list float "profiles" folder content
>>> af.describe_profiles() # Pandas DataFrame describing all available float profile files

"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
11 changes: 11 additions & 0 deletions argopy/stores/float/implementations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/env python
# -*coding: UTF-8 -*-
#
# HELP
#
# Created by gmaze on 09/01/2025
__author__ = '[email protected]'

import os
import sys
import xarray as xr
91 changes: 91 additions & 0 deletions argopy/stores/float/implementations/argo_float_offline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import pandas as pd
import numpy as np
from pathlib import Path
import logging

from ....errors import InvalidOption
from ..spec import ArgoFloatProto


log = logging.getLogger("argopy.stores.ArgoFloat")


class ArgoFloatOffline(ArgoFloatProto):
"""Offline :class:`ArgoFloat` implementation"""
_online = False

def __init__(
self,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)

if self.host_protocol != "file":
raise InvalidOption(
"Trying to work with the offline store using a remote host !"
)

# Load some data (in a perfect world, this should be done asynchronously):
self.load_dac() # must come before metadata
self.load_metadata()

def load_metadata(self):
"""Method to load float meta-data"""
data = {}

ds = self.open_dataset("meta")
data.update(
{
"deployment": {
"launchDate": pd.to_datetime(ds["LAUNCH_DATE"].values, utc=True)
}
}
)
data.update(
{"platform": {"type": ds["PLATFORM_TYPE"].values[np.newaxis][0].strip()}}
)
data.update({"maker": ds["PLATFORM_MAKER"].values[np.newaxis][0].strip()})

def infer_network(this_ds):
if this_ds["PLATFORM_FAMILY"].values[np.newaxis][0].strip() == "FLOAT_DEEP":
network = ["DEEP"]
if len(this_ds["SENSOR"].values) > 4:
network.append("BGC")

elif this_ds["PLATFORM_FAMILY"].values[np.newaxis][0].strip() == "FLOAT":
if len(this_ds["SENSOR"].values) > 4:
network = ["BGC"]
else:
network = ["CORE"]

else:
network = ["?"]

return network

data.update({"networks": infer_network(ds)})

data.update({"cycles": np.unique(self.open_dataset("prof")["CYCLE_NUMBER"])})

self._metadata = data

def load_dac(self):
"""Load the DAC short name for this float"""
try:
dac = [
p.parts[-2]
for p in Path(self.host).glob(
self.host_sep.join(["dac", "*", "%i" % self.WMO])
)
]
if len(dac) > 0:
self._dac = dac[0]

except:
raise ValueError(
f"DAC name for Float {self.WMO} cannot be found from {self.host}"
)

# For the record, another method to get the DAC name, based on the profile index:
# self._dac = self.idx.search_wmo(self.WMO).read_dac_wmo()[0][0] # Get DAC from Argo index
78 changes: 78 additions & 0 deletions argopy/stores/float/implementations/argo_float_online.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from typing import Union
import xarray as xr
from pathlib import Path
import pandas as pd
import logging

from ....plot import dashboard
from ....utils import check_wmo, isconnected, argo_split_path
from ....options import OPTIONS
from ... import ArgoIndex, httpstore
from ..spec import ArgoFloatProto


log = logging.getLogger("argopy.stores.ArgoFloat")


class ArgoFloatOnline(ArgoFloatProto):
""":class:`ArgoFloat` implementation using web access"""
_online = True
_eafleetmonitoring_server = "https://fleetmonitoring.euro-argo.eu"
_technicaldata = None

def __init__(
self,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)

if self.host_protocol == "s3":
self.host = self.host.replace(
"/idx", ""
) # Fix s3 anomaly whereby index files are not at the 'dac' level

# Load some data (in a perfect world, this should be done asynchronously):
self.load_metadata() # must come before load_dac
self.load_dac()

def load_metadata(self):
"""Load float meta data from Euro-Argo fleet-monitoring API"""
# api_point = f"{self._eafleetmonitoring_server}/floats/basic/{self.WMO}"
api_point = f"{self._eafleetmonitoring_server}/floats/{self.WMO}"
self._metadata = httpstore(cache=self.cache, cachedir=self.cachedir).open_json(
api_point
)

# Fix data type for some useful keys:
self._metadata["deployment"]["launchDate"] = pd.to_datetime(
self._metadata["deployment"]["launchDate"]
)

def load_technicaldata(self):
"""Load float technical data from Euro-Argo fleet-monitoring API"""
# api_point = f"{self._eafleetmonitoring_server}/technical-data/basic/{self.WMO}"
api_point = f"{self._eafleetmonitoring_server}/technical-data/{self.WMO}"
self._technicaldata = httpstore(
cache=self.cache, cachedir=self.cachedir
).open_json(api_point)

@property
def technicaldata(self) -> dict:
"""A dictionary holding float technical data"""
if self._technicaldata is None:
self.load_technicaldata()
return self._technicaldata

def load_dac(self):
"""Load the DAC short name for this float"""
try:
# Get DAC from EA-Metadata API:
self._dac = self._metadata["dataCenter"]["name"].lower()
except:
raise ValueError(
f"DAC name for Float {self.WMO} cannot be found from {self.host}"
)

# For the record, another method to get the DAC name, based on the profile index
# self._dac = self.idx.search_wmo(self.WMO).read_dac_wmo()[0][0] # Get DAC from Argo index
Loading
Loading