Skip to content

Commit

Permalink
assign entities only if there are unknown entities
Browse files Browse the repository at this point in the history
  • Loading branch information
CarlosCoelhoSL committed Jan 3, 2025
1 parent f2a7833 commit 76be56d
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 83 deletions.
171 changes: 88 additions & 83 deletions digital_land/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
get_issue_summary,
is_date_valid,
is_url_valid,
proceed,
)

from .register import hash_value
Expand Down Expand Up @@ -730,14 +731,8 @@ def add_data(
# We need to delete the collection log to enable consecutive runs due to collection.load()
clear_log(collection_dir, endpoint_resource_info["endpoint"])

user_response = (
input("Do you want to continue processing this resource? (yes/no): ")
.strip()
.lower()
)

if user_response != "yes":
print("Operation cancelled by user.")
# Ask if user wants to proceed
if not proceed("Do you want to continue processing this resource? (yes/no): "):
return

if not cache_dir:
Expand Down Expand Up @@ -827,93 +822,103 @@ def add_data(
)
print(entity_summary)

user_response = (
input("Do you want to assign entities for this resource? (yes/no): ")
.strip()
.lower()
)

if user_response != "yes":
print("Operation cancelled by user.")
return

# Resource has been processed, run assign entities and reprocess
# Check for unknown entities and assign them
if (
"unknown entity" in issue_summary
or "unknown entity - missing reference" in issue_summary
):
# Ask if user wants to proceed
print("\nThere are unknown entities")
if not proceed(
"Do you want to assign entities for this resource? (yes/no): "
):
return

# Copy pipeline dir to cache dir so we can modify it
cache_pipeline_dir = cache_dir / "pipeline"
copy_tree(str(pipeline_dir), str(cache_pipeline_dir))
# Resource has been processed, run assign entities and reprocess

assign_entities(
resource_file_paths=[endpoint_resource_info["resource_path"]],
collection=collection,
dataset=dataset,
organisation=[endpoint_resource_info["organisation"]],
pipeline_dir=cache_pipeline_dir,
specification_dir=specification_dir,
organisation_path=organisation_path,
endpoints=[endpoint_resource_info["endpoint"]],
tmp_dir=cache_dir,
)
# Copy pipeline dir to cache dir so we can modify it
cache_pipeline_dir = cache_dir / "pipeline"
copy_tree(str(pipeline_dir), str(cache_pipeline_dir))

pipeline = Pipeline(cache_pipeline_dir, dataset)

# Now rerun pipeline with new assigned entities
try:
pipeline_run(
dataset,
pipeline,
Specification(specification_dir),
endpoint_resource_info["resource_path"],
output_path=output_path,
collection_dir=collection_dir,
issue_dir=issue_dir,
operational_issue_dir=operational_issue_dir,
column_field_dir=column_field_dir,
dataset_resource_dir=dataset_resource_dir,
converted_resource_dir=converted_resource_dir,
assign_entities(
resource_file_paths=[endpoint_resource_info["resource_path"]],
collection=collection,
dataset=dataset,
organisation=[endpoint_resource_info["organisation"]],
pipeline_dir=cache_pipeline_dir,
specification_dir=specification_dir,
organisation_path=organisation_path,
endpoints=[endpoint_resource_info["endpoint"]],
organisations=[endpoint_resource_info["organisation"]],
resource=endpoint_resource_info["resource"],
output_log_dir=output_log_dir,
converted_path=os.path.join(
converted_dir, endpoint_resource_info["resource"] + ".csv"
),
)
except Exception as e:
raise RuntimeError(
f"Pipeline failed to process resource with the following error: {e}"
tmp_dir=cache_dir,
)

entity_summary = get_entity_summary(
endpoint_resource_info, output_path, dataset, issue_dir, cache_pipeline_dir
)
print(entity_summary)
pipeline = Pipeline(cache_pipeline_dir, dataset)

# Check if there are unassigned entities in the issue summary - raise exception if they still exist
issue_summary = get_issue_summary(endpoint_resource_info, issue_dir)
if (
"unknown entity" in issue_summary
or "unknown entity - missing reference" in issue_summary
):
print(issue_summary)
raise Exception(
f"Unknown entities remain in resource {endpoint_resource_info['resource']} after assigning entities"
# Now rerun pipeline with new assigned entities
try:
pipeline_run(
dataset,
pipeline,
Specification(specification_dir),
endpoint_resource_info["resource_path"],
output_path=output_path,
collection_dir=collection_dir,
issue_dir=issue_dir,
operational_issue_dir=operational_issue_dir,
column_field_dir=column_field_dir,
dataset_resource_dir=dataset_resource_dir,
converted_resource_dir=converted_resource_dir,
organisation_path=organisation_path,
endpoints=[endpoint_resource_info["endpoint"]],
organisations=[endpoint_resource_info["organisation"]],
resource=endpoint_resource_info["resource"],
output_log_dir=output_log_dir,
converted_path=os.path.join(
converted_dir, endpoint_resource_info["resource"] + ".csv"
),
)
except Exception as e:
raise RuntimeError(
f"Pipeline failed to process resource with the following error: {e}"
)

entity_summary = get_entity_summary(
endpoint_resource_info,
output_path,
dataset,
issue_dir,
cache_pipeline_dir,
)
print(entity_summary)

user_response = (
input("Do you want to save changes made in this session? (yes/no): ")
.strip()
.lower()
)
# Check if there are unassigned entities in the issue summary - raise exception if they still exist
issue_summary = get_issue_summary(endpoint_resource_info, issue_dir)
if (
"unknown entity" in issue_summary
or "unknown entity - missing reference" in issue_summary
):
print(issue_summary)
raise Exception(
f"Unknown entities remain in resource {endpoint_resource_info['resource']} after assigning entities"
)

if user_response != "yes":
print("Operation cancelled by user.")
return
# Ask if user wants to proceed
if not proceed(
"Do you want to save changes made in this session? (yes/no): "
):
return

# Save changes to lookup.csv
shutil.copy(cache_pipeline_dir / "lookup.csv", pipeline_dir / "lookup.csv")
else:
# Ask if user wants to proceed
if not proceed(
"Do you want to save changes made in this session? (yes/no): "
):
return

# Save changes to collection and lookup.csv
# Save changes to collection
collection.save_csv()
shutil.copy(cache_pipeline_dir / "lookup.csv", pipeline_dir / "lookup.csv")


def add_endpoints_and_lookups(
Expand Down Expand Up @@ -1062,7 +1067,7 @@ def assign_entities(

print("")
print("======================================================================")
print("New Lookups")
print("New Entities")
print("======================================================================")

new_lookups = []
Expand Down
10 changes: 10 additions & 0 deletions digital_land/utils/add_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ def is_date_valid(date, date_type):
return True, ""


def proceed(message):
user_response = input(message).strip().lower()

if user_response != "yes":
print("Operation cancelled by user.")
return False

return True


def clear_log(collection_dir, endpoint):
collector = Collector(collection_dir=collection_dir)
log_path = collector.log_path(datetime.utcnow(), endpoint)
Expand Down

0 comments on commit 76be56d

Please sign in to comment.