From 76be56d06e3d46bf42c382a2f0ed1670dbdaccf6 Mon Sep 17 00:00:00 2001 From: CarlosCoelhoSL Date: Fri, 3 Jan 2025 12:17:37 +0000 Subject: [PATCH] assign entities only if there are unknown entities --- digital_land/commands.py | 171 ++++++++++++++------------- digital_land/utils/add_data_utils.py | 10 ++ 2 files changed, 98 insertions(+), 83 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index efab9203..8514b32b 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -67,6 +67,7 @@ get_issue_summary, is_date_valid, is_url_valid, + proceed, ) from .register import hash_value @@ -730,14 +731,8 @@ def add_data( # We need to delete the collection log to enable consecutive runs due to collection.load() clear_log(collection_dir, endpoint_resource_info["endpoint"]) - user_response = ( - input("Do you want to continue processing this resource? (yes/no): ") - .strip() - .lower() - ) - - if user_response != "yes": - print("Operation cancelled by user.") + # Ask if user wants to proceed + if not proceed("Do you want to continue processing this resource? (yes/no): "): return if not cache_dir: @@ -827,93 +822,103 @@ def add_data( ) print(entity_summary) - user_response = ( - input("Do you want to assign entities for this resource? (yes/no): ") - .strip() - .lower() - ) - - if user_response != "yes": - print("Operation cancelled by user.") - return - - # Resource has been processed, run assign entities and reprocess + # Check for unknown entities and assign them + if ( + "unknown entity" in issue_summary + or "unknown entity - missing reference" in issue_summary + ): + # Ask if user wants to proceed + print("\nThere are unknown entities") + if not proceed( + "Do you want to assign entities for this resource? (yes/no): " + ): + return - # Copy pipeline dir to cache dir so we can modify it - cache_pipeline_dir = cache_dir / "pipeline" - copy_tree(str(pipeline_dir), str(cache_pipeline_dir)) + # Resource has been processed, run assign entities and reprocess - assign_entities( - resource_file_paths=[endpoint_resource_info["resource_path"]], - collection=collection, - dataset=dataset, - organisation=[endpoint_resource_info["organisation"]], - pipeline_dir=cache_pipeline_dir, - specification_dir=specification_dir, - organisation_path=organisation_path, - endpoints=[endpoint_resource_info["endpoint"]], - tmp_dir=cache_dir, - ) + # Copy pipeline dir to cache dir so we can modify it + cache_pipeline_dir = cache_dir / "pipeline" + copy_tree(str(pipeline_dir), str(cache_pipeline_dir)) - pipeline = Pipeline(cache_pipeline_dir, dataset) - - # Now rerun pipeline with new assigned entities - try: - pipeline_run( - dataset, - pipeline, - Specification(specification_dir), - endpoint_resource_info["resource_path"], - output_path=output_path, - collection_dir=collection_dir, - issue_dir=issue_dir, - operational_issue_dir=operational_issue_dir, - column_field_dir=column_field_dir, - dataset_resource_dir=dataset_resource_dir, - converted_resource_dir=converted_resource_dir, + assign_entities( + resource_file_paths=[endpoint_resource_info["resource_path"]], + collection=collection, + dataset=dataset, + organisation=[endpoint_resource_info["organisation"]], + pipeline_dir=cache_pipeline_dir, + specification_dir=specification_dir, organisation_path=organisation_path, endpoints=[endpoint_resource_info["endpoint"]], - organisations=[endpoint_resource_info["organisation"]], - resource=endpoint_resource_info["resource"], - output_log_dir=output_log_dir, - converted_path=os.path.join( - converted_dir, endpoint_resource_info["resource"] + ".csv" - ), - ) - except Exception as e: - raise RuntimeError( - f"Pipeline failed to process resource with the following error: {e}" + tmp_dir=cache_dir, ) - entity_summary = get_entity_summary( - endpoint_resource_info, output_path, dataset, issue_dir, cache_pipeline_dir - ) - print(entity_summary) + pipeline = Pipeline(cache_pipeline_dir, dataset) - # Check if there are unassigned entities in the issue summary - raise exception if they still exist - issue_summary = get_issue_summary(endpoint_resource_info, issue_dir) - if ( - "unknown entity" in issue_summary - or "unknown entity - missing reference" in issue_summary - ): - print(issue_summary) - raise Exception( - f"Unknown entities remain in resource {endpoint_resource_info['resource']} after assigning entities" + # Now rerun pipeline with new assigned entities + try: + pipeline_run( + dataset, + pipeline, + Specification(specification_dir), + endpoint_resource_info["resource_path"], + output_path=output_path, + collection_dir=collection_dir, + issue_dir=issue_dir, + operational_issue_dir=operational_issue_dir, + column_field_dir=column_field_dir, + dataset_resource_dir=dataset_resource_dir, + converted_resource_dir=converted_resource_dir, + organisation_path=organisation_path, + endpoints=[endpoint_resource_info["endpoint"]], + organisations=[endpoint_resource_info["organisation"]], + resource=endpoint_resource_info["resource"], + output_log_dir=output_log_dir, + converted_path=os.path.join( + converted_dir, endpoint_resource_info["resource"] + ".csv" + ), + ) + except Exception as e: + raise RuntimeError( + f"Pipeline failed to process resource with the following error: {e}" + ) + + entity_summary = get_entity_summary( + endpoint_resource_info, + output_path, + dataset, + issue_dir, + cache_pipeline_dir, ) + print(entity_summary) - user_response = ( - input("Do you want to save changes made in this session? (yes/no): ") - .strip() - .lower() - ) + # Check if there are unassigned entities in the issue summary - raise exception if they still exist + issue_summary = get_issue_summary(endpoint_resource_info, issue_dir) + if ( + "unknown entity" in issue_summary + or "unknown entity - missing reference" in issue_summary + ): + print(issue_summary) + raise Exception( + f"Unknown entities remain in resource {endpoint_resource_info['resource']} after assigning entities" + ) - if user_response != "yes": - print("Operation cancelled by user.") - return + # Ask if user wants to proceed + if not proceed( + "Do you want to save changes made in this session? (yes/no): " + ): + return + + # Save changes to lookup.csv + shutil.copy(cache_pipeline_dir / "lookup.csv", pipeline_dir / "lookup.csv") + else: + # Ask if user wants to proceed + if not proceed( + "Do you want to save changes made in this session? (yes/no): " + ): + return - # Save changes to collection and lookup.csv + # Save changes to collection collection.save_csv() - shutil.copy(cache_pipeline_dir / "lookup.csv", pipeline_dir / "lookup.csv") def add_endpoints_and_lookups( @@ -1062,7 +1067,7 @@ def assign_entities( print("") print("======================================================================") - print("New Lookups") + print("New Entities") print("======================================================================") new_lookups = [] diff --git a/digital_land/utils/add_data_utils.py b/digital_land/utils/add_data_utils.py index a2b54415..43d507cc 100644 --- a/digital_land/utils/add_data_utils.py +++ b/digital_land/utils/add_data_utils.py @@ -48,6 +48,16 @@ def is_date_valid(date, date_type): return True, "" +def proceed(message): + user_response = input(message).strip().lower() + + if user_response != "yes": + print("Operation cancelled by user.") + return False + + return True + + def clear_log(collection_dir, endpoint): collector = Collector(collection_dir=collection_dir) log_path = collector.log_path(datetime.utcnow(), endpoint)