Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zarr file format support #37

Merged
merged 9 commits into from
Jan 22, 2025
Merged
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,7 @@ venv.bak/
_catalogs/
_old/

.DS_Store
.DS_Store
*.zarr
*.nc
db_init
2 changes: 1 addition & 1 deletion api/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ RUN apt update && apt install -y cron curl

WORKDIR /app
COPY requirements.txt /code/requirements.txt
RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt --break-system-packages
COPY app /app
EXPOSE 80

Expand Down
28 changes: 22 additions & 6 deletions api/app/endpoint_handlers/file.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
"""Module with functions to handle file related endpoints"""
import io
import os
import zipfile
from pathlib import Path
from zipfile import ZipFile

from fastapi.responses import FileResponse


from dbmanager.dbmanager import DBManager, RequestStatus
from starlette.requests import Request
from starlette.responses import HTMLResponse, RedirectResponse, StreamingResponse
from starlette.staticfiles import StaticFiles

from utils.api_logging import get_dds_logger
from utils.metrics import log_execution_time
import exceptions as exc

log = get_dds_logger(__name__)


@log_execution_time(log)
def download_request_result(request_id: int):
def download_request_result(request_id: int, filename: str = None):
"""Realize the logic for the endpoint:

`GET /download/{request_id}`
Expand Down Expand Up @@ -60,7 +68,15 @@ def download_request_result(request_id: int):
download_details.location_path,
)
raise FileNotFoundError
return FileResponse(
path=download_details.location_path,
filename=download_details.location_path.split(os.sep)[-1],
)

if download_details.location_path.endswith(".zarr"):
log.info("Zarr detected")
return FileResponse(
path=f'{download_details.location_path}/{filename}',
filename=filename,
)
else:
return FileResponse(
path=download_details.location_path,
filename=download_details.location_path.split(os.sep)[-1],
)
49 changes: 49 additions & 0 deletions api/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,3 +355,52 @@ async def download_request_result(
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="File was not found!"
) from err

@app.get("/download/{request_id}/{filename}", tags=[tags.REQUEST])
@timer(
app.state.api_request_duration_seconds,
labels={"route": "GET /download/{request_id}/{filename}"},
)
# @requires([scopes.AUTHENTICATED]) # TODO: mange download auth in the web component
async def download_request_result(
request: Request,
request_id: int,
filename: str,
):
"""Download result of the request"""
app.state.api_http_requests_total.inc(
{"route": "GET /download/{request_id}/{filename}"}
)
try:
return file_handler.download_request_result(request_id=request_id, filename=filename)
except exc.BaseDDSException as err:
raise err.wrap_around_http_exception() from err
except FileNotFoundError as err:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="File was not found!"
) from err

@app.get("/download/{request_id}/{filename}/{subfile}", tags=[tags.REQUEST])
@timer(
app.state.api_request_duration_seconds,
labels={"route": "GET /download/{request_id}/{filename}/{subfile}"},
)
# @requires([scopes.AUTHENTICATED])
async def download_request_result(
request: Request,
request_id: int,
filename: str,
subfile: str,
):
"""Download result of the request"""
app.state.api_http_requests_total.inc(
{"route": "GET /download/{request_id}/{filename}/{subfile}"}
)
try:
return file_handler.download_request_result(request_id=request_id, filename=f'{filename}/{subfile}')
except exc.BaseDDSException as err:
raise err.wrap_around_http_exception() from err
except FileNotFoundError as err:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="File was not found!"
) from err
2 changes: 1 addition & 1 deletion datastore/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ ARG TAG=latest
FROM $REGISTRY/geolake-drivers:$TAG

COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt --break-system-packages
COPY ./datastore /app/datastore
COPY ./workflow /app/workflow
COPY ./dbmanager /app/dbmanager
Expand Down
3 changes: 3 additions & 0 deletions datastore/datastore/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ def _process_query(kube, query: GeoQuery, compute: None | bool = False):
else:
method = "nearest"
kube = kube.sel(vertical=vertical, method=method)
if query.resample:
Datastore._LOG.debug("Applying resample...")
kube = kube.resample(**query.resample)
return kube.compute() if compute else kube

@staticmethod
Expand Down
1 change: 1 addition & 0 deletions datastore/geoquery/geoquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
class GeoQuery(BaseModel, extra="allow"):
variable: Optional[Union[str, List[str]]]
# TODO: Check how `time` is to be represented
resample: Optional[Dict[str,str]]
time: Optional[Union[Dict[str, str], Dict[str, List[str]]]]
area: Optional[Dict[str, float]]
location: Optional[Dict[str, Union[float, List[float]]]]
Expand Down
183 changes: 109 additions & 74 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,85 +1,120 @@
version: "3"
version: "3.9"
services:
api:
build:
context: ./
dockerfile: ./api/Dockerfile
db:
image: postgres:latest
environment:
- POSTGRES_USER=dds
- POSTGRES_PASSWORD=dds
- POSTGRES_DB=dds
ports:
- "8080:80"
- "5432:5432"
expose:
- 5432
healthcheck:
test: [ "CMD-SHELL", "pg_isready", "-d", "db_prod" ]
interval: 30s
timeout: 60s
retries: 5
start_period: 80s

restore-db:
image: rg.fr-par.scw.cloud/geodds-production/backup:v0.1a2
depends_on:
- broker
- db
links:
- broker
- db
environment:
CATALOG_PATH: /code/app/resources/catalogs/catalog.yaml
POSTGRES_DB: dds
POSTGRES_USER: dds
POSTGRES_PASSWORD: dds
POSTGRES_HOST: db
POSTGRES_PORT: 5432
db:
condition: service_healthy
command:
- /bin/bash
- -c
- |
/db_init/restore.sh
tail -f /dev/null
volumes:
- downloads:/downloads:ro
command: ["./wait-for-it.sh", "broker:5672", "--", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "80"]
executor:
build:
context: ./
dockerfile: ./executor/Dockerfile
depends_on:
- broker
- db
links:
- broker
- db
ports:
- 8787:8787
- ./db_init:/db_init
environment:
EXECUTOR_TYPES: query,info,estimate
CATALOG_PATH: /code/app/resources/catalogs/catalog.yaml
POSTGRES_DB: dds
POSTGRES_USER: dds
POSTGRES_PASSWORD: dds
POSTGRES_HOST: db
POSTGRES_PORT: 5432
- POSTGRES_HOST=db
- FILENAME=postgres_localhost-2024_10_03_09_53_46-dump-blank.sql
- POSTGRES_USER=dds
- POSTGRES_PASSWORD=dds
- POSTGRES_DB=dds

catalog:
image: rg.fr-par.scw.cloud/geolake/geolake-datastore:latest
build: ./datastore
volumes:
- downloads:/downloads:rw
command: ["./wait-for-it.sh", "broker:5672", "--", "python", "./app/main.py"]
broker:
image: rabbitmq:3
db:
image: postgres:14.1
restart: always
- ./catalog:/catalog
command:
- /bin/bash
- -c
- |
trap : TERM INT
sleep infinity &
wait

api:
image: geolago-api:dev
build: ./api
command:
- ./../wait-for-it.sh
- $(BROKER_SERVICE_HOST):5672
- --
- uvicorn
- main:app
- --host
- 0.0.0.0
- --port
- '80'
environment:
- DB_SERVICE_HOST=db
- DB_SERVICE_PORT=5432
- BROKER_SERVICE_HOST=broker
- POSTGRES_USER=dds
- POSTGRES_PASSWORD=dds
- POSTGRES_DB=dds
- CATALOG_PATH=/catalog/catalog.yaml
- CACHE_PATH=/catalog/.cache
- MESSAGE_SEPARATOR='\'
volumes:
- ./db/scripts:/docker-entrypoint-initdb.d
- ./catalog:/catalog
- ./downloads:/downloads
- ./catalog/datasets:/data
expose:
- '8080'
ports:
- 5432:5432
environment:
POSTGRES_DB: dds
POSTGRES_USER: dds
POSTGRES_PASSWORD: dds
POSTGRES_HOST: db
POSTGRES_PORT: 5432
web:
build:
context: ./
dockerfile: ./web/Dockerfile
- '8080:80'

broker:
image: rabbitmq:3.12-management-alpine
ports:
- "8080:80"
depends_on:
- db
links:
- db
- "5672:5672"
- "15672:15672"
expose:
- 5672

executor:
image: geolago-executor:dev
build: ./executor
command:
- ./../wait-for-it.sh
- $(BROKER_SERVICE_HOST):5672
- --
- python
- main.py
environment:
CATALOG_PATH: /code/app/resources/catalogs/catalog.yaml
POSTGRES_DB: dds
POSTGRES_USER: dds
POSTGRES_PASSWORD: dds
POSTGRES_HOST: db
POSTGRES_PORT: 5432
- BROKER_SERVICE_HOST=broker
- DB_SERVICE_HOST=db
- DB_SERVICE_PORT=5432
- POSTGRES_USER=dds
- POSTGRES_PASSWORD=dds
- POSTGRES_DB=dds
- EXECUTOR_TYPES=query
- MESSAGE_SEPARATOR='\'
- SLEEP_SEC=10
- RESULT_CHECK_RETRIES=360
- CATALOG_PATH=/catalog/catalog.yaml
- CACHE_PATH=/catalog/.cache
- HDF5_USE_FILE_LOCKING='FALSE'
- LD_LIBRARY_PATH=/urs/lib/x86_64-linux-gnu
volumes:
- downloads:/downloads:ro
command: ["./wait-for-it.sh", "broker:5672", "--", "uvicorn", "web.main:app", "--host", "0.0.0.0", "--port", "80"]

volumes:
downloads:
- ./catalog:/catalog
- ./downloads:/downloads
- ./catalog/datasets:/data
5 changes: 3 additions & 2 deletions drivers/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
ARG REGISTRY=rg.fr-par.scw.cloud/geokube
#ARG TAG=v0.2.6b2
ARG TAG=2024.05.03.10.36
#ARG TAG=2024.05.03.10.36
ARG TAG=v0.2.7.1
FROM $REGISTRY/geokube:$TAG

COPY dist/intake_geokube-0.1a0-py3-none-any.whl /
RUN pip install /intake_geokube-0.1a0-py3-none-any.whl
RUN pip install /intake_geokube-0.1a0-py3-none-any.whl --break-system-packages
RUN rm /intake_geokube-0.1a0-py3-none-any.whl
2 changes: 1 addition & 1 deletion executor/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ RUN apt update && apt install -y cron curl

WORKDIR /app
COPY requirements.txt /code/requirements.txt
RUN pip install --no-cache-dir -r /code/requirements.txt
RUN pip install --no-cache-dir -r /code/requirements.txt --break-system-packages
COPY app /app

COPY ./healtcheck.* /opt/
Expand Down
5 changes: 5 additions & 0 deletions executor/app/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import tempfile
import time
import datetime
import pika
Expand Down Expand Up @@ -119,6 +120,9 @@ def persist_datacube(
case "geojson":
full_path = os.path.join(base_path, f"{path}.json")
kube.to_geojson(full_path)
case "zarr":
full_path = os.path.join(base_path, f"{path}.zarr")
kube.to_zarr(full_path, mode='w', consolidated=True)
case _:
raise ValueError(f"format `{format}` is not supported")
return full_path
Expand Down Expand Up @@ -455,6 +459,7 @@ def get_size(self, location_path):
dask_cluster_opts["n_workers"] = int(os.getenv("DASK_N_WORKERS", 1))
dask_cluster_opts["memory_limit"] = os.getenv("DASK_MEMORY_LIMIT", "auto")
dask_cluster_opts['thread_per_worker'] = int(os.getenv("DASK_THREADS_PER_WORKER", 8))
dask_cluster_opts['local_directory'] = os.getenv("LOCAL_DIR", tempfile.mkdtemp(f'{os.uname()[1]}'))


executor = Executor(broker=broker, store_path=store_path, dask_cluster_opts=dask_cluster_opts)
Expand Down
Loading