From 753b40a3b77ef104ad6b3e8fe58687f40d8dbdd1 Mon Sep 17 00:00:00 2001 From: eveleighoj <35256612+eveleighoj@users.noreply.github.com> Date: Wed, 15 Jan 2025 18:51:31 +0000 Subject: [PATCH] start range in correct place --- digital_land/commands.py | 68 +++++++++++++------------ digital_land/package/dataset_parquet.py | 3 +- 2 files changed, 37 insertions(+), 34 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 010c42c3..641e9c8f 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -25,7 +25,8 @@ ConvertedResourceLog, ) from digital_land.organisation import Organisation -from digital_land.package.dataset import DatasetPackage + +# from digital_land.package.dataset import DatasetPackage from digital_land.package.dataset_parquet import DatasetParquetPackage from digital_land.phase.combine import FactCombinePhase from digital_land.phase.concat import ConcatFieldPhase @@ -392,37 +393,37 @@ def dataset_create( sys.exit(2) # Set up initial objects - organisation = Organisation( - organisation_path=organisation_path, pipeline_dir=Path(pipeline.path) - ) + # organisation = Organisation( + # organisation_path=organisation_path, pipeline_dir=Path(pipeline.path) + # ) # create sqlite dataset packageas before and load inn data that isn't in the parquetpackage yet - package = DatasetPackage( - dataset, - organisation=organisation, - path=output_path, - specification_dir=None, # TBD: package should use this specification object - ) - package.create() - for path in input_paths: - path_obj = Path(path) - package.load_column_fields(column_field_dir / dataset / f"{path_obj.stem}.csv") - package.load_dataset_resource( - dataset_resource_dir / dataset / f"{path_obj.stem}.csv" - ) - - old_entity_path = Path(pipeline.path) / "old-entity.csv" - if old_entity_path.exists(): - package.load_old_entities(old_entity_path) - - issue_paths = issue_dir / dataset - if issue_paths.exists(): - for issue_path in os.listdir(issue_paths): - package.load_issues(os.path.join(issue_paths, issue_path)) - else: - logging.warning("No directory for this dataset in the provided issue_directory") - - package.add_counts() + # package = DatasetPackage( + # dataset, + # organisation=organisation, + # path=output_path, + # specification_dir=None, # TBD: package should use this specification object + # ) + # package.create() + # for path in input_paths: + # path_obj = Path(path) + # package.load_column_fields(column_field_dir / dataset / f"{path_obj.stem}.csv") + # package.load_dataset_resource( + # dataset_resource_dir / dataset / f"{path_obj.stem}.csv" + # ) + + # old_entity_path = Path(pipeline.path) / "old-entity.csv" + # if old_entity_path.exists(): + # package.load_old_entities(old_entity_path) + + # issue_paths = issue_dir / dataset + # if issue_paths.exists(): + # for issue_path in os.listdir(issue_paths): + # package.load_issues(os.path.join(issue_paths, issue_path)) + # else: + # logging.warning("No directory for this dataset in the provided issue_directory") + + # package.add_counts() # Repeat for parquet # Set up cache directory to store parquet files. The sqlite files created from this will be saved in the dataset @@ -433,12 +434,13 @@ def dataset_create( dataset, path=dataset_parquet_path, specification_dir=None, # TBD: package should use this specification object + duckdb_path=cache_dir / "overflow.duckdb", ) # pqpackage.create_temp_table(input_paths) - pqpackage.load_facts(transformed_parquet_dir) - pqpackage.load_fact_resource(transformed_parquet_dir) + # pqpackage.load_facts(transformed_parquet_dir) + # pqpackage.load_fact_resource(transformed_parquet_dir) pqpackage.load_entities(transformed_parquet_dir, resource_path, organisation_path) - pqpackage.load_to_sqlite(output_path) + # pqpackage.load_to_sqlite(output_path) def dataset_dump(input_path, output_path): diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index 17b5f645..94a55907 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -227,6 +227,7 @@ def load_entities_range( query = f""" SELECT DISTINCT REPLACE(field,'-','_') FROM parquet_scan('{transformed_parquet_dir}/*.parquet') + WHERE entity >= {entity_range[0]} AND entity < {entity_range[1]} """ # distinct_fields - list of fields in the field in fact @@ -397,7 +398,7 @@ def load_entities(self, transformed_parquet_dir, resource_path, organisation_pat entity_limit = 1000000 if total_entities > entity_limit: logger.info(f"total entities {total_entities} exceeds limit {entity_limit}") - _ = 0 + _ = min_entity file_count = 1 while _ < max_entity: temp_output_path = (