Skip to content

Commit

Permalink
Add data step 1 (#285)
Browse files Browse the repository at this point in the history
* first stage added to submodule

* refactors log analysis in add-data

* adds happy path test and start of acceptance test

* adds/fixes tests

* adds pipeline and collection validation

* adds acceptance tests

* tidies logs and comments

* adds consecutive run handling (temporary)

* parametrizes tests

* raise exception for duplicate endpoint added

* fixes log endpoint message

* modifies collection.load() to handle logs without entry in source.csv

* adds log message to collection.load()

* adds extra url validity check

* adds tests for collection and is_url_valid

* removes unnecessary raise

* removes unnecessary pipeline_dir fixtures

* uses specification and organisation classes, moves validation to utils

* removes log_path from Collector.fetch return

* raises HTTPError when failing to collect from URL

* adds logging to try except in collection.py

* changes default logging in collection.py to True

* renames logging boolean

* renames error ogging boolean

* removes collection.py edits, now deletes log after exiting

---------

Co-authored-by: averheecke-tpx <[email protected]>
  • Loading branch information
2 people authored and cjohns-scottlogic committed Dec 9, 2024
1 parent 290ea3e commit 0019785
Show file tree
Hide file tree
Showing 9 changed files with 1,074 additions and 0 deletions.
31 changes: 31 additions & 0 deletions digital_land/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
organisation_check,
save_state,
compare_state,
add_data,
)

from digital_land.command_arguments import (
Expand Down Expand Up @@ -346,6 +347,36 @@ def retire_endpoints_cmd(config_collections_dir, csv_path):
return collection_retire_endpoints_and_sources(config_collections_dir, csv_path)


@cli.command("add-data")
@click.argument("csv-path", nargs=1, type=click.Path())
@click.argument("collection-name", nargs=1, type=click.STRING)
@click.option("--collection-dir", "-c", nargs=1, type=click.Path(exists=True))
@click.option(
"--specification-dir", "-s", type=click.Path(exists=True), default="specification/"
)
@click.option(
"--organisation-path",
"-o",
type=click.Path(exists=True),
default="var/cache/organisation.csv",
)
def add_data_cmd(
csv_path, collection_name, collection_dir, specification_dir, organisation_path
):
csv_file_path = Path(csv_path)
if not csv_file_path.is_file():
logging.error(f"CSV file not found at path: {csv_path}")
sys.exit(2)

return add_data(
csv_file_path,
collection_name,
collection_dir,
specification_dir,
organisation_path,
)


# edit to add collection_name in
@cli.command("add-endpoints-and-lookups")
@click.argument("csv-path", nargs=1, type=click.Path())
Expand Down
188 changes: 188 additions & 0 deletions digital_land/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
from packaging.version import Version
import pandas as pd
from pathlib import Path
from datetime import datetime

import geojson
from requests import HTTPError
import shapely

from digital_land.package.organisation import OrganisationPackage
Expand Down Expand Up @@ -57,6 +59,7 @@
from digital_land.configuration.main import Config
from digital_land.api import API
from digital_land.state import State
from digital_land.utils.add_data_utils import clear_log, is_date_valid, is_url_valid

from .register import hash_value
from .utils.gdal_utils import get_gdal_version
Expand Down Expand Up @@ -531,6 +534,191 @@ def collection_add_source(entry, collection, endpoint_url, collection_dir):
add_source_endpoint(entry, directory=collection_dir)


def validate_and_add_data_input(
csv_file_path, collection_name, collection_dir, specification_dir, organisation_path
):
expected_cols = [
"pipelines",
"organisation",
"documentation-url",
"endpoint-url",
"start-date",
"licence",
]

specification = Specification(specification_dir)
organisation = Organisation(organisation_path=organisation_path)

collection = Collection(name=collection_name, directory=collection_dir)
collection.load()
# ===== FIRST VALIDATION BASED ON IMPORT.CSV INFO
# - Check licence, url, date, organisation

# read and process each record of the new endpoints csv at csv_file_path i.e import.csv

with open(csv_file_path) as new_endpoints_file:
reader = csv.DictReader(new_endpoints_file)
csv_columns = reader.fieldnames

# validate the columns in input .csv
for expected_col in expected_cols:
if expected_col not in csv_columns:
raise Exception(f"required column ({expected_col}) not found in csv")

for row in reader:
# validate licence
if row["licence"] == "":
raise ValueError("Licence is blank")
elif not specification.licence.get(row["licence"], None):
raise ValueError(
f"Licence '{row['licence']}' is not a valid licence according to the specification."
)
# check if urls are not blank and valid urls
is_endpoint_valid, endpoint_valid_error = is_url_valid(
row["endpoint-url"], "endpoint_url"
)
is_documentation_valid, documentation_valid_error = is_url_valid(
row["documentation-url"], "documentation_url"
)
if not is_endpoint_valid or not is_documentation_valid:
raise ValueError(
f"{endpoint_valid_error} \n {documentation_valid_error}"
)

# if there is no start-date, do we want to populate it with today's date?
if row["start-date"]:
valid_date, error = is_date_valid(row["start-date"], "start-date")
if not valid_date:
raise ValueError(error)

# validate organisation
if row["organisation"] == "":
raise ValueError("The organisation must not be blank")
elif not organisation.lookup(row["organisation"]):
raise ValueError(
f"The given organisation '{row['organisation']}' is not in our valid organisations"
)

# validate pipeline(s) - do they exist and are they in the collection
pipelines = row["pipelines"].split(";")
for pipeline in pipelines:
if not specification.dataset.get(pipeline, None):
raise ValueError(
f"'{pipeline}' is not a valid dataset in the specification"
)
collection_in_specification = specification.dataset.get(
pipeline, None
).get("collection")
if collection_name != collection_in_specification:
raise ValueError(
f"'{pipeline}' does not belong to provided collection {collection_name}"
)

# VALIDATION DONE, NOW ADD TO COLLECTION
print("======================================================================")
print("Endpoint and source details")
print("======================================================================")
print("Endpoint URL: ", row["endpoint-url"])
print("Endpoint Hash:", hash_value(row["endpoint-url"]))
print("Documentation URL: ", row["documentation-url"])
print()

endpoints = []
# if endpoint already exists, it will indicate it and quit function here
if collection.add_source_endpoint(row):
endpoint = {
"endpoint-url": row["endpoint-url"],
"endpoint": hash_value(row["endpoint-url"]),
"end-date": row.get("end-date", ""),
"plugin": row.get("plugin"),
"licence": row["licence"],
}
endpoints.append(endpoint)
else:
# We rely on the add_source_endpoint function to log why it couldn't be added
raise Exception(
"Endpoint and source could not be added - is this a duplicate endpoint?"
)

# if successfully added we can now attempt to fetch from endpoint
collector = Collector(collection_dir=collection_dir)
endpoint_resource_info = {}
for endpoint in endpoints:
status = collector.fetch(
url=endpoint["endpoint-url"],
endpoint=endpoint["endpoint"],
end_date=endpoint["end-date"],
plugin=endpoint["plugin"],
)
try:
log_path = collector.log_path(datetime.utcnow(), endpoint["endpoint"])
with open(log_path, "r") as f:
log = json.load(f)
except Exception as e:
print(
f"Error: The log file for {endpoint} could not be read from path {log_path}.\n{e}"
)
break

status = log.get("status", None)
# Raise exception if status is not 200
if not status or status != "200":
exception = log.get("exception", None)
raise HTTPError(
f"Failed to collect from URL with status: {status if status else exception}"
)

# Resource and path will only be printed if downloaded successfully but should only happen if status is 200
resource = log.get("resource", None)
if resource:
print(
"Resource collected: ",
resource,
)
print(
"Resource Path is: ",
Path(collection_dir) / "resource" / resource,
)

print(f"Log Status for {endpoint['endpoint']}: The status is {status}")
endpoint_resource_info.update(
{
"endpoint": endpoint["endpoint"],
"resource": log.get("resource"),
"pipelines": row["pipelines"].split(";"),
}
)

return collection, endpoint_resource_info


def add_data(
csv_file_path, collection_name, collection_dir, specification_dir, organisation_path
):
# Potentially track a list of files to clean up at the end of session? e.g log file

# First validate the input .csv and collect from the endpoint
collection, endpoint_resource_info = validate_and_add_data_input(
csv_file_path,
collection_name,
collection_dir,
specification_dir,
organisation_path,
)
# At this point the endpoint will have been added to the collection

user_response = (
input("Do you want to continue processing this resource? (yes/no): ")
.strip()
.lower()
)

if user_response != "yes":
print("Operation cancelled by user.")
clear_log(collection_dir, endpoint_resource_info["endpoint"])
return


def add_endpoints_and_lookups(
csv_file_path,
collection_name,
Expand Down
7 changes: 7 additions & 0 deletions digital_land/specification.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(self, path="specification"):
self.schema_field = {}
self.typology = {}
self.pipeline = {}
self.licence = {}
self.load_dataset(path)
self.load_schema(path)
self.load_dataset_schema(path)
Expand All @@ -48,6 +49,7 @@ def __init__(self, path="specification"):
self.load_typology(path)
self.load_pipeline(path)
self.load_dataset_field(path)
self.load_licence(path)

self.index_field()
self.index_schema()
Expand Down Expand Up @@ -111,6 +113,11 @@ def load_pipeline(self, path):
for row in reader:
self.pipeline[row["pipeline"]] = row

def load_licence(self, path):
reader = csv.DictReader(open(os.path.join(path, "licence.csv")))
for row in reader:
self.licence[row["licence"]] = row

def index_schema(self):
self.schema_dataset = {}
for dataset, d in self.dataset_schema.items():
Expand Down
50 changes: 50 additions & 0 deletions digital_land/utils/add_data_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import os
from datetime import datetime
from urllib.parse import urlparse

from digital_land.collect import Collector


def is_url_valid(url, url_type):
if not url or url.strip() == "":
return False, f"The {url_type} must be populated"

parsed_url = urlparse(url)
# is url scheme valid i.e start with http:// or https://
if parsed_url.scheme not in ["http", "https"] or not parsed_url.scheme:
return False, f"The {url_type} must start with 'http://' or 'https://'"

# does url have domain
if not parsed_url.netloc:
return False, f"The {url_type} must have a domain"

# ensure domain has correct format
if "." not in parsed_url.netloc:
return (
False,
f"The {url_type} must have a valid domain with a top-level domain (e.g., '.gov.uk', '.com')",
)

return True, ""


def is_date_valid(date, date_type):
if len(date) == 0:
return False, "Date is blank"
try:
date = datetime.strptime(date, "%Y-%m-%d").date()
# need to catch ValueError here otherwise datetime will raise it's own error, not the clear format we want
except ValueError:
return False, f"{date_type} {date} must be format YYYY-MM-DD"

if date > datetime.today().date():
return False, f"The {date_type} {date} cannot be in the future"

return True, ""


def clear_log(collection_dir, endpoint):
collector = Collector(collection_dir=collection_dir)
log_path = collector.log_path(datetime.utcnow(), endpoint)
if os.path.isfile(log_path):
os.remove(log_path)
Loading

0 comments on commit 0019785

Please sign in to comment.