From 2298f0adcfce8d01e52f18c207091f3943dd7dd0 Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Thu, 16 Jul 2020 11:27:05 -0700 Subject: [PATCH] [feature] adding a random jitter to the delay (#18) * [feature] adding a random jitter to the delay Also, exposing delay on the command line for run-job and watch-job --- pyfgaws/batch/api.py | 58 +++++++++++++++++++++++++++++---- pyfgaws/batch/tests/test_api.py | 2 +- pyfgaws/batch/tools.py | 24 ++++++++++++-- pyfgaws/core/__init__.py | 33 +++++++++++++++++++ pyfgaws/core/tests/__init__.py | 0 pyfgaws/core/tests/test_core.py | 27 +++++++++++++++ 6 files changed, 135 insertions(+), 9 deletions(-) create mode 100644 pyfgaws/core/tests/__init__.py create mode 100644 pyfgaws/core/tests/test_core.py diff --git a/pyfgaws/batch/api.py b/pyfgaws/batch/api.py index ff4b994..91d0504 100644 --- a/pyfgaws/batch/api.py +++ b/pyfgaws/batch/api.py @@ -14,9 +14,9 @@ from typing import Union import botocore +import mypy_boto3_batch as batch import namegenerator from botocore.waiter import Waiter as BotoWaiter -import mypy_boto3_batch as batch from mypy_boto3_batch.type_defs import ArrayPropertiesTypeDef # noqa from mypy_boto3_batch.type_defs import ContainerDetailTypeDef # noqa from mypy_boto3_batch.type_defs import ContainerOverridesTypeDef # noqa @@ -31,6 +31,10 @@ from mypy_boto3_batch.type_defs import RetryStrategyTypeDef # noqa from mypy_boto3_batch.type_defs import SubmitJobResponseTypeDef # noqa +from pyfgaws.core import MINIMUM_DELAY +from pyfgaws.core import DEFAULT_JITTER_WIDTH +from pyfgaws.core import add_jitter + # The possible values of Status, for type checking StatusValue = Union[ Literal["SUBMITTED"], @@ -166,6 +170,7 @@ def __init__( ) if logger is not None: logger.info(f"Retrieved latest job definition '{job_definition}'") + self._logger: Optional[logging.Logger] = logger # Main arguments self.name: str = namegenerator.gen() if name is None else name @@ -340,18 +345,28 @@ def wait_on( self, status_to_state: Dict[Status, bool], max_attempts: Optional[int] = None, - delay: Optional[int] = None, + delay: Optional[Union[int, float]] = None, + minimum_delay: Optional[Union[int, float]] = None, + delay_width: Optional[Union[int, float]] = None, after_success: bool = False, ) -> batch.type_defs.JobDetailTypeDef: """Waits for the given states with associated success or failure. If some states are missing from the input mapping, then all statuses after the last - successful input status are treated as success or failure based on `after_success` + successful input status are treated as success or failure based on `after_success`. + + This method first enforces a minimum delay of 1 second, then adds a small random jitter + (+/-2 seconds) to the delay to help avoid AWS batch API limits for monitoring batch jobs + in the cases of many requests across concurrent jobs. Args: status_to_state: mapping of status to success (true) or failure (false) state max_attempts: the maximum # of attempts until reaching the given state. delay: the delay before waiting + minimum_delay: override the a minimum delay + delay_width: the delay_width in the jitter to apply to the delay + after_success: true treat statuses "after" the last successful input status as + success, false to treat them as failure """ assert len(status_to_state) > 0, "No statuses given" assert any(value for value in status_to_state.values()), "No statuses with success set." @@ -373,8 +388,15 @@ def wait_on( name = "Waiter for statues: [" + ",".join(s.status for s in _status_to_state) + "]" config: Dict[str, Any] = {"version": 2} + + minimum_delay = MINIMUM_DELAY if minimum_delay is None else minimum_delay + delay_width = DEFAULT_JITTER_WIDTH if delay_width is None else delay_width + delay = add_jitter(delay=delay, width=delay_width, minima=minimum_delay) + if self._logger is not None: + self._logger.debug("Changing delay from {delay} to {actual_delay}") + waiter_body: Dict[str, Any] = { - "delay": 1 if delay is None else delay, + "delay": delay, "operation": "DescribeJobs", "maxAttempts": sys.maxsize if max_attempts is None else max_attempts, "acceptors": [ @@ -394,33 +416,57 @@ def wait_on( return self.describe_job() def wait_on_running( - self, max_attempts: Optional[int] = None, delay: Optional[int] = None + self, + max_attempts: Optional[int] = None, + delay: Optional[Union[int, float]] = None, + minimum_delay: Optional[Union[int, float]] = None, + delay_width: Optional[Union[int, float]] = None, ) -> batch.type_defs.JobDetailTypeDef: """Waits for the given states with associated success or failure. + This method first enforces a minimum delay of 1 second, then adds a small random jitter + (+/-2 seconds) to the delay to help avoid AWS batch API limits for monitoring batch jobs + in the cases of many requests across concurrent jobs. + Args: max_attempts: the maximum # of attempts until reaching the given state. delay: the delay before waiting + minimum_delay: override the a minimum delay + delay_width: the delay_width in the jitter to apply to the delay """ return self.wait_on( status_to_state={Status.Running: True}, max_attempts=max_attempts, delay=delay, + minimum_delay=minimum_delay, + delay_width=delay_width, after_success=True, ) def wait_on_complete( - self, max_attempts: Optional[int] = None, delay: Optional[int] = None + self, + max_attempts: Optional[int] = None, + delay: Optional[Union[int, float]] = None, + minimum_delay: Optional[Union[int, float]] = None, + delay_width: Optional[Union[int, float]] = None, ) -> batch.type_defs.JobDetailTypeDef: """Waits for the given states with associated success or failure. + This method first enforces a minimum delay of 1 second, then adds a small random jitter + (+/-2 seconds) to the delay to help avoid AWS batch API limits for monitoring batch jobs + in the cases of many requests across concurrent jobs. + Args: max_attempts: the maximum # of attempts until reaching the given state. delay: the delay before waiting + minimum_delay: override the a minimum delay + delay_width: the delay_width in the jitter to apply to the delay """ return self.wait_on( status_to_state={Status.Succeeded: True, Status.Failed: True}, max_attempts=max_attempts, delay=delay, + minimum_delay=minimum_delay, + delay_width=delay_width, after_success=False, ) diff --git a/pyfgaws/batch/tests/test_api.py b/pyfgaws/batch/tests/test_api.py index e886587..6d390cf 100644 --- a/pyfgaws/batch/tests/test_api.py +++ b/pyfgaws/batch/tests/test_api.py @@ -157,6 +157,6 @@ def test_wait_for_job(statuses: List[Status]) -> None: job: BatchJob = BatchJob.from_id(client=client, job_id="job-id") assert job.job_id is not None, str(job) - response = job.wait_on_complete(delay=0) + response = job.wait_on_complete(delay=0, minimum_delay=0, delay_width=0.0001) assert response == service_responses[-1]["jobs"][0], str(response) diff --git a/pyfgaws/batch/tools.py b/pyfgaws/batch/tools.py index f2ae67d..dd68d4c 100644 --- a/pyfgaws/batch/tools.py +++ b/pyfgaws/batch/tools.py @@ -41,13 +41,25 @@ def _log_it(region_name: str, job: BatchJob, logger: logging.Logger) -> None: logs_thread.start() -def watch_job(*, job_id: str, region_name: Optional[str] = None, print_logs: bool = True,) -> None: +def watch_job( + *, + job_id: str, + region_name: Optional[str] = None, + print_logs: bool = True, + delay: Optional[int] = None, +) -> None: """Watches an AWS batch job. + This tool a small random jitter (+/-2 seconds) to the delay to help avoid AWS batch API + limits for monitoring batch jobs in the cases of many requests across concurrent jobs. A + minimum delay of 1 second is subsequently applied. + Args: job_id: the AWS batch job identifier region_name: the AWS region print_logs: true to print CloudWatch logs, false otherwise + delay: the number of seconds to wait after polling for status. Only used when + `--watch-until` is `true`. """ logger = logging.getLogger(__name__) @@ -62,7 +74,7 @@ def watch_job(*, job_id: str, region_name: Optional[str] = None, print_logs: boo if print_logs: _log_it(region_name=region_name, job=job, logger=logger) - job.wait_on_complete() + job.wait_on_complete(delay=delay) end_status = job.get_status() logger.info( f"Job completed with name '{job.name}', id '{job.job_id}', and status '{end_status}'" @@ -83,9 +95,14 @@ def run_job( environment: Optional[KeyValuePairTypeDef] = None, watch_until: List[Status] = [], after_success: bool = False, + delay: Optional[int] = None, ) -> None: """Submits a batch job and optionally waits for it to reach one of the given states. + This tool a small random jitter (+/-2 seconds) to the delay to help avoid AWS batch API + limits for monitoring batch jobs in the cases of many requests across concurrent jobs. A + minimum delay of 1 second is subsequently applied. + Args: job_definition: the ARN for the AWS batch job definition, or the name of the job definition to get the latest revision @@ -105,6 +122,8 @@ def run_job( See the `--after-success` option to control this behavior. after_success: true to treat states after the `watch_until` states as success, otherwise failure. + delay: the number of seconds to wait after polling for status. Only used when + `--watch-until` is `true`. """ logger = logging.getLogger(__name__) @@ -141,6 +160,7 @@ def run_job( job.wait_on( status_to_state=dict((status, True) for status in watch_until), after_success=after_success, + delay=delay, ) end_status: Status = job.get_status() diff --git a/pyfgaws/core/__init__.py b/pyfgaws/core/__init__.py index e69de29..190592d 100644 --- a/pyfgaws/core/__init__.py +++ b/pyfgaws/core/__init__.py @@ -0,0 +1,33 @@ +from random import uniform +from typing import Union + +# The minimum delay in seconds between batch job API requests +MINIMUM_DELAY: int = 1 + +# The default jitter width for adding jitter to the delay +DEFAULT_JITTER_WIDTH: int = 2 + + +def add_jitter( + delay: Union[int, float] = 0, + width: Union[int, float] = DEFAULT_JITTER_WIDTH, + minima: Union[int, float] = 0, +) -> float: + """Apply a jitter to the delay, to help avoid AWS batch API limits for monitoring batch + jobs in the cases of many requests across concurrent jobs. + + Args: + delay: the number of seconds to wait upon making a subsequent request + width: the width for the random jitter, centered around delay, must be > 0 + minima: the minimum delay allowed, must be >= 0 + + Returns: + the new delay with the jitter applied (`uniform(delay - width, delay + width)`) + """ + assert width > 0, f"Width must be > 0: {width}" + assert minima >= 0, f"Minima must be >= 0: {minima}" + delay = max(0, delay) + lower = max(minima, delay - width) + upper = lower + (2 * width) + assert upper >= lower + return uniform(lower, upper) diff --git a/pyfgaws/core/tests/__init__.py b/pyfgaws/core/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyfgaws/core/tests/test_core.py b/pyfgaws/core/tests/test_core.py new file mode 100644 index 0000000..41155c7 --- /dev/null +++ b/pyfgaws/core/tests/test_core.py @@ -0,0 +1,27 @@ +"""Tests for :module:`~pyfgaws.core`""" + +from pyfgaws.core import add_jitter +import pytest + + +def test_add_jitter() -> None: + for delay in [-10, 0, 1, 5, 10, 5.5]: + for width in [1, 5, 10, 2.5]: + for minima in [0, 1, 5, 10, 0.5]: + for _ in range(15): + jittered = add_jitter(delay=delay, width=width, minima=minima) + label: str = f"add_jitter(delay={delay}, width={width}, minima={minima})" + assert jittered >= minima, label + assert jittered >= delay - width, label + assert jittered <= max(minima, delay - width) + (2 * width), label + + +def test_add_jitter_bad_args() -> None: + with pytest.raises(AssertionError): + add_jitter(width=0) + with pytest.raises(AssertionError): + add_jitter(width=-10) + with pytest.raises(AssertionError): + add_jitter(minima=-0.001) + with pytest.raises(AssertionError): + add_jitter(minima=-10)