diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index 3e5c2e2934e7d7..cbe1f6eb978247 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -62,11 +62,9 @@ class BigqueryTableConstraint: @dataclass class PartitionInfo: - partition_field: Optional[str] = None - fields: List[str] = field(default_factory=list) - partition_column: Optional[BigqueryColumn] = None - columns: List[BigqueryColumn] = field(default_factory=list) + field: str # Data type is optional as we not have it when we set it from TimePartitioning + column: Optional[BigqueryColumn] = None type: str = TimePartitioningType.DAY expiration_ms: Optional[int] = None require_partition_filter: bool = False @@ -76,13 +74,8 @@ class PartitionInfo: def from_time_partitioning( cls, time_partitioning: TimePartitioning ) -> "PartitionInfo": - """ - Create PartitionInfo from a time-based partitioning configuration. - See https://cloud.google.com/bigquery/docs/partitioned-tables#date_timestamp_partitioned_tables - """ return cls( - partition_field=time_partitioning.field or "_PARTITIONTIME", - fields=[time_partitioning.field or "_PARTITIONTIME"], + field=time_partitioning.field or "_PARTITIONTIME", type=time_partitioning.type_, expiration_ms=time_partitioning.expiration_ms, require_partition_filter=time_partitioning.require_partition_filter, @@ -92,40 +85,27 @@ def from_time_partitioning( def from_range_partitioning( cls, range_partitioning: Dict[str, Any] ) -> Optional["PartitionInfo"]: - """ - Create PartitionInfo from a range-based partitioning configuration. - See https://cloud.google.com/bigquery/docs/partitioned-tables#integer_range - """ - partition_field: Optional[str] = range_partitioning.get("field") - if not partition_field: + field: Optional[str] = range_partitioning.get("field") + if not field: return None return cls( - fields=[partition_field], + field=field, type=RANGE_PARTITION_NAME, ) @classmethod def from_table_info(cls, table_info: TableListItem) -> Optional["PartitionInfo"]: - # Handle existing time partitioning + RANGE_PARTITIONING_KEY: str = "rangePartitioning" + if table_info.time_partitioning: - return cls.from_time_partitioning(table_info.time_partitioning) - elif "rangePartitioning" in table_info._properties: - return cls.from_range_partitioning( - table_info._properties["rangePartitioning"] + return PartitionInfo.from_time_partitioning(table_info.time_partitioning) + elif RANGE_PARTITIONING_KEY in table_info._properties: + return PartitionInfo.from_range_partitioning( + table_info._properties[RANGE_PARTITIONING_KEY] ) - # Add support for multiple partition columns - elif "partitioning" in table_info._properties: - fields = [ - field["name"] - for field in table_info._properties.get("partitioning", {}).get( - "fields", [] - ) - ] - if fields: - return cls(partition_field=fields[0], fields=fields) + else: return None - return None @dataclass @@ -139,51 +119,10 @@ class BigqueryTable(BaseTable): active_billable_bytes: Optional[int] = None long_term_billable_bytes: Optional[int] = None partition_info: Optional[PartitionInfo] = None - partition_details: Optional[Dict[str, Any]] = None external: bool = False constraints: List[BigqueryTableConstraint] = field(default_factory=list) table_type: Optional[str] = None - @staticmethod - def _make_bigquery_table( - table: bigquery.Row, table_basic: Optional[TableListItem] - ) -> "BigqueryTable": - try: - expiration = table_basic.expires if table_basic else None - except OverflowError: - logger.info(f"Invalid expiration time for table {table.table_name}.") - expiration = None - - _, shard = BigqueryTableIdentifier.get_table_and_shard(table.table_name) - - partition_info = None - partition_details = None - if table_basic: - partition_info = PartitionInfo.from_table_info(table_basic) - partition_details = table_basic._properties.get("partitioning") - - return BigqueryTable( - name=table.table_name, - created=table.created, - table_type=table.table_type, - last_altered=parse_ts_millis(table.get("last_altered")), - size_in_bytes=table.get("bytes"), - rows_count=table.get("row_count"), - comment=table.comment, - ddl=table.ddl, - expires=expiration, - labels=table_basic.labels if table_basic else None, - partition_info=partition_info, - partition_details=partition_details, - clustering_fields=table_basic.clustering_fields if table_basic else None, - max_partition_id=table.get("max_partition_id"), - max_shard_id=shard, - num_partitions=table.get("num_partitions"), - active_billable_bytes=table.get("active_billable_bytes"), - long_term_billable_bytes=table.get("long_term_billable_bytes"), - external=(table.table_type == BigqueryTableType.EXTERNAL), - ) - @dataclass class BigqueryView(BaseView): diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py index ae06ebddfde2c3..56e930dfb811f1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py @@ -626,11 +626,11 @@ def _process_table( # If table has time partitioning, set the data type of the partitioning field if table.partition_info: - table.partition_info.partition_column = next( + table.partition_info.column = next( ( column for column in columns - if column.name == table.partition_info.partition_field + if column.name == table.partition_info.field ), None, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py index 31976692fe8fb9..182ae2265cb162 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py @@ -1,5 +1,5 @@ import logging -from datetime import datetime, timezone +from datetime import datetime from typing import Dict, Iterable, List, Optional, Tuple, cast from dateutil.relativedelta import relativedelta @@ -75,107 +75,81 @@ def generate_partition_profiler_query( """ Method returns partition id if table is partitioned or sharded and generate custom partition query for partitioned table. - Supports both legacy single-column partitioning and multi-column partitioning schemes. See more about partitioned tables at https://cloud.google.com/bigquery/docs/partitioned-tables """ logger.debug( - f"generate partition profiler query for project: {project} schema: {schema} " - f"and table {table.name}, partition_datetime: {partition_datetime}" + f"generate partition profiler query for project: {project} schema: {schema} and table {table.name}, partition_datetime: {partition_datetime}" ) partition = table.max_partition_id if table.partition_info and partition: - partition_where_clauses: List[str] = [] + partition_where_clause: str - # Handle legacy single column partitioning - if isinstance(table.partition_info.partition_field, str): - if table.partition_info.type == RANGE_PARTITION_NAME: - if table.partition_info.partition_column: - partition_where_clauses.append( - f"{table.partition_info.partition_column.name} >= {partition}" - ) - else: - logger.warning( - f"Partitioned table {table.name} without partition column" - ) - self.report.profiling_skipped_invalid_partition_ids[ - f"{project}.{schema}.{table.name}" - ] = partition - return None, None + if table.partition_info.type == RANGE_PARTITION_NAME: + if table.partition_info.column: + partition_where_clause = ( + f"{table.partition_info.column.name} >= {partition}" + ) else: - logger.debug( - f"{table.name} is partitioned and partition column is {partition}" + logger.warning( + f"Partitioned table {table.name} without partition column" ) - try: - ( - partition_datetime, - upper_bound_partition_datetime, - ) = self.get_partition_range_from_partition_id( - partition, partition_datetime - ) - except ValueError as e: - logger.error( - f"Unable to get partition range for partition id: {partition} it failed with exception {e}" - ) - self.report.profiling_skipped_invalid_partition_ids[ - f"{project}.{schema}.{table.name}" - ] = partition - return None, None - - partition_data_type: str = "TIMESTAMP" - partition_column_name = "_PARTITIONTIME" - if table.partition_info.partition_column: - partition_column_name = ( - table.partition_info.partition_column.name - ) - partition_data_type = ( - table.partition_info.partition_column.data_type - ) - if table.partition_info.type in ("HOUR", "DAY", "MONTH", "YEAR"): - partition_where_clauses.append( - f"`{partition_column_name}` BETWEEN {partition_data_type}('{partition_datetime}') " - f"AND {partition_data_type}('{upper_bound_partition_datetime}')" - ) - else: - logger.warning( - f"Not supported partition type {table.partition_info.type}" - ) - self.report.profiling_skipped_invalid_partition_type[ - f"{project}.{schema}.{table.name}" - ] = table.partition_info.type - return None, None - # Handle multiple partition columns - elif isinstance(table.partition_info.fields, list): - for field, column in zip( - table.partition_info.fields, table.partition_info.columns or [] - ): - if not column: - logger.warning( - f"Partitioned table {table.name} missing column info for {field}" - ) - self.report.profiling_skipped_invalid_partition_ids[ - f"{project}.{schema}.{table.name}" - ] = field - return None, None - # For each partition column, add a filter condition using the current date - partition_datetime_value = partition_datetime or datetime.now( - timezone.utc + self.report.profiling_skipped_invalid_partition_ids[ + f"{project}.{schema}.{table.name}" + ] = partition + return None, None + else: + logger.debug( + f"{table.name} is partitioned and partition column is {partition}" + ) + try: + ( + partition_datetime, + upper_bound_partition_datetime, + ) = self.get_partition_range_from_partition_id( + partition, partition_datetime ) - partition_where_clauses.append( - f"`{column.name}` = {column.data_type}('{partition_datetime_value}')" + except ValueError as e: + logger.error( + f"Unable to get partition range for partition id: {partition} it failed with exception {e}" ) + self.report.profiling_skipped_invalid_partition_ids[ + f"{project}.{schema}.{table.name}" + ] = partition + return None, None - if partition_where_clauses: - where_clause = " AND ".join(partition_where_clauses) - custom_sql = f""" + partition_data_type: str = "TIMESTAMP" + # Ingestion time partitioned tables has a pseudo column called _PARTITIONTIME + # See more about this at + # https://cloud.google.com/bigquery/docs/partitioned-tables#ingestion_time + partition_column_name = "_PARTITIONTIME" + if table.partition_info.column: + partition_column_name = table.partition_info.column.name + partition_data_type = table.partition_info.column.data_type + if table.partition_info.type in ("HOUR", "DAY", "MONTH", "YEAR"): + partition_where_clause = f"`{partition_column_name}` BETWEEN {partition_data_type}('{partition_datetime}') AND {partition_data_type}('{upper_bound_partition_datetime}')" + else: + logger.warning( + f"Not supported partition type {table.partition_info.type}" + ) + self.report.profiling_skipped_invalid_partition_type[ + f"{project}.{schema}.{table.name}" + ] = table.partition_info.type + return None, None + custom_sql = """ SELECT * FROM - `{project}.{schema}.{table.name}` + `{table_catalog}.{table_schema}.{table_name}` WHERE - {where_clause} - """ - return (partition, custom_sql) + {partition_where_clause} + """.format( + table_catalog=project, + table_schema=schema, + table_name=table.name, + partition_where_clause=partition_where_clause, + ) + return (partition, custom_sql) elif table.max_shard_id: # For sharded table we want to get the partition id but not needed to generate custom query return table.max_shard_id, None diff --git a/metadata-ingestion/tests/unit/bigquery/test_bigquery_profiler.py b/metadata-ingestion/tests/unit/bigquery/test_bigquery_profiler.py index 1ac62306b0feda..fb5133b24474c2 100644 --- a/metadata-ingestion/tests/unit/bigquery/test_bigquery_profiler.py +++ b/metadata-ingestion/tests/unit/bigquery/test_bigquery_profiler.py @@ -41,9 +41,7 @@ def test_generate_day_partitioned_partition_profiler_query(): comment=None, is_nullable=False, ) - partition_info = PartitionInfo( - type="DAY", partition_field="date", partition_column=column - ) + partition_info = PartitionInfo(type="DAY", field="date", column=column) profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report()) test_table = BigqueryTable( name="test_table", @@ -86,9 +84,7 @@ def test_generate_day_partitioned_partition_profiler_query_with_set_partition_ti comment=None, is_nullable=False, ) - partition_info = PartitionInfo( - type="DAY", partition_field="date", partition_column=column - ) + partition_info = PartitionInfo(type="DAY", field="date", column=column) profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report()) test_table = BigqueryTable( name="test_table", @@ -130,9 +126,7 @@ def test_generate_hour_partitioned_partition_profiler_query(): comment=None, is_nullable=False, ) - partition_info = PartitionInfo( - type="DAY", partition_field="date", partition_column=column - ) + partition_info = PartitionInfo(type="DAY", field="date", column=column) profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report()) test_table = BigqueryTable( name="test_table", @@ -166,7 +160,7 @@ def test_generate_hour_partitioned_partition_profiler_query(): # Ingestion partitioned tables do not have partition column in the schema as it uses a psudo column _PARTITIONTIME to partition def test_generate_ingestion_partitioned_partition_profiler_query(): - partition_info = PartitionInfo(type="DAY", partition_field="date") + partition_info = PartitionInfo(type="DAY", field="date") profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report()) test_table = BigqueryTable( name="test_table", @@ -216,87 +210,3 @@ def test_generate_sharded_table_profiler_query(): assert "20200101" == query[0] assert query[1] is None - - -def test_generate_multiple_partition_columns_profiler_query(): - columns = [ - BigqueryColumn( - name="year", - field_path="year", - ordinal_position=1, - data_type="INT64", - is_partition_column=True, - cluster_column_position=None, - comment=None, - is_nullable=False, - ), - BigqueryColumn( - name="month", - field_path="month", - ordinal_position=2, - data_type="INT64", - is_partition_column=True, - cluster_column_position=None, - comment=None, - is_nullable=False, - ), - BigqueryColumn( - name="feedhandler", - field_path="feedhandler", - ordinal_position=3, - data_type="STRING", - is_partition_column=True, - cluster_column_position=None, - comment=None, - is_nullable=False, - ), - BigqueryColumn( - name="region", - field_path="region", - ordinal_position=4, - data_type="STRING", - is_partition_column=True, - cluster_column_position=None, - comment=None, - is_nullable=False, - ), - ] - - partition_info = PartitionInfo( - partition_field=None, # This ensures we use the multi-column path - fields=["year", "month", "feedhandler", "region"], - columns=columns, - ) - - profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report()) - test_table = BigqueryTable( - name="test_multi_partition_table", - comment="test_comment", - rows_count=1, - size_in_bytes=1, - last_altered=datetime.now(timezone.utc), - created=datetime.now(timezone.utc), - partition_info=partition_info, - max_partition_id="20200101", - ) - - test_datetime = datetime(2020, 1, 1, tzinfo=timezone.utc) - query = profiler.generate_partition_profiler_query( - project="test_project", - schema="test_dataset", - table=test_table, - partition_datetime=test_datetime, - ) - - expected_query = """ -SELECT - * -FROM - `test_project.test_dataset.test_multi_partition_table` -WHERE - `year` = INT64('2020-01-01 00:00:00+00:00') AND `month` = INT64('2020-01-01 00:00:00+00:00') AND `feedhandler` = STRING('2020-01-01 00:00:00+00:00') AND `region` = STRING('2020-01-01 00:00:00+00:00') -""".strip() - - assert "20200101" == query[0] - assert query[1] is not None - assert expected_query == query[1].strip()