diff --git a/.gitignore b/.gitignore index 1856070..75c6166 100644 --- a/.gitignore +++ b/.gitignore @@ -112,4 +112,7 @@ venv.bak/ _catalogs/ _old/ -.DS_Store \ No newline at end of file +.DS_Store +*.zarr +*.nc +db_init \ No newline at end of file diff --git a/api/Dockerfile b/api/Dockerfile index ad73842..e5016fa 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -6,7 +6,7 @@ RUN apt update && apt install -y cron curl WORKDIR /app COPY requirements.txt /code/requirements.txt -RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt +RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt --break-system-packages COPY app /app EXPOSE 80 diff --git a/api/app/endpoint_handlers/file.py b/api/app/endpoint_handlers/file.py index 04cf562..1b589a9 100644 --- a/api/app/endpoint_handlers/file.py +++ b/api/app/endpoint_handlers/file.py @@ -1,8 +1,17 @@ """Module with functions to handle file related endpoints""" +import io import os +import zipfile +from pathlib import Path +from zipfile import ZipFile from fastapi.responses import FileResponse + + from dbmanager.dbmanager import DBManager, RequestStatus +from starlette.requests import Request +from starlette.responses import HTMLResponse, RedirectResponse, StreamingResponse +from starlette.staticfiles import StaticFiles from utils.api_logging import get_dds_logger from utils.metrics import log_execution_time @@ -10,9 +19,8 @@ log = get_dds_logger(__name__) - @log_execution_time(log) -def download_request_result(request_id: int): +def download_request_result(request_id: int, filename: str = None): """Realize the logic for the endpoint: `GET /download/{request_id}` @@ -60,7 +68,15 @@ def download_request_result(request_id: int): download_details.location_path, ) raise FileNotFoundError - return FileResponse( - path=download_details.location_path, - filename=download_details.location_path.split(os.sep)[-1], - ) + + if download_details.location_path.endswith(".zarr"): + log.info("Zarr detected") + return FileResponse( + path=f'{download_details.location_path}/{filename}', + filename=filename, + ) + else: + return FileResponse( + path=download_details.location_path, + filename=download_details.location_path.split(os.sep)[-1], + ) diff --git a/api/app/main.py b/api/app/main.py index d0f5ad8..b7703f8 100644 --- a/api/app/main.py +++ b/api/app/main.py @@ -355,3 +355,52 @@ async def download_request_result( raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="File was not found!" ) from err + +@app.get("/download/{request_id}/{filename}", tags=[tags.REQUEST]) +@timer( + app.state.api_request_duration_seconds, + labels={"route": "GET /download/{request_id}/{filename}"}, +) +# @requires([scopes.AUTHENTICATED]) # TODO: mange download auth in the web component +async def download_request_result( + request: Request, + request_id: int, + filename: str, +): + """Download result of the request""" + app.state.api_http_requests_total.inc( + {"route": "GET /download/{request_id}/{filename}"} + ) + try: + return file_handler.download_request_result(request_id=request_id, filename=filename) + except exc.BaseDDSException as err: + raise err.wrap_around_http_exception() from err + except FileNotFoundError as err: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="File was not found!" + ) from err + +@app.get("/download/{request_id}/{filename}/{subfile}", tags=[tags.REQUEST]) +@timer( + app.state.api_request_duration_seconds, + labels={"route": "GET /download/{request_id}/{filename}/{subfile}"}, +) +# @requires([scopes.AUTHENTICATED]) +async def download_request_result( + request: Request, + request_id: int, + filename: str, + subfile: str, +): + """Download result of the request""" + app.state.api_http_requests_total.inc( + {"route": "GET /download/{request_id}/{filename}/{subfile}"} + ) + try: + return file_handler.download_request_result(request_id=request_id, filename=f'{filename}/{subfile}') + except exc.BaseDDSException as err: + raise err.wrap_around_http_exception() from err + except FileNotFoundError as err: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="File was not found!" + ) from err \ No newline at end of file diff --git a/datastore/Dockerfile b/datastore/Dockerfile index 7e051cc..77fc265 100644 --- a/datastore/Dockerfile +++ b/datastore/Dockerfile @@ -3,7 +3,7 @@ ARG TAG=latest FROM $REGISTRY/geolake-drivers:$TAG COPY requirements.txt /app/requirements.txt -RUN pip install --no-cache-dir -r /app/requirements.txt +RUN pip install --no-cache-dir -r /app/requirements.txt --break-system-packages COPY ./datastore /app/datastore COPY ./workflow /app/workflow COPY ./dbmanager /app/dbmanager diff --git a/datastore/datastore/datastore.py b/datastore/datastore/datastore.py index b754fc0..3fb2bfc 100644 --- a/datastore/datastore/datastore.py +++ b/datastore/datastore/datastore.py @@ -453,6 +453,9 @@ def _process_query(kube, query: GeoQuery, compute: None | bool = False): else: method = "nearest" kube = kube.sel(vertical=vertical, method=method) + if query.resample: + Datastore._LOG.debug("Applying resample...") + kube = kube.resample(**query.resample) return kube.compute() if compute else kube @staticmethod diff --git a/datastore/geoquery/geoquery.py b/datastore/geoquery/geoquery.py index 8446660..540ea6f 100644 --- a/datastore/geoquery/geoquery.py +++ b/datastore/geoquery/geoquery.py @@ -9,6 +9,7 @@ class GeoQuery(BaseModel, extra="allow"): variable: Optional[Union[str, List[str]]] # TODO: Check how `time` is to be represented + resample: Optional[Dict[str,str]] time: Optional[Union[Dict[str, str], Dict[str, List[str]]]] area: Optional[Dict[str, float]] location: Optional[Dict[str, Union[float, List[float]]]] diff --git a/docker-compose.yaml b/docker-compose.yaml index 995b7ca..81736dd 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,85 +1,120 @@ -version: "3" +version: "3.9" services: - api: - build: - context: ./ - dockerfile: ./api/Dockerfile + db: + image: postgres:latest + environment: + - POSTGRES_USER=dds + - POSTGRES_PASSWORD=dds + - POSTGRES_DB=dds ports: - - "8080:80" + - "5432:5432" + expose: + - 5432 + healthcheck: + test: [ "CMD-SHELL", "pg_isready", "-d", "db_prod" ] + interval: 30s + timeout: 60s + retries: 5 + start_period: 80s + + restore-db: + image: rg.fr-par.scw.cloud/geodds-production/backup:v0.1a2 depends_on: - - broker - - db - links: - - broker - - db - environment: - CATALOG_PATH: /code/app/resources/catalogs/catalog.yaml - POSTGRES_DB: dds - POSTGRES_USER: dds - POSTGRES_PASSWORD: dds - POSTGRES_HOST: db - POSTGRES_PORT: 5432 + db: + condition: service_healthy + command: + - /bin/bash + - -c + - | + /db_init/restore.sh + tail -f /dev/null volumes: - - downloads:/downloads:ro - command: ["./wait-for-it.sh", "broker:5672", "--", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "80"] - executor: - build: - context: ./ - dockerfile: ./executor/Dockerfile - depends_on: - - broker - - db - links: - - broker - - db - ports: - - 8787:8787 + - ./db_init:/db_init environment: - EXECUTOR_TYPES: query,info,estimate - CATALOG_PATH: /code/app/resources/catalogs/catalog.yaml - POSTGRES_DB: dds - POSTGRES_USER: dds - POSTGRES_PASSWORD: dds - POSTGRES_HOST: db - POSTGRES_PORT: 5432 + - POSTGRES_HOST=db + - FILENAME=postgres_localhost-2024_10_03_09_53_46-dump-blank.sql + - POSTGRES_USER=dds + - POSTGRES_PASSWORD=dds + - POSTGRES_DB=dds + + catalog: + image: rg.fr-par.scw.cloud/geolake/geolake-datastore:latest + build: ./datastore volumes: - - downloads:/downloads:rw - command: ["./wait-for-it.sh", "broker:5672", "--", "python", "./app/main.py"] - broker: - image: rabbitmq:3 - db: - image: postgres:14.1 - restart: always + - ./catalog:/catalog + command: + - /bin/bash + - -c + - | + trap : TERM INT + sleep infinity & + wait + + api: + image: geolago-api:dev + build: ./api + command: + - ./../wait-for-it.sh + - $(BROKER_SERVICE_HOST):5672 + - -- + - uvicorn + - main:app + - --host + - 0.0.0.0 + - --port + - '80' + environment: + - DB_SERVICE_HOST=db + - DB_SERVICE_PORT=5432 + - BROKER_SERVICE_HOST=broker + - POSTGRES_USER=dds + - POSTGRES_PASSWORD=dds + - POSTGRES_DB=dds + - CATALOG_PATH=/catalog/catalog.yaml + - CACHE_PATH=/catalog/.cache + - MESSAGE_SEPARATOR='\' volumes: - - ./db/scripts:/docker-entrypoint-initdb.d + - ./catalog:/catalog + - ./downloads:/downloads + - ./catalog/datasets:/data + expose: + - '8080' ports: - - 5432:5432 - environment: - POSTGRES_DB: dds - POSTGRES_USER: dds - POSTGRES_PASSWORD: dds - POSTGRES_HOST: db - POSTGRES_PORT: 5432 - web: - build: - context: ./ - dockerfile: ./web/Dockerfile + - '8080:80' + + broker: + image: rabbitmq:3.12-management-alpine ports: - - "8080:80" - depends_on: - - db - links: - - db + - "5672:5672" + - "15672:15672" + expose: + - 5672 + + executor: + image: geolago-executor:dev + build: ./executor + command: + - ./../wait-for-it.sh + - $(BROKER_SERVICE_HOST):5672 + - -- + - python + - main.py environment: - CATALOG_PATH: /code/app/resources/catalogs/catalog.yaml - POSTGRES_DB: dds - POSTGRES_USER: dds - POSTGRES_PASSWORD: dds - POSTGRES_HOST: db - POSTGRES_PORT: 5432 + - BROKER_SERVICE_HOST=broker + - DB_SERVICE_HOST=db + - DB_SERVICE_PORT=5432 + - POSTGRES_USER=dds + - POSTGRES_PASSWORD=dds + - POSTGRES_DB=dds + - EXECUTOR_TYPES=query + - MESSAGE_SEPARATOR='\' + - SLEEP_SEC=10 + - RESULT_CHECK_RETRIES=360 + - CATALOG_PATH=/catalog/catalog.yaml + - CACHE_PATH=/catalog/.cache + - HDF5_USE_FILE_LOCKING='FALSE' + - LD_LIBRARY_PATH=/urs/lib/x86_64-linux-gnu volumes: - - downloads:/downloads:ro - command: ["./wait-for-it.sh", "broker:5672", "--", "uvicorn", "web.main:app", "--host", "0.0.0.0", "--port", "80"] - -volumes: - downloads: \ No newline at end of file + - ./catalog:/catalog + - ./downloads:/downloads + - ./catalog/datasets:/data \ No newline at end of file diff --git a/drivers/Dockerfile b/drivers/Dockerfile index 978eaa1..722bfa7 100644 --- a/drivers/Dockerfile +++ b/drivers/Dockerfile @@ -1,8 +1,9 @@ ARG REGISTRY=rg.fr-par.scw.cloud/geokube #ARG TAG=v0.2.6b2 -ARG TAG=2024.05.03.10.36 +#ARG TAG=2024.05.03.10.36 +ARG TAG=v0.2.7.1 FROM $REGISTRY/geokube:$TAG COPY dist/intake_geokube-0.1a0-py3-none-any.whl / -RUN pip install /intake_geokube-0.1a0-py3-none-any.whl +RUN pip install /intake_geokube-0.1a0-py3-none-any.whl --break-system-packages RUN rm /intake_geokube-0.1a0-py3-none-any.whl \ No newline at end of file diff --git a/executor/Dockerfile b/executor/Dockerfile index e3404e5..3399feb 100644 --- a/executor/Dockerfile +++ b/executor/Dockerfile @@ -7,7 +7,7 @@ RUN apt update && apt install -y cron curl WORKDIR /app COPY requirements.txt /code/requirements.txt -RUN pip install --no-cache-dir -r /code/requirements.txt +RUN pip install --no-cache-dir -r /code/requirements.txt --break-system-packages COPY app /app COPY ./healtcheck.* /opt/ diff --git a/executor/app/main.py b/executor/app/main.py index 5daf7e6..b6b5fa5 100644 --- a/executor/app/main.py +++ b/executor/app/main.py @@ -1,4 +1,5 @@ import os +import tempfile import time import datetime import pika @@ -119,6 +120,9 @@ def persist_datacube( case "geojson": full_path = os.path.join(base_path, f"{path}.json") kube.to_geojson(full_path) + case "zarr": + full_path = os.path.join(base_path, f"{path}.zarr") + kube.to_zarr(full_path, mode='w', consolidated=True) case _: raise ValueError(f"format `{format}` is not supported") return full_path @@ -455,6 +459,7 @@ def get_size(self, location_path): dask_cluster_opts["n_workers"] = int(os.getenv("DASK_N_WORKERS", 1)) dask_cluster_opts["memory_limit"] = os.getenv("DASK_MEMORY_LIMIT", "auto") dask_cluster_opts['thread_per_worker'] = int(os.getenv("DASK_THREADS_PER_WORKER", 8)) + dask_cluster_opts['local_directory'] = os.getenv("LOCAL_DIR", tempfile.mkdtemp(f'{os.uname()[1]}')) executor = Executor(broker=broker, store_path=store_path, dask_cluster_opts=dask_cluster_opts)