Skip to content

Commit

Permalink
Move node to data set cache to DataflowNodeToSqlSubqueryVisitor and u…
Browse files Browse the repository at this point in the history
…se in SQL plan resolution
  • Loading branch information
courtneyholcomb committed Jan 25, 2025
1 parent 2ec9368 commit 30786c9
Show file tree
Hide file tree
Showing 177 changed files with 26,141 additions and 26,211 deletions.
4 changes: 2 additions & 2 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
FindSourceNodeRecipeResult,
)
from metricflow.dataflow.builder.measure_spec_properties import MeasureSpecProperties
from metricflow.dataflow.builder.node_data_set import DataflowPlanNodeOutputDataSetResolver
from metricflow.dataflow.builder.node_evaluator import (
LinkableInstanceSatisfiabilityEvaluation,
NodeEvaluatorForLinkableInstances,
Expand Down Expand Up @@ -109,6 +108,7 @@
PredicatePushdownState,
PreJoinNodeProcessor,
)
from metricflow.plan_conversion.to_sql_plan.dataflow_to_subquery import DataflowNodeToSqlSubqueryVisitor

logger = logging.getLogger(__name__)

Expand All @@ -120,7 +120,7 @@ def __init__( # noqa: D107
self,
source_node_set: SourceNodeSet,
semantic_manifest_lookup: SemanticManifestLookup,
node_output_resolver: DataflowPlanNodeOutputDataSetResolver,
node_output_resolver: DataflowNodeToSqlSubqueryVisitor,
column_association_resolver: ColumnAssociationResolver,
source_node_builder: SourceNodeBuilder,
dataflow_plan_builder_cache: Optional[DataflowPlanBuilderCache] = None,
Expand Down
110 changes: 0 additions & 110 deletions metricflow/dataflow/builder/node_data_set.py

This file was deleted.

4 changes: 2 additions & 2 deletions metricflow/dataflow/builder/node_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from metricflow_semantics.specs.spec_set import group_specs_by_type
from metricflow_semantics.sql.sql_join_type import SqlJoinType

from metricflow.dataflow.builder.node_data_set import DataflowPlanNodeOutputDataSetResolver
from metricflow.dataflow.builder.partitions import (
PartitionDimensionJoinDescription,
PartitionJoinResolver,
Expand All @@ -45,6 +44,7 @@
from metricflow.dataset.dataset_classes import DataSet
from metricflow.dataset.sql_dataset import SqlDataSet
from metricflow.plan_conversion.instance_converters import CreateValidityWindowJoinDescription
from metricflow.plan_conversion.to_sql_plan.dataflow_to_subquery import DataflowNodeToSqlSubqueryVisitor

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -168,7 +168,7 @@ def __init__(
self,
semantic_model_lookup: SemanticModelLookup,
nodes_available_for_joins: Sequence[DataflowPlanNode],
node_data_set_resolver: DataflowPlanNodeOutputDataSetResolver,
node_data_set_resolver: DataflowNodeToSqlSubqueryVisitor,
time_spine_metric_time_nodes: Sequence[MetricTimeDimensionTransformNode],
) -> None:
"""Initializer.
Expand Down
4 changes: 2 additions & 2 deletions metricflow/dataflow/optimizer/dataflow_optimizer_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

from dbt_semantic_interfaces.enum_extension import assert_values_exhausted

from metricflow.dataflow.builder.node_data_set import DataflowPlanNodeOutputDataSetResolver
from metricflow.dataflow.optimizer.dataflow_plan_optimizer import DataflowPlanOptimizer
from metricflow.dataflow.optimizer.predicate_pushdown_optimizer import PredicatePushdownOptimizer
from metricflow.dataflow.optimizer.source_scan.source_scan_optimizer import SourceScanOptimizer
from metricflow.plan_conversion.to_sql_plan.dataflow_to_subquery import DataflowNodeToSqlSubqueryVisitor


class DataflowPlanOptimization(Enum):
Expand Down Expand Up @@ -45,7 +45,7 @@ class DataflowPlanOptimizerFactory:
processing between the DataflowPlanBuilder and the optimizer instances requiring that functionality.
"""

def __init__(self, node_data_set_resolver: DataflowPlanNodeOutputDataSetResolver) -> None:
def __init__(self, node_data_set_resolver: DataflowNodeToSqlSubqueryVisitor) -> None:
"""Initializer.
This collects all of the initialization requirements for the optimizers it manages.
Expand Down
4 changes: 2 additions & 2 deletions metricflow/dataflow/optimizer/predicate_pushdown_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from metricflow_semantics.specs.where_filter.where_filter_spec import WhereFilterSpec
from metricflow_semantics.sql.sql_join_type import SqlJoinType

from metricflow.dataflow.builder.node_data_set import DataflowPlanNodeOutputDataSetResolver
from metricflow.dataflow.dataflow_plan import (
DataflowPlan,
DataflowPlanNode,
Expand Down Expand Up @@ -42,6 +41,7 @@
from metricflow.dataflow.optimizer.dataflow_plan_optimizer import DataflowPlanOptimizer
from metricflow.dataflow.optimizer.source_scan.source_scan_optimizer import OptimizeBranchResult
from metricflow.plan_conversion.node_processor import PredicateInputType, PredicatePushdownState
from metricflow.plan_conversion.to_sql_plan.dataflow_to_subquery import DataflowNodeToSqlSubqueryVisitor

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -160,7 +160,7 @@ class PredicatePushdownOptimizer(
we encounter gets applied exactly once per nested subquery branch encapsulated by a given constraint node.
"""

def __init__(self, node_data_set_resolver: DataflowPlanNodeOutputDataSetResolver) -> None:
def __init__(self, node_data_set_resolver: DataflowNodeToSqlSubqueryVisitor) -> None:
"""Initializer.
Initializes predicate pushdown state with all optimizer-managed pushdown types enabled, but nothing to
Expand Down
4 changes: 2 additions & 2 deletions metricflow/engine/metricflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from metricflow.data_table.mf_table import MetricFlowDataTable
from metricflow.dataflow.builder.builder_cache import DataflowPlanBuilderCache
from metricflow.dataflow.builder.dataflow_plan_builder import DataflowPlanBuilder
from metricflow.dataflow.builder.node_data_set import DataflowPlanNodeOutputDataSetResolver
from metricflow.dataflow.builder.source_node import SourceNodeBuilder
from metricflow.dataflow.dataflow_plan import DataflowPlan
from metricflow.dataflow.optimizer.dataflow_optimizer_factory import DataflowPlanOptimization
Expand All @@ -54,6 +53,7 @@
from metricflow.execution.execution_plan import ExecutionPlan, SqlStatement
from metricflow.execution.executor import SequentialPlanExecutor
from metricflow.plan_conversion.to_sql_plan.dataflow_to_sql import DataflowToSqlPlanConverter
from metricflow.plan_conversion.to_sql_plan.dataflow_to_subquery import DataflowNodeToSqlSubqueryVisitor
from metricflow.protocols.sql_client import SqlClient
from metricflow.sql.optimizer.optimization_levels import SqlOptimizationLevel
from metricflow.telemetry.models import TelemetryLevel
Expand Down Expand Up @@ -384,7 +384,7 @@ def __init__(
)
source_node_set = source_node_builder.create_from_data_sets(self._source_data_sets)

node_output_resolver = DataflowPlanNodeOutputDataSetResolver(
node_output_resolver = DataflowNodeToSqlSubqueryVisitor(
column_association_resolver=self._column_association_resolver,
semantic_manifest_lookup=self._semantic_manifest_lookup,
)
Expand Down
4 changes: 2 additions & 2 deletions metricflow/plan_conversion/node_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from metricflow_semantics.specs.where_filter.where_filter_spec import WhereFilterSpec
from metricflow_semantics.sql.sql_join_type import SqlJoinType

from metricflow.dataflow.builder.node_data_set import DataflowPlanNodeOutputDataSetResolver
from metricflow.dataflow.builder.partitions import PartitionJoinResolver
from metricflow.dataflow.dataflow_plan import (
DataflowPlanNode,
Expand All @@ -30,6 +29,7 @@
from metricflow.dataflow.nodes.join_to_base import JoinDescription, JoinOnEntitiesNode
from metricflow.dataflow.nodes.metric_time_transform import MetricTimeDimensionTransformNode
from metricflow.dataflow.nodes.where_filter import WhereConstraintNode
from metricflow.plan_conversion.to_sql_plan.dataflow_to_subquery import DataflowNodeToSqlSubqueryVisitor
from metricflow.validation.dataflow_join_validator import JoinDataflowOutputValidator

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -330,7 +330,7 @@ class PreJoinNodeProcessor:
def __init__( # noqa: D107
self,
semantic_model_lookup: SemanticModelLookup,
node_data_set_resolver: DataflowPlanNodeOutputDataSetResolver,
node_data_set_resolver: DataflowNodeToSqlSubqueryVisitor,
):
self._node_data_set_resolver = node_data_set_resolver
self._partition_resolver = PartitionJoinResolver(semantic_model_lookup)
Expand Down
Loading

0 comments on commit 30786c9

Please sign in to comment.