Skip to content

Commit

Permalink
Revert "Use parquet to speed up dataset building. (#290)"
Browse files Browse the repository at this point in the history
This reverts commit 93a97bd.
  • Loading branch information
cjohns-scottlogic authored Dec 4, 2024
1 parent 42d033e commit ebce134
Show file tree
Hide file tree
Showing 11 changed files with 6 additions and 1,848 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ demodata/
*.gfs
.venv
.direnv
/var/cache
var/cache
/collection
/specification
ssl.pem
/var
var
.junitxml
pyrightconfig.json
.idea
Expand Down
6 changes: 0 additions & 6 deletions digital_land/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ def convert_cmd(input_path, output_path):
@column_field_dir
@dataset_resource_dir
@issue_dir
@click.option("--cache-dir", type=click.Path(), default="var/cache/parquet")
@click.option("--resource-path", type=click.Path(), default="collection/resource.csv")
@click.argument("input-paths", nargs=-1, type=click.Path(exists=True))
@click.pass_context
def dataset_create_cmd(
Expand All @@ -152,8 +150,6 @@ def dataset_create_cmd(
column_field_dir,
dataset_resource_dir,
issue_dir,
cache_dir,
resource_path,
):
return dataset_create(
input_paths=input_paths,
Expand All @@ -165,8 +161,6 @@ def dataset_create_cmd(
column_field_dir=column_field_dir,
dataset_resource_dir=dataset_resource_dir,
issue_dir=issue_dir,
cache_dir=cache_dir,
resource_path=resource_path,
)


Expand Down
41 changes: 4 additions & 37 deletions digital_land/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import geojson
import shapely

import subprocess

from digital_land.package.organisation import OrganisationPackage
from digital_land.check import duplicate_reference_check
from digital_land.specification import Specification
Expand All @@ -28,7 +26,6 @@
)
from digital_land.organisation import Organisation
from digital_land.package.dataset import DatasetPackage
from digital_land.package.datasetparquet import DatasetParquetPackage
from digital_land.phase.combine import FactCombinePhase
from digital_land.phase.concat import ConcatFieldPhase
from digital_land.phase.convert import ConvertPhase, execute
Expand Down Expand Up @@ -360,11 +357,7 @@ def dataset_create(
issue_dir="issue",
column_field_dir="var/column-field",
dataset_resource_dir="var/dataset-resource",
cache_dir="var/cache/parquet",
resource_path="collection/resource.csv",
):
cache_dir = os.path.join(cache_dir, dataset)

if not output_path:
print("missing output path", file=sys.stderr)
sys.exit(2)
Expand All @@ -384,8 +377,10 @@ def dataset_create(
package.create()
for path in input_paths:
path_obj = Path(path)
package.load_transformed(path)
package.load_column_fields(column_field_dir / dataset / path_obj.name)
package.load_dataset_resource(dataset_resource_dir / dataset / path_obj.name)
package.load_entities()

old_entity_path = os.path.join(pipeline.path, "old-entity.csv")
if os.path.exists(old_entity_path):
Expand All @@ -400,29 +395,9 @@ def dataset_create(

package.add_counts()

# Repeat for parquet
# Set up cache directory to store parquet files. The sqlite files created from this will be saved in the dataset
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)

pqpackage = DatasetParquetPackage(
dataset,
organisation=organisation,
path=output_path,
cache_dir=cache_dir,
resource_path=resource_path,
specification_dir=None, # TBD: package should use this specification object
)
pqpackage.create_temp_table(input_paths)
pqpackage.load_facts()
pqpackage.load_fact_resource()
pqpackage.load_entities()
pqpackage.pq_to_sqlite()
pqpackage.close_conn()


def dataset_dump(input_path, output_path):
cmd = f"sqlite3 -header -csv {input_path} 'select * from entity order by entity;' > {output_path}"
cmd = f"sqlite3 -header -csv {input_path} 'select * from entity;' > {output_path}"
logging.info(cmd)
os.system(cmd)

Expand All @@ -434,7 +409,7 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset):
elif isinstance(csv_path, Path):
dataset_name = csv_path.stem
else:
logging.error(f"Can't extract datapackage name from {csv_path}")
logging.error(f"Can't extract datapackage name from {csv_path}")
sys.exit(-1)

flattened_csv_path = os.path.join(flattened_dir, f"{dataset_name}.csv")
Expand Down Expand Up @@ -481,7 +456,6 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset):
batch_size = 100000
temp_geojson_files = []
geography_entities = [e for e in entities if e["typology"] == "geography"]

for i in range(0, len(geography_entities), batch_size):
batch = geography_entities[i : i + batch_size]
feature_collection = process_data_in_batches(batch, flattened_dir, dataset_name)
Expand All @@ -496,13 +470,6 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset):

if all(os.path.isfile(path) for path in temp_geojson_files):
rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson")
env = os.environ.copy()

out, _ = subprocess.Popen(
["ogr2ogr", "--version"],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
).communicate()
env = (
dict(os.environ, OGR_GEOJSON_MAX_OBJ_SIZE="0")
if get_gdal_version() >= Version("3.5.2")
Expand Down
Loading

0 comments on commit ebce134

Please sign in to comment.