Skip to content

Commit

Permalink
Build DataflowPlan for custom offset window (#1584)
Browse files Browse the repository at this point in the history
This is the dataflow plan that will be used if the custom grain is
queried with any grains that aren't the same as the grain used in the
offset window. There will be a simpler dataflow plan used for queries
using only the same grain as what's used in the offset window.
  • Loading branch information
courtneyholcomb authored Jan 23, 2025
1 parent 6aad757 commit 2c3a85f
Show file tree
Hide file tree
Showing 236 changed files with 1,081 additions and 618 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241218-133743.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Support for custom offset windows.
time: 2024-12-18T13:37:43.23915-08:00
custom:
Author: courtneyholcomb
Issue: "1584"
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,10 @@ class JoinToTimeSpineDescription:
join_type: SqlJoinType
offset_window: Optional[MetricTimeWindow]
offset_to_grain: Optional[TimeGranularity]

@property
def standard_offset_window(self) -> Optional[MetricTimeWindow]:
"""Return the standard offset window if it is a standard granularity."""
if self.offset_window and self.offset_window.is_standard_granularity:
return self.offset_window
return None
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Optional

from dbt_semantic_interfaces.implementations.metric import PydanticMetricTimeWindow
from dbt_semantic_interfaces.protocols.metric import MetricTimeWindow
from dbt_semantic_interfaces.references import MetricReference
from dbt_semantic_interfaces.type_enums import TimeGranularity

Expand Down Expand Up @@ -67,3 +68,10 @@ def without_filter_specs(self) -> MetricSpec: # noqa: D102
offset_window=self.offset_window,
offset_to_grain=self.offset_to_grain,
)

@property
def standard_offset_window(self) -> Optional[MetricTimeWindow]:
"""Return the offset window if it exists and uses a standard granularity."""
if self.offset_window and self.offset_window.is_standard_granularity:
return self.offset_window
return None
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,7 @@ def test_min_queryable_time_granularity_for_different_agg_time_grains( # noqa:
def test_custom_offset_window_for_metric(
simple_semantic_manifest_lookup: SemanticManifestLookup,
) -> None:
"""Test offset window with custom grain supplied.
TODO: As of now, the functionality of an offset window with a custom grain is not supported in MF.
This test is added to show that at least the parsing is successful using a custom grain offset window.
Once support for that is added in MF + relevant tests, this test can be removed.
"""
"""Test offset window with custom grain supplied."""
metric = simple_semantic_manifest_lookup.metric_lookup.get_metric(MetricReference("bookings_offset_martian_day"))

assert len(metric.input_metrics) == 1
Expand Down
92 changes: 63 additions & 29 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
from metricflow.dataflow.nodes.join_to_time_spine import JoinToTimeSpineNode
from metricflow.dataflow.nodes.metric_time_transform import MetricTimeDimensionTransformNode
from metricflow.dataflow.nodes.min_max import MinMaxNode
from metricflow.dataflow.nodes.offset_by_custom_granularity import OffsetByCustomGranularityNode
from metricflow.dataflow.nodes.order_by_limit import OrderByLimitNode
from metricflow.dataflow.nodes.read_sql_source import ReadSqlSourceNode
from metricflow.dataflow.nodes.semi_additive_join import SemiAdditiveJoinNode
Expand Down Expand Up @@ -661,13 +662,16 @@ def _build_derived_metric_output_node(
)
if metric_spec.has_time_offset and queried_agg_time_dimension_specs:
# TODO: move this to a helper method
time_spine_node = self._build_time_spine_node(queried_agg_time_dimension_specs)
time_spine_node = self._build_time_spine_node(
queried_time_spine_specs=queried_agg_time_dimension_specs,
offset_window=metric_spec.offset_window,
)
output_node = JoinToTimeSpineNode.create(
metric_source_node=output_node,
time_spine_node=time_spine_node,
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
join_on_time_dimension_spec=self._sort_by_base_granularity(queried_agg_time_dimension_specs)[0],
offset_window=metric_spec.offset_window,
standard_offset_window=metric_spec.standard_offset_window,
offset_to_grain=metric_spec.offset_to_grain,
join_type=SqlJoinType.INNER,
)
Expand Down Expand Up @@ -1668,13 +1672,16 @@ def _build_aggregated_measure_from_measure_source_node(
required_time_spine_specs = base_queried_agg_time_dimension_specs
if join_on_time_dimension_spec not in required_time_spine_specs:
required_time_spine_specs = (join_on_time_dimension_spec,) + required_time_spine_specs
time_spine_node = self._build_time_spine_node(required_time_spine_specs)
time_spine_node = self._build_time_spine_node(
queried_time_spine_specs=required_time_spine_specs,
offset_window=before_aggregation_time_spine_join_description.offset_window,
)
unaggregated_measure_node = JoinToTimeSpineNode.create(
metric_source_node=unaggregated_measure_node,
time_spine_node=time_spine_node,
requested_agg_time_dimension_specs=base_queried_agg_time_dimension_specs,
join_on_time_dimension_spec=join_on_time_dimension_spec,
offset_window=before_aggregation_time_spine_join_description.offset_window,
standard_offset_window=(before_aggregation_time_spine_join_description.standard_offset_window),
offset_to_grain=before_aggregation_time_spine_join_description.offset_to_grain,
join_type=before_aggregation_time_spine_join_description.join_type,
)
Expand Down Expand Up @@ -1881,6 +1888,7 @@ def _build_time_spine_node(
queried_time_spine_specs: Sequence[TimeDimensionSpec],
where_filter_specs: Sequence[WhereFilterSpec] = (),
time_range_constraint: Optional[TimeRangeConstraint] = None,
offset_window: Optional[MetricTimeWindow] = None,
) -> DataflowPlanNode:
"""Return the time spine node needed to satisfy the specs."""
required_time_spine_spec_set = self.__get_required_linkable_specs(
Expand All @@ -1889,39 +1897,65 @@ def _build_time_spine_node(
)
required_time_spine_specs = required_time_spine_spec_set.time_dimension_specs

# TODO: support multiple time spines here. Build node on the one with the smallest base grain.
# Then, pass custom_granularity_specs into _build_pre_aggregation_plan if they aren't satisfied by smallest time spine.
time_spine_source = self._choose_time_spine_source(required_time_spine_specs)
read_node = self._choose_time_spine_read_node(time_spine_source)
time_spine_data_set = self._node_data_set_resolver.get_output_data_set(read_node)

# Change the column aliases to match the specs that were requested in the query.
time_spine_node = AliasSpecsNode.create(
parent_node=read_node,
change_specs=tuple(
SpecToAlias(
input_spec=time_spine_data_set.instance_from_time_dimension_grain_and_date_part(
time_granularity_name=required_spec.time_granularity_name, date_part=required_spec.date_part
).spec,
output_spec=required_spec,
)
for required_spec in required_time_spine_specs
),
)

# If the base grain of the time spine isn't selected, it will have duplicate rows that need deduping.
should_dedupe = ExpandedTimeGranularity.from_time_granularity(time_spine_source.base_granularity) not in {
spec.time_granularity for spec in queried_time_spine_specs
}
should_dedupe = False
filter_to_specs = tuple(queried_time_spine_specs)
if offset_window and not offset_window.is_standard_granularity:
time_spine_node = self.build_custom_offset_time_spine_node(
offset_window=offset_window, required_time_spine_specs=required_time_spine_specs
)
filter_to_specs = self._node_data_set_resolver.get_output_data_set(
time_spine_node
).instance_set.spec_set.time_dimension_specs
else:
# For simpler time spine queries, choose the appropriate time spine node and apply requested aliases.
time_spine_source = self._choose_time_spine_source(required_time_spine_specs)
# TODO: support multiple time spines here. Build node on the one with the smallest base grain.
# Then, pass custom_granularity_specs into _build_pre_aggregation_plan if they aren't satisfied by smallest time spine.
read_node = self._choose_time_spine_read_node(time_spine_source)
time_spine_data_set = self._node_data_set_resolver.get_output_data_set(read_node)
# Change the column aliases to match the specs that were requested in the query.
time_spine_node = AliasSpecsNode.create(
parent_node=read_node,
change_specs=tuple(
SpecToAlias(
input_spec=time_spine_data_set.instance_from_time_dimension_grain_and_date_part(
time_granularity_name=required_spec.time_granularity_name, date_part=required_spec.date_part
).spec,
output_spec=required_spec,
)
for required_spec in required_time_spine_specs
),
)
# If the base grain of the time spine isn't selected, it will have duplicate rows that need deduping.
should_dedupe = ExpandedTimeGranularity.from_time_granularity(time_spine_source.base_granularity) not in {
spec.time_granularity for spec in queried_time_spine_specs
}

return self._build_pre_aggregation_plan(
source_node=time_spine_node,
filter_to_specs=InstanceSpecSet(time_dimension_specs=tuple(queried_time_spine_specs)),
filter_to_specs=InstanceSpecSet(time_dimension_specs=filter_to_specs),
time_range_constraint=time_range_constraint,
where_filter_specs=where_filter_specs,
distinct=should_dedupe,
)

def build_custom_offset_time_spine_node(
self, offset_window: MetricTimeWindow, required_time_spine_specs: Tuple[TimeDimensionSpec, ...]
) -> DataflowPlanNode:
"""Builds an OffsetByCustomGranularityNode used for custom offset windows."""
# Build time spine node that offsets agg time dimensions by a custom grain.
custom_grain = self._semantic_model_lookup._custom_granularities[offset_window.granularity]
time_spine_source = self._choose_time_spine_source((DataSet.metric_time_dimension_spec(custom_grain),))
time_spine_read_node = self._choose_time_spine_read_node(time_spine_source)
if {spec.time_granularity for spec in required_time_spine_specs} == {custom_grain}:
# TODO: If querying with only the same grain as is used in the offset_window, can use a simpler plan.
pass
return OffsetByCustomGranularityNode.create(
time_spine_node=time_spine_read_node,
offset_window=offset_window,
required_time_spine_specs=required_time_spine_specs,
)

def _sort_by_base_granularity(self, time_dimension_specs: Sequence[TimeDimensionSpec]) -> List[TimeDimensionSpec]:
"""Sort the time dimensions by their base granularity.
Expand Down
25 changes: 15 additions & 10 deletions metricflow/dataflow/nodes/join_to_time_spine.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class JoinToTimeSpineNode(DataflowPlanNode, ABC):
requested_agg_time_dimension_specs: Time dimensions requested in the query.
join_type: Join type to use when joining to time spine.
join_on_time_dimension_spec: The time dimension to use in the join ON condition.
offset_window: Time window to offset the parent dataset by when joining to time spine.
standard_offset_window: Time window to offset the parent dataset by when joining to time spine.
Only standard granularities are accepted for standard_offset_window in this node.
offset_to_grain: Granularity period to offset the parent dataset to when joining to time spine.
"""

Expand All @@ -33,18 +34,22 @@ class JoinToTimeSpineNode(DataflowPlanNode, ABC):
requested_agg_time_dimension_specs: Sequence[TimeDimensionSpec]
join_on_time_dimension_spec: TimeDimensionSpec
join_type: SqlJoinType
offset_window: Optional[MetricTimeWindow]
standard_offset_window: Optional[MetricTimeWindow]
offset_to_grain: Optional[TimeGranularity]

def __post_init__(self) -> None: # noqa: D105
super().__post_init__()

assert not (
self.offset_window and self.offset_to_grain
), "Can't set both offset_window and offset_to_grain when joining to time spine. Choose one or the other."
self.standard_offset_window and self.offset_to_grain
), "Can't set both standard_offset_window and offset_to_grain when joining to time spine. Choose one or the other."
assert (
len(self.requested_agg_time_dimension_specs) > 0
), "Must have at least one value in requested_agg_time_dimension_specs for JoinToTimeSpineNode."
if self.standard_offset_window and not self.standard_offset_window.is_standard_granularity:
raise RuntimeError(
f"JoinToTimeSpineNode should not accept a custom standard_offset_window. Got: {self.standard_offset_window}"
)

@staticmethod
def create( # noqa: D102
Expand All @@ -53,7 +58,7 @@ def create( # noqa: D102
requested_agg_time_dimension_specs: Sequence[TimeDimensionSpec],
join_on_time_dimension_spec: TimeDimensionSpec,
join_type: SqlJoinType,
offset_window: Optional[MetricTimeWindow] = None,
standard_offset_window: Optional[MetricTimeWindow] = None,
offset_to_grain: Optional[TimeGranularity] = None,
) -> JoinToTimeSpineNode:
return JoinToTimeSpineNode(
Expand All @@ -63,7 +68,7 @@ def create( # noqa: D102
requested_agg_time_dimension_specs=tuple(requested_agg_time_dimension_specs),
join_on_time_dimension_spec=join_on_time_dimension_spec,
join_type=join_type,
offset_window=offset_window,
standard_offset_window=standard_offset_window,
offset_to_grain=offset_to_grain,
)

Expand All @@ -85,16 +90,16 @@ def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102
DisplayedProperty("join_on_time_dimension_spec", self.join_on_time_dimension_spec),
DisplayedProperty("join_type", self.join_type),
)
if self.offset_window:
props += (DisplayedProperty("offset_window", self.offset_window),)
if self.standard_offset_window:
props += (DisplayedProperty("standard_offset_window", self.standard_offset_window),)
if self.offset_to_grain:
props += (DisplayedProperty("offset_to_grain", self.offset_to_grain),)
return props

def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D102
return (
isinstance(other_node, self.__class__)
and other_node.offset_window == self.offset_window
and other_node.standard_offset_window == self.standard_offset_window
and other_node.offset_to_grain == self.offset_to_grain
and other_node.requested_agg_time_dimension_specs == self.requested_agg_time_dimension_specs
and other_node.join_on_time_dimension_spec == self.join_on_time_dimension_spec
Expand All @@ -107,7 +112,7 @@ def with_new_parents(self, new_parent_nodes: Sequence[DataflowPlanNode]) -> Join
metric_source_node=self.metric_source_node,
time_spine_node=self.time_spine_node,
requested_agg_time_dimension_specs=self.requested_agg_time_dimension_specs,
offset_window=self.offset_window,
standard_offset_window=self.standard_offset_window,
offset_to_grain=self.offset_to_grain,
join_type=self.join_type,
join_on_time_dimension_spec=self.join_on_time_dimension_spec,
Expand Down
Loading

0 comments on commit 2c3a85f

Please sign in to comment.