Skip to content

Commit

Permalink
Make _T in Observable[_T] covariant
Browse files Browse the repository at this point in the history
  • Loading branch information
Azureblade3808 authored and dbrattli committed Jul 2, 2022
1 parent 5d8b06c commit 9ecf95a
Showing 1 changed file with 19 additions and 19 deletions.
38 changes: 19 additions & 19 deletions reactivex/observable/observable.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@
_F = TypeVar("_F")
_G = TypeVar("_G")

_T = TypeVar("_T")
_T_out = TypeVar("_T_out", covariant=True)


class Observable(abc.ObservableBase[_T]):
class Observable(abc.ObservableBase[_T_out]):
"""Observable base class.
Represents a push-style collection, which you can :func:`pipe <pipe>` into
:mod:`operators <reactivex.operators>`."""

def __init__(self, subscribe: Optional[abc.Subscription[_T]] = None) -> None:
def __init__(self, subscribe: Optional[abc.Subscription[_T_out]] = None) -> None:
"""Creates an observable sequence object from the specified
subscription function.
Expand All @@ -43,14 +43,14 @@ def __init__(self, subscribe: Optional[abc.Subscription[_T]] = None) -> None:

def _subscribe_core(
self,
observer: abc.ObserverBase[_T],
observer: abc.ObserverBase[_T_out],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
return self._subscribe(observer, scheduler) if self._subscribe else Disposable()

def subscribe(
self,
on_next: Optional[Union[abc.ObserverBase[_T], abc.OnNext[_T], None]] = None,
on_next: Optional[Union[abc.ObserverBase[_T_out], abc.OnNext[_T_out], None]] = None,
on_error: Optional[abc.OnError] = None,
on_completed: Optional[abc.OnCompleted] = None,
*,
Expand Down Expand Up @@ -93,12 +93,12 @@ def subscribe(
or hasattr(on_next, "on_next")
and callable(getattr(on_next, "on_next"))
):
obv = cast(abc.ObserverBase[_T], on_next)
obv = cast(abc.ObserverBase[_T_out], on_next)
on_next = obv.on_next
on_error = obv.on_error
on_completed = obv.on_completed

auto_detach_observer: AutoDetachObserver[_T] = AutoDetachObserver(
auto_detach_observer: AutoDetachObserver[_T_out] = AutoDetachObserver(
on_next, on_error, on_completed
)

Expand Down Expand Up @@ -145,21 +145,21 @@ def set_disposable(
return Disposable(auto_detach_observer.dispose)

@overload
def pipe(self, __op1: Callable[[Observable[_T]], _A]) -> _A:
def pipe(self, __op1: Callable[[Observable[_T_out]], _A]) -> _A:
...

@overload
def pipe(
self,
__op1: Callable[[Observable[_T]], _A],
__op1: Callable[[Observable[_T_out]], _A],
__op2: Callable[[_A], _B],
) -> _B:
...

@overload
def pipe(
self,
__op1: Callable[[Observable[_T]], _A],
__op1: Callable[[Observable[_T_out]], _A],
__op2: Callable[[_A], _B],
__op3: Callable[[_B], _C],
) -> _C:
Expand All @@ -168,7 +168,7 @@ def pipe(
@overload
def pipe(
self,
__op1: Callable[[Observable[_T]], _A],
__op1: Callable[[Observable[_T_out]], _A],
__op2: Callable[[_A], _B],
__op3: Callable[[_B], _C],
__op4: Callable[[_C], _D],
Expand All @@ -178,7 +178,7 @@ def pipe(
@overload
def pipe(
self,
__op1: Callable[[Observable[_T]], _A],
__op1: Callable[[Observable[_T_out]], _A],
__op2: Callable[[_A], _B],
__op3: Callable[[_B], _C],
__op4: Callable[[_C], _D],
Expand All @@ -189,7 +189,7 @@ def pipe(
@overload
def pipe(
self,
__op1: Callable[[Observable[_T]], _A],
__op1: Callable[[Observable[_T_out]], _A],
__op2: Callable[[_A], _B],
__op3: Callable[[_B], _C],
__op4: Callable[[_C], _D],
Expand All @@ -201,7 +201,7 @@ def pipe(
@overload
def pipe(
self,
__op1: Callable[[Observable[_T]], _A],
__op1: Callable[[Observable[_T_out]], _A],
__op2: Callable[[_A], _B],
__op3: Callable[[_B], _C],
__op4: Callable[[_C], _D],
Expand Down Expand Up @@ -256,7 +256,7 @@ def run(self) -> Any:

return run(self)

def __await__(self) -> Generator[Any, None, _T]:
def __await__(self) -> Generator[Any, None, _T_out]:
"""Awaits the given observable.
Returns:
Expand All @@ -265,12 +265,12 @@ def __await__(self) -> Generator[Any, None, _T]:
from ..operators._tofuture import to_future_

loop = asyncio.get_event_loop()
future: asyncio.Future[_T] = self.pipe(
future: asyncio.Future[_T_out] = self.pipe(
to_future_(scheduler=AsyncIOScheduler(loop=loop))
)
return future.__await__()

def __add__(self, other: Observable[_T]) -> Observable[_T]:
def __add__(self, other: Observable[_T_out]) -> Observable[_T_out]:
"""Pythonic version of :func:`concat <reactivex.concat>`.
Example:
Expand All @@ -286,7 +286,7 @@ def __add__(self, other: Observable[_T]) -> Observable[_T]:

return concat(self, other)

def __iadd__(self, other: Observable[_T]) -> "Observable[_T]":
def __iadd__(self, other: Observable[_T_out]) -> "Observable[_T_out]":
"""Pythonic use of :func:`concat <reactivex.concat>`.
Example:
Expand All @@ -302,7 +302,7 @@ def __iadd__(self, other: Observable[_T]) -> "Observable[_T]":

return concat(self, other)

def __getitem__(self, key: Union[slice, int]) -> Observable[_T]:
def __getitem__(self, key: Union[slice, int]) -> Observable[_T_out]:
"""
Pythonic version of :func:`slice <reactivex.operators.slice>`.
Expand Down

0 comments on commit 9ecf95a

Please sign in to comment.