Skip to content

Commit

Permalink
feat(ingest): pydantic v2 compatibility (datahub-project#9434)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Dec 18, 2023
1 parent 193d146 commit ecda3e6
Show file tree
Hide file tree
Showing 27 changed files with 209 additions and 81 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/airflow-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,21 @@ jobs:
strategy:
matrix:
include:
# Note: this should be kept in sync with tox.ini.
- python-version: "3.8"
extra_pip_requirements: "apache-airflow~=2.1.4"
extra_pip_extras: plugin-v1
- python-version: "3.8"
extra_pip_requirements: "apache-airflow~=2.2.4"
extra_pip_extras: plugin-v1
- python-version: "3.10"
extra_pip_requirements: "apache-airflow~=2.4.0"
extra_pip_requirements: 'apache-airflow~=2.4.0 pluggy==1.0.0 "pendulum<3.0"'
extra_pip_extras: plugin-v2
- python-version: "3.10"
extra_pip_requirements: "apache-airflow~=2.6.0"
extra_pip_requirements: 'apache-airflow~=2.6.0 "pendulum<3.0"'
extra_pip_extras: plugin-v2
- python-version: "3.10"
extra_pip_requirements: "apache-airflow>=2.7.0"
extra_pip_requirements: "apache-airflow>=2.7.0 pydantic==2.4.2"
extra_pip_extras: plugin-v2
fail-fast: false
steps:
Expand Down
9 changes: 9 additions & 0 deletions metadata-ingestion-modules/airflow-plugin/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ envlist = py38-airflow21, py38-airflow22, py310-airflow24, py310-airflow26, py31
use_develop = true
extras = dev,integration-tests,plugin-v1
deps =
# This should be kept in sync with the Github Actions matrix.
-e ../../metadata-ingestion/
# Airflow version
airflow21: apache-airflow~=2.1.0
Expand All @@ -20,7 +21,15 @@ deps =
# See https://github.com/datahub-project/datahub/pull/9365
airflow24: apache-airflow~=2.4.0,pluggy==1.0.0
airflow26: apache-airflow~=2.6.0
# Respect the constraints file on pendulum.
# See https://github.com/apache/airflow/issues/36274
airflow24,airflow26: pendulum>=2.0,<3.0
# The Airflow 2.7 constraints file points at pydantic v2, so we match that here.
# https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt
# Note that Airflow is actually compatible with both pydantic v1 and v2, and the
# constraints file is overly restrictive.
airflow27: apache-airflow~=2.7.0
airflow27: pydantic==2.4.2
commands =
pytest --cov-append {posargs}

Expand Down
39 changes: 33 additions & 6 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
"mypy_extensions>=0.4.3",
# Actual dependencies.
"typing-inspect",
# pydantic 1.8.2 is incompatible with mypy 0.910.
# See https://github.com/samuelcolvin/pydantic/pull/3175#issuecomment-995382910.
# pydantic 1.10.3 is incompatible with typing-extensions 4.1.1 - https://github.com/pydantic/pydantic/issues/4885
# pydantic 2 makes major, backwards-incompatible changes - https://github.com/pydantic/pydantic/issues/4887
"pydantic>=1.5.1,!=1.10.3,<2",
"pydantic>=1.10.0,!=1.10.3",
"mixpanel>=4.9.0",
"sentry-sdk",
}
Expand Down Expand Up @@ -53,6 +54,18 @@
"ruamel.yaml",
}

pydantic_no_v2 = {
# pydantic 2 makes major, backwards-incompatible changes - https://github.com/pydantic/pydantic/issues/4887
# Tags sources that require the pydantic v2 API.
"pydantic<2",
}

plugin_common = {
# While pydantic v2 support is experimental, require that all plugins
# continue to use v1. This will ensure that no ingestion recipes break.
*pydantic_no_v2,
}

rest_common = {"requests", "requests_file"}

kafka_common = {
Expand Down Expand Up @@ -118,6 +131,7 @@
"sqlalchemy>=1.4.39, <2",
# Required for SQL profiling.
"great-expectations>=0.15.12, <=0.15.50",
*pydantic_no_v2, # because of great-expectations
# scipy version restricted to reduce backtracking, used by great-expectations,
"scipy>=1.7.2",
# GE added handling for higher version of jinja2
Expand Down Expand Up @@ -229,6 +243,7 @@
iceberg_common = {
# Iceberg Python SDK
"pyiceberg",
*pydantic_no_v2, # because of pyiceberg
"pyarrow>=9.0.0, <13.0.0",
}

Expand Down Expand Up @@ -477,9 +492,6 @@
"flake8-bugbear==23.3.12",
"isort>=5.7.0",
"mypy==1.0.0",
# pydantic 1.8.2 is incompatible with mypy 0.910.
# See https://github.com/samuelcolvin/pydantic/pull/3175#issuecomment-995382910.
"pydantic>=1.10.0",
*test_api_requirements,
pytest_dep,
"pytest-asyncio>=0.16.0",
Expand Down Expand Up @@ -740,7 +752,22 @@
extras_require={
"base": list(framework_common),
**{
plugin: list(framework_common | dependencies)
plugin: list(
framework_common
| (
plugin_common
if plugin
not in {
"airflow",
"datahub-rest",
"datahub-kafka",
"sync-file-emitter",
"sql-parser",
}
else set()
)
| dependencies
)
for (plugin, dependencies) in plugins.items()
},
"all": list(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Optional

from datahub.configuration import ConfigModel
from datahub.configuration.pydantic_migration_helpers import v1_ConfigModel


class BaseAssertion(ConfigModel):
class BaseAssertion(v1_ConfigModel):
description: Optional[str] = None
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from typing_extensions import Literal, Protocol

from datahub.configuration import ConfigModel
from datahub.configuration.pydantic_migration_helpers import v1_ConfigModel
from datahub.metadata.schema_classes import (
AssertionStdOperatorClass,
AssertionStdParameterClass,
Expand Down Expand Up @@ -58,7 +58,7 @@ def _generate_assertion_std_parameters(
)


class EqualToOperator(ConfigModel):
class EqualToOperator(v1_ConfigModel):
type: Literal["equal_to"]
value: Union[str, int, float]

Expand All @@ -71,7 +71,7 @@ def generate_parameters(self) -> AssertionStdParametersClass:
return _generate_assertion_std_parameters(value=self.value)


class BetweenOperator(ConfigModel):
class BetweenOperator(v1_ConfigModel):
type: Literal["between"]
min: Union[int, float]
max: Union[int, float]
Expand All @@ -87,7 +87,7 @@ def generate_parameters(self) -> AssertionStdParametersClass:
)


class LessThanOperator(ConfigModel):
class LessThanOperator(v1_ConfigModel):
type: Literal["less_than"]
value: Union[int, float]

Expand All @@ -100,7 +100,7 @@ def generate_parameters(self) -> AssertionStdParametersClass:
return _generate_assertion_std_parameters(value=self.value)


class GreaterThanOperator(ConfigModel):
class GreaterThanOperator(v1_ConfigModel):
type: Literal["greater_than"]
value: Union[int, float]

Expand All @@ -113,7 +113,7 @@ def generate_parameters(self) -> AssertionStdParametersClass:
return _generate_assertion_std_parameters(value=self.value)


class LessThanOrEqualToOperator(ConfigModel):
class LessThanOrEqualToOperator(v1_ConfigModel):
type: Literal["less_than_or_equal_to"]
value: Union[int, float]

Expand All @@ -126,7 +126,7 @@ def generate_parameters(self) -> AssertionStdParametersClass:
return _generate_assertion_std_parameters(value=self.value)


class GreaterThanOrEqualToOperator(ConfigModel):
class GreaterThanOrEqualToOperator(v1_ConfigModel):
type: Literal["greater_than_or_equal_to"]
value: Union[int, float]

Expand All @@ -139,7 +139,7 @@ def generate_parameters(self) -> AssertionStdParametersClass:
return _generate_assertion_std_parameters(value=self.value)


class NotNullOperator(ConfigModel):
class NotNullOperator(v1_ConfigModel):
type: Literal["not_null"]

operator: str = AssertionStdOperatorClass.NOT_NULL
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from typing import List, Optional, Union

import pydantic
from typing_extensions import Literal

import datahub.emitter.mce_builder as builder
from datahub.api.entities.datacontract.assertion import BaseAssertion
from datahub.api.entities.datacontract.assertion_operator import Operators
from datahub.configuration.common import ConfigModel
from datahub.configuration.pydantic_migration_helpers import v1_ConfigModel, v1_Field
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import (
AssertionInfoClass,
Expand All @@ -25,7 +24,7 @@


class IdConfigMixin(BaseAssertion):
id_raw: Optional[str] = pydantic.Field(
id_raw: Optional[str] = v1_Field(
default=None,
alias="id",
description="The id of the assertion. If not provided, one will be generated using the type.",
Expand All @@ -38,7 +37,7 @@ def generate_default_id(self) -> str:
class CustomSQLAssertion(IdConfigMixin, BaseAssertion):
type: Literal["custom_sql"]
sql: str
operator: Operators = pydantic.Field(discriminator="type")
operator: Operators = v1_Field(discriminator="type")

def generate_default_id(self) -> str:
return f"{self.type}-{self.sql}-{self.operator.id()}"
Expand Down Expand Up @@ -89,11 +88,11 @@ def generate_assertion_info(self, entity_urn: str) -> AssertionInfoClass:
)


class DataQualityAssertion(ConfigModel):
class DataQualityAssertion(v1_ConfigModel):
__root__: Union[
CustomSQLAssertion,
ColumnUniqueAssertion,
] = pydantic.Field(discriminator="type")
] = v1_Field(discriminator="type")

@property
def id(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import collections
from typing import Iterable, List, Optional, Tuple

import pydantic
from ruamel.yaml import YAML
from typing_extensions import Literal

Expand All @@ -11,7 +10,11 @@
)
from datahub.api.entities.datacontract.freshness_assertion import FreshnessAssertion
from datahub.api.entities.datacontract.schema_assertion import SchemaAssertion
from datahub.configuration.common import ConfigModel
from datahub.configuration.pydantic_migration_helpers import (
v1_ConfigModel,
v1_Field,
v1_validator,
)
from datahub.emitter.mce_builder import datahub_guid, make_assertion_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import (
Expand All @@ -26,7 +29,7 @@
from datahub.utilities.urns.urn import guess_entity_type


class DataContract(ConfigModel):
class DataContract(v1_ConfigModel):
"""A yml representation of a Data Contract.
This model is used as a simpler, Python-native representation of a DataHub data contract.
Expand All @@ -36,29 +39,27 @@ class DataContract(ConfigModel):

version: Literal[1]

id: Optional[str] = pydantic.Field(
id: Optional[str] = v1_Field(
default=None,
alias="urn",
description="The data contract urn. If not provided, one will be generated.",
)
entity: str = pydantic.Field(
entity: str = v1_Field(
description="The entity urn that the Data Contract is associated with"
)
# TODO: add support for properties
# properties: Optional[Dict[str, str]] = None

schema_field: Optional[SchemaAssertion] = pydantic.Field(
default=None, alias="schema"
)
schema_field: Optional[SchemaAssertion] = v1_Field(default=None, alias="schema")

freshness: Optional[FreshnessAssertion] = pydantic.Field(default=None)
freshness: Optional[FreshnessAssertion] = v1_Field(default=None)

# TODO: Add a validator to ensure that ids are unique
data_quality: Optional[List[DataQualityAssertion]] = pydantic.Field(default=None)
data_quality: Optional[List[DataQualityAssertion]] = v1_Field(default=None)

_original_yaml_dict: Optional[dict] = None

@pydantic.validator("data_quality")
@v1_validator("data_quality") # type: ignore
def validate_data_quality(
cls, data_quality: Optional[List[DataQualityAssertion]]
) -> Optional[List[DataQualityAssertion]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
from datetime import timedelta
from typing import List, Union

import pydantic
from typing_extensions import Literal

from datahub.api.entities.datacontract.assertion import BaseAssertion
from datahub.configuration.common import ConfigModel
from datahub.configuration.pydantic_migration_helpers import v1_ConfigModel, v1_Field
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import (
AssertionInfoClass,
Expand All @@ -25,10 +24,10 @@
class CronFreshnessAssertion(BaseAssertion):
type: Literal["cron"]

cron: str = pydantic.Field(
cron: str = v1_Field(
description="The cron expression to use. See https://crontab.guru/ for help."
)
timezone: str = pydantic.Field(
timezone: str = v1_Field(
"UTC",
description="The timezone to use for the cron schedule. Defaults to UTC.",
)
Expand Down Expand Up @@ -58,10 +57,10 @@ def generate_freshness_assertion_schedule(self) -> FreshnessAssertionScheduleCla
)


class FreshnessAssertion(ConfigModel):
__root__: Union[
CronFreshnessAssertion, FixedIntervalFreshnessAssertion
] = pydantic.Field(discriminator="type")
class FreshnessAssertion(v1_ConfigModel):
__root__: Union[CronFreshnessAssertion, FixedIntervalFreshnessAssertion] = v1_Field(
discriminator="type"
)

@property
def id(self):
Expand Down
Loading

0 comments on commit ecda3e6

Please sign in to comment.