From ce5e228b7f472516cfd9269f34b78f9a9cdffd2f Mon Sep 17 00:00:00 2001 From: CarlosCoelhoSL <110818364+CarlosCoelhoSL@users.noreply.github.com> Date: Wed, 18 Dec 2024 13:51:17 +0000 Subject: [PATCH] Add data step 2 (#307) * adds pipeline_run * Adds column field, issue, entity summary * fixes acceptance tests * adds add_data_utils tests * refines and adds test * adds acceptance test for failing pipeline * Removes unused mock exception fn * read just columns instead of whole .csv * use Path objects * adds 'missing entity' check --- digital_land/cli.py | 18 +- digital_land/commands.py | 116 ++++++- digital_land/utils/add_data_utils.py | 169 ++++++++++ tests/acceptance/test_add_data.py | 110 ++++++- tests/integration/test_add_data_utils.py | 373 ++++++++++++++++++++++- 5 files changed, 777 insertions(+), 9 deletions(-) diff --git a/digital_land/cli.py b/digital_land/cli.py index a7b85988..1f4c87ed 100644 --- a/digital_land/cli.py +++ b/digital_land/cli.py @@ -351,6 +351,7 @@ def retire_endpoints_cmd(config_collections_dir, csv_path): @click.argument("csv-path", nargs=1, type=click.Path()) @click.argument("collection-name", nargs=1, type=click.STRING) @click.option("--collection-dir", "-c", nargs=1, type=click.Path(exists=True)) +@click.option("--pipeline-dir", "-p", nargs=1, type=click.Path(exists=True)) @click.option( "--specification-dir", "-s", type=click.Path(exists=True), default="specification/" ) @@ -360,20 +361,35 @@ def retire_endpoints_cmd(config_collections_dir, csv_path): type=click.Path(exists=True), default="var/cache/organisation.csv", ) +@click.option( + "--cache-dir", + type=click.Path(exists=True), +) def add_data_cmd( - csv_path, collection_name, collection_dir, specification_dir, organisation_path + csv_path, + collection_name, + collection_dir, + pipeline_dir, + specification_dir, + organisation_path, + cache_dir, ): csv_file_path = Path(csv_path) if not csv_file_path.is_file(): logging.error(f"CSV file not found at path: {csv_path}") sys.exit(2) + collection_dir = Path(collection_dir) + pipeline_dir = Path(pipeline_dir) + specification_dir = Path(specification_dir) return add_data( csv_file_path, collection_name, collection_dir, + pipeline_dir, specification_dir, organisation_path, + cache_dir=cache_dir, ) diff --git a/digital_land/commands.py b/digital_land/commands.py index 47a82494..0f32a1f8 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -59,7 +59,14 @@ from digital_land.configuration.main import Config from digital_land.api import API from digital_land.state import State -from digital_land.utils.add_data_utils import clear_log, is_date_valid, is_url_valid +from digital_land.utils.add_data_utils import ( + clear_log, + get_column_field_summary, + get_entity_summary, + get_issue_summary, + is_date_valid, + is_url_valid, +) from .register import hash_value from .utils.gdal_utils import get_gdal_version @@ -202,6 +209,7 @@ def pipeline_run( config_path="var/cache/config.sqlite3", resource=None, output_log_dir=None, + converted_path=None, ): if resource is None: resource = resource_from_path(input_path) @@ -267,6 +275,7 @@ def pipeline_run( dataset_resource_log=dataset_resource_log, converted_resource_log=converted_resource_log, custom_temp_dir=custom_temp_dir, + output_path=converted_path, ), NormalisePhase(skip_patterns=skip_patterns, null_path=null_path), ParsePhase(), @@ -671,13 +680,14 @@ def validate_and_add_data_input( # Resource and path will only be printed if downloaded successfully but should only happen if status is 200 resource = log.get("resource", None) if resource: + resource_path = Path(collection_dir) / "resource" / resource print( "Resource collected: ", resource, ) print( "Resource Path is: ", - Path(collection_dir) / "resource" / resource, + resource_path, ) print(f"Log Status for {endpoint['endpoint']}: The status is {status}") @@ -685,7 +695,10 @@ def validate_and_add_data_input( { "endpoint": endpoint["endpoint"], "resource": log.get("resource"), + "resource_path": resource_path, "pipelines": row["pipelines"].split(";"), + "organisation": row["organisation"], + "entry-date": row["entry-date"], } ) @@ -693,7 +706,13 @@ def validate_and_add_data_input( def add_data( - csv_file_path, collection_name, collection_dir, specification_dir, organisation_path + csv_file_path, + collection_name, + collection_dir, + pipeline_dir, + specification_dir, + organisation_path, + cache_dir=None, ): # Potentially track a list of files to clean up at the end of session? e.g log file @@ -707,6 +726,9 @@ def add_data( ) # At this point the endpoint will have been added to the collection + # We need to delete the collection log to enable consecutive runs due to collection.load() + clear_log(collection_dir, endpoint_resource_info["endpoint"]) + user_response = ( input("Do you want to continue processing this resource? (yes/no): ") .strip() @@ -715,9 +737,95 @@ def add_data( if user_response != "yes": print("Operation cancelled by user.") - clear_log(collection_dir, endpoint_resource_info["endpoint"]) return + if not cache_dir: + cache_dir = Path("var/cache/add_data/") + else: + cache_dir = Path(cache_dir) + + output_path = ( + cache_dir / "transformed/" / (endpoint_resource_info["resource"] + ".csv") + ) + + issue_dir = cache_dir / "issue/" + column_field_dir = cache_dir / "column_field/" + dataset_resource_dir = cache_dir / "dataset_resource/" + converted_resource_dir = cache_dir / "converted_resource/" + converted_dir = cache_dir / "converted/" + output_log_dir = cache_dir / "log/" + operational_issue_dir = cache_dir / "performance/ " / "operational_issue/" + + output_path.parent.mkdir(parents=True, exist_ok=True) + issue_dir.mkdir(parents=True, exist_ok=True) + column_field_dir.mkdir(parents=True, exist_ok=True) + dataset_resource_dir.mkdir(parents=True, exist_ok=True) + converted_resource_dir.mkdir(parents=True, exist_ok=True) + converted_dir.mkdir(parents=True, exist_ok=True) + output_log_dir.mkdir(parents=True, exist_ok=True) + operational_issue_dir.mkdir(parents=True, exist_ok=True) + + collection.load_log_items() + for pipeline in endpoint_resource_info["pipelines"]: + print("======================================================================") + print("Run pipeline") + print("======================================================================") + try: + pipeline_run( + pipeline, + Pipeline(pipeline_dir, pipeline), + Specification(specification_dir), + endpoint_resource_info["resource_path"], + output_path=output_path, + collection_dir=collection_dir, + issue_dir=issue_dir, + operational_issue_dir=operational_issue_dir, + column_field_dir=column_field_dir, + dataset_resource_dir=dataset_resource_dir, + converted_resource_dir=converted_resource_dir, + organisation_path=organisation_path, + endpoints=[endpoint_resource_info["endpoint"]], + organisations=[endpoint_resource_info["organisation"]], + resource=endpoint_resource_info["resource"], + output_log_dir=output_log_dir, + converted_path=os.path.join( + converted_dir, endpoint_resource_info["resource"] + ".csv" + ), + ) + except Exception as e: + raise RuntimeError( + f"Pipeline failed to process resource with the following error: {e}" + ) + print("======================================================================") + print("Pipeline successful!") + print("======================================================================") + + converted_path = os.path.join( + converted_dir, endpoint_resource_info["resource"] + ".csv" + ) + if not os.path.isfile(converted_path): + # The pipeline doesn't convert .csv resources so direct user to original resource + converted_path = endpoint_resource_info["resource_path"] + print(f"Converted .csv resource path: {converted_path}") + print(f"Transformed resource path: {output_path}") + + column_field_summary = get_column_field_summary( + pipeline, + endpoint_resource_info, + column_field_dir, + converted_dir, + specification_dir, + ) + print(column_field_summary) + + issue_summary = get_issue_summary(endpoint_resource_info, issue_dir) + print(issue_summary) + + entity_summary = get_entity_summary( + endpoint_resource_info, output_path, pipeline, issue_dir, pipeline_dir + ) + print(entity_summary) + def add_endpoints_and_lookups( csv_file_path, diff --git a/digital_land/utils/add_data_utils.py b/digital_land/utils/add_data_utils.py index 4f95f6d7..a2b54415 100644 --- a/digital_land/utils/add_data_utils.py +++ b/digital_land/utils/add_data_utils.py @@ -1,8 +1,13 @@ +import csv import os +import duckdb from datetime import datetime from urllib.parse import urlparse +import pandas as pd + from digital_land.collect import Collector +from digital_land.specification import Specification def is_url_valid(url, url_type): @@ -48,3 +53,167 @@ def clear_log(collection_dir, endpoint): log_path = collector.log_path(datetime.utcnow(), endpoint) if os.path.isfile(log_path): os.remove(log_path) + + +def get_column_field_summary( + pipeline, endpoint_resource_info, column_field_dir, converted_dir, specification_dir +): + column_field_summary = "" + column_field_summary += ( + "\n======================================================================" + ) + column_field_summary += "\nColumn Field Summary" + column_field_summary += ( + "\n======================================================================" + ) + column_field_path = os.path.join( + column_field_dir, endpoint_resource_info["resource"] + ".csv" + ) + column_field_df = pd.read_csv(column_field_path) + + column_field_summary += "\nMapped Columns:" + column_field_summary += "\n" + column_field_summary += column_field_df[["column", "field"]].to_string(index=False) + + column_field_summary += "\n\nUnmapped Columns:" + # Try reading from converted .csv, if FileNotFound then resource is already .csv + try: + with open( + os.path.join(converted_dir, endpoint_resource_info["resource"] + ".csv"), + "r", + ) as f: + reader = csv.DictReader(f) + converted_resource_columns = reader.fieldnames + except FileNotFoundError: + with open(endpoint_resource_info["resource_path"], "r") as f: + reader = csv.DictReader(f) + converted_resource_columns = reader.fieldnames + + # Find columns that are in resource that aren't in the column field log + unmapped_columns = [ + column + for column in converted_resource_columns + if column not in column_field_df["column"].values + ] + + if len(unmapped_columns) > 0: + column_field_summary += "\n" + column_field_summary += ", ".join(unmapped_columns) + else: + column_field_summary += "\nNo unmapped columns!" + + column_field_summary += "\n\nUnmapped Fields:" + mapped_fields = column_field_df["field"].values + specification = Specification(specification_dir) + dataset_fields = specification.dataset_field[pipeline] + + # remove fields from dataset fields that are autogenerated by pipeline + exclude_fields = ["entity", "organisation", "prefix"] + if pipeline != "tree": + exclude_fields.append("point") + dataset_fields = [field for field in dataset_fields if field not in exclude_fields] + + unmapped_fields = [field for field in dataset_fields if field not in mapped_fields] + if len(unmapped_fields) > 0: + column_field_summary += "\n" + column_field_summary += ", ".join(unmapped_fields) + else: + column_field_summary += "\nNo unmapped fields!" + + if "reference" not in mapped_fields: + raise ValueError( + "Reference not found in the mapped fields - does this need mapping?" + ) + + return column_field_summary + + +def get_issue_summary(endpoint_resource_info, issue_dir): + issue_summary = "" + issue_summary += ( + "\n======================================================================" + ) + issue_summary += "\nIssue Summary" + issue_summary += ( + "\n======================================================================" + ) + + issue_df = pd.read_csv( + os.path.join(issue_dir, endpoint_resource_info["resource"] + ".csv") + ) + + issue_summary += "\n" + issue_summary += issue_df.groupby(["issue-type", "field"]).size().to_string() + + return issue_summary + + +def get_entity_summary( + endpoint_resource_info, output_path, pipeline, issue_dir, pipeline_dir +): + entity_summary = "" + entity_summary += ( + "\n======================================================================" + ) + entity_summary += "\nEntity Summary" + entity_summary += ( + "\n======================================================================" + ) + + # Get number of entities in resource from transformed file + transformed_df = pd.read_csv(output_path) + existing_entities = transformed_df["entity"].unique().tolist() + existing_entity_count = len(existing_entities) + entity_summary += ( + f"\nNumber of existing entities in resource: {existing_entity_count}" + ) + + # Get new entities by looking at unknown entity issues + issue_df = pd.read_csv( + os.path.join(issue_dir, endpoint_resource_info["resource"] + ".csv") + ) + new_entities_df = issue_df[ + issue_df["issue-type"].isin( + ["unknown entity", "unknown entity - missing reference"] + ) + ] + new_entity_count = len(new_entities_df) + + if new_entity_count == 0: + entity_summary += "\nWARNING: No new entities in resource" + else: + entity_summary += f"\nNumber of new entities in resource: {new_entity_count}" + entity_summary += "\n\nNew entity breakdown:\n" + # Remove prefix from value column to get just reference + new_entities_df.loc[:, "value"] = new_entities_df[ + new_entities_df["issue-type"] == "unknown entity" + ]["value"].apply(lambda x: x.split(":")[1]) + entity_summary += ( + new_entities_df[["value", "line-number"]] + .astype({"line-number": int}) + .rename({"value": "reference", "line-number": "line-number"}, axis=1) + .to_string(index=False) + ) + + # Get list of entities for this provision from lookup.csv + # Potential for failure here when reading huge lookup file + lookup_path = pipeline_dir / "lookup.csv" + conn = duckdb.connect() + provision_entity_df = conn.execute( + f"SELECT entity FROM '{lookup_path}' WHERE prefix='{pipeline}' AND organisation='{endpoint_resource_info['organisation']}'" + ).df() + if len(provision_entity_df) > 0: + provision_entities = provision_entity_df["entity"].values + else: + provision_entities = [] + + # Compare entities found in resource with entities in lookup.csv + missing_entity_count = len( + [entity for entity in provision_entities if entity not in existing_entities] + ) + if missing_entity_count == len(provision_entities) and len(provision_entities) > 0: + entity_summary += f"\nWARNING: NONE of the {len(provision_entities)} entities on the platform for this provision are in the resource - is this correct?" + elif missing_entity_count != 0: + entity_summary += f"\nWARNING: There are {missing_entity_count} entities on the platform for this provision that aren't present in this resource" + + return entity_summary diff --git a/tests/acceptance/test_add_data.py b/tests/acceptance/test_add_data.py index 2cdf96cd..016ea383 100644 --- a/tests/acceptance/test_add_data.py +++ b/tests/acceptance/test_add_data.py @@ -1,6 +1,7 @@ import csv import os import tempfile +from unittest import mock from unittest.mock import Mock from click.testing import CliRunner import pytest @@ -16,6 +17,36 @@ def specification_dir(tmp_path_factory): return specification_dir +@pytest.fixture +def pipeline_dir(tmp_path_factory): + pipeline_dir = tmp_path_factory.mktemp("pipeline") + + collection_name = "ancient-woodland" + # create lookups + row = { + "prefix": collection_name, + "resource": "", + "entry-number": "", + "organisation": "local-authority:SST", + "reference": "reference", + "entity": 44000001, + } + + fieldnames = row.keys() + + with open(os.path.join(pipeline_dir, "lookup.csv"), "w") as f: + dictwriter = csv.DictWriter(f, fieldnames=fieldnames) + dictwriter.writeheader() + dictwriter.writerow(row) + + return pipeline_dir + + +@pytest.fixture +def cache_dir(tmp_path_factory): + return tmp_path_factory.mktemp("cache") + + @pytest.fixture(scope="function") def collection_dir(tmp_path_factory): collection_dir = tmp_path_factory.mktemp("collection") @@ -91,9 +122,8 @@ def organisation_csv(): @pytest.fixture def mock_request_get(mocker): - data = {"reference": "1", "value": "test"} - csv_content = str(data).encode("utf-8") - + data = "reference,documentation-url\n1,url" + csv_content = data.encode("utf-8") mock_response = Mock() mock_response.status_code = 200 mock_response.request.headers = {"test": "test"} @@ -130,6 +160,8 @@ def create_input_csv( def test_cli_add_data( collection_dir, specification_dir, + pipeline_dir, + cache_dir, organisation_csv, mock_request_get, monkeypatch, @@ -159,8 +191,12 @@ def test_cli_add_data( str(collection_dir), "--specification-dir", str(specification_dir), + "--pipeline-dir", + str(pipeline_dir), "--organisation-path", str(organisation_csv), + "--cache-dir", + str(cache_dir), ], ) @@ -170,8 +206,10 @@ def test_cli_add_data( def test_cli_add_data_incorrect_input_data( collection_dir, specification_dir, + pipeline_dir, organisation_csv, mock_request_get, + cache_dir, ): incorrect_input_data = { "organisation": "", @@ -195,8 +233,12 @@ def test_cli_add_data_incorrect_input_data( str(collection_dir), "--specification-dir", str(specification_dir), + "--pipeline-dir", + str(pipeline_dir), "--organisation-path", str(organisation_csv), + "--cache-dir", + str(cache_dir), ], ) assert result.exit_code == 1 @@ -208,9 +250,11 @@ def test_cli_add_data_incorrect_input_data( def test_cli_add_data_consecutive_runs( collection_dir, specification_dir, + pipeline_dir, organisation_csv, mock_request_get, monkeypatch, + cache_dir, ): no_error_input_data = { "organisation": "local-authority:SST", @@ -237,8 +281,12 @@ def test_cli_add_data_consecutive_runs( str(collection_dir), "--specification-dir", str(specification_dir), + "--pipeline-dir", + str(pipeline_dir), "--organisation-path", str(organisation_csv), + "--cache-dir", + str(cache_dir), ], ) assert result.exit_code == 0 @@ -255,8 +303,64 @@ def test_cli_add_data_consecutive_runs( str(collection_dir), "--specification-dir", str(specification_dir), + "--pipeline-dir", + str(pipeline_dir), "--organisation-path", str(organisation_csv), + "--cache-dir", + str(cache_dir), ], ) assert result.exit_code == 0 + + +def test_cli_add_data_pipeline_fail( + collection_dir, + specification_dir, + pipeline_dir, + cache_dir, + organisation_csv, + mock_request_get, + monkeypatch, +): + no_error_input_data = { + "organisation": "local-authority:SST", + "documentation-url": "https://www.sstaffs.gov.uk/planning/conservation-and-heritage/south-staffordshires-conservation-areas", + "endpoint-url": "https://www.sstaffs.gov.uk/sites/default/files/2024-11/South Staffs Conservation Area document dataset_1.csv", + "start-date": "", + "pipelines": "conservation-area", + "plugin": "", + "licence": "ogl3", + } + csv_path = create_input_csv(no_error_input_data) + + # Mock in user input + monkeypatch.setattr("builtins.input", lambda _: "yes") + + runner = CliRunner() + with mock.patch("digital_land.commands.pipeline_run") as pipeline_mock: + pipeline_mock.side_effect = Exception("Exception while running pipeline") + result = runner.invoke( + cli, + [ + "add-data", + csv_path, + "conservation-area", + "--collection-dir", + str(collection_dir), + "--specification-dir", + str(specification_dir), + "--pipeline-dir", + str(pipeline_dir), + "--organisation-path", + str(organisation_csv), + "--cache-dir", + str(cache_dir), + ], + ) + + assert result.exit_code == 1 + assert "Pipeline failed to process resource with the following error" in str( + result.exception + ) + assert "Exception while running pipeline" in str(result.exception) diff --git a/tests/integration/test_add_data_utils.py b/tests/integration/test_add_data_utils.py index 64f87ee5..53b581ca 100644 --- a/tests/integration/test_add_data_utils.py +++ b/tests/integration/test_add_data_utils.py @@ -1,7 +1,14 @@ +import csv from datetime import datetime import os +import pytest -from digital_land.utils.add_data_utils import clear_log +from digital_land.utils.add_data_utils import ( + clear_log, + get_column_field_summary, + get_entity_summary, + get_issue_summary, +) def test_clear_logs(tmp_path_factory): @@ -17,3 +24,367 @@ def test_clear_logs(tmp_path_factory): clear_log(collection_dir, endpoint) assert not os.path.isfile(file_path) + + +def test_get_issue_summary(tmp_path_factory): + issue_dir = tmp_path_factory.mktemp("issue") + + resource = "resource" + endpoint_resource_info = {"resource": resource} + + headers = ["issue-type", "field", "value"] + rows = [ + {"issue-type": "issue-type1", "field": "field1", "value": "issue1"}, + {"issue-type": "issue-type1", "field": "field2", "value": "issue2"}, + {"issue-type": "issue-type2", "field": "field1", "value": "issue3"}, + {"issue-type": "issue-type2", "field": "field1", "value": "issue4"}, + ] + with open(os.path.join(issue_dir, resource + ".csv"), "w") as f: + writer = csv.DictWriter(f, fieldnames=headers) + writer.writeheader() + writer.writerows(rows) + + issue_summary = get_issue_summary(endpoint_resource_info, issue_dir) + + assert "issue-type1 field1 1\n field2 1" in issue_summary + assert "issue-type2 field1 2" in issue_summary + + +def test_get_entity_summary(tmp_path_factory): + issue_dir = tmp_path_factory.mktemp("issue") + transformed_dir = tmp_path_factory.mktemp("tranformed") + pipeline_dir = tmp_path_factory.mktemp("pipeline") + + resource = "resource" + endpoint_resource_info = { + "organisation": "local-authority-eng:SST", + "resource": resource, + } + pipeline = "dataset" + + issue_headers = ["issue-type", "field", "value", "line-number"] + issue_rows = [ + { + "issue-type": "unknown entity", + "field": "field1", + "value": "dataset:reference", + "line-number": 1, + }, + { + "issue-type": "unknown entity - missing reference", + "field": "field1", + "value": "dataset:", + "line-number": 2, + }, + {"issue-type": "known entity", "field": "field1", "value": "n/a"}, + ] + with open(os.path.join(issue_dir, resource + ".csv"), "w") as f: + writer = csv.DictWriter(f, fieldnames=issue_headers) + writer.writeheader() + writer.writerows(issue_rows) + + output_path = os.path.join(transformed_dir, resource + ".csv") + transformed_headers = ["entity"] + transformed_rows = [ + {"entity": 1}, + {"entity": 1}, + {"entity": 1}, + {"entity": 2}, + {"entity": 3}, + {"entity": 4}, + {"entity": 4}, + ] + with open(output_path, "w") as f: + writer = csv.DictWriter(f, fieldnames=transformed_headers) + writer.writeheader() + writer.writerows(transformed_rows) + + rows = [ + { + "prefix": "dataset", + "resource": "", + "entry-number": "", + "organisation": "local-authority:SST", + "reference": "reference", + "entity": 10, + }, + { + "prefix": "dataset", + "resource": "", + "entry-number": "", + "organisation": "local-authority:SST", + "reference": "reference", + "entity": 11, + }, + ] + + fieldnames = rows[0].keys() + + with open(os.path.join(pipeline_dir, "lookup.csv"), "w") as f: + dictwriter = csv.DictWriter(f, fieldnames=fieldnames) + dictwriter.writeheader() + dictwriter.writerows(rows) + + entity_summary = get_entity_summary( + endpoint_resource_info, output_path, pipeline, issue_dir, pipeline_dir + ) + assert "Number of existing entities in resource: 4" in entity_summary + assert "Number of new entities in resource: 2" in entity_summary + assert "reference line-number" in entity_summary + assert "reference 1" in entity_summary + assert "NaN 2" in entity_summary + + +def test_get_entity_summary_missing_entity(tmp_path_factory): + issue_dir = tmp_path_factory.mktemp("issue") + transformed_dir = tmp_path_factory.mktemp("tranformed") + pipeline_dir = tmp_path_factory.mktemp("pipeline") + + resource = "resource" + endpoint_resource_info = { + "organisation": "local-authority:SST", + "resource": resource, + } + pipeline = "dataset" + + issue_headers = ["issue-type", "field", "value", "line-number"] + with open(os.path.join(issue_dir, resource + ".csv"), "w") as f: + writer = csv.DictWriter(f, fieldnames=issue_headers) + writer.writeheader() + + output_path = os.path.join(transformed_dir, resource + ".csv") + transformed_headers = ["entity"] + transformed_rows = [ + {"entity": 1}, + {"entity": 2}, + ] + with open(output_path, "w") as f: + writer = csv.DictWriter(f, fieldnames=transformed_headers) + writer.writeheader() + writer.writerows(transformed_rows) + + # create lookups + rows = [ + { + "prefix": "dataset", + "resource": "", + "entry-number": "", + "organisation": "local-authority:SST", + "reference": "reference", + "entity": 1, + }, + { + "prefix": "dataset", + "resource": "", + "entry-number": "", + "organisation": "local-authority:SST", + "reference": "reference2", + "entity": 2, + }, + { + "prefix": "dataset", + "resource": "", + "entry-number": "", + "organisation": "local-authority:SST", + "reference": "reference3", + "entity": 3, + }, + ] + + fieldnames = rows[0].keys() + + with open(os.path.join(pipeline_dir, "lookup.csv"), "w") as f: + dictwriter = csv.DictWriter(f, fieldnames=fieldnames) + dictwriter.writeheader() + dictwriter.writerows(rows) + + entity_summary = get_entity_summary( + endpoint_resource_info, output_path, pipeline, issue_dir, pipeline_dir + ) + assert ( + "WARNING: There are 1 entities on the platform for this provision that aren't present in this resource" + in entity_summary + ) + + +def test_get_entity_summary_missing_all_entity(tmp_path_factory): + issue_dir = tmp_path_factory.mktemp("issue") + transformed_dir = tmp_path_factory.mktemp("tranformed") + pipeline_dir = tmp_path_factory.mktemp("pipeline") + + resource = "resource" + endpoint_resource_info = { + "organisation": "local-authority:SST", + "resource": resource, + } + pipeline = "dataset" + + issue_headers = ["issue-type", "field", "value", "line-number"] + with open(os.path.join(issue_dir, resource + ".csv"), "w") as f: + writer = csv.DictWriter(f, fieldnames=issue_headers) + writer.writeheader() + + output_path = os.path.join(transformed_dir, resource + ".csv") + transformed_headers = ["entity"] + transformed_rows = [ + {"entity": 1}, + {"entity": 2}, + ] + with open(output_path, "w") as f: + writer = csv.DictWriter(f, fieldnames=transformed_headers) + writer.writeheader() + writer.writerows(transformed_rows) + + # create lookups + rows = [ + { + "prefix": "dataset", + "resource": "", + "entry-number": "", + "organisation": "local-authority:SST", + "reference": "reference", + "entity": 11, + }, + { + "prefix": "dataset", + "resource": "", + "entry-number": "", + "organisation": "local-authority:SST", + "reference": "reference2", + "entity": 12, + }, + { + "prefix": "dataset", + "resource": "", + "entry-number": "", + "organisation": "local-authority:SST", + "reference": "reference3", + "entity": 13, + }, + ] + + fieldnames = rows[0].keys() + + with open(os.path.join(pipeline_dir, "lookup.csv"), "w") as f: + dictwriter = csv.DictWriter(f, fieldnames=fieldnames) + dictwriter.writeheader() + dictwriter.writerows(rows) + + entity_summary = get_entity_summary( + endpoint_resource_info, output_path, pipeline, issue_dir, pipeline_dir + ) + assert ( + "WARNING: NONE of the 3 entities on the platform for this provision are in the resource - is this correct?" + in entity_summary + ) + + +# This test also tests the 'exclude fields' functionality +# as some of those fields are present in the test spec +def test_get_column_field_summary(tmp_path_factory): + column_field_dir = tmp_path_factory.mktemp("column_field") + converted_dir = tmp_path_factory.mktemp("converted") + + resource = "resource" + pipeline = "address" + endpoint_resource_info = {"resource": resource} + + specification_dir = "tests/data/specification" + + column_field_headers = ["column", "field"] + column_field_rows = [ + {"column": "column1", "field": "address"}, + {"column": "column2", "field": "address-text"}, + {"column": "column3", "field": "end-date"}, + {"column": "column4", "field": "entry-date"}, + {"column": "column5", "field": "latitude"}, + {"column": "column6", "field": "longitude"}, + {"column": "column7", "field": "name"}, + {"column": "column8", "field": "notes"}, + {"column": "column9", "field": "reference"}, + ] + with open(os.path.join(column_field_dir, resource + ".csv"), "w") as f: + writer = csv.DictWriter(f, fieldnames=column_field_headers) + writer.writeheader() + writer.writerows(column_field_rows) + + converted_headers = [ + "column1", + "column2", + "column3", + "column4", + "column5", + "column6", + "column7", + "column8", + "column9", + "new_column", + ] + + with open(os.path.join(converted_dir, resource + ".csv"), "w") as f: + writer = csv.DictWriter(f, fieldnames=converted_headers) + writer.writeheader() + + column_field_summary = get_column_field_summary( + pipeline, + endpoint_resource_info, + column_field_dir, + converted_dir, + specification_dir, + ) + + assert "Unmapped Columns:\nnew_column" in column_field_summary + assert "Unmapped Fields:\nstart-date" in column_field_summary + + +def test_column_field_summary_no_reference(tmp_path_factory): + column_field_dir = tmp_path_factory.mktemp("column_field") + converted_dir = tmp_path_factory.mktemp("converted") + + resource = "resource" + pipeline = "address" + endpoint_resource_info = {"resource": resource} + + specification_dir = "tests/data/specification" + + column_field_headers = ["column", "field"] + column_field_rows = [ + {"column": "column1", "field": "address"}, + {"column": "column2", "field": "address-text"}, + {"column": "column3", "field": "end-date"}, + {"column": "column4", "field": "entry-date"}, + {"column": "column5", "field": "latitude"}, + {"column": "column6", "field": "longitude"}, + {"column": "column7", "field": "name"}, + {"column": "column8", "field": "notes"}, + ] + with open(os.path.join(column_field_dir, resource + ".csv"), "w") as f: + writer = csv.DictWriter(f, fieldnames=column_field_headers) + writer.writeheader() + writer.writerows(column_field_rows) + + converted_headers = [ + "column1", + "column2", + "column3", + "column4", + "column5", + "column6", + "column7", + "column8", + ] + + with open(os.path.join(converted_dir, resource + ".csv"), "w") as f: + writer = csv.DictWriter(f, fieldnames=converted_headers) + writer.writeheader() + + with pytest.raises(ValueError) as error: + get_column_field_summary( + pipeline, + endpoint_resource_info, + column_field_dir, + converted_dir, + specification_dir, + ) + + assert "Reference not found in the mapped fields" in str(error)