Skip to content

Commit

Permalink
Revert "partition updates"
Browse files Browse the repository at this point in the history
This reverts commit 05dc0bb.
  • Loading branch information
acrylJonny committed Jan 17, 2025
1 parent 05dc0bb commit a653212
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,9 @@ class BigqueryTableConstraint:

@dataclass
class PartitionInfo:
partition_field: Optional[str] = None
fields: List[str] = field(default_factory=list)
partition_column: Optional[BigqueryColumn] = None
columns: List[BigqueryColumn] = field(default_factory=list)
field: str
# Data type is optional as we not have it when we set it from TimePartitioning
column: Optional[BigqueryColumn] = None
type: str = TimePartitioningType.DAY
expiration_ms: Optional[int] = None
require_partition_filter: bool = False
Expand All @@ -76,13 +74,8 @@ class PartitionInfo:
def from_time_partitioning(
cls, time_partitioning: TimePartitioning
) -> "PartitionInfo":
"""
Create PartitionInfo from a time-based partitioning configuration.
See https://cloud.google.com/bigquery/docs/partitioned-tables#date_timestamp_partitioned_tables
"""
return cls(
partition_field=time_partitioning.field or "_PARTITIONTIME",
fields=[time_partitioning.field or "_PARTITIONTIME"],
field=time_partitioning.field or "_PARTITIONTIME",
type=time_partitioning.type_,
expiration_ms=time_partitioning.expiration_ms,
require_partition_filter=time_partitioning.require_partition_filter,
Expand All @@ -92,40 +85,27 @@ def from_time_partitioning(
def from_range_partitioning(
cls, range_partitioning: Dict[str, Any]
) -> Optional["PartitionInfo"]:
"""
Create PartitionInfo from a range-based partitioning configuration.
See https://cloud.google.com/bigquery/docs/partitioned-tables#integer_range
"""
partition_field: Optional[str] = range_partitioning.get("field")
if not partition_field:
field: Optional[str] = range_partitioning.get("field")
if not field:
return None

return cls(
fields=[partition_field],
field=field,
type=RANGE_PARTITION_NAME,
)

@classmethod
def from_table_info(cls, table_info: TableListItem) -> Optional["PartitionInfo"]:
# Handle existing time partitioning
RANGE_PARTITIONING_KEY: str = "rangePartitioning"

if table_info.time_partitioning:
return cls.from_time_partitioning(table_info.time_partitioning)
elif "rangePartitioning" in table_info._properties:
return cls.from_range_partitioning(
table_info._properties["rangePartitioning"]
return PartitionInfo.from_time_partitioning(table_info.time_partitioning)
elif RANGE_PARTITIONING_KEY in table_info._properties:
return PartitionInfo.from_range_partitioning(
table_info._properties[RANGE_PARTITIONING_KEY]
)
# Add support for multiple partition columns
elif "partitioning" in table_info._properties:
fields = [
field["name"]
for field in table_info._properties.get("partitioning", {}).get(
"fields", []
)
]
if fields:
return cls(partition_field=fields[0], fields=fields)
else:
return None
return None


@dataclass
Expand All @@ -139,51 +119,10 @@ class BigqueryTable(BaseTable):
active_billable_bytes: Optional[int] = None
long_term_billable_bytes: Optional[int] = None
partition_info: Optional[PartitionInfo] = None
partition_details: Optional[Dict[str, Any]] = None
external: bool = False
constraints: List[BigqueryTableConstraint] = field(default_factory=list)
table_type: Optional[str] = None

@staticmethod
def _make_bigquery_table(
table: bigquery.Row, table_basic: Optional[TableListItem]
) -> "BigqueryTable":
try:
expiration = table_basic.expires if table_basic else None
except OverflowError:
logger.info(f"Invalid expiration time for table {table.table_name}.")
expiration = None

_, shard = BigqueryTableIdentifier.get_table_and_shard(table.table_name)

partition_info = None
partition_details = None
if table_basic:
partition_info = PartitionInfo.from_table_info(table_basic)
partition_details = table_basic._properties.get("partitioning")

return BigqueryTable(
name=table.table_name,
created=table.created,
table_type=table.table_type,
last_altered=parse_ts_millis(table.get("last_altered")),
size_in_bytes=table.get("bytes"),
rows_count=table.get("row_count"),
comment=table.comment,
ddl=table.ddl,
expires=expiration,
labels=table_basic.labels if table_basic else None,
partition_info=partition_info,
partition_details=partition_details,
clustering_fields=table_basic.clustering_fields if table_basic else None,
max_partition_id=table.get("max_partition_id"),
max_shard_id=shard,
num_partitions=table.get("num_partitions"),
active_billable_bytes=table.get("active_billable_bytes"),
long_term_billable_bytes=table.get("long_term_billable_bytes"),
external=(table.table_type == BigqueryTableType.EXTERNAL),
)


@dataclass
class BigqueryView(BaseView):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,11 +626,11 @@ def _process_table(

# If table has time partitioning, set the data type of the partitioning field
if table.partition_info:
table.partition_info.partition_column = next(
table.partition_info.column = next(
(
column
for column in columns
if column.name == table.partition_info.partition_field
if column.name == table.partition_info.field
),
None,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from datetime import datetime, timezone
from datetime import datetime
from typing import Dict, Iterable, List, Optional, Tuple, cast

from dateutil.relativedelta import relativedelta
Expand Down Expand Up @@ -75,107 +75,81 @@ def generate_partition_profiler_query(
"""
Method returns partition id if table is partitioned or sharded and generate custom partition query for
partitioned table.
Supports both legacy single-column partitioning and multi-column partitioning schemes.
See more about partitioned tables at https://cloud.google.com/bigquery/docs/partitioned-tables
"""
logger.debug(
f"generate partition profiler query for project: {project} schema: {schema} "
f"and table {table.name}, partition_datetime: {partition_datetime}"
f"generate partition profiler query for project: {project} schema: {schema} and table {table.name}, partition_datetime: {partition_datetime}"
)
partition = table.max_partition_id
if table.partition_info and partition:
partition_where_clauses: List[str] = []
partition_where_clause: str

# Handle legacy single column partitioning
if isinstance(table.partition_info.partition_field, str):
if table.partition_info.type == RANGE_PARTITION_NAME:
if table.partition_info.partition_column:
partition_where_clauses.append(
f"{table.partition_info.partition_column.name} >= {partition}"
)
else:
logger.warning(
f"Partitioned table {table.name} without partition column"
)
self.report.profiling_skipped_invalid_partition_ids[
f"{project}.{schema}.{table.name}"
] = partition
return None, None
if table.partition_info.type == RANGE_PARTITION_NAME:
if table.partition_info.column:
partition_where_clause = (
f"{table.partition_info.column.name} >= {partition}"
)
else:
logger.debug(
f"{table.name} is partitioned and partition column is {partition}"
logger.warning(
f"Partitioned table {table.name} without partition column"
)
try:
(
partition_datetime,
upper_bound_partition_datetime,
) = self.get_partition_range_from_partition_id(
partition, partition_datetime
)
except ValueError as e:
logger.error(
f"Unable to get partition range for partition id: {partition} it failed with exception {e}"
)
self.report.profiling_skipped_invalid_partition_ids[
f"{project}.{schema}.{table.name}"
] = partition
return None, None

partition_data_type: str = "TIMESTAMP"
partition_column_name = "_PARTITIONTIME"
if table.partition_info.partition_column:
partition_column_name = (
table.partition_info.partition_column.name
)
partition_data_type = (
table.partition_info.partition_column.data_type
)
if table.partition_info.type in ("HOUR", "DAY", "MONTH", "YEAR"):
partition_where_clauses.append(
f"`{partition_column_name}` BETWEEN {partition_data_type}('{partition_datetime}') "
f"AND {partition_data_type}('{upper_bound_partition_datetime}')"
)
else:
logger.warning(
f"Not supported partition type {table.partition_info.type}"
)
self.report.profiling_skipped_invalid_partition_type[
f"{project}.{schema}.{table.name}"
] = table.partition_info.type
return None, None
# Handle multiple partition columns
elif isinstance(table.partition_info.fields, list):
for field, column in zip(
table.partition_info.fields, table.partition_info.columns or []
):
if not column:
logger.warning(
f"Partitioned table {table.name} missing column info for {field}"
)
self.report.profiling_skipped_invalid_partition_ids[
f"{project}.{schema}.{table.name}"
] = field
return None, None
# For each partition column, add a filter condition using the current date
partition_datetime_value = partition_datetime or datetime.now(
timezone.utc
self.report.profiling_skipped_invalid_partition_ids[
f"{project}.{schema}.{table.name}"
] = partition
return None, None
else:
logger.debug(
f"{table.name} is partitioned and partition column is {partition}"
)
try:
(
partition_datetime,
upper_bound_partition_datetime,
) = self.get_partition_range_from_partition_id(
partition, partition_datetime
)
partition_where_clauses.append(
f"`{column.name}` = {column.data_type}('{partition_datetime_value}')"
except ValueError as e:
logger.error(
f"Unable to get partition range for partition id: {partition} it failed with exception {e}"
)
self.report.profiling_skipped_invalid_partition_ids[
f"{project}.{schema}.{table.name}"
] = partition
return None, None

if partition_where_clauses:
where_clause = " AND ".join(partition_where_clauses)
custom_sql = f"""
partition_data_type: str = "TIMESTAMP"
# Ingestion time partitioned tables has a pseudo column called _PARTITIONTIME
# See more about this at
# https://cloud.google.com/bigquery/docs/partitioned-tables#ingestion_time
partition_column_name = "_PARTITIONTIME"
if table.partition_info.column:
partition_column_name = table.partition_info.column.name
partition_data_type = table.partition_info.column.data_type
if table.partition_info.type in ("HOUR", "DAY", "MONTH", "YEAR"):
partition_where_clause = f"`{partition_column_name}` BETWEEN {partition_data_type}('{partition_datetime}') AND {partition_data_type}('{upper_bound_partition_datetime}')"
else:
logger.warning(
f"Not supported partition type {table.partition_info.type}"
)
self.report.profiling_skipped_invalid_partition_type[
f"{project}.{schema}.{table.name}"
] = table.partition_info.type
return None, None
custom_sql = """
SELECT
*
FROM
`{project}.{schema}.{table.name}`
`{table_catalog}.{table_schema}.{table_name}`
WHERE
{where_clause}
"""
return (partition, custom_sql)
{partition_where_clause}
""".format(
table_catalog=project,
table_schema=schema,
table_name=table.name,
partition_where_clause=partition_where_clause,
)

return (partition, custom_sql)
elif table.max_shard_id:
# For sharded table we want to get the partition id but not needed to generate custom query
return table.max_shard_id, None
Expand Down
Loading

0 comments on commit a653212

Please sign in to comment.