Skip to content

A set of Python regularly used classes/functions


Notifications You must be signed in to change notification settings



Folders and files

Last commit message
Last commit date

Latest commit


Repository files navigation


A set of Python regularly used classes/functions

Last release
Weekly checks
Python >= 3.10.1
License: Apache-2.0

Table of contents


pip install dev4py-utils

Project template

This project is based on pymsdl_template

Project links

Dev4py-utils modules


AsyncJOptional documentation

Note: AsyncJOptional class is designed in order to simplify JOptional with async mapper.

Note: AsyncJOptional support T or Awaitable[T] values. That's why some checks are done when terminal operation is called with await.


import asyncio
from dev4py.utils import AsyncJOptional

def sync_mapper(i: int) -> int:
  return i * 2

async def async_mapper(i: int) -> str:
  return f"The value is {i}"

async def async_sample() -> None:
  value: int = 1
  await AsyncJOptional.of_noneable(value) \
    .map(sync_mapper) \
    .map(async_mapper) \
    .if_present(print)  # The value is 2


Awaitables documentation

Note: awaitables module provides a set of utility functions to simplify Awaitable operations.


import asyncio
from dev4py.utils import awaitables, JOptional

# is_awaitable sample
awaitables.is_awaitable(asyncio.sleep(2))  # True
awaitables.is_awaitable(print('Hello'))  # False

# to_sync_or_async_param_function sample
def mapper(s: str) -> str:
    return s + '_suffix'

async def async_mapper(s: str) -> str:
    await asyncio.sleep(1)
    return s + '_async_suffix'

async def async_test():
    # Note: mapper parameter is str and async_mapper returns an Awaitable[str] so we have to manage it
    # Note: !WARNING! Since 3.0.0 see AsyncJOptional / JOptional to_async_joptional method
    result: str = await JOptional.of("A value") \
      .map(async_mapper) \
      .map(awaitables.to_sync_or_async_param_function(mapper)) \
    print(result)  # A value_async_suffix_suffix


Collectors documentation

Note: The collectors class is inspired by


from dev4py.utils import Stream, collectors

Stream.of('a', 'b', 'c').collect(collectors.to_list())  # ['a', 'b', 'c']


Dicts documentation

Note: dicts module provides a set of utility functions to simplify dict operations.


from dev4py.utils import dicts
from dev4py.utils.types import Supplier

# is_dict sample
dicts.is_dict("A str")  # False
dicts.is_dict({'key': 'A dict value'})  # True

# get_value sample
int_supplier: Supplier[int] = lambda: 3
dictionary: dict[str, int] = {'key_1': 1, 'key_2': 2}

dicts.get_value(dictionary, 'key_1')  # 1
dicts.get_value(dictionary, 'key_3')  # None
dicts.get_value(dictionary, 'key_3', int_supplier)  # 3

# get_value_from_path sample
str_supplier: Supplier[str] = lambda: "a3"
deep_dictionary: dict[str, dict[int, str]] = { \
  'a': {1: 'a1', 2: 'a2'}, \
  'b': {1: 'b1', 2: 'b2'} \

dicts.get_value_from_path(deep_dictionary, ["a", 1])  # 'a1'
dicts.get_value_from_path(deep_dictionary, ["c", 1])  # None
dicts.get_value_from_path(deep_dictionary, ["a", 3])  # None
dicts.get_value_from_path(deep_dictionary, ["a", 3], str_supplier)  # 'a3'


Iterables documentation

Note: The iterables module provides a set of utility functions to simplify iterables operations.


from typing import Iterator

from dev4py.utils import iterables

values: range = range(0, 10)
chunks: Iterator[list[int]] = iterables.get_chunks(values, 3)
[chunk for chunk in chunks]  # [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]


JOptional documentation

Note: JOptional class is inspired by java.util.Optional class with some adds (like peek method).


from dev4py.utils import JOptional

value: int = 1
JOptional.of_noneable(value) \
  .map(lambda v: f"The value is {v}") \
  .if_present(print)  # The value is 1


Lists documentation

Note: lists module provides a set of utility functions to simplify lists operations.


from dev4py.utils import lists

# empty sample
lst: list[int] = lists.empty_list()  # []

# append sample
lst: list[int] = [1, 2, 3, 4]
app_lst: list[int] = lists.append(lst, 5)  # [1, 2, 3, 4, 5]
# - Note: lst == app_lst

# extend sample
lst: list[int] = [1, 2, 3, 4]
lst2: list[int] = [5, 6, 7, 8]
ext_lst: list[int] = lists.extend(lst, lst2)  # [1, 2, 3, 4, 5, 6, 7, 8]
# - Note: lst == ext_lst


Objects documentation

Note: The objects module is inspired by java.util.Objects class.


from dev4py.utils import objects

# non_none sample
value = None
objects.non_none(value)  # False

# require_non_none sample
value = "A value"
objects.require_non_none(value)  # 'A value'

# to_string sample
value = None
default_value: str = "A default value"
objects.to_string(value, default_value)  # 'A default value'


Pipeline documentation

Note: The pipeline package provides a set of Pipeline class describing different kind of pipelines.


from dev4py.utils.pipeline import SimplePipeline, StepPipeline, StepResult

# SimplePipeline sample
pipeline: SimplePipeline[int, str] = SimplePipeline.of(lambda i: i * i) \
    .add_handler(str) \
    .add_handler(lambda s: f"Result: {s} | Type: {type(s)}")

pipeline.execute(10)  # "Result: 100 | Type: <class 'str'>"

# StepPipeline sample
# Note: StepPipeline can be stopped at each step by setting `go_next` value to False
pipeline: StepPipeline[int, str] = StepPipeline.of(lambda i: StepResult(i * i)) \
    .add_handler(lambda i: StepResult(value=str(i), go_next=i < 150)) \
    .add_handler(lambda s: StepResult(f"Result: {s} | Type: {type(s)}"))

pipeline.execute(10)  # StepResult(value="Result: 100 | Type: <class 'str'>", go_next=True)
# - Note: When the pipeline is fully completed, `go_next` is True
pipeline.execute(15)  # StepResult(value='225', go_next=False)
# - Note: Even if the pipeline is not fully completed, the last StepResult is returned with `go_next=False`


Retry documentation

Note: The retry module provides provides function to create retryable callable from simple sync or async callables using exponential backoff

Usage idea: network requests (HTTP, AMQP, MQTT, etc.) with retry on error


import asyncio
from time import time
from typing import Awaitable

from dev4py.utils.retry import RetryConfiguration, to_retryable, to_async_retryable, retryable, async_retryable
from dev4py.utils.types import BiFunction

# RetryConfiguration:
# Note: exponential backoff used formula is 'delay * (exponent^retry_number)'
#   => Example: For the following RetryConfiguration, waiting times in case of error are:
#       * first try:                    0 sec (always 0 for the first try)
#       * second try (/first retry):    1 sec ('0.5 * (2^1)')
#       * third try (/second retry):    2 sec ('0.5 * (2^2)')
#       * max_tries=3 => no fourth try (/third retry)
retry_config: RetryConfiguration = RetryConfiguration(
    delay=0.5,  # the exponential backoff delay in second (default: 0.1)
    exponent=2,  # the exponential backoff exponent to determine delay between each try (default: 2)
    max_tries=3  # max try number (first try included) (default: 3, i.e.: first try and 2 retry)

# to_retryable sample:
def callable_sample(j: int, start_time: float) -> int:
    print("callable_sample - call time: '%.2f'" % (time() - start_time))
    return j ** 2

retryable_sample: BiFunction[int, float, int] = to_retryable(sync_callable=callable_sample, retry_config=retry_config)
# Note: Since 3.5.0 you can also use `retryable(sync_callable=callable_sample, retry_config=retry_config)`

result: int = retryable_sample(3, time())  # result = 9
# outputs:
#  callable_sample - call time: '0.00'

def in_error_callable_sample(j: int, start_time: float) -> int:
    print("in_error_callable_sample - call time: '%.2f'" % (time() - start_time))
    raise ValueError(j)

in_error_retryable_sample: BiFunction[int, float, int] = \
    to_retryable(sync_callable=in_error_callable_sample, retry_config=retry_config)
# Note: Since 3.5.0 you can also use `retryable(sync_callable=in_error_callable_sample, retry_config=retry_config)`
# Note: By default the last raised exception is raised if max_tries is reach. You can change this behavior by setting
#       the `on_failure` parameter
result: int = in_error_retryable_sample(3, time())
# outputs:
#  in_error_callable_sample - call time: '0.00'
#  in_error_callable_sample - call time: '1.00'
#  in_error_callable_sample - call time: '3.00'
#  ValueError: 3
# Note: By default the last raised exception is raised if max_tries is reached. You can change this behavior by setting
#       the `on_failure` parameter

def decorated_in_error_callable_sample(j: int, start_time: float) -> int:
    print("decorated_in_error_callable_sample - call time: '%.2f'" % (time() - start_time))
    raise ValueError(j)

result: int = decorated_in_error_callable_sample(3, time())
# outputs:
#  decorated_in_error_callable_sample - call time: '0.00'
#  decorated_in_error_callable_sample - call time: '1.00'
#  decorated_in_error_callable_sample - call time: '3.00'
#  ValueError: 3
# Note: By default the last raised exception is raised if max_tries is reached. You can change this behavior by setting
#       the `on_failure` parameter

# to_async_retryable sample:
async def in_error_async_callable_sample(j: int, start_time: float) -> int:
    print("in_error_async_callable_sample - call time: '%.2f'" % (time() - start_time))
    raise ValueError(j)

async def async_retryable_sample() -> None:
    in_error_async_retryable_sample: BiFunction[int, float, Awaitable[int]] = \
        to_async_retryable(async_callable=in_error_async_callable_sample, retry_config=retry_config)
    # Note: Since 3.5.0 you can also use 
    # `async_retryable(async_callable=in_error_async_callable_sample, retry_config=retry_config)`
    result: int = await in_error_async_retryable_sample(2, time())
# outputs:
#  in_error_async_callable_sample - call time: '0.00'
#  in_error_async_callable_sample - call time: '1.00'
#  in_error_async_callable_sample - call time: '3.00'
#  ValueError: 2
# Note: By default the last raised exception is raised if max_tries is reached. You can change this behavior by setting
#       the `on_failure` parameter

async def decorated_in_error_async_callable_sample(j: int, start_time: float) -> int:
    print("decorated_in_error_async_callable_sample - call time: '%.2f'" % (time() - start_time))
    raise ValueError(j)

async def async_decorated_retryable_sample() -> None:
    result: int = await decorated_in_error_async_callable_sample(2, time())
# outputs:
#  in_error_async_callable_sample - call time: '0.00'
#  in_error_async_callable_sample - call time: '1.00'
#  in_error_async_callable_sample - call time: '3.00'
#  ValueError: 2
# Note: By default the last raised exception is raised if max_tries is reached. You can change this behavior by setting
#       the `on_failure` parameter


Stream documentation

Note: Stream class is inspired by


from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from time import sleep

from dev4py.utils import Stream, ParallelConfiguration

# Sequential sample
Stream.of(1, 2, 3, 4) \
    .map(str) \
    .peek(lambda s: sleep(0.5)) \
    .map(lambda s: f"Mapped value: {s}") \
    .to_list()  # ['Mapped value: 1', 'Mapped value: 2', 'Mapped value: 3', 'Mapped value: 4']
# - Note: Execution time around 2 sec due to the sleep call

# Multithreading sample
with ThreadPoolExecutor(max_workers=2) as executor:
    Stream.of(1, 2, 3, 4) \
        .parallel(parallel_config=ParallelConfiguration(executor=executor, chunksize=2)) \
        .map(str) \
        .peek(lambda s: sleep(0.5)) \
        .map(lambda s: f"Mapped value: {s}") \
        .to_list()  # ['Mapped value: 3', 'Mapped value: 4', 'Mapped value: 1', 'Mapped value: 2']
# - Note: Execution time around 1 sec due to the given ParallelConfiguration
# - Note: Since this stream is (by default) unordered, results order is random

# Multiprocessing sample
# - Note: Due to use of Multiprocessing:
#       * lambdas cannot be used since they cannot be pickled
#       * This sample should be put in a python file in order to work
def _sleep(s: str) -> None:
    # eq lambda s: sleep(0.5)

def _mapper(s: str) -> str:
    # eq lambda s: f"Mapped value: {s}"
    return f"Mapped value: {s}"

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=2) as executor:
        Stream.of(1, 2, 3, 4) \
            .parallel(parallel_config=ParallelConfiguration(executor=executor, chunksize=2)) \
            .map(str) \
            .peek(_sleep) \
            .map(_mapper) \

# - Note: Execution time around 1 sec due to the given ParallelConfiguration
#         (Reminder: Use Multiprocessing for CPU-bound tasks. In this case Multithreading is more appropriate)
# - Note: Since this stream is (by default) unordered, results order is random


Tuples documentation

Note: tuples module provides a set of utility functions to simplify tuples operations.


from dev4py.utils import tuples

# empty sample
tpl: tuple[int, ...] = tuples.empty_tuple()  # ()

# append sample
tpl: tuple[int, ...] = (1, 2, 3, 4)
app_tpl: tuple[int, ...] = tuples.append(tpl, 5)  # (1, 2, 3, 4, 5)

# extend sample
tpl: tuple[int, ...] = (1, 2, 3, 4)
tpl2: tuple[int, ...] = (5, 6, 7, 8)
ext_tpl: tuple[int, ...] = tuples.extend(tpl, tpl2)  # (1, 2, 3, 4, 5, 6, 7, 8)


Types documentation

Note: The types module is inspired by java.util.function package.


from dev4py.utils.types import Function, Predicate, Consumer

# Function sample
int_to_str: Function[int, str] = lambda i: str(i)
str_result: str = int_to_str(1)  # '1'

# Predicate sample
str_predicate: Predicate[str] = lambda s: s == "A value"
pred_result: bool = str_predicate("Value to test")  # False

# Consumer sample
def sample(consumer: Consumer[str], value: str) -> None:

def my_consumer(arg: str) -> None:

sample(my_consumer, "My value")  # My value


A set of Python regularly used classes/functions







No packages published


  • Python 99.8%
  • Other 0.2%