Skip to content

Commit

Permalink
Merge branch 'master' of github.com:skypilot-org/skypilot into restapi (
Browse files Browse the repository at this point in the history
#62)

* [perf] use uv for venv creation and pip install (#4414)

* Revert "remove `uv` from runtime setup due to azure installation issue (#4401)"

This reverts commit 0b20d56.

* on azure, use --prerelease=allow to install azure-cli

* use uv venv --seed

* fix backwards compatibility

* really fix backwards compatibility

* use uv to set up controller dependencies

* fix python 3.8

* lint

* add missing file

* update comment

* split out azure-cli dep

* fix lint for dependencies

* use runpy.run_path rather than modifying sys.path

* fix cloud dependency installation commands

* lint

* Update sky/utils/controller_utils.py

Co-authored-by: Zhanghao Wu <[email protected]>

---------

Co-authored-by: Zhanghao Wu <[email protected]>

* [Minor] README updates. (#4436)

* [Minor] README touches.

* update

* update

* make --fast robust against credential or wheel updates (#4289)

* add config_dict['config_hash'] output to write_cluster_config

* fix docstring for write_cluster_config

This used to be true, but since #2943, 'ray' is the only provisioner.
Add other keys that are now present instead.

* when using --fast, check if config_hash matches, and if not, provision

* mock hashing method in unit test

This is needed since some files in the fake file mounts don't actually exist,
like the wheel path.

* check config hash within provision with lock held

* address other PR review comments

* rename to skip_if_no_cluster_updates

Co-authored-by: Zhanghao Wu <[email protected]>

* add assert details

Co-authored-by: Zhanghao Wu <[email protected]>

* address PR comments and update docstrings

* fix test

* update docstrings

Co-authored-by: Zhanghao Wu <[email protected]>

* address PR comments

* fix lint and tests

* Update sky/backends/cloud_vm_ray_backend.py

Co-authored-by: Zhanghao Wu <[email protected]>

* refactor skip_if_no_cluster_update var

* clarify comment

* format exception

---------

Co-authored-by: Zhanghao Wu <[email protected]>

* [k8s] Add resource limits only if they exist (#4440)

Add limits only if they exist

* [robustness] cover some potential resource leakage cases (#4443)

* if a newly-created cluster is missing from the cloud, wait before deleting

Addresses #4431.

* confirm cluster actually terminates before deleting from the db

* avoid deleting cluster data outside the primary provision loop

* tweaks

* Apply suggestions from code review

Co-authored-by: Zhanghao Wu <[email protected]>

* use usage_intervals for new cluster detection

get_cluster_duration will include the total duration of the cluster since its
initial launch, while launched_at may be reset by sky launch on an existing
cluster. So this is a more accurate method to check.

* fix terminating/stopping state for Lambda and Paperspace

* Revert "use usage_intervals for new cluster detection"

This reverts commit aa6d2e9.

* check cloud.STATUS_VERSION before calling query_instances

* avoid try/catch when querying instances

* update comments

---------

Co-authored-by: Zhanghao Wu <[email protected]>

* smoke tests support storage mount only (#4446)

* smoke tests support storage mount only

* fix verify command

* rename to only_mount

* [Feature] support spot pod on RunPod (#4447)

* wip

* wip

* wip

* wip

* wip

* wip

* resolve comments

* wip

* wip

* wip

* wip

* wip

* wip

---------

Co-authored-by: hwei <[email protected]>

* use lazy import for runpod (#4451)

Fixes runpod import issues introduced in #4447.

* [k8s] Fix show-gpus when running with incluster auth (#4452)

* Add limits only if they exist

* Fix incluster auth handling

* Merge branch 'master' of github.com:skypilot-org/skypilot into restapi

* Add comment

* Not mutate azure dep list at runtime (#4457)

* format

---------

Co-authored-by: Christopher Cooper <[email protected]>
Co-authored-by: Zongheng Yang <[email protected]>
Co-authored-by: Romil Bhardwaj <[email protected]>
Co-authored-by: zpoint <[email protected]>
Co-authored-by: Hong <[email protected]>
Co-authored-by: hwei <[email protected]>
Co-authored-by: Yika <[email protected]>
  • Loading branch information
8 people authored Dec 12, 2024
1 parent 250fe42 commit 839db19
Show file tree
Hide file tree
Showing 18 changed files with 511 additions and 125 deletions.
49 changes: 40 additions & 9 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@
_ENDPOINTS_RETRY_MESSAGE = ('If the cluster was recently started, '
'please retry after a while.')

# If a cluster is less than LAUNCH_DOUBLE_CHECK_WINDOW seconds old, and we don't
# see any instances in the cloud, the instances might be in the proccess of
# being created. We will wait LAUNCH_DOUBLE_CHECK_DELAY seconds and then double
# check to make sure there are still no instances. LAUNCH_DOUBLE_CHECK_DELAY
# should be set longer than the delay between (sending the create instance
# request) and (the instances appearing on the cloud).
# See https://github.com/skypilot-org/skypilot/issues/4431.
_LAUNCH_DOUBLE_CHECK_WINDOW = 60
_LAUNCH_DOUBLE_CHECK_DELAY = 1

# Include the fields that will be used for generating tags that distinguishes
# the cluster in ray, to avoid the stopped cluster being discarded due to
# updates in the yaml template.
Expand Down Expand Up @@ -1761,13 +1771,12 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
logger.debug(
f'Refreshing status ({cluster_name!r}) failed to get IPs.')
except RuntimeError as e:
logger.debug(str(e))
logger.debug(common_utils.format_exception(e))
except Exception as e: # pylint: disable=broad-except
# This can be raised by `external_ssh_ports()`, due to the
# underlying call to kubernetes API.
logger.debug(
f'Refreshing status ({cluster_name!r}) failed: '
f'{common_utils.format_exception(e, use_bracket=True)}')
logger.debug(f'Refreshing status ({cluster_name!r}) failed: ',
exc_info=e)
return False

# Determining if the cluster is healthy (UP):
Expand All @@ -1794,6 +1803,24 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
return global_user_state.get_cluster_from_name(cluster_name)

# All cases below are transitioning the cluster to non-UP states.

if (not node_statuses and handle.launched_resources.cloud.STATUS_VERSION >=
clouds.StatusVersion.SKYPILOT):
# Note: launched_at is set during sky launch, even on an existing
# cluster. This will catch the case where the cluster was terminated on
# the cloud and restarted by sky launch.
time_since_launch = time.time() - record['launched_at']
if (record['status'] == status_lib.ClusterStatus.INIT and
time_since_launch < _LAUNCH_DOUBLE_CHECK_WINDOW):
# It's possible the instances for this cluster were just created,
# and haven't appeared yet in the cloud API/console. Wait for a bit
# and check again. This is a best-effort leak prevention check.
# See https://github.com/skypilot-org/skypilot/issues/4431.
time.sleep(_LAUNCH_DOUBLE_CHECK_DELAY)
node_statuses = _query_cluster_status_via_cloud_api(handle)
# Note: even if all the node_statuses are UP now, we will still
# consider this cluster abnormal, and its status will be INIT.

if len(node_statuses) > handle.launched_nodes:
# Unexpected: in the queried region more than 1 cluster with the same
# constructed name tag returned. This will typically not happen unless
Expand Down Expand Up @@ -1822,13 +1849,15 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
f'{colorama.Style.RESET_ALL}')
assert len(node_statuses) <= handle.launched_nodes

# If the node_statuses is empty, all the nodes are terminated. We can
# safely set the cluster status to TERMINATED. This handles the edge case
# where the cluster is terminated by the user manually through the UI.
# If the node_statuses is empty, it should mean that all the nodes are
# terminated and we can set the cluster status to TERMINATED. This handles
# the edge case where the cluster is terminated by the user manually through
# the UI.
to_terminate = not node_statuses

# A cluster is considered "abnormal", if not all nodes are TERMINATED or
# not all nodes are STOPPED. We check that with the following logic:
# A cluster is considered "abnormal", if some (but not all) nodes are
# TERMINATED, or not all nodes are STOPPED. We check that with the following
# logic:
# * Not all nodes are terminated and there's at least one node
# terminated; or
# * Any of the non-TERMINATED nodes is in a non-STOPPED status.
Expand All @@ -1840,6 +1869,8 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
# cluster is probably down.
# * The cluster is partially terminated or stopped should be considered
# abnormal.
# * The cluster is partially or completely in the INIT state, which means
# that provisioning was interrupted. This is considered abnormal.
#
# An abnormal cluster will transition to INIT and have any autostop setting
# reset (unless it's autostopping/autodowning).
Expand Down
69 changes: 64 additions & 5 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@
# The maximum retry count for fetching IP address.
_FETCH_IP_MAX_ATTEMPTS = 3

# How many times to query the cloud provider to make sure instances are
# stopping/terminating, and how long to wait between each query.
_TEARDOWN_WAIT_MAX_ATTEMPTS = 10
_TEARDOWN_WAIT_BETWEEN_ATTEMPS_SECONDS = 1

_TEARDOWN_FAILURE_MESSAGE = (
f'\n{colorama.Fore.RED}Failed to terminate '
'{cluster_name}. {extra_reason}'
Expand Down Expand Up @@ -2411,15 +2416,17 @@ def get_command_runners(self,
zip(ip_list, port_list), **ssh_credentials)
return runners
if self.cached_cluster_info is None:
# We have `or self.cached_external_ips is None` here, because
# We have `and self.cached_external_ips is None` here, because
# when a cluster's cloud is just upgraded to the new provsioner,
# although it has the cached_external_ips, the cached_cluster_info
# can be None. We need to update it here, even when force_cached is
# set to True.
# TODO: We can remove `self.cached_external_ips is None` after
# version 0.8.0.
assert not force_cached or self.cached_external_ips is not None, (
force_cached, self.cached_external_ips)
if force_cached and self.cached_external_ips is None:
raise RuntimeError(
'Tried to use cached cluster info, but it\'s missing for '
f'cluster "{self.cluster_name}"')
self._update_cluster_info()
assert self.cached_cluster_info is not None, self
runners = provision_lib.get_command_runners(
Expand Down Expand Up @@ -4009,7 +4016,6 @@ def teardown_no_lock(self,
limit=1000).get_result()['items']
vpc_id = None
try:
# pylint: disable=line-too-long
vpc_id = vpcs_filtered_by_tags_and_region[0]['crn'].rsplit(
':', 1)[-1]
vpc_found = True
Expand All @@ -4018,7 +4024,6 @@ def teardown_no_lock(self,
returncode = -1

if vpc_found:
# pylint: disable=line-too-long E1136
# Delete VPC and it's associated resources
vpc_provider = IBMVPCProvider(
config_provider['resource_group_id'], region,
Expand Down Expand Up @@ -4121,6 +4126,7 @@ def post_teardown_cleanup(self,
* Removing the terminated cluster's scripts and ray yaml files.
"""
cluster_name_on_cloud = handle.cluster_name_on_cloud
cloud = handle.launched_resources.cloud

if (terminate and handle.launched_resources.is_image_managed is True):
# Delete the image when terminating a "cloned" cluster, i.e.,
Expand Down Expand Up @@ -4173,6 +4179,59 @@ def post_teardown_cleanup(self,

sky.utils.cluster_utils.SSHConfigHelper.remove_cluster(
handle.cluster_name)

def _detect_abnormal_non_terminated_nodes(
handle: CloudVmRayResourceHandle) -> None:
# Confirm that instances have actually transitioned state before
# updating the state database. We do this immediately before
# removing the state from the database, so that we can guarantee
# that this is always called before the state is removed. We
# considered running this check as part of
# provisioner.teardown_cluster or provision.terminate_instances, but
# it would open the door to code paths that successfully call this
# function but do not first call teardown_cluster or
# terminate_instances. See
# https://github.com/skypilot-org/skypilot/pull/4443#discussion_r1872798032
attempts = 0
while True:
config = common_utils.read_yaml(handle.cluster_yaml)

logger.debug(f'instance statuses attempt {attempts + 1}')
node_status_dict = provision_lib.query_instances(
repr(cloud),
cluster_name_on_cloud,
config['provider'],
non_terminated_only=False)

unexpected_node_state: Optional[Tuple[str, str]] = None
for node_id, node_status in node_status_dict.items():
logger.debug(f'{node_id} status: {node_status}')
# FIXME(cooperc): Some clouds (e.g. GCP) do not distinguish
# between "stopping/stopped" and "terminating/terminated",
# so we allow for either status instead of casing on
# `terminate`.
if node_status not in [
None, status_lib.ClusterStatus.STOPPED
]:
unexpected_node_state = (node_id, node_status)
break

if unexpected_node_state is None:
break

attempts += 1
if attempts < _TEARDOWN_WAIT_MAX_ATTEMPTS:
time.sleep(_TEARDOWN_WAIT_BETWEEN_ATTEMPS_SECONDS)
else:
(node_id, node_status) = unexpected_node_state
raise RuntimeError(f'Instance {node_id} in unexpected '
f'state {node_status}.')

# If cluster_yaml is None, the cluster should ensured to be terminated,
# so we don't need to do the double check.
if handle.cluster_yaml is not None:
_detect_abnormal_non_terminated_nodes(handle)

if not terminate or remove_from_db:
global_user_state.remove_cluster(handle.cluster_name,
terminate=terminate)
Expand Down
19 changes: 11 additions & 8 deletions sky/clouds/runpod.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ class RunPod(clouds.Cloud):
_REPR = 'RunPod'
_CLOUD_UNSUPPORTED_FEATURES = {
clouds.CloudImplementationFeatures.STOP: 'Stopping not supported.',
clouds.CloudImplementationFeatures.SPOT_INSTANCE:
('Spot is not supported, as runpod API does not implement spot.'),
clouds.CloudImplementationFeatures.MULTI_NODE:
('Multi-node not supported yet, as the interconnection among nodes '
'are non-trivial on RunPod.'),
Expand Down Expand Up @@ -71,11 +69,8 @@ def regions_with_offering(cls, instance_type: str,
zone: Optional[str]) -> List[clouds.Region]:
assert zone is None, 'RunPod does not support zones.'
del accelerators, zone # unused
if use_spot:
return []
else:
regions = service_catalog.get_region_zones_for_instance_type(
instance_type, use_spot, 'runpod')
regions = service_catalog.get_region_zones_for_instance_type(
instance_type, use_spot, 'runpod')

if region is not None:
regions = [r for r in regions if r.name == region]
Expand Down Expand Up @@ -177,11 +172,19 @@ def make_deploy_resources_variables(
else:
image_id = r.image_id[r.region]

instance_type = resources.instance_type
use_spot = resources.use_spot

hourly_cost = self.instance_type_to_hourly_cost(
instance_type=instance_type, use_spot=use_spot)

return {
'instance_type': resources.instance_type,
'instance_type': instance_type,
'custom_resources': custom_resources,
'region': region.name,
'image_id': image_id,
'use_spot': use_spot,
'bid_per_gpu': str(hourly_cost),
}

def _get_feasible_launchable_resources(
Expand Down
4 changes: 4 additions & 0 deletions sky/clouds/service_catalog/kubernetes_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ def _list_accelerators(
# clusters defined by allowed_contexts.
if region_filter is None:
context = kubernetes_utils.get_current_kube_config_context_name()
if context is None and kubernetes_utils.is_incluster_config_available():
# If context is None and we are running in a kubernetes pod, use the
# in-cluster context as the current context.
context = kubernetes.in_cluster_context_name()
else:
context = region_filter
if context is None:
Expand Down
4 changes: 4 additions & 0 deletions sky/global_user_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ def create_table(cursor, conn):
value_to_replace_existing_entries=common_utils.get_user_hash())
db_utils.add_column_to_table(cursor, conn, 'clusters', 'config_hash',
'TEXT DEFAULT null')

db_utils.add_column_to_table(cursor, conn, 'clusters', 'config_hash',
'TEXT DEFAULT null')

db_utils.add_column_to_table(cursor, conn, 'cluster_history', 'user_hash',
'TEXT DEFAULT null')
conn.commit()
Expand Down
2 changes: 1 addition & 1 deletion sky/provision/azure/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ def cluster_status_map(
) -> Dict['AzureInstanceStatus', Optional[status_lib.ClusterStatus]]:
return {
cls.PENDING: status_lib.ClusterStatus.INIT,
cls.STOPPING: status_lib.ClusterStatus.INIT,
cls.RUNNING: status_lib.ClusterStatus.UP,
cls.STOPPING: status_lib.ClusterStatus.STOPPED,
cls.STOPPED: status_lib.ClusterStatus.STOPPED,
cls.DELETING: None,
}
Expand Down
2 changes: 2 additions & 0 deletions sky/provision/gcp/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def _filter_instances(
# non_terminated_only=True?
# Will there be callers who would want this to be False?
# stop() and terminate() for example already implicitly assume non-terminated.
# Currently, even with non_terminated_only=False, we may not have a dict entry
# for terminated instances, if they have already been fully deleted.
@common_utils.retry
def query_instances(
cluster_name_on_cloud: str,
Expand Down
2 changes: 1 addition & 1 deletion sky/provision/lambda_cloud/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def query_instances(
'booting': status_lib.ClusterStatus.INIT,
'active': status_lib.ClusterStatus.UP,
'unhealthy': status_lib.ClusterStatus.INIT,
'terminating': status_lib.ClusterStatus.INIT,
'terminating': None,
}
statuses: Dict[str, Optional[status_lib.ClusterStatus]] = {}
for instance_id, instance in instances.items():
Expand Down
3 changes: 2 additions & 1 deletion sky/provision/paperspace/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,13 @@ def query_instances(
assert provider_config is not None, (cluster_name_on_cloud, provider_config)
instances = _filter_instances(cluster_name_on_cloud, None)

# https://docs.digitalocean.com/reference/paperspace/core/commands/machines/#show
status_map = {
'starting': status_lib.ClusterStatus.INIT,
'restarting': status_lib.ClusterStatus.INIT,
'upgrading': status_lib.ClusterStatus.INIT,
'provisioning': status_lib.ClusterStatus.INIT,
'stopping': status_lib.ClusterStatus.INIT,
'stopping': status_lib.ClusterStatus.STOPPED,
'serviceready': status_lib.ClusterStatus.INIT,
'ready': status_lib.ClusterStatus.UP,
'off': status_lib.ClusterStatus.STOPPED,
Expand Down
3 changes: 3 additions & 0 deletions sky/provision/runpod/api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""RunPod low level API support for spot pod."""

from sky.provision.runpod.api.commands import create_spot_pod
Loading

0 comments on commit 839db19

Please sign in to comment.