Skip to content

Commit

Permalink
[feature] adding a random jitter to the delay (#18)
Browse files Browse the repository at this point in the history
* [feature] adding a random jitter to the delay

Also, exposing delay on the command line for run-job and watch-job
  • Loading branch information
nh13 authored Jul 16, 2020
1 parent 7f58892 commit 2298f0a
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 9 deletions.
58 changes: 52 additions & 6 deletions pyfgaws/batch/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
from typing import Union

import botocore
import mypy_boto3_batch as batch
import namegenerator
from botocore.waiter import Waiter as BotoWaiter
import mypy_boto3_batch as batch
from mypy_boto3_batch.type_defs import ArrayPropertiesTypeDef # noqa
from mypy_boto3_batch.type_defs import ContainerDetailTypeDef # noqa
from mypy_boto3_batch.type_defs import ContainerOverridesTypeDef # noqa
Expand All @@ -31,6 +31,10 @@
from mypy_boto3_batch.type_defs import RetryStrategyTypeDef # noqa
from mypy_boto3_batch.type_defs import SubmitJobResponseTypeDef # noqa

from pyfgaws.core import MINIMUM_DELAY
from pyfgaws.core import DEFAULT_JITTER_WIDTH
from pyfgaws.core import add_jitter

# The possible values of Status, for type checking
StatusValue = Union[
Literal["SUBMITTED"],
Expand Down Expand Up @@ -166,6 +170,7 @@ def __init__(
)
if logger is not None:
logger.info(f"Retrieved latest job definition '{job_definition}'")
self._logger: Optional[logging.Logger] = logger

# Main arguments
self.name: str = namegenerator.gen() if name is None else name
Expand Down Expand Up @@ -340,18 +345,28 @@ def wait_on(
self,
status_to_state: Dict[Status, bool],
max_attempts: Optional[int] = None,
delay: Optional[int] = None,
delay: Optional[Union[int, float]] = None,
minimum_delay: Optional[Union[int, float]] = None,
delay_width: Optional[Union[int, float]] = None,
after_success: bool = False,
) -> batch.type_defs.JobDetailTypeDef:
"""Waits for the given states with associated success or failure.
If some states are missing from the input mapping, then all statuses after the last
successful input status are treated as success or failure based on `after_success`
successful input status are treated as success or failure based on `after_success`.
This method first enforces a minimum delay of 1 second, then adds a small random jitter
(+/-2 seconds) to the delay to help avoid AWS batch API limits for monitoring batch jobs
in the cases of many requests across concurrent jobs.
Args:
status_to_state: mapping of status to success (true) or failure (false) state
max_attempts: the maximum # of attempts until reaching the given state.
delay: the delay before waiting
minimum_delay: override the a minimum delay
delay_width: the delay_width in the jitter to apply to the delay
after_success: true treat statuses "after" the last successful input status as
success, false to treat them as failure
"""
assert len(status_to_state) > 0, "No statuses given"
assert any(value for value in status_to_state.values()), "No statuses with success set."
Expand All @@ -373,8 +388,15 @@ def wait_on(

name = "Waiter for statues: [" + ",".join(s.status for s in _status_to_state) + "]"
config: Dict[str, Any] = {"version": 2}

minimum_delay = MINIMUM_DELAY if minimum_delay is None else minimum_delay
delay_width = DEFAULT_JITTER_WIDTH if delay_width is None else delay_width
delay = add_jitter(delay=delay, width=delay_width, minima=minimum_delay)
if self._logger is not None:
self._logger.debug("Changing delay from {delay} to {actual_delay}")

waiter_body: Dict[str, Any] = {
"delay": 1 if delay is None else delay,
"delay": delay,
"operation": "DescribeJobs",
"maxAttempts": sys.maxsize if max_attempts is None else max_attempts,
"acceptors": [
Expand All @@ -394,33 +416,57 @@ def wait_on(
return self.describe_job()

def wait_on_running(
self, max_attempts: Optional[int] = None, delay: Optional[int] = None
self,
max_attempts: Optional[int] = None,
delay: Optional[Union[int, float]] = None,
minimum_delay: Optional[Union[int, float]] = None,
delay_width: Optional[Union[int, float]] = None,
) -> batch.type_defs.JobDetailTypeDef:
"""Waits for the given states with associated success or failure.
This method first enforces a minimum delay of 1 second, then adds a small random jitter
(+/-2 seconds) to the delay to help avoid AWS batch API limits for monitoring batch jobs
in the cases of many requests across concurrent jobs.
Args:
max_attempts: the maximum # of attempts until reaching the given state.
delay: the delay before waiting
minimum_delay: override the a minimum delay
delay_width: the delay_width in the jitter to apply to the delay
"""
return self.wait_on(
status_to_state={Status.Running: True},
max_attempts=max_attempts,
delay=delay,
minimum_delay=minimum_delay,
delay_width=delay_width,
after_success=True,
)

def wait_on_complete(
self, max_attempts: Optional[int] = None, delay: Optional[int] = None
self,
max_attempts: Optional[int] = None,
delay: Optional[Union[int, float]] = None,
minimum_delay: Optional[Union[int, float]] = None,
delay_width: Optional[Union[int, float]] = None,
) -> batch.type_defs.JobDetailTypeDef:
"""Waits for the given states with associated success or failure.
This method first enforces a minimum delay of 1 second, then adds a small random jitter
(+/-2 seconds) to the delay to help avoid AWS batch API limits for monitoring batch jobs
in the cases of many requests across concurrent jobs.
Args:
max_attempts: the maximum # of attempts until reaching the given state.
delay: the delay before waiting
minimum_delay: override the a minimum delay
delay_width: the delay_width in the jitter to apply to the delay
"""
return self.wait_on(
status_to_state={Status.Succeeded: True, Status.Failed: True},
max_attempts=max_attempts,
delay=delay,
minimum_delay=minimum_delay,
delay_width=delay_width,
after_success=False,
)
2 changes: 1 addition & 1 deletion pyfgaws/batch/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,6 @@ def test_wait_for_job(statuses: List[Status]) -> None:

job: BatchJob = BatchJob.from_id(client=client, job_id="job-id")
assert job.job_id is not None, str(job)
response = job.wait_on_complete(delay=0)
response = job.wait_on_complete(delay=0, minimum_delay=0, delay_width=0.0001)

assert response == service_responses[-1]["jobs"][0], str(response)
24 changes: 22 additions & 2 deletions pyfgaws/batch/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,25 @@ def _log_it(region_name: str, job: BatchJob, logger: logging.Logger) -> None:
logs_thread.start()


def watch_job(*, job_id: str, region_name: Optional[str] = None, print_logs: bool = True,) -> None:
def watch_job(
*,
job_id: str,
region_name: Optional[str] = None,
print_logs: bool = True,
delay: Optional[int] = None,
) -> None:
"""Watches an AWS batch job.
This tool a small random jitter (+/-2 seconds) to the delay to help avoid AWS batch API
limits for monitoring batch jobs in the cases of many requests across concurrent jobs. A
minimum delay of 1 second is subsequently applied.
Args:
job_id: the AWS batch job identifier
region_name: the AWS region
print_logs: true to print CloudWatch logs, false otherwise
delay: the number of seconds to wait after polling for status. Only used when
`--watch-until` is `true`.
"""
logger = logging.getLogger(__name__)

Expand All @@ -62,7 +74,7 @@ def watch_job(*, job_id: str, region_name: Optional[str] = None, print_logs: boo
if print_logs:
_log_it(region_name=region_name, job=job, logger=logger)

job.wait_on_complete()
job.wait_on_complete(delay=delay)
end_status = job.get_status()
logger.info(
f"Job completed with name '{job.name}', id '{job.job_id}', and status '{end_status}'"
Expand All @@ -83,9 +95,14 @@ def run_job(
environment: Optional[KeyValuePairTypeDef] = None,
watch_until: List[Status] = [],
after_success: bool = False,
delay: Optional[int] = None,
) -> None:
"""Submits a batch job and optionally waits for it to reach one of the given states.
This tool a small random jitter (+/-2 seconds) to the delay to help avoid AWS batch API
limits for monitoring batch jobs in the cases of many requests across concurrent jobs. A
minimum delay of 1 second is subsequently applied.
Args:
job_definition: the ARN for the AWS batch job definition, or the name of the job definition
to get the latest revision
Expand All @@ -105,6 +122,8 @@ def run_job(
See the `--after-success` option to control this behavior.
after_success: true to treat states after the `watch_until` states as success, otherwise
failure.
delay: the number of seconds to wait after polling for status. Only used when
`--watch-until` is `true`.
"""
logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -141,6 +160,7 @@ def run_job(
job.wait_on(
status_to_state=dict((status, True) for status in watch_until),
after_success=after_success,
delay=delay,
)
end_status: Status = job.get_status()

Expand Down
33 changes: 33 additions & 0 deletions pyfgaws/core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from random import uniform
from typing import Union

# The minimum delay in seconds between batch job API requests
MINIMUM_DELAY: int = 1

# The default jitter width for adding jitter to the delay
DEFAULT_JITTER_WIDTH: int = 2


def add_jitter(
delay: Union[int, float] = 0,
width: Union[int, float] = DEFAULT_JITTER_WIDTH,
minima: Union[int, float] = 0,
) -> float:
"""Apply a jitter to the delay, to help avoid AWS batch API limits for monitoring batch
jobs in the cases of many requests across concurrent jobs.
Args:
delay: the number of seconds to wait upon making a subsequent request
width: the width for the random jitter, centered around delay, must be > 0
minima: the minimum delay allowed, must be >= 0
Returns:
the new delay with the jitter applied (`uniform(delay - width, delay + width)`)
"""
assert width > 0, f"Width must be > 0: {width}"
assert minima >= 0, f"Minima must be >= 0: {minima}"
delay = max(0, delay)
lower = max(minima, delay - width)
upper = lower + (2 * width)
assert upper >= lower
return uniform(lower, upper)
Empty file added pyfgaws/core/tests/__init__.py
Empty file.
27 changes: 27 additions & 0 deletions pyfgaws/core/tests/test_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Tests for :module:`~pyfgaws.core`"""

from pyfgaws.core import add_jitter
import pytest


def test_add_jitter() -> None:
for delay in [-10, 0, 1, 5, 10, 5.5]:
for width in [1, 5, 10, 2.5]:
for minima in [0, 1, 5, 10, 0.5]:
for _ in range(15):
jittered = add_jitter(delay=delay, width=width, minima=minima)
label: str = f"add_jitter(delay={delay}, width={width}, minima={minima})"
assert jittered >= minima, label
assert jittered >= delay - width, label
assert jittered <= max(minima, delay - width) + (2 * width), label


def test_add_jitter_bad_args() -> None:
with pytest.raises(AssertionError):
add_jitter(width=0)
with pytest.raises(AssertionError):
add_jitter(width=-10)
with pytest.raises(AssertionError):
add_jitter(minima=-0.001)
with pytest.raises(AssertionError):
add_jitter(minima=-10)

0 comments on commit 2298f0a

Please sign in to comment.