Skip to content

Commit

Permalink
Better logging
Browse files Browse the repository at this point in the history
  • Loading branch information
skrydal committed Aug 21, 2024
1 parent 0320a23 commit 63457cc
Showing 1 changed file with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from typing import Any, Dict, Iterable, List, Optional

from pyiceberg.catalog import Catalog
from pyiceberg.exceptions import NoSuchIcebergTableError
from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit
from pyiceberg.table import Table
from pyiceberg.typedef import Identifier
Expand Down Expand Up @@ -137,9 +136,8 @@ def _get_datasets(self, catalog: Catalog) -> Iterable[Identifier]:
yield from catalog.list_tables(namespace)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
thread_local = threading.local()

def _process_dataset(dataset_path):
thread_local = threading.local()
LOGGER.debug("Processing dataset for path %s", dataset_path)
dataset_name = ".".join(dataset_path)
if not self.config.table_pattern.allowed(dataset_name):
Expand All @@ -149,19 +147,22 @@ def _process_dataset(dataset_path):
try:
if not hasattr(thread_local, "local_catalog"):
LOGGER.debug(
"Didn't find local_catalog in thread_local (%s), initializing new catalog"
"Didn't find local_catalog in thread_local (%s), initializing new catalog",
thread_local
)
thread_local.local_catalog = self.config.get_catalog()
# Try to load an Iceberg table. Might not contain one, this will be caught by NoSuchIcebergTableError.
start_ts = time()
table = thread_local.local_catalog.load_table(dataset_path)
self.report.report_table_load_time(time() - start_ts)
LOGGER.debug("Loaded table: %s", table)
return [*self._create_iceberg_workunit(dataset_name, table)]
except NoSuchIcebergTableError as e:
except Exception as e:
self.report.report_failure("general", f"Failed to create workunit: {e}")
LOGGER.exception(
f"Exception while processing table {dataset_path}, skipping it.",
)
return []

try:
start = time()
Expand All @@ -179,7 +180,7 @@ def _process_dataset(dataset_path):
len(datasets),
datasets[0] if len(datasets) > 0 else None,
)
datasets = datasets[:100]
# datasets = datasets[:100]

for wu in ThreadedIteratorExecutor.process(
worker_func=_process_dataset,
Expand Down

0 comments on commit 63457cc

Please sign in to comment.