Skip to content

Commit

Permalink
Linting
Browse files Browse the repository at this point in the history
  • Loading branch information
skrydal committed Aug 21, 2024
1 parent b04324d commit a7c842f
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 7 deletions.
25 changes: 19 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,10 @@ def _process_dataset(dataset_path):
self.report.report_dropped(dataset_name)
return
try:
if not hasattr(thread_local, 'local_catalog'):
LOGGER.debug("Didn't find local_catalog in thread_local (%s), initializing new catalog")
if not hasattr(thread_local, "local_catalog"):
LOGGER.debug(
"Didn't find local_catalog in thread_local (%s), initializing new catalog"
)
thread_local.local_catalog = self.config.get_catalog()
# Try to load an Iceberg table. Might not contain one, this will be caught by NoSuchIcebergTableError.
start_ts = time()
Expand All @@ -172,8 +174,11 @@ def _process_dataset(dataset_path):

LOGGER.debug("Retrieving list of datasets in the catalog")
datasets = [*self._get_datasets(catalog)]
LOGGER.debug("Retrieved %s datasets, starting with: %s", len(datasets),
datasets[0] if len(datasets) > 0 else None)
LOGGER.debug(
"Retrieved %s datasets, starting with: %s",
len(datasets),
datasets[0] if len(datasets) > 0 else None,
)
datasets = datasets[:100]

for wu in ThreadedIteratorExecutor.process(
Expand Down Expand Up @@ -219,13 +224,21 @@ def _create_iceberg_workunit(
# Dataset ownership aspect.
dataset_ownership = self._get_ownership_aspect(table)
if dataset_ownership:
LOGGER.debug("Adding ownership: %s to the dataset %s", dataset_ownership, dataset_name)
LOGGER.debug(
"Adding ownership: %s to the dataset %s",
dataset_ownership,
dataset_name,
)
dataset_snapshot.aspects.append(dataset_ownership)

LOGGER.debug("Attempting to process schema of dataset %s", dataset_name)
schema_metadata = self._create_schema_metadata(dataset_name, table)
dataset_snapshot.aspects.append(schema_metadata)
LOGGER.debug("Processed schema of dataset %s, number of fields: %s", dataset_name, len(schema_metadata.fields))
LOGGER.debug(
"Processed schema of dataset %s, number of fields: %s",
dataset_name,
len(schema_metadata.fields),
)

mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
self.report.report_table_processing_time(time() - start_ts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ def get_catalog(self) -> Catalog:

# Retrieve the dict associated with the one catalog entry
catalog_name, catalog_config = next(iter(self.catalog.items()))
logger.debug("Initializing the catalog %s with config: %s", catalog_name, catalog_config)
logger.debug(
"Initializing the catalog %s with config: %s", catalog_name, catalog_config
)
return load_catalog(name=catalog_name, **catalog_config)


Expand Down

0 comments on commit a7c842f

Please sign in to comment.