Skip to content

Commit

Permalink
Use parquet to speed up dataset building.
Browse files Browse the repository at this point in the history
  • Loading branch information
cjohns-scottlogic committed Dec 2, 2024
1 parent 1be654b commit 5fb1f38
Show file tree
Hide file tree
Showing 10 changed files with 1,843 additions and 9 deletions.
7 changes: 7 additions & 0 deletions digital_land/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
operational_issue_save_csv,
convert,
dataset_create,
# dataset_parquet_create,
pipeline_run,
collection_add_source,
add_endpoints_and_lookups,
Expand Down Expand Up @@ -140,6 +141,8 @@ def convert_cmd(input_path, output_path):
@column_field_dir
@dataset_resource_dir
@issue_dir
@click.option("--cache-dir", type=click.Path(), default="var/cache/parquet")
@click.option("--resource-path", type=click.Path(), default="collection/resource.csv")
@click.argument("input-paths", nargs=-1, type=click.Path(exists=True))
@click.pass_context
def dataset_create_cmd(
Expand All @@ -150,6 +153,8 @@ def dataset_create_cmd(
column_field_dir,
dataset_resource_dir,
issue_dir,
cache_dir,
resource_path,
):
return dataset_create(
input_paths=input_paths,
Expand All @@ -161,6 +166,8 @@ def dataset_create_cmd(
column_field_dir=column_field_dir,
dataset_resource_dir=dataset_resource_dir,
issue_dir=issue_dir,
cache_dir=cache_dir,
resource_path=resource_path,
)


Expand Down
48 changes: 41 additions & 7 deletions digital_land/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import geojson
import shapely

import subprocess

from digital_land.package.organisation import OrganisationPackage
from digital_land.check import duplicate_reference_check
from digital_land.specification import Specification
Expand All @@ -26,6 +28,7 @@
)
from digital_land.organisation import Organisation
from digital_land.package.dataset import DatasetPackage
from digital_land.package.datasetparquet import DatasetParquetPackage
from digital_land.phase.combine import FactCombinePhase
from digital_land.phase.concat import ConcatFieldPhase
from digital_land.phase.convert import ConvertPhase, execute
Expand Down Expand Up @@ -357,7 +360,11 @@ def dataset_create(
issue_dir="issue",
column_field_dir="var/column-field",
dataset_resource_dir="var/dataset-resource",
cache_dir="var/cache/parquet",
resource_path="collection/resource.csv",
):
cache_dir = os.path.join(cache_dir, dataset)

if not output_path:
print("missing output path", file=sys.stderr)
sys.exit(2)
Expand All @@ -377,10 +384,8 @@ def dataset_create(
package.create()
for path in input_paths:
path_obj = Path(path)
package.load_transformed(path)
package.load_column_fields(column_field_dir / dataset / path_obj.name)
package.load_dataset_resource(dataset_resource_dir / dataset / path_obj.name)
package.load_entities()

old_entity_path = os.path.join(pipeline.path, "old-entity.csv")
if os.path.exists(old_entity_path):
Expand All @@ -395,9 +400,29 @@ def dataset_create(

package.add_counts()

# Repeat for parquet
# Set up cache directory to store parquet files. The sqlite files created from this will be saved in the dataset
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)

pqpackage = DatasetParquetPackage(
dataset,
organisation=organisation,
path=output_path,
cache_dir=cache_dir,
resource_path=resource_path,
specification_dir=None, # TBD: package should use this specification object
)
pqpackage.create_temp_table(input_paths)
pqpackage.load_facts()
pqpackage.load_fact_resource()
pqpackage.load_entities()
pqpackage.pq_to_sqlite()
pqpackage.close_conn()


def dataset_dump(input_path, output_path):
cmd = f"sqlite3 -header -csv {input_path} 'select * from entity;' > {output_path}"
cmd = f"sqlite3 -header -csv {input_path} 'select * from entity order by entity;' > {output_path}"
logging.info(cmd)
os.system(cmd)

Expand All @@ -409,7 +434,7 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset):
elif isinstance(csv_path, Path):
dataset_name = csv_path.stem
else:
logging.error(f"Can't extract datapackage name from {csv_path}")
logging.error(f"Can't extract datapackage name from {csv_path}")
sys.exit(-1)

flattened_csv_path = os.path.join(flattened_dir, f"{dataset_name}.csv")
Expand Down Expand Up @@ -456,6 +481,7 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset):
batch_size = 100000
temp_geojson_files = []
geography_entities = [e for e in entities if e["typology"] == "geography"]

for i in range(0, len(geography_entities), batch_size):
batch = geography_entities[i : i + batch_size]
feature_collection = process_data_in_batches(batch, flattened_dir, dataset_name)
Expand All @@ -470,6 +496,13 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset):

if all(os.path.isfile(path) for path in temp_geojson_files):
rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson")
env = os.environ.copy()

out, _ = subprocess.Popen(
["ogr2ogr", "--version"],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
).communicate()
env = (
dict(os.environ, OGR_GEOJSON_MAX_OBJ_SIZE="0")
if get_gdal_version() >= Version("3.5.2")
Expand Down Expand Up @@ -874,9 +907,10 @@ def process_data_in_batches(entities, flattened_dir, dataset_name):
logging.error(f"Error loading wkt from entity {entity['entity']}")
logging.error(e)
else:
logging.error(
f"No geometry or point data for entity {entity['entity']} with typology 'geography'"
)
pass
# logging.error(
# f"No geometry or point data for entity {entity['entity']} with typology 'geography'"
# )

if features:
feature_collection = geojson.FeatureCollection(
Expand Down
2 changes: 1 addition & 1 deletion digital_land/expectations/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def count_lpa_boundary(

result = comparison_rules[comparison_rule]

message = f"there were {actual} entities found"
message = f"there were {actual} enttities found"

details = {
"actual": actual,
Expand Down
Loading

0 comments on commit 5fb1f38

Please sign in to comment.