Skip to content

Commit

Permalink
Add data step 2 (#307)
Browse files Browse the repository at this point in the history
* adds pipeline_run

* Adds column field, issue, entity summary

* fixes acceptance tests

* adds add_data_utils tests

* refines and adds test

* adds acceptance test for failing pipeline

* Removes unused mock exception fn

* read just columns instead of whole .csv

* use Path objects

* adds 'missing entity' check
  • Loading branch information
CarlosCoelhoSL authored Dec 18, 2024
1 parent cdc48cb commit ce5e228
Show file tree
Hide file tree
Showing 5 changed files with 777 additions and 9 deletions.
18 changes: 17 additions & 1 deletion digital_land/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ def retire_endpoints_cmd(config_collections_dir, csv_path):
@click.argument("csv-path", nargs=1, type=click.Path())
@click.argument("collection-name", nargs=1, type=click.STRING)
@click.option("--collection-dir", "-c", nargs=1, type=click.Path(exists=True))
@click.option("--pipeline-dir", "-p", nargs=1, type=click.Path(exists=True))
@click.option(
"--specification-dir", "-s", type=click.Path(exists=True), default="specification/"
)
Expand All @@ -360,20 +361,35 @@ def retire_endpoints_cmd(config_collections_dir, csv_path):
type=click.Path(exists=True),
default="var/cache/organisation.csv",
)
@click.option(
"--cache-dir",
type=click.Path(exists=True),
)
def add_data_cmd(
csv_path, collection_name, collection_dir, specification_dir, organisation_path
csv_path,
collection_name,
collection_dir,
pipeline_dir,
specification_dir,
organisation_path,
cache_dir,
):
csv_file_path = Path(csv_path)
if not csv_file_path.is_file():
logging.error(f"CSV file not found at path: {csv_path}")
sys.exit(2)
collection_dir = Path(collection_dir)
pipeline_dir = Path(pipeline_dir)
specification_dir = Path(specification_dir)

return add_data(
csv_file_path,
collection_name,
collection_dir,
pipeline_dir,
specification_dir,
organisation_path,
cache_dir=cache_dir,
)


Expand Down
116 changes: 112 additions & 4 deletions digital_land/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,14 @@
from digital_land.configuration.main import Config
from digital_land.api import API
from digital_land.state import State
from digital_land.utils.add_data_utils import clear_log, is_date_valid, is_url_valid
from digital_land.utils.add_data_utils import (
clear_log,
get_column_field_summary,
get_entity_summary,
get_issue_summary,
is_date_valid,
is_url_valid,
)

from .register import hash_value
from .utils.gdal_utils import get_gdal_version
Expand Down Expand Up @@ -202,6 +209,7 @@ def pipeline_run(
config_path="var/cache/config.sqlite3",
resource=None,
output_log_dir=None,
converted_path=None,
):
if resource is None:
resource = resource_from_path(input_path)
Expand Down Expand Up @@ -267,6 +275,7 @@ def pipeline_run(
dataset_resource_log=dataset_resource_log,
converted_resource_log=converted_resource_log,
custom_temp_dir=custom_temp_dir,
output_path=converted_path,
),
NormalisePhase(skip_patterns=skip_patterns, null_path=null_path),
ParsePhase(),
Expand Down Expand Up @@ -671,29 +680,39 @@ def validate_and_add_data_input(
# Resource and path will only be printed if downloaded successfully but should only happen if status is 200
resource = log.get("resource", None)
if resource:
resource_path = Path(collection_dir) / "resource" / resource
print(
"Resource collected: ",
resource,
)
print(
"Resource Path is: ",
Path(collection_dir) / "resource" / resource,
resource_path,
)

print(f"Log Status for {endpoint['endpoint']}: The status is {status}")
endpoint_resource_info.update(
{
"endpoint": endpoint["endpoint"],
"resource": log.get("resource"),
"resource_path": resource_path,
"pipelines": row["pipelines"].split(";"),
"organisation": row["organisation"],
"entry-date": row["entry-date"],
}
)

return collection, endpoint_resource_info


def add_data(
csv_file_path, collection_name, collection_dir, specification_dir, organisation_path
csv_file_path,
collection_name,
collection_dir,
pipeline_dir,
specification_dir,
organisation_path,
cache_dir=None,
):
# Potentially track a list of files to clean up at the end of session? e.g log file

Expand All @@ -707,6 +726,9 @@ def add_data(
)
# At this point the endpoint will have been added to the collection

# We need to delete the collection log to enable consecutive runs due to collection.load()
clear_log(collection_dir, endpoint_resource_info["endpoint"])

user_response = (
input("Do you want to continue processing this resource? (yes/no): ")
.strip()
Expand All @@ -715,9 +737,95 @@ def add_data(

if user_response != "yes":
print("Operation cancelled by user.")
clear_log(collection_dir, endpoint_resource_info["endpoint"])
return

if not cache_dir:
cache_dir = Path("var/cache/add_data/")
else:
cache_dir = Path(cache_dir)

output_path = (
cache_dir / "transformed/" / (endpoint_resource_info["resource"] + ".csv")
)

issue_dir = cache_dir / "issue/"
column_field_dir = cache_dir / "column_field/"
dataset_resource_dir = cache_dir / "dataset_resource/"
converted_resource_dir = cache_dir / "converted_resource/"
converted_dir = cache_dir / "converted/"
output_log_dir = cache_dir / "log/"
operational_issue_dir = cache_dir / "performance/ " / "operational_issue/"

output_path.parent.mkdir(parents=True, exist_ok=True)
issue_dir.mkdir(parents=True, exist_ok=True)
column_field_dir.mkdir(parents=True, exist_ok=True)
dataset_resource_dir.mkdir(parents=True, exist_ok=True)
converted_resource_dir.mkdir(parents=True, exist_ok=True)
converted_dir.mkdir(parents=True, exist_ok=True)
output_log_dir.mkdir(parents=True, exist_ok=True)
operational_issue_dir.mkdir(parents=True, exist_ok=True)

collection.load_log_items()
for pipeline in endpoint_resource_info["pipelines"]:
print("======================================================================")
print("Run pipeline")
print("======================================================================")
try:
pipeline_run(
pipeline,
Pipeline(pipeline_dir, pipeline),
Specification(specification_dir),
endpoint_resource_info["resource_path"],
output_path=output_path,
collection_dir=collection_dir,
issue_dir=issue_dir,
operational_issue_dir=operational_issue_dir,
column_field_dir=column_field_dir,
dataset_resource_dir=dataset_resource_dir,
converted_resource_dir=converted_resource_dir,
organisation_path=organisation_path,
endpoints=[endpoint_resource_info["endpoint"]],
organisations=[endpoint_resource_info["organisation"]],
resource=endpoint_resource_info["resource"],
output_log_dir=output_log_dir,
converted_path=os.path.join(
converted_dir, endpoint_resource_info["resource"] + ".csv"
),
)
except Exception as e:
raise RuntimeError(
f"Pipeline failed to process resource with the following error: {e}"
)
print("======================================================================")
print("Pipeline successful!")
print("======================================================================")

converted_path = os.path.join(
converted_dir, endpoint_resource_info["resource"] + ".csv"
)
if not os.path.isfile(converted_path):
# The pipeline doesn't convert .csv resources so direct user to original resource
converted_path = endpoint_resource_info["resource_path"]
print(f"Converted .csv resource path: {converted_path}")
print(f"Transformed resource path: {output_path}")

column_field_summary = get_column_field_summary(
pipeline,
endpoint_resource_info,
column_field_dir,
converted_dir,
specification_dir,
)
print(column_field_summary)

issue_summary = get_issue_summary(endpoint_resource_info, issue_dir)
print(issue_summary)

entity_summary = get_entity_summary(
endpoint_resource_info, output_path, pipeline, issue_dir, pipeline_dir
)
print(entity_summary)


def add_endpoints_and_lookups(
csv_file_path,
Expand Down
Loading

0 comments on commit ce5e228

Please sign in to comment.