Skip to content

Commit

Permalink
start range in correct place
Browse files Browse the repository at this point in the history
  • Loading branch information
eveleighoj committed Jan 15, 2025
1 parent ccd1dda commit 753b40a
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 34 deletions.
68 changes: 35 additions & 33 deletions digital_land/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
ConvertedResourceLog,
)
from digital_land.organisation import Organisation
from digital_land.package.dataset import DatasetPackage

# from digital_land.package.dataset import DatasetPackage
from digital_land.package.dataset_parquet import DatasetParquetPackage
from digital_land.phase.combine import FactCombinePhase
from digital_land.phase.concat import ConcatFieldPhase
Expand Down Expand Up @@ -392,37 +393,37 @@ def dataset_create(
sys.exit(2)

# Set up initial objects
organisation = Organisation(
organisation_path=organisation_path, pipeline_dir=Path(pipeline.path)
)
# organisation = Organisation(
# organisation_path=organisation_path, pipeline_dir=Path(pipeline.path)
# )

# create sqlite dataset packageas before and load inn data that isn't in the parquetpackage yet
package = DatasetPackage(
dataset,
organisation=organisation,
path=output_path,
specification_dir=None, # TBD: package should use this specification object
)
package.create()
for path in input_paths:
path_obj = Path(path)
package.load_column_fields(column_field_dir / dataset / f"{path_obj.stem}.csv")
package.load_dataset_resource(
dataset_resource_dir / dataset / f"{path_obj.stem}.csv"
)

old_entity_path = Path(pipeline.path) / "old-entity.csv"
if old_entity_path.exists():
package.load_old_entities(old_entity_path)

issue_paths = issue_dir / dataset
if issue_paths.exists():
for issue_path in os.listdir(issue_paths):
package.load_issues(os.path.join(issue_paths, issue_path))
else:
logging.warning("No directory for this dataset in the provided issue_directory")

package.add_counts()
# package = DatasetPackage(
# dataset,
# organisation=organisation,
# path=output_path,
# specification_dir=None, # TBD: package should use this specification object
# )
# package.create()
# for path in input_paths:
# path_obj = Path(path)
# package.load_column_fields(column_field_dir / dataset / f"{path_obj.stem}.csv")
# package.load_dataset_resource(
# dataset_resource_dir / dataset / f"{path_obj.stem}.csv"
# )

# old_entity_path = Path(pipeline.path) / "old-entity.csv"
# if old_entity_path.exists():
# package.load_old_entities(old_entity_path)

# issue_paths = issue_dir / dataset
# if issue_paths.exists():
# for issue_path in os.listdir(issue_paths):
# package.load_issues(os.path.join(issue_paths, issue_path))
# else:
# logging.warning("No directory for this dataset in the provided issue_directory")

# package.add_counts()

# Repeat for parquet
# Set up cache directory to store parquet files. The sqlite files created from this will be saved in the dataset
Expand All @@ -433,12 +434,13 @@ def dataset_create(
dataset,
path=dataset_parquet_path,
specification_dir=None, # TBD: package should use this specification object
duckdb_path=cache_dir / "overflow.duckdb",
)
# pqpackage.create_temp_table(input_paths)
pqpackage.load_facts(transformed_parquet_dir)
pqpackage.load_fact_resource(transformed_parquet_dir)
# pqpackage.load_facts(transformed_parquet_dir)
# pqpackage.load_fact_resource(transformed_parquet_dir)
pqpackage.load_entities(transformed_parquet_dir, resource_path, organisation_path)
pqpackage.load_to_sqlite(output_path)
# pqpackage.load_to_sqlite(output_path)


def dataset_dump(input_path, output_path):
Expand Down
3 changes: 2 additions & 1 deletion digital_land/package/dataset_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ def load_entities_range(
query = f"""
SELECT DISTINCT REPLACE(field,'-','_')
FROM parquet_scan('{transformed_parquet_dir}/*.parquet')
WHERE entity >= {entity_range[0]} AND entity < {entity_range[1]}
"""

# distinct_fields - list of fields in the field in fact
Expand Down Expand Up @@ -397,7 +398,7 @@ def load_entities(self, transformed_parquet_dir, resource_path, organisation_pat
entity_limit = 1000000
if total_entities > entity_limit:
logger.info(f"total entities {total_entities} exceeds limit {entity_limit}")
_ = 0
_ = min_entity
file_count = 1
while _ < max_entity:
temp_output_path = (
Expand Down

0 comments on commit 753b40a

Please sign in to comment.