Skip to content

Commit

Permalink
[k8s] Add retries for pod and node fetching to handle transient errors (
Browse files Browse the repository at this point in the history
#4543)

* wip

* add debug logging

* lint

* Add backoff

* lint
  • Loading branch information
romilbhardwaj authored Jan 23, 2025
1 parent 763e963 commit 97b8e8f
Showing 1 changed file with 76 additions and 16 deletions.
92 changes: 76 additions & 16 deletions sky/provision/kubernetes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import re
import shutil
import subprocess
import time
import typing
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from urllib.parse import urlparse
Expand Down Expand Up @@ -105,6 +106,75 @@

logger = sky_logging.init_logger(__name__)

# Default retry settings for Kubernetes API calls
DEFAULT_MAX_RETRIES = 3
DEFAULT_RETRY_INTERVAL_SECONDS = 1


def _retry_on_error(max_retries=DEFAULT_MAX_RETRIES,
retry_interval=DEFAULT_RETRY_INTERVAL_SECONDS,
resource_type: Optional[str] = None):
"""Decorator to retry Kubernetes API calls on transient failures.
Args:
max_retries: Maximum number of retry attempts
retry_interval: Initial seconds to wait between retries
resource_type: Type of resource being accessed (e.g. 'node', 'pod').
Used to provide more specific error messages.
"""

def decorator(func):

@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
backoff = common_utils.Backoff(initial_backoff=retry_interval,
max_backoff_factor=3)

for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except (kubernetes.max_retry_error(),
kubernetes.api_exception(),
kubernetes.config_exception()) as e:
last_exception = e
# Don't retry on permanent errors like 401 (Unauthorized)
# or 403 (Forbidden)
if (isinstance(e, kubernetes.api_exception()) and
e.status in (401, 403)):
raise
if attempt < max_retries - 1:
sleep_time = backoff.current_backoff()
logger.debug(f'Kubernetes API call {func.__name__} '
f'failed with {str(e)}. Retrying in '
f'{sleep_time:.1f}s...')
time.sleep(sleep_time)
continue

# Format error message based on the type of exception
resource_msg = f' when trying to get {resource_type} info' \
if resource_type else ''
debug_cmd = f' To debug, run: kubectl get {resource_type}s' \
if resource_type else ''

if isinstance(last_exception, kubernetes.max_retry_error()):
error_msg = f'Timed out{resource_msg} from Kubernetes cluster.'
elif isinstance(last_exception, kubernetes.api_exception()):
error_msg = (f'Kubernetes API error{resource_msg}: '
f'{str(last_exception)}')
else:
error_msg = (f'Kubernetes configuration error{resource_msg}: '
f'{str(last_exception)}')

raise exceptions.ResourcesUnavailableError(
f'{error_msg}'
f' Please check if the cluster is healthy and retry.'
f'{debug_cmd}') from last_exception

return wrapper

return decorator


class GPULabelFormatter:
"""Base class to define a GPU label formatter for a Kubernetes cluster
Expand Down Expand Up @@ -446,6 +516,7 @@ def detect_accelerator_resource(


@functools.lru_cache(maxsize=10)
@_retry_on_error(resource_type='node')
def get_kubernetes_nodes(context: Optional[str] = None) -> List[Any]:
"""Gets the kubernetes nodes in the context.
Expand All @@ -454,17 +525,12 @@ def get_kubernetes_nodes(context: Optional[str] = None) -> List[Any]:
if context is None:
context = get_current_kube_config_context_name()

try:
nodes = kubernetes.core_api(context).list_node(
_request_timeout=kubernetes.API_TIMEOUT).items
except kubernetes.max_retry_error():
raise exceptions.ResourcesUnavailableError(
'Timed out when trying to get node info from Kubernetes cluster. '
'Please check if the cluster is healthy and retry. To debug, run: '
'kubectl get nodes') from None
nodes = kubernetes.core_api(context).list_node(
_request_timeout=kubernetes.API_TIMEOUT).items
return nodes


@_retry_on_error(resource_type='pod')
def get_all_pods_in_kubernetes_cluster(
context: Optional[str] = None) -> List[Any]:
"""Gets pods in all namespaces in kubernetes cluster indicated by context.
Expand All @@ -474,14 +540,8 @@ def get_all_pods_in_kubernetes_cluster(
if context is None:
context = get_current_kube_config_context_name()

try:
pods = kubernetes.core_api(context).list_pod_for_all_namespaces(
_request_timeout=kubernetes.API_TIMEOUT).items
except kubernetes.max_retry_error():
raise exceptions.ResourcesUnavailableError(
'Timed out when trying to get pod info from Kubernetes cluster. '
'Please check if the cluster is healthy and retry. To debug, run: '
'kubectl get pods') from None
pods = kubernetes.core_api(context).list_pod_for_all_namespaces(
_request_timeout=kubernetes.API_TIMEOUT).items
return pods


Expand Down

0 comments on commit 97b8e8f

Please sign in to comment.