Skip to content

Commit

Permalink
Add dataflow to SQL test
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Jan 23, 2025
1 parent e8a3d58 commit 0938ede
Show file tree
Hide file tree
Showing 5 changed files with 384 additions and 2 deletions.
5 changes: 3 additions & 2 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1900,7 +1900,7 @@ def _build_time_spine_node(
should_dedupe = False
filter_to_specs = tuple(queried_time_spine_specs)
if offset_window and not offset_window.is_standard_granularity:
time_spine_node = self._build_custom_offset_time_spine_node(
time_spine_node = self.build_custom_offset_time_spine_node(
offset_window=offset_window, required_time_spine_specs=required_time_spine_specs
)
filter_to_specs = self._node_data_set_resolver.get_output_data_set(
Expand Down Expand Up @@ -1939,9 +1939,10 @@ def _build_time_spine_node(
distinct=should_dedupe,
)

def _build_custom_offset_time_spine_node(
def build_custom_offset_time_spine_node(
self, offset_window: MetricTimeWindow, required_time_spine_specs: Tuple[TimeDimensionSpec, ...]
) -> DataflowPlanNode:
"""Builds an OffsetByCustomGranularityNode used for custom offset windows."""
# Build time spine node that offsets agg time dimensions by a custom grain.
custom_grain = self._semantic_model_lookup._custom_granularities[offset_window.granularity]
time_spine_source = self._choose_time_spine_source((DataSet.metric_time_dimension_spec(custom_grain),))
Expand Down
28 changes: 28 additions & 0 deletions tests_metricflow/plan_conversion/test_dataflow_to_sql_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pytest
from _pytest.fixtures import FixtureRequest
from dbt_semantic_interfaces.implementations.metric import PydanticMetricTimeWindow
from dbt_semantic_interfaces.references import EntityReference, SemanticModelReference, TimeDimensionReference
from dbt_semantic_interfaces.test_utils import as_datetime
from dbt_semantic_interfaces.type_enums.aggregation_type import AggregationType
Expand Down Expand Up @@ -47,6 +48,7 @@
from metricflow.dataflow.nodes.semi_additive_join import SemiAdditiveJoinNode
from metricflow.dataflow.nodes.where_filter import WhereConstraintNode
from metricflow.dataflow.nodes.write_to_data_table import WriteToResultDataTableNode
from metricflow.dataset.dataset_classes import DataSet
from metricflow.plan_conversion.dataflow_to_sql import DataflowToSqlPlanConverter
from metricflow.protocols.sql_client import SqlClient
from metricflow.sql.optimizer.optimization_levels import SqlOptimizationLevel
Expand Down Expand Up @@ -707,6 +709,32 @@ def test_order_by_node(
)


@pytest.mark.sql_engine_snapshot
@pytest.mark.duckdb_only
def test_offset_by_custom_granularity_node( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
mf_engine_test_fixture_mapping: Mapping[SemanticManifestSetup, MetricFlowEngineTestFixture],
dataflow_to_sql_converter: DataflowToSqlPlanConverter,
dataflow_plan_builder: DataflowPlanBuilder,
sql_client: SqlClient,
) -> None:
offset_by_custom_granularity_node = dataflow_plan_builder.build_custom_offset_time_spine_node(
offset_window=PydanticMetricTimeWindow(count=3, granularity="martian_day"),
required_time_spine_specs=(
DataSet.metric_time_dimension_spec(ExpandedTimeGranularity.from_time_granularity(TimeGranularity.MONTH)),
),
)

convert_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=offset_by_custom_granularity_node,
)


@pytest.mark.sql_engine_snapshot
def test_semi_additive_join_node(
request: FixtureRequest,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
test_name: test_offset_by_custom_granularity_node
test_filename: test_dataflow_to_sql_plan.py
sql_engine: DuckDB
---
-- Apply Requested Granularities
SELECT
subq_3.ds__day
, DATE_TRUNC('month', subq_3.ds__day__lead) AS metric_time__month
FROM (
-- Offset Base Granularity By Custom Granularity Period(s)
WITH cte_0 AS (
-- Get Custom Granularity Bounds
SELECT
time_spine_src_28006.ds AS ds__day
, DATE_TRUNC('week', time_spine_src_28006.ds) AS ds__week
, DATE_TRUNC('month', time_spine_src_28006.ds) AS ds__month
, DATE_TRUNC('quarter', time_spine_src_28006.ds) AS ds__quarter
, DATE_TRUNC('year', time_spine_src_28006.ds) AS ds__year
, EXTRACT(year FROM time_spine_src_28006.ds) AS ds__extract_year
, EXTRACT(quarter FROM time_spine_src_28006.ds) AS ds__extract_quarter
, EXTRACT(month FROM time_spine_src_28006.ds) AS ds__extract_month
, EXTRACT(day FROM time_spine_src_28006.ds) AS ds__extract_day
, EXTRACT(isodow FROM time_spine_src_28006.ds) AS ds__extract_dow
, EXTRACT(doy FROM time_spine_src_28006.ds) AS ds__extract_doy
, time_spine_src_28006.martian_day AS ds__martian_day
, FIRST_VALUE(subq_0.ds__day) OVER (
PARTITION BY subq_0.ds__martian_day
ORDER BY subq_0.ds__day
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS ds__martian_day__first_value
, LAST_VALUE(subq_0.ds__day) OVER (
PARTITION BY subq_0.ds__martian_day
ORDER BY subq_0.ds__day
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS ds__martian_day__last_value
, ROW_NUMBER() OVER (
PARTITION BY subq_0.ds__martian_day
ORDER BY subq_0.ds__day
) AS ds__day__row_number
FROM (
-- Read From Time Spine 'mf_time_spine'
SELECT
time_spine_src_28006.ds AS ds__day
, DATE_TRUNC('week', time_spine_src_28006.ds) AS ds__week
, DATE_TRUNC('month', time_spine_src_28006.ds) AS ds__month
, DATE_TRUNC('quarter', time_spine_src_28006.ds) AS ds__quarter
, DATE_TRUNC('year', time_spine_src_28006.ds) AS ds__year
, EXTRACT(year FROM time_spine_src_28006.ds) AS ds__extract_year
, EXTRACT(quarter FROM time_spine_src_28006.ds) AS ds__extract_quarter
, EXTRACT(month FROM time_spine_src_28006.ds) AS ds__extract_month
, EXTRACT(day FROM time_spine_src_28006.ds) AS ds__extract_day
, EXTRACT(isodow FROM time_spine_src_28006.ds) AS ds__extract_dow
, EXTRACT(doy FROM time_spine_src_28006.ds) AS ds__extract_doy
, time_spine_src_28006.martian_day AS ds__martian_day
FROM ***************************.mf_time_spine time_spine_src_28006
) subq_0
)

SELECT
cte_0.ds__day AS ds__day
, CASE
WHEN subq_2.ds__martian_day__first_value__lead + INTERVAL (cte_0.ds__day__row_number - 1) day <= subq_2.ds__martian_day__last_value__lead
THEN subq_2.ds__martian_day__first_value__lead + INTERVAL (cte_0.ds__day__row_number - 1) day
ELSE NULL
END AS ds__day__lead
FROM cte_0 cte_0
INNER JOIN (
-- Offset Custom Granularity Bounds
SELECT
subq_1.ds__martian_day
, LEAD(subq_1.ds__martian_day__first_value, 3) OVER (ORDER BY subq_1.ds__martian_day) AS ds__martian_day__first_value__lead
, LEAD(subq_1.ds__martian_day__last_value, 3) OVER (ORDER BY subq_1.ds__martian_day) AS ds__martian_day__last_value__lead
FROM (
-- Get Unique Rows for Custom Granularity Bounds
SELECT
cte_0.ds__martian_day
, cte_0.ds__martian_day__first_value
, cte_0.ds__martian_day__last_value
FROM cte_0 cte_0
GROUP BY
cte_0.ds__martian_day
, cte_0.ds__martian_day__first_value
, cte_0.ds__martian_day__last_value
) subq_1
) subq_2
ON
cte_0.ds__martian_day = subq_2.ds__martian_day
) subq_3
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
test_name: test_offset_by_custom_granularity_node
test_filename: test_dataflow_to_sql_plan.py
sql_engine: DuckDB
---
-- Apply Requested Granularities
SELECT
ds__day
, DATE_TRUNC('month', ds__day__lead) AS metric_time__month
FROM (
-- Offset Base Granularity By Custom Granularity Period(s)
WITH cte_2 AS (
-- Read From Time Spine 'mf_time_spine'
-- Get Custom Granularity Bounds
SELECT
ds AS ds__day
, martian_day AS ds__martian_day
, FIRST_VALUE(ds) OVER (
PARTITION BY martian_day
ORDER BY ds
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS ds__martian_day__first_value
, LAST_VALUE(ds) OVER (
PARTITION BY martian_day
ORDER BY ds
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS ds__martian_day__last_value
, ROW_NUMBER() OVER (
PARTITION BY martian_day
ORDER BY ds
) AS ds__day__row_number
FROM ***************************.mf_time_spine time_spine_src_28006
)

SELECT
cte_2.ds__day AS ds__day
, CASE
WHEN LEAD(subq_5.ds__martian_day__first_value, 3) OVER (ORDER BY subq_5.ds__martian_day) + INTERVAL (cte_2.ds__day__row_number - 1) day <= LEAD(subq_5.ds__martian_day__last_value, 3) OVER (ORDER BY subq_5.ds__martian_day)
THEN LEAD(subq_5.ds__martian_day__first_value, 3) OVER (ORDER BY subq_5.ds__martian_day) + INTERVAL (cte_2.ds__day__row_number - 1) day
ELSE NULL
END AS ds__day__lead
FROM cte_2 cte_2
INNER JOIN (
-- Get Unique Rows for Custom Granularity Bounds
SELECT
ds__martian_day
, ds__martian_day__first_value
, ds__martian_day__last_value
FROM cte_2 cte_2
GROUP BY
ds__martian_day
, ds__martian_day__first_value
, ds__martian_day__last_value
) subq_5
ON
cte_2.ds__martian_day = subq_5.ds__martian_day
) subq_7
Loading

0 comments on commit 0938ede

Please sign in to comment.