From 63457cce29945a0538627ad493c36c396dabefbe Mon Sep 17 00:00:00 2001 From: Piotr Skrydalewicz Date: Wed, 21 Aug 2024 14:54:54 +0200 Subject: [PATCH] Better logging --- .../src/datahub/ingestion/source/iceberg/iceberg.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py index a530246a91bec1..923f3872686b7a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py @@ -6,7 +6,6 @@ from typing import Any, Dict, Iterable, List, Optional from pyiceberg.catalog import Catalog -from pyiceberg.exceptions import NoSuchIcebergTableError from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit from pyiceberg.table import Table from pyiceberg.typedef import Identifier @@ -137,9 +136,8 @@ def _get_datasets(self, catalog: Catalog) -> Iterable[Identifier]: yield from catalog.list_tables(namespace) def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: - thread_local = threading.local() - def _process_dataset(dataset_path): + thread_local = threading.local() LOGGER.debug("Processing dataset for path %s", dataset_path) dataset_name = ".".join(dataset_path) if not self.config.table_pattern.allowed(dataset_name): @@ -149,19 +147,22 @@ def _process_dataset(dataset_path): try: if not hasattr(thread_local, "local_catalog"): LOGGER.debug( - "Didn't find local_catalog in thread_local (%s), initializing new catalog" + "Didn't find local_catalog in thread_local (%s), initializing new catalog", + thread_local ) thread_local.local_catalog = self.config.get_catalog() # Try to load an Iceberg table. Might not contain one, this will be caught by NoSuchIcebergTableError. start_ts = time() table = thread_local.local_catalog.load_table(dataset_path) self.report.report_table_load_time(time() - start_ts) + LOGGER.debug("Loaded table: %s", table) return [*self._create_iceberg_workunit(dataset_name, table)] - except NoSuchIcebergTableError as e: + except Exception as e: self.report.report_failure("general", f"Failed to create workunit: {e}") LOGGER.exception( f"Exception while processing table {dataset_path}, skipping it.", ) + return [] try: start = time() @@ -179,7 +180,7 @@ def _process_dataset(dataset_path): len(datasets), datasets[0] if len(datasets) > 0 else None, ) - datasets = datasets[:100] + # datasets = datasets[:100] for wu in ThreadedIteratorExecutor.process( worker_func=_process_dataset,