Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace convert_to_typing_type with PEP 585 equivalent #33538

Merged
merged 2 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/transforms/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def convert_to_typing_type(type_):
if isinstance(type_, row_type.RowTypeConstraint):
return named_tuple_from_schema(named_fields_to_schema(type_._fields))
else:
return native_type_compatibility.convert_to_typing_type(type_)
return native_type_compatibility.convert_to_python_type(type_)


def _is_optional_or_none(typehint):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from apache_beam.portability.api import external_transforms_pb2
from apache_beam.pvalue import Row
from apache_beam.transforms import ptransform
from apache_beam.typehints.native_type_compatibility import convert_to_typing_type
from apache_beam.typehints.native_type_compatibility import convert_to_python_type
from apache_beam.typehints.schemas import named_fields_to_schema
from apache_beam.typehints.trivial_inference import instance_to_type
from apache_beam.utils import python_callable
Expand Down Expand Up @@ -100,11 +100,11 @@ def _resolve(cls, fully_qualified_name):

def to_runner_api_parameter(self, unused_context):
_args_schema = named_fields_to_schema([
(f'arg{ix}', convert_to_typing_type(instance_to_type(value)))
(f'arg{ix}', convert_to_python_type(instance_to_type(value)))
for (ix, value) in enumerate(self._args)
])
_kwargs_schema = named_fields_to_schema([
(key, convert_to_typing_type(instance_to_type(value)))
(key, convert_to_python_type(instance_to_type(value)))
for (key, value) in self._kwargs.items()
])
payload_schema = named_fields_to_schema({
Expand Down
32 changes: 16 additions & 16 deletions sdks/python/apache_beam/typehints/native_type_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,8 @@ def convert_to_beam_types(args):
return [convert_to_beam_type(v) for v in args]


def convert_to_typing_type(typ):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this api considered public or not? Wondering if it is called externally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is 100% not intended to be used externally, nothing in our type hinting module is

"""Converts a given Beam type to a typing type.
def convert_to_python_type(typ):
"""Converts a given Beam type to a python type.

This is the reverse of convert_to_beam_type.

Expand Down Expand Up @@ -482,33 +482,33 @@ def convert_to_typing_type(typ):
if isinstance(typ, typehints.AnyTypeConstraint):
return typing.Any
if isinstance(typ, typehints.DictConstraint):
return typing.Dict[convert_to_typing_type(typ.key_type),
convert_to_typing_type(typ.value_type)]
return dict[convert_to_python_type(typ.key_type),
convert_to_python_type(typ.value_type)]
if isinstance(typ, typehints.ListConstraint):
return typing.List[convert_to_typing_type(typ.inner_type)]
return list[convert_to_python_type(typ.inner_type)]
if isinstance(typ, typehints.IterableTypeConstraint):
return typing.Iterable[convert_to_typing_type(typ.inner_type)]
return collections.abc.Iterable[convert_to_python_type(typ.inner_type)]
if isinstance(typ, typehints.UnionConstraint):
if not typ.union_types:
# Gracefully handle the empty union type.
return typing.Any
return typing.Union[tuple(convert_to_typing_types(typ.union_types))]
return typing.Union[tuple(convert_to_python_types(typ.union_types))]
if isinstance(typ, typehints.SetTypeConstraint):
return typing.Set[convert_to_typing_type(typ.inner_type)]
return set[convert_to_python_type(typ.inner_type)]
if isinstance(typ, typehints.FrozenSetTypeConstraint):
return typing.FrozenSet[convert_to_typing_type(typ.inner_type)]
return frozenset[convert_to_python_type(typ.inner_type)]
if isinstance(typ, typehints.TupleConstraint):
return typing.Tuple[tuple(convert_to_typing_types(typ.tuple_types))]
return tuple[tuple(convert_to_python_types(typ.tuple_types))]
if isinstance(typ, typehints.TupleSequenceConstraint):
return typing.Tuple[convert_to_typing_type(typ.inner_type), ...]
return tuple[convert_to_python_type(typ.inner_type), ...]
if isinstance(typ, typehints.IteratorTypeConstraint):
return typing.Iterator[convert_to_typing_type(typ.yielded_type)]
return collections.abc.Iterator[convert_to_python_type(typ.yielded_type)]

raise ValueError('Failed to convert Beam type: %s' % typ)


def convert_to_typing_types(args):
"""Convert the given list or dictionary of args to typing types.
def convert_to_python_types(args):
"""Convert the given list or dictionary of args to python types.

Args:
args: Either an iterable of types, or a dictionary where the values are
Expand All @@ -519,6 +519,6 @@ def convert_to_typing_types(args):
a dictionary with the same keys, and values which have been converted.
"""
if isinstance(args, dict):
return {k: convert_to_typing_type(v) for k, v in args.items()}
return {k: convert_to_python_type(v) for k, v in args.items()}
else:
return [convert_to_typing_type(v) for v in args]
return [convert_to_python_type(v) for v in args]
100 changes: 76 additions & 24 deletions sdks/python/apache_beam/typehints/native_type_compatibility_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@
from apache_beam.typehints.native_type_compatibility import convert_builtin_to_typing
from apache_beam.typehints.native_type_compatibility import convert_to_beam_type
from apache_beam.typehints.native_type_compatibility import convert_to_beam_types
from apache_beam.typehints.native_type_compatibility import convert_to_typing_type
from apache_beam.typehints.native_type_compatibility import convert_to_typing_types
from apache_beam.typehints.native_type_compatibility import convert_to_python_type
from apache_beam.typehints.native_type_compatibility import convert_to_python_types
from apache_beam.typehints.native_type_compatibility import convert_typing_to_builtin
from apache_beam.typehints.native_type_compatibility import is_any

_TestNamedTuple = typing.NamedTuple(
'_TestNamedTuple', [('age', int), ('name', bytes)])
_TestFlatAlias = typing.Tuple[bytes, float]
_TestNestedAlias = typing.List[_TestFlatAlias]
_TestFlatAlias = tuple[bytes, float]
_TestNestedAlias = list[_TestFlatAlias]
_TestFlatAliasTyping = typing.Tuple[bytes, float]
_TestNestedAliasTyping = typing.List[_TestFlatAliasTyping]


class _TestClass(object):
Expand Down Expand Up @@ -68,50 +70,51 @@ def test_convert_to_beam_type(self):
('raw int', int, int),
('raw float', float, float),
('any', typing.Any, typehints.Any),
('simple dict', typing.Dict[bytes, int],
('simple dict', dict[bytes, int],
typehints.Dict[bytes, int]),
('simple list', typing.List[int], typehints.List[int]),
('simple iterable', typing.Iterable[int], typehints.Iterable[int]),
('simple list', list[int], typehints.List[int]),
('simple iterable', collections.abc.Iterable[int],
typehints.Iterable[int]),
('simple optional', typing.Optional[int], typehints.Optional[int]),
('simple set', typing.Set[float], typehints.Set[float]),
('simple set', set[float], typehints.Set[float]),
('simple frozenset',
typing.FrozenSet[float],
frozenset[float],
typehints.FrozenSet[float]),
('simple unary tuple', typing.Tuple[bytes],
('simple unary tuple', tuple[bytes],
typehints.Tuple[bytes]),
('simple union', typing.Union[int, bytes, float],
typehints.Union[int, bytes, float]),
('namedtuple', _TestNamedTuple, _TestNamedTuple),
('test class', _TestClass, _TestClass),
('test class in list', typing.List[_TestClass],
('test class in list', list[_TestClass],
typehints.List[_TestClass]),
('generic bare', _TestGeneric, _TestGeneric),
('generic subscripted', _TestGeneric[int], _TestGeneric[int]),
('complex tuple', typing.Tuple[bytes, typing.List[typing.Tuple[
('complex tuple', tuple[bytes, list[tuple[
bytes, typing.Union[int, bytes, float]]]],
typehints.Tuple[bytes, typehints.List[typehints.Tuple[
bytes, typehints.Union[int, bytes, float]]]]),
('arbitrary-length tuple', typing.Tuple[int, ...],
('arbitrary-length tuple', tuple[int, ...],
typehints.Tuple[int, ...]),
('flat alias', _TestFlatAlias, typehints.Tuple[bytes, float]), # type: ignore[misc]
('nested alias', _TestNestedAlias,
typehints.List[typehints.Tuple[bytes, float]]),
('complex dict',
typing.Dict[bytes, typing.List[typing.Tuple[bytes, _TestClass]]],
dict[bytes, list[tuple[bytes, _TestClass]]],
typehints.Dict[bytes, typehints.List[typehints.Tuple[
bytes, _TestClass]]]),
('type var', typing.TypeVar('T'), typehints.TypeVariable('T')),
('nested type var',
typing.Tuple[typing.TypeVar('K'), typing.TypeVar('V')],
tuple[typing.TypeVar('K'), typing.TypeVar('V')],
typehints.Tuple[typehints.TypeVariable('K'),
typehints.TypeVariable('V')]),
('iterator', typing.Iterator[typing.Any],
('iterator', collections.abc.Iterator[typing.Any],
typehints.Iterator[typehints.Any]),
('nested generic bare', typing.List[_TestGeneric],
('nested generic bare', list[_TestGeneric],
typehints.List[_TestGeneric]),
('nested generic subscripted', typing.List[_TestGeneric[int]],
('nested generic subscripted', list[_TestGeneric[int]],
typehints.List[_TestGeneric[int]]),
('nested generic with any', typing.List[_TestPair[typing.Any]],
('nested generic with any', list[_TestPair[typing.Any]],
typehints.List[_TestPair[typing.Any]]),
('raw enum', _TestEnum, _TestEnum),
]
Expand All @@ -125,9 +128,58 @@ def test_convert_to_beam_type(self):
expected_beam_type = test_case[2]
converted_beam_type = convert_to_beam_type(typing_type)
self.assertEqual(converted_beam_type, expected_beam_type, description)
converted_typing_type = convert_to_typing_type(converted_beam_type)
converted_typing_type = convert_to_python_type(converted_beam_type)
self.assertEqual(converted_typing_type, typing_type, description)

def test_convert_to_beam_type_with_typing_types(self):
test_cases = [
('simple dict', typing.Dict[bytes, int],
typehints.Dict[bytes, int]),
('simple list', typing.List[int], typehints.List[int]),
('simple iterable', typing.Iterable[int], typehints.Iterable[int]),
('simple optional', typing.Optional[int], typehints.Optional[int]),
('simple set', typing.Set[float], typehints.Set[float]),
('simple frozenset',
typing.FrozenSet[float],
typehints.FrozenSet[float]),
('simple unary tuple', typing.Tuple[bytes],
typehints.Tuple[bytes]),
('test class in list', typing.List[_TestClass],
typehints.List[_TestClass]),
('complex tuple', typing.Tuple[bytes, typing.List[typing.Tuple[
bytes, typing.Union[int, bytes, float]]]],
typehints.Tuple[bytes, typehints.List[typehints.Tuple[
bytes, typehints.Union[int, bytes, float]]]]),
('arbitrary-length tuple', typing.Tuple[int, ...],
typehints.Tuple[int, ...]),
('flat alias', _TestFlatAliasTyping, typehints.Tuple[bytes, float]), # type: ignore[misc]
('nested alias', _TestNestedAliasTyping,
typehints.List[typehints.Tuple[bytes, float]]),
('complex dict',
typing.Dict[bytes, typing.List[typing.Tuple[bytes, _TestClass]]],
typehints.Dict[bytes, typehints.List[typehints.Tuple[
bytes, _TestClass]]]),
('nested type var',
typing.Tuple[typing.TypeVar('K'), typing.TypeVar('V')],
typehints.Tuple[typehints.TypeVariable('K'),
typehints.TypeVariable('V')]),
('iterator', typing.Iterator[typing.Any],
typehints.Iterator[typehints.Any]),
('nested generic bare', typing.List[_TestGeneric],
typehints.List[_TestGeneric]),
('nested generic subscripted', typing.List[_TestGeneric[int]],
typehints.List[_TestGeneric[int]]),
('nested generic with any', typing.List[_TestPair[typing.Any]],
typehints.List[_TestPair[typing.Any]]),
]

for test_case in test_cases:
description = test_case[0]
builtins_type = test_case[1]
expected_beam_type = test_case[2]
converted_beam_type = convert_to_beam_type(builtins_type)
self.assertEqual(converted_beam_type, expected_beam_type, description)

def test_convert_to_beam_type_with_builtin_types(self):
test_cases = [
('builtin dict', dict[str, int], typehints.Dict[str, int]),
Expand Down Expand Up @@ -312,9 +364,9 @@ def test_convert_bare_types_fail(self):
def test_convert_to_beam_types(self):
typing_types = [
bytes,
typing.List[bytes],
typing.List[typing.Tuple[bytes, int]],
typing.Union[int, typing.List[int]]
list[bytes],
list[tuple[bytes, int]],
typing.Union[int, list[int]]
]
beam_types = [
bytes,
Expand All @@ -324,7 +376,7 @@ def test_convert_to_beam_types(self):
]
converted_beam_types = convert_to_beam_types(typing_types)
self.assertEqual(converted_beam_types, beam_types)
converted_typing_types = convert_to_typing_types(converted_beam_types)
converted_typing_types = convert_to_python_types(converted_beam_types)
self.assertEqual(converted_typing_types, typing_types)

def test_is_any(self):
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/typehints/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
from apache_beam.typehints.native_type_compatibility import _match_is_exactly_mapping
from apache_beam.typehints.native_type_compatibility import _match_is_optional
from apache_beam.typehints.native_type_compatibility import _safe_issubclass
from apache_beam.typehints.native_type_compatibility import convert_to_typing_type
from apache_beam.typehints.native_type_compatibility import convert_to_python_type
from apache_beam.typehints.native_type_compatibility import extract_optional_type
from apache_beam.typehints.native_type_compatibility import match_is_named_tuple
from apache_beam.typehints.schema_registry import SCHEMA_REGISTRY
Expand Down Expand Up @@ -293,7 +293,7 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
return self.typing_to_runner_api(row_type_constraint)

if isinstance(type_, typehints.TypeConstraint):
type_ = convert_to_typing_type(type_)
type_ = convert_to_python_type(type_)

# All concrete types (other than NamedTuple sub-classes) should map to
# a supported primitive type.
Expand Down
Loading