Skip to content

Commit

Permalink
Use pd.read_csv for border collection
Browse files Browse the repository at this point in the history
  • Loading branch information
alexiglaser committed Dec 3, 2024
1 parent 6ef7811 commit 8b468fc
Showing 1 changed file with 39 additions and 30 deletions.
69 changes: 39 additions & 30 deletions digital_land/package/datasetparquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import duckdb
from .package import Package
import resource
import pandas as pd

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -71,35 +71,44 @@ def create_temp_table(self, input_paths):
# max_limit = 200000000 # Maximum allowable line size to attempt

# increment = False
while True:
try:
self.conn.execute("DROP TABLE IF EXISTS temp_table")
query = f"""
CREATE TEMPORARY TABLE temp_table AS
SELECT *
FROM read_csv(
[{input_paths_str}],
columns = {self.schema},
header = true,
force_not_null = {[field for field in self.schema.keys()]},
max_line_size={max_size}
)
"""
self.conn.execute(query)
break
except duckdb.Error as e: # Catch specific DuckDB error
if "Value with unterminated quote" in str(e):
hard_limit = int(resource.getrlimit(resource.RLIMIT_AS)[1])
if max_size < hard_limit / 3:
logging.info(
f"Initial max_size did not work, setting it to {hard_limit / 2}"
)
max_size = hard_limit / 2
else:
raise
else:
logging.info(f"Failed to read in when max_size = {max_size}")
raise
try:
self.conn.execute("DROP TABLE IF EXISTS temp_table")
query = f"""
CREATE TEMPORARY TABLE temp_table AS
SELECT *
FROM read_csv(
[{input_paths_str}],
columns = {self.schema},
header = true,
force_not_null = {[field for field in self.schema.keys()]},
max_line_size={max_size}
)
"""
self.conn.execute(query)
except duckdb.Error as e: # Catch specific DuckDB error when running border collection
if "Value with unterminated quote" in str(e):
dataframes = []
for file_path in input_paths_str:
df = pd.read_csv(file_path, lineterminator="\n")
dataframes.append(df)
combined_df = pd.concat(dataframes, ignore_index=True)

# Register the combined DataFrame
self.conn.register("temp_table", combined_df)

del combined_df

# hard_limit = resource.getrlimit(resource.RLIMIT_AS)[1]
# if max_size < hard_limit / 3:
# logging.info(
# f"Initial max_size did not work, setting it to {hard_limit / 2}"
# )
# max_size = hard_limit / 2
# else:
# raise
else:
logging.info(f"Failed to read in when max_size = {max_size}")
raise

def load_facts(self):
logging.info("loading facts from temp table")
Expand Down

0 comments on commit 8b468fc

Please sign in to comment.