From 061d4bd998739e16fa704a855233e62290f6cbdb Mon Sep 17 00:00:00 2001 From: Chester Li Date: Sat, 4 Jan 2025 03:29:33 +0800 Subject: [PATCH 01/17] [k8s] Add validation for pod_config #4206 (#4466) * [k8s] Add validation for pod_config #4206 Check pod_config when run 'sky check k8s' by using k8s api * update: check pod_config when launch check merged pod_config during launch using k8s api * fix test * ignore check failed when test with dryrun if there is no kube config in env, ignore ValueError when launch with dryrun. For now, we don't support check schema offline. * use deserialize api to check pod_config schema * test * create another api_client with no kubeconfig * test * update error message * update test * test * test * Update sky/backends/backend_utils.py --------- Co-authored-by: Romil Bhardwaj --- sky/backends/backend_utils.py | 7 +++++ sky/provision/kubernetes/utils.py | 46 +++++++++++++++++++++++++++++++ tests/test_config.py | 46 +++++++++++++++++++++++++++++++ 3 files changed, 99 insertions(+) diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 1de799e7cf8..6e79469a819 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -926,6 +926,13 @@ def write_cluster_config( tmp_yaml_path, cluster_config_overrides=to_provision.cluster_config_overrides) kubernetes_utils.combine_metadata_fields(tmp_yaml_path) + yaml_obj = common_utils.read_yaml(tmp_yaml_path) + pod_config = yaml_obj['available_node_types']['ray_head_default'][ + 'node_config'] + valid, message = kubernetes_utils.check_pod_config(pod_config) + if not valid: + raise exceptions.InvalidCloudConfigs( + f'Invalid pod_config. Details: {message}') if dryrun: # If dryrun, return the unfinished tmp yaml path. diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index 487868d1d9e..14b6b42aa58 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -893,6 +893,52 @@ def check_credentials(context: Optional[str], return True, None +def check_pod_config(pod_config: dict) \ + -> Tuple[bool, Optional[str]]: + """Check if the pod_config is a valid pod config + + Using deserialize api to check the pod_config is valid or not. + + Returns: + bool: True if pod_config is valid. + str: Error message about why the pod_config is invalid, None otherwise. + """ + errors = [] + # This api_client won't be used to send any requests, so there is no need to + # load kubeconfig + api_client = kubernetes.kubernetes.client.ApiClient() + + # Used for kubernetes api_client deserialize function, the function will use + # data attr, the detail ref: + # https://github.com/kubernetes-client/python/blob/master/kubernetes/client/api_client.py#L244 + class InnerResponse(): + + def __init__(self, data: dict): + self.data = json.dumps(data) + + try: + # Validate metadata if present + if 'metadata' in pod_config: + try: + value = InnerResponse(pod_config['metadata']) + api_client.deserialize( + value, kubernetes.kubernetes.client.V1ObjectMeta) + except ValueError as e: + errors.append(f'Invalid metadata: {str(e)}') + # Validate spec if present + if 'spec' in pod_config: + try: + value = InnerResponse(pod_config['spec']) + api_client.deserialize(value, + kubernetes.kubernetes.client.V1PodSpec) + except ValueError as e: + errors.append(f'Invalid spec: {str(e)}') + return len(errors) == 0, '.'.join(errors) + except Exception as e: # pylint: disable=broad-except + errors.append(f'Validation error: {str(e)}') + return False, '.'.join(errors) + + def is_kubeconfig_exec_auth( context: Optional[str] = None) -> Tuple[bool, Optional[str]]: """Checks if the kubeconfig file uses exec-based authentication diff --git a/tests/test_config.py b/tests/test_config.py index 5789214dc61..d3eaeb261bc 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -7,6 +7,7 @@ import sky from sky import skypilot_config +import sky.exceptions from sky.skylet import constants from sky.utils import common_utils from sky.utils import kubernetes_enums @@ -99,6 +100,29 @@ def _create_task_yaml_file(task_file_path: pathlib.Path) -> None: """)) +def _create_invalid_config_yaml_file(task_file_path: pathlib.Path) -> None: + task_file_path.write_text( + textwrap.dedent("""\ + experimental: + config_overrides: + kubernetes: + pod_config: + metadata: + labels: + test-key: test-value + annotations: + abc: def + spec: + containers: + - name: + imagePullSecrets: + - name: my-secret-2 + + setup: echo 'Setting up...' + run: echo 'Running...' + """)) + + def test_nested_config(monkeypatch) -> None: """Test that the nested config works.""" config = skypilot_config.Config() @@ -335,6 +359,28 @@ def test_k8s_config_with_override(monkeypatch, tmp_path, assert cluster_pod_config['spec']['runtimeClassName'] == 'nvidia' +def test_k8s_config_with_invalid_config(monkeypatch, tmp_path, + enable_all_clouds) -> None: + config_path = tmp_path / 'config.yaml' + _create_config_file(config_path) + monkeypatch.setattr(skypilot_config, 'CONFIG_PATH', config_path) + + _reload_config() + task_path = tmp_path / 'task.yaml' + _create_invalid_config_yaml_file(task_path) + task = sky.Task.from_yaml(task_path) + + # Test Kubernetes pod_config invalid + cluster_name = 'test_k8s_config_with_invalid_config' + task.set_resources_override({'cloud': sky.Kubernetes()}) + exception_occurred = False + try: + sky.launch(task, cluster_name=cluster_name, dryrun=True) + except sky.exceptions.ResourcesUnavailableError: + exception_occurred = True + assert exception_occurred + + def test_gcp_config_with_override(monkeypatch, tmp_path, enable_all_clouds) -> None: config_path = tmp_path / 'config.yaml' From 4ab8e1668053fef8ae87ba9c832073c444078e49 Mon Sep 17 00:00:00 2001 From: Christopher Cooper Date: Fri, 3 Jan 2025 21:03:39 -0800 Subject: [PATCH 02/17] [core] fix wheel timestamp check (#4488) Previously, we were only taking the max timestamp of all the subdirectories of the given directory. So the timestamp could be incorrect if only a file changed, and no directory changed. This fixes the issue by looking at all directories and files given by os.walk(). --- sky/backends/wheel_utils.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sky/backends/wheel_utils.py b/sky/backends/wheel_utils.py index ed580569e0b..805117ee2a3 100644 --- a/sky/backends/wheel_utils.py +++ b/sky/backends/wheel_utils.py @@ -153,7 +153,10 @@ def _get_latest_modification_time(path: pathlib.Path) -> float: if not path.exists(): return -1. try: - return max(os.path.getmtime(root) for root, _, _ in os.walk(path)) + return max( + os.path.getmtime(os.path.join(root, f)) + for root, dirs, files in os.walk(path) + for f in (*dirs, *files)) except ValueError: return -1. From e4939f9fafde4985836689211d9e5f67731e792a Mon Sep 17 00:00:00 2001 From: Hysun He Date: Mon, 6 Jan 2025 09:20:33 +0800 Subject: [PATCH 03/17] [docs] Add image_id doc in task YAML for OCI (#4526) * Add image_id doc for OCI * nit * Update docs/source/reference/yaml-spec.rst Co-authored-by: Tian Xia --------- Co-authored-by: Tian Xia --- docs/source/reference/yaml-spec.rst | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/docs/source/reference/yaml-spec.rst b/docs/source/reference/yaml-spec.rst index 8a490b7e817..d2f0506993a 100644 --- a/docs/source/reference/yaml-spec.rst +++ b/docs/source/reference/yaml-spec.rst @@ -176,9 +176,9 @@ Available fields: # tpu_vm: True # True to use TPU VM (the default); False to use TPU node. # Custom image id (optional, advanced). The image id used to boot the - # instances. Only supported for AWS and GCP (for non-docker image). If not - # specified, SkyPilot will use the default debian-based image suitable for - # machine learning tasks. + # instances. Only supported for AWS, GCP, OCI and IBM (for non-docker image). + # If not specified, SkyPilot will use the default debian-based image + # suitable for machine learning tasks. # # Docker support # You can specify docker image to use by setting the image_id to @@ -204,7 +204,7 @@ Available fields: # image_id: # us-east-1: ami-0729d913a335efca7 # us-west-2: ami-050814f384259894c - image_id: ami-0868a20f5a3bf9702 + # # GCP # To find GCP images: https://cloud.google.com/compute/docs/images # image_id: projects/deeplearning-platform-release/global/images/common-cpu-v20230615-debian-11-py310 @@ -215,6 +215,24 @@ Available fields: # To find Azure images: https://docs.microsoft.com/en-us/azure/virtual-machines/linux/cli-ps-findimage # image_id: microsoft-dsvm:ubuntu-2004:2004:21.11.04 # + # OCI + # To find OCI images: https://docs.oracle.com/en-us/iaas/images + # You can choose the image with OS version from the following image tags + # provided by SkyPilot: + # image_id: skypilot:gpu-ubuntu-2204 + # image_id: skypilot:gpu-ubuntu-2004 + # image_id: skypilot:gpu-oraclelinux9 + # image_id: skypilot:gpu-oraclelinux8 + # image_id: skypilot:cpu-ubuntu-2204 + # image_id: skypilot:cpu-ubuntu-2004 + # image_id: skypilot:cpu-oraclelinux9 + # image_id: skypilot:cpu-oraclelinux8 + # + # It is also possible to specify your custom image's OCID with OS type, + # for example: + # image_id: ocid1.image.oc1.us-sanjose-1.aaaaaaaaywwfvy67wwe7f24juvjwhyjn3u7g7s3wzkhduxcbewzaeki2nt5q:oraclelinux + # image_id: ocid1.image.oc1.us-sanjose-1.aaaaaaaa5tnuiqevhoyfnaa5pqeiwjv6w5vf6w4q2hpj3atyvu3yd6rhlhyq:ubuntu + # # IBM # Create a private VPC image and paste its ID in the following format: # image_id: @@ -224,6 +242,7 @@ Available fields: # https://www.ibm.com/cloud/blog/use-ibm-packer-plugin-to-create-custom-images-on-ibm-cloud-vpc-infrastructure # To use a more limited but easier to manage tool: # https://github.com/IBM/vpc-img-inst + image_id: ami-0868a20f5a3bf9702 # Labels to apply to the instances (optional). # From 9828f6b9b3ea50a35352c2b530c6717c6eef82b4 Mon Sep 17 00:00:00 2001 From: Hong Date: Mon, 6 Jan 2025 10:26:59 +0800 Subject: [PATCH 04/17] [UX] warning before launching jobs/serve when using a reauth required credentials (#4479) * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * Update sky/backends/cloud_vm_ray_backend.py Minor fix * Update sky/clouds/aws.py Co-authored-by: Romil Bhardwaj * wip * minor changes * wip --------- Co-authored-by: hong Co-authored-by: Romil Bhardwaj --- sky/backends/backend_utils.py | 36 ++++++++++++++++++++++++++++ sky/backends/cloud_vm_ray_backend.py | 17 +++++++++++++ sky/clouds/aws.py | 24 +++++++++++++++++++ sky/clouds/cloud.py | 4 ++++ sky/clouds/gcp.py | 9 +++++++ 5 files changed, 90 insertions(+) diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 6e79469a819..bf92f442d2f 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -650,6 +650,42 @@ def _restore_block(new_block: Dict[str, Any], old_block: Dict[str, Any]): return common_utils.dump_yaml_str(new_config) +def get_expirable_clouds( + enabled_clouds: Sequence[clouds.Cloud]) -> List[clouds.Cloud]: + """Returns a list of clouds that use local credentials and whose credentials can expire. + + This function checks each cloud in the provided sequence to determine if it uses local credentials + and if its credentials can expire. If both conditions are met, the cloud is added to the list of + expirable clouds. + + Args: + enabled_clouds (Sequence[clouds.Cloud]): A sequence of cloud objects to check. + + Returns: + list[clouds.Cloud]: A list of cloud objects that use local credentials and whose credentials can expire. + """ + expirable_clouds = [] + local_credentials_value = schemas.RemoteIdentityOptions.LOCAL_CREDENTIALS.value + for cloud in enabled_clouds: + remote_identities = skypilot_config.get_nested( + (str(cloud).lower(), 'remote_identity'), None) + if remote_identities is None: + remote_identities = schemas.get_default_remote_identity( + str(cloud).lower()) + + local_credential_expiring = cloud.can_credential_expire() + if isinstance(remote_identities, str): + if remote_identities == local_credentials_value and local_credential_expiring: + expirable_clouds.append(cloud) + elif isinstance(remote_identities, list): + for profile in remote_identities: + if list(profile.values( + ))[0] == local_credentials_value and local_credential_expiring: + expirable_clouds.append(cloud) + break + return expirable_clouds + + # TODO: too many things happening here - leaky abstraction. Refactor. @timeline.event def write_cluster_config( diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 156f43181b2..c972928cd7d 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -26,6 +26,7 @@ import sky from sky import backends +from sky import check as sky_check from sky import cloud_stores from sky import clouds from sky import exceptions @@ -1996,6 +1997,22 @@ def provision_with_retries( skip_unnecessary_provisioning else None) failover_history: List[Exception] = list() + # If the user is using local credentials which may expire, the + # controller may leak resources if the credentials expire while a job + # is running. Here we check the enabled clouds and expiring credentials + # and raise a warning to the user. + if task.is_controller_task(): + enabled_clouds = sky_check.get_cached_enabled_clouds_or_refresh() + expirable_clouds = backend_utils.get_expirable_clouds( + enabled_clouds) + + if len(expirable_clouds) > 0: + warnings = (f'\033[93mWarning: Credentials used for ' + f'{expirable_clouds} may expire. Clusters may be ' + f'leaked if the credentials expire while jobs ' + f'are running. It is recommended to use credentials' + f' that never expire or a service account.\033[0m') + logger.warning(warnings) # Retrying launchable resources. while True: diff --git a/sky/clouds/aws.py b/sky/clouds/aws.py index c665263e22e..a86a87f4feb 100644 --- a/sky/clouds/aws.py +++ b/sky/clouds/aws.py @@ -103,6 +103,24 @@ class AWSIdentityType(enum.Enum): # region us-east-1 config-file ~/.aws/config SHARED_CREDENTIALS_FILE = 'shared-credentials-file' + def can_credential_expire(self) -> bool: + """Check if the AWS identity type can expire. + + SSO,IAM_ROLE and CONTAINER_ROLE are temporary credentials and refreshed + automatically. ENV and SHARED_CREDENTIALS_FILE are short-lived + credentials without refresh. + IAM ROLE: + https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html + SSO/Container-role refresh token: + https://docs.aws.amazon.com/solutions/latest/dea-api/auth-refreshtoken.html + """ + # TODO(hong): Add a CLI based check for the expiration of the temporary + # credentials + expirable_types = { + AWSIdentityType.ENV, AWSIdentityType.SHARED_CREDENTIALS_FILE + } + return self in expirable_types + @clouds.CLOUD_REGISTRY.register class AWS(clouds.Cloud): @@ -860,6 +878,12 @@ def get_credential_file_mounts(self) -> Dict[str, str]: if os.path.exists(os.path.expanduser(f'~/.aws/{filename}')) } + @functools.lru_cache(maxsize=1) + def can_credential_expire(self) -> bool: + identity_type = self._current_identity_type() + return identity_type is not None and identity_type.can_credential_expire( + ) + def instance_type_exists(self, instance_type): return service_catalog.instance_type_exists(instance_type, clouds='aws') diff --git a/sky/clouds/cloud.py b/sky/clouds/cloud.py index 455baeaf5d9..2cb45ca14fc 100644 --- a/sky/clouds/cloud.py +++ b/sky/clouds/cloud.py @@ -536,6 +536,10 @@ def get_credential_file_mounts(self) -> Dict[str, str]: """ raise NotImplementedError + def can_credential_expire(self) -> bool: + """Returns whether the cloud credential can expire.""" + return False + @classmethod def get_image_size(cls, image_id: str, region: Optional[str]) -> float: """Check the image size from the cloud. diff --git a/sky/clouds/gcp.py b/sky/clouds/gcp.py index ff200f84147..3502fee8e1c 100644 --- a/sky/clouds/gcp.py +++ b/sky/clouds/gcp.py @@ -132,6 +132,9 @@ class GCPIdentityType(enum.Enum): SHARED_CREDENTIALS_FILE = '' + def can_credential_expire(self) -> bool: + return self == GCPIdentityType.SHARED_CREDENTIALS_FILE + @clouds.CLOUD_REGISTRY.register class GCP(clouds.Cloud): @@ -863,6 +866,12 @@ def get_credential_file_mounts(self) -> Dict[str, str]: pass return credentials + @functools.lru_cache(maxsize=1) + def can_credential_expire(self) -> bool: + identity_type = self._get_identity_type() + return identity_type is not None and identity_type.can_credential_expire( + ) + @classmethod def _get_identity_type(cls) -> Optional[GCPIdentityType]: try: From 38a822ac6b553df0e784e559715ee4269c21f780 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Sun, 5 Jan 2025 22:51:04 -0800 Subject: [PATCH 05/17] [GCP] Activate service account for storage and controller (#4529) * Activate service account for storage * disable logging if not using service account * Activate for controller as well. * revert controller activate * Add comments * format * fix smoke --- sky/cloud_stores.py | 12 ++++++++++-- sky/data/data_utils.py | 12 ++++++++---- tests/smoke_tests/test_managed_job.py | 2 +- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/sky/cloud_stores.py b/sky/cloud_stores.py index 108f33f2c1f..e24c4f3ad03 100644 --- a/sky/cloud_stores.py +++ b/sky/cloud_stores.py @@ -113,8 +113,16 @@ class GcsCloudStorage(CloudStorage): @property def _gsutil_command(self): gsutil_alias, alias_gen = data_utils.get_gsutil_command() - return (f'{alias_gen}; GOOGLE_APPLICATION_CREDENTIALS=' - f'{gcp.DEFAULT_GCP_APPLICATION_CREDENTIAL_PATH} {gsutil_alias}') + return ( + f'{alias_gen}; GOOGLE_APPLICATION_CREDENTIALS=' + f'{gcp.DEFAULT_GCP_APPLICATION_CREDENTIAL_PATH}; ' + # Explicitly activate service account. Unlike the gcp packages + # and other GCP commands, gsutil does not automatically pick up + # the default credential keys when it is a service account. + 'gcloud auth activate-service-account ' + '--key-file=$GOOGLE_APPLICATION_CREDENTIALS ' + '2> /dev/null || true; ' + f'{gsutil_alias}') def is_directory(self, url: str) -> bool: """Returns whether 'url' is a directory. diff --git a/sky/data/data_utils.py b/sky/data/data_utils.py index 05c2b42c844..e8dcaa83017 100644 --- a/sky/data/data_utils.py +++ b/sky/data/data_utils.py @@ -523,10 +523,14 @@ def get_gsutil_command() -> Tuple[str, str]: def run_upload_cli(command: str, access_denied_message: str, bucket_name: str, log_path: str): - returncode, stdout, stderr = log_lib.run_with_log(command, - log_path, - shell=True, - require_outputs=True) + returncode, stdout, stderr = log_lib.run_with_log( + command, + log_path, + shell=True, + require_outputs=True, + # We need to use bash as some of the cloud commands uses bash syntax, + # such as [[ ... ]] + executable='/bin/bash') if access_denied_message in stderr: with ux_utils.print_exception_no_traceback(): raise PermissionError('Failed to upload files to ' diff --git a/tests/smoke_tests/test_managed_job.py b/tests/smoke_tests/test_managed_job.py index 22381fc45e3..5c930724523 100644 --- a/tests/smoke_tests/test_managed_job.py +++ b/tests/smoke_tests/test_managed_job.py @@ -365,7 +365,7 @@ def test_managed_jobs_pipeline_recovery_gcp(): # separated by `-`. (f'MANAGED_JOB_ID=`cat /tmp/{name}-run-id | rev | ' f'cut -d\'_\' -f1 | rev | cut -d\'-\' -f1`; {terminate_cmd}'), - smoke_tests_utils.zJOB_WAIT_NOT_RUNNING.format(job_name=name), + smoke_tests_utils.JOB_WAIT_NOT_RUNNING.format(job_name=name), f'{smoke_tests_utils.GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RECOVERING"', smoke_tests_utils. get_cmd_wait_until_managed_job_status_contains_matching_job_name( From 8952fecd776cab6326e099a2255c15dd8d385890 Mon Sep 17 00:00:00 2001 From: Hysun He Date: Mon, 6 Jan 2025 20:42:21 +0800 Subject: [PATCH 06/17] [OCI] Support reuse existing VCN for SkyServe (#4530) * Support reuse existing VCN for SkyServe * fix * remove unused import * format --- sky/clouds/utils/oci_utils.py | 9 +++++++++ sky/provision/oci/query_utils.py | 34 ++++++++++++++++---------------- sky/utils/schemas.py | 3 +++ 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/sky/clouds/utils/oci_utils.py b/sky/clouds/utils/oci_utils.py index 581d4d72d3c..46d4454d866 100644 --- a/sky/clouds/utils/oci_utils.py +++ b/sky/clouds/utils/oci_utils.py @@ -10,6 +10,8 @@ from ubuntu 20.04 to ubuntu 22.04, including: - GPU: skypilot:gpu-ubuntu-2004 -> skypilot:gpu-ubuntu-2204 - CPU: skypilot:cpu-ubuntu-2004 -> skypilot:cpu-ubuntu-2204 + - Hysun He (hysun.he@oracle.com) @ Jan.01, 2025: Support reuse existing + VCN for SkyServe. """ import os @@ -109,8 +111,15 @@ def get_compartment(cls, region): ('oci', region, 'compartment_ocid'), default_compartment_ocid) return compartment + @classmethod + def get_vcn_ocid(cls, region): + # Will reuse the regional VCN if specified. + vcn = skypilot_config.get_nested(('oci', region, 'vcn_ocid'), None) + return vcn + @classmethod def get_vcn_subnet(cls, region): + # Will reuse the subnet if specified. vcn = skypilot_config.get_nested(('oci', region, 'vcn_subnet'), None) return vcn diff --git a/sky/provision/oci/query_utils.py b/sky/provision/oci/query_utils.py index 3037fcc2703..3f545aca4ba 100644 --- a/sky/provision/oci/query_utils.py +++ b/sky/provision/oci/query_utils.py @@ -7,6 +7,8 @@ find_compartment: allow search subtree when find a compartment. - Hysun He (hysun.he@oracle.com) @ Nov.12, 2024: Add methods to Add/remove security rules: create_nsg_rules & remove_nsg + - Hysun He (hysun.he@oracle.com) @ Jan.01, 2025: Support reuse existing + VCN for SkyServe. """ from datetime import datetime import functools @@ -17,7 +19,6 @@ import typing from typing import List, Optional, Tuple -from sky import exceptions from sky import sky_logging from sky.adaptors import common as adaptors_common from sky.adaptors import oci as oci_adaptor @@ -496,26 +497,25 @@ def find_nsg(cls, region: str, nsg_name: str, compartment = cls.find_compartment(region) - list_vcns_resp = net_client.list_vcns( - compartment_id=compartment, - display_name=oci_utils.oci_config.VCN_NAME, - lifecycle_state='AVAILABLE', - ) - - if not list_vcns_resp: - raise exceptions.ResourcesUnavailableError( - 'The VCN is not available') + vcn_id = oci_utils.oci_config.get_vcn_ocid(region) + if vcn_id is None: + list_vcns_resp = net_client.list_vcns( + compartment_id=compartment, + display_name=oci_utils.oci_config.VCN_NAME, + lifecycle_state='AVAILABLE', + ) - # Get the primary vnic. The vnic might be an empty list for the - # corner case when the cluster was exited during provision. - if not list_vcns_resp.data: - return None + # Get the primary vnic. The vnic might be an empty list for the + # corner case when the cluster was exited during provision. + if not list_vcns_resp.data: + return None - vcn = list_vcns_resp.data[0] + vcn = list_vcns_resp.data[0] + vcn_id = vcn.id list_nsg_resp = net_client.list_network_security_groups( compartment_id=compartment, - vcn_id=vcn.id, + vcn_id=vcn_id, limit=1, display_name=nsg_name, ) @@ -532,7 +532,7 @@ def find_nsg(cls, region: str, nsg_name: str, create_network_security_group_details=oci_adaptor.oci.core.models. CreateNetworkSecurityGroupDetails( compartment_id=compartment, - vcn_id=vcn.id, + vcn_id=vcn_id, display_name=nsg_name, )) get_nsg_resp = net_client.get_network_security_group( diff --git a/sky/utils/schemas.py b/sky/utils/schemas.py index a424ae074b9..3194dc79da5 100644 --- a/sky/utils/schemas.py +++ b/sky/utils/schemas.py @@ -886,6 +886,9 @@ def get_config_schema(): 'image_tag_gpu': { 'type': 'string', }, + 'vcn_ocid': { + 'type': 'string', + }, 'vcn_subnet': { 'type': 'string', }, From 0e149822cc91ec57202ab071ab33d0f5d8c0a3ba Mon Sep 17 00:00:00 2001 From: Hysun He Date: Mon, 6 Jan 2025 20:43:22 +0800 Subject: [PATCH 07/17] [docs] OCI: advanced configuration & add vcn_ocid (#4531) * Add vcn_ocid configuration * Update config.rst --- docs/source/reference/config.rst | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/docs/source/reference/config.rst b/docs/source/reference/config.rst index 99bd347942a..a76dc473206 100644 --- a/docs/source/reference/config.rst +++ b/docs/source/reference/config.rst @@ -628,20 +628,30 @@ Available fields and semantics: # Advanced OCI configurations (optional). oci: # A dict mapping region names to region-specific configurations, or - # `default` for the default configuration. + # `default` for the default/global configuration. default: - # The OCID of the profile to use for launching instances (optional). - oci_config_profile: DEFAULT - # The OCID of the compartment to use for launching instances (optional). + # The profile name in ~/.oci/config to use for launching instances. If not + # set, the one named DEFAULT will be used (optional). + oci_config_profile: SKY_PROVISION_PROFILE + # The OCID of the compartment to use for launching instances. If not set, + # the root compartment will be used (optional). compartment_ocid: ocid1.compartment.oc1..aaaaaaaahr7aicqtodxmcfor6pbqn3hvsngpftozyxzqw36gj4kh3w3kkj4q - # The image tag to use for launching general instances (optional). - image_tag_general: skypilot:cpu-ubuntu-2004 - # The image tag to use for launching GPU instances (optional). - image_tag_gpu: skypilot:gpu-ubuntu-2004 - + # The default image tag to use for launching general instances (CPU) if the + # image_id parameter is not specified. If not set, the default is + # skypilot:cpu-ubuntu-2204 (optional). + image_tag_general: skypilot:cpu-oraclelinux8 + # The default image tag to use for launching GPU instances if the image_id + # parameter is not specified. If not set, the default is + # skypilot:gpu-ubuntu-2204 (optional). + image_tag_gpu: skypilot:gpu-oraclelinux8 + + # Region-specific configurations ap-seoul-1: + # The OCID of the VCN to use for instances (optional). + vcn_ocid: ocid1.vcn.oc1.ap-seoul-1.amaaaaaaak7gbriarkfs2ssus5mh347ktmi3xa72tadajep6asio3ubqgarq # The OCID of the subnet to use for instances (optional). vcn_subnet: ocid1.subnet.oc1.ap-seoul-1.aaaaaaaa5c6wndifsij6yfyfehmi3tazn6mvhhiewqmajzcrlryurnl7nuja us-ashburn-1: + vcn_ocid: ocid1.vcn.oc1.ap-seoul-1.amaaaaaaak7gbriarkfs2ssus5mh347ktmi3xa72tadajep6asio3ubqgarq vcn_subnet: ocid1.subnet.oc1.iad.aaaaaaaafbj7i3aqc4ofjaapa5edakde6g4ea2yaslcsay32cthp7qo55pxa From 59cb4e9625e98b06fd293d0dd5cea5deb89ea358 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Mon, 6 Jan 2025 14:10:55 -0800 Subject: [PATCH 08/17] [k8s] Fix `--purge` not cleaning up cluster in stale k8s context (#4514) * Fix purge not cleaning up stale k8s context cluster * update comment * Apply purge after printing warnings. * lint * Fix comments * clean up condition --- sky/backends/cloud_vm_ray_backend.py | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index c972928cd7d..2316888b44c 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -4216,11 +4216,20 @@ def post_teardown_cleanup(self, attempts = 0 while True: logger.debug(f'instance statuses attempt {attempts + 1}') - node_status_dict = provision_lib.query_instances( - repr(cloud), - cluster_name_on_cloud, - config['provider'], - non_terminated_only=False) + try: + node_status_dict = provision_lib.query_instances( + repr(cloud), + cluster_name_on_cloud, + config['provider'], + non_terminated_only=False) + except Exception as e: # pylint: disable=broad-except + if purge: + logger.warning( + f'Failed to query instances. Skipping since purge is ' + f'set. Details: ' + f'{common_utils.format_exception(e, use_bracket=True)}') + break + raise unexpected_node_state: Optional[Tuple[str, str]] = None for node_id, node_status in node_status_dict.items(): @@ -4239,8 +4248,13 @@ def post_teardown_cleanup(self, time.sleep(_TEARDOWN_WAIT_BETWEEN_ATTEMPS_SECONDS) else: (node_id, node_status) = unexpected_node_state - raise RuntimeError(f'Instance {node_id} in unexpected state ' - f'{node_status}.') + if purge: + logger.warning(f'Instance {node_id} in unexpected ' + f'state {node_status}. Skipping since purge ' + 'is set.') + break + raise RuntimeError(f'Instance {node_id} in unexpected ' + f'state {node_status}.') global_user_state.remove_cluster(handle.cluster_name, terminate=terminate) From 6cf98a3cfbed39984b106f95ec7209058c73570d Mon Sep 17 00:00:00 2001 From: Tian Xia Date: Tue, 7 Jan 2025 15:11:27 +0800 Subject: [PATCH 09/17] [Catalog] TPU V6e pricing fetcher (#4540) * [Catalog] TPU V6e pricing fetcher * Update sky/clouds/service_catalog/data_fetchers/fetch_gcp.py Co-authored-by: Zhanghao Wu * comment --------- Co-authored-by: Zhanghao Wu --- .../data_fetchers/fetch_gcp.py | 61 +++---------------- 1 file changed, 10 insertions(+), 51 deletions(-) diff --git a/sky/clouds/service_catalog/data_fetchers/fetch_gcp.py b/sky/clouds/service_catalog/data_fetchers/fetch_gcp.py index 570bc773d2e..b3a71e9514a 100644 --- a/sky/clouds/service_catalog/data_fetchers/fetch_gcp.py +++ b/sky/clouds/service_catalog/data_fetchers/fetch_gcp.py @@ -47,10 +47,6 @@ TPU_V4_ZONES = ['us-central2-b'] # TPU v3 pods are available in us-east1-d, but hidden in the skus. # We assume the TPU prices are the same as us-central1. -# TPU v6e's pricing info is not available on the SKUs. However, in -# https://cloud.google.com/tpu/pricing, it listed the price for 4 regions: -# us-east1, us-east5, europe-west4, and asia-northeast1. We hardcode them here -# and filtered out the other regions (us-central{1,2}, us-south1). HIDDEN_TPU_DF = pd.read_csv( io.StringIO( textwrap.dedent("""\ @@ -62,49 +58,10 @@ ,tpu-v3-512,1,,,tpu-v3-512,512.0,153.6,us-east1,us-east1-d ,tpu-v3-1024,1,,,tpu-v3-1024,1024.0,307.2,us-east1,us-east1-d ,tpu-v3-2048,1,,,tpu-v3-2048,2048.0,614.4,us-east1,us-east1-d - ,tpu-v6e-1,1,,,tpu-v6e-1,2.7,,us-east5,us-east5-b - ,tpu-v6e-1,1,,,tpu-v6e-1,2.7,,us-east5,us-east5-c - ,tpu-v6e-1,1,,,tpu-v6e-1,2.97,,europe-west4,europe-west4-a - ,tpu-v6e-1,1,,,tpu-v6e-1,3.24,,asia-northeast1,asia-northeast1-b - ,tpu-v6e-1,1,,,tpu-v6e-1,2.7,,us-east1,us-east1-d - ,tpu-v6e-4,1,,,tpu-v6e-4,10.8,,us-east5,us-east5-b - ,tpu-v6e-4,1,,,tpu-v6e-4,10.8,,us-east5,us-east5-c - ,tpu-v6e-4,1,,,tpu-v6e-4,11.88,,europe-west4,europe-west4-a - ,tpu-v6e-4,1,,,tpu-v6e-4,12.96,,asia-northeast1,asia-northeast1-b - ,tpu-v6e-4,1,,,tpu-v6e-4,10.8,,us-east1,us-east1-d - ,tpu-v6e-8,1,,,tpu-v6e-8,21.6,,us-east5,us-east5-b - ,tpu-v6e-8,1,,,tpu-v6e-8,21.6,,us-east5,us-east5-c - ,tpu-v6e-8,1,,,tpu-v6e-8,23.76,,europe-west4,europe-west4-a - ,tpu-v6e-8,1,,,tpu-v6e-8,25.92,,asia-northeast1,asia-northeast1-b - ,tpu-v6e-8,1,,,tpu-v6e-8,21.6,,us-east1,us-east1-d - ,tpu-v6e-16,1,,,tpu-v6e-16,43.2,,us-east5,us-east5-b - ,tpu-v6e-16,1,,,tpu-v6e-16,43.2,,us-east5,us-east5-c - ,tpu-v6e-16,1,,,tpu-v6e-16,47.52,,europe-west4,europe-west4-a - ,tpu-v6e-16,1,,,tpu-v6e-16,51.84,,asia-northeast1,asia-northeast1-b - ,tpu-v6e-16,1,,,tpu-v6e-16,43.2,,us-east1,us-east1-d - ,tpu-v6e-32,1,,,tpu-v6e-32,86.4,,us-east5,us-east5-b - ,tpu-v6e-32,1,,,tpu-v6e-32,86.4,,us-east5,us-east5-c - ,tpu-v6e-32,1,,,tpu-v6e-32,95.04,,europe-west4,europe-west4-a - ,tpu-v6e-32,1,,,tpu-v6e-32,103.68,,asia-northeast1,asia-northeast1-b - ,tpu-v6e-32,1,,,tpu-v6e-32,86.4,,us-east1,us-east1-d - ,tpu-v6e-64,1,,,tpu-v6e-64,172.8,,us-east5,us-east5-b - ,tpu-v6e-64,1,,,tpu-v6e-64,172.8,,us-east5,us-east5-c - ,tpu-v6e-64,1,,,tpu-v6e-64,190.08,,europe-west4,europe-west4-a - ,tpu-v6e-64,1,,,tpu-v6e-64,207.36,,asia-northeast1,asia-northeast1-b - ,tpu-v6e-64,1,,,tpu-v6e-64,172.8,,us-east1,us-east1-d - ,tpu-v6e-128,1,,,tpu-v6e-128,345.6,,us-east5,us-east5-b - ,tpu-v6e-128,1,,,tpu-v6e-128,345.6,,us-east5,us-east5-c - ,tpu-v6e-128,1,,,tpu-v6e-128,380.16,,europe-west4,europe-west4-a - ,tpu-v6e-128,1,,,tpu-v6e-128,414.72,,asia-northeast1,asia-northeast1-b - ,tpu-v6e-128,1,,,tpu-v6e-128,345.6,,us-east1,us-east1-d - ,tpu-v6e-256,1,,,tpu-v6e-256,691.2,,us-east5,us-east5-b - ,tpu-v6e-256,1,,,tpu-v6e-256,691.2,,us-east5,us-east5-c - ,tpu-v6e-256,1,,,tpu-v6e-256,760.32,,europe-west4,europe-west4-a - ,tpu-v6e-256,1,,,tpu-v6e-256,829.44,,asia-northeast1,asia-northeast1-b - ,tpu-v6e-256,1,,,tpu-v6e-256,691.2,,us-east1,us-east1-d """))) -TPU_V6E_MISSING_REGIONS = ['us-central1', 'us-central2', 'us-south1'] +# TPU V6e price for us-central2 is missing in the SKUs. +TPU_V6E_MISSING_REGIONS = ['us-central2'] # TPU V5 is not visible in specific zones. We hardcode the missing zones here. # NOTE(dev): Keep the zones and the df in sync. @@ -670,6 +627,8 @@ def _get_tpu_description_str(tpu_version: str) -> str: return 'TpuV5p' assert tpu_version == 'v5litepod', tpu_version return 'TpuV5e' + if tpu_version.startswith('v6e'): + return 'TpuV6e' return f'Tpu-{tpu_version}' def get_tpu_price(row: pd.Series, spot: bool) -> Optional[float]: @@ -684,10 +643,10 @@ def get_tpu_price(row: pd.Series, spot: bool) -> Optional[float]: # whether the TPU is a single device or a pod. # For TPU-v4, the pricing is uniform, and thus the pricing API # only provides the price of TPU-v4 pods. - # The price shown for v5 TPU is per chip hour, so there is no 'Pod' - # keyword in the description. + # The price shown for v5 & v6e TPU is per chip hour, so there is + # no 'Pod' keyword in the description. is_pod = ((num_cores > 8 or tpu_version == 'v4') and - not tpu_version.startswith('v5')) + not tpu_version.startswith('v5') and tpu_version != 'v6e') for sku in gce_skus + tpu_skus: if tpu_region not in sku['serviceRegions']: @@ -718,7 +677,9 @@ def get_tpu_price(row: pd.Series, spot: bool) -> Optional[float]: # for v5e. Reference here: # https://cloud.google.com/tpu/docs/v5p#using-accelerator-type # https://cloud.google.com/tpu/docs/v5e#tpu-v5e-config - core_per_sku = (1 if tpu_version == 'v5litepod' else + # v6e is also per chip price. Reference here: + # https://cloud.google.com/tpu/docs/v6e#configurations + core_per_sku = (1 if tpu_version in ['v5litepod', 'v6e'] else 2 if tpu_version == 'v5p' else 8) tpu_core_price = tpu_device_price / core_per_sku tpu_price = num_cores * tpu_core_price @@ -738,8 +699,6 @@ def get_tpu_price(row: pd.Series, spot: bool) -> Optional[float]: spot_str = 'spot ' if spot else '' print(f'The {spot_str}price of {tpu_name} in {tpu_region} is ' 'not found in SKUs or hidden TPU price DF.') - # TODO(tian): Hack. Should investigate how to retrieve the price - # for TPU-v6e. if (tpu_name.startswith('tpu-v6e') and tpu_region in TPU_V6E_MISSING_REGIONS): if not spot: From bee46471a0fe2f6df141fb30f32d167f5b7ec53b Mon Sep 17 00:00:00 2001 From: zpoint Date: Wed, 8 Jan 2025 10:13:04 +0800 Subject: [PATCH 10/17] Update peoetry-build.yml to ensure github CI pass (#4541) * change the dependencies * remove --no-update * remove poetry build --- .github/workflows/test-poetry-build.yml | 63 ------------------------- 1 file changed, 63 deletions(-) delete mode 100644 .github/workflows/test-poetry-build.yml diff --git a/.github/workflows/test-poetry-build.yml b/.github/workflows/test-poetry-build.yml deleted file mode 100644 index 4cce22809ef..00000000000 --- a/.github/workflows/test-poetry-build.yml +++ /dev/null @@ -1,63 +0,0 @@ -name: Poetry Test -on: - # Trigger the workflow on push or pull request, - # but only for the main branch - push: - branches: - - master - - 'releases/**' - pull_request: - branches: - - master - - 'releases/**' - merge_group: - -jobs: - poetry-build-test: - runs-on: ubuntu-latest - steps: - - name: Set up Python 3.10 - uses: actions/setup-python@v4 - with: - python-version: '3.10' - - name: Install Poetry - run: | - curl -sSL https://install.python-poetry.org | python - - echo "$HOME/.poetry/bin" >> $GITHUB_PATH - - name: Create foo package - run: | - mkdir foo - MASTER_REPO_URL=${{ github.server_url }}/${{ github.repository }} - REPO_URL=${{ github.event.pull_request.head.repo.html_url }} - if [ -z "$REPO_URL" ]; then - # This is a push, not a PR, so use the repo URL - REPO_URL=$MASTER_REPO_URL - fi - echo Master repo URL: $MASTER_REPO_URL - echo Using repo URL: $REPO_URL - cat < foo/pyproject.toml - [tool.poetry] - name = "foo" - version = "1.0.0" - authors = ["skypilot-bot"] - description = "" - - [tool.poetry.dependencies] - python = "3.10.x" - - [tool.poetry.group.dev.dependencies] - skypilot = {git = "${REPO_URL}.git", branch = "${{ github.head_ref }}"} - - [build-system] - requires = ["poetry-core"] - build-backend = "poetry.core.masonry.api" - - EOF - - - name: Check poetry lock time - run: | - cd foo - poetry lock --no-update - timeout-minutes: 2 - - From 2fa37ec2a68bdd1e69f3726ca46aaeebcaf07a2d Mon Sep 17 00:00:00 2001 From: Tian Xia Date: Wed, 8 Jan 2025 11:19:40 +0800 Subject: [PATCH 11/17] [Core][Docker] Support docker login on RunPod. (#4287) * [Core][Docker] Support docker login on RunPod. * nit * works * remove unnecessary * delete template and registry after termination * move to graphql api & remove ephemeral resourcdes * nits --- docs/source/getting-started/installation.rst | 2 +- sky/provision/docker_utils.py | 11 +- sky/provision/runpod/instance.py | 11 +- sky/provision/runpod/utils.py | 183 +++++++++++++++++-- sky/setup_files/dependencies.py | 4 +- sky/skylet/providers/command_runner.py | 12 +- sky/templates/runpod-ray.yml.j2 | 13 ++ 7 files changed, 210 insertions(+), 26 deletions(-) diff --git a/docs/source/getting-started/installation.rst b/docs/source/getting-started/installation.rst index 1d36b5ef6b8..93c730ef651 100644 --- a/docs/source/getting-started/installation.rst +++ b/docs/source/getting-started/installation.rst @@ -304,7 +304,7 @@ RunPod .. code-block:: shell - pip install "runpod>=1.5.1" + pip install "runpod>=1.6.1" runpod config diff --git a/sky/provision/docker_utils.py b/sky/provision/docker_utils.py index 848c7a06983..0aadcc55335 100644 --- a/sky/provision/docker_utils.py +++ b/sky/provision/docker_utils.py @@ -38,6 +38,13 @@ class DockerLoginConfig: password: str server: str + def format_image(self, image: str) -> str: + """Format the image name with the server prefix.""" + server_prefix = f'{self.server}/' + if not image.startswith(server_prefix): + return f'{server_prefix}{image}' + return image + @classmethod def from_env_vars(cls, d: Dict[str, str]) -> 'DockerLoginConfig': return cls( @@ -220,9 +227,7 @@ def initialize(self) -> str: wait_for_docker_daemon=True) # We automatically add the server prefix to the image name if # the user did not add it. - server_prefix = f'{docker_login_config.server}/' - if not specific_image.startswith(server_prefix): - specific_image = f'{server_prefix}{specific_image}' + specific_image = docker_login_config.format_image(specific_image) if self.docker_config.get('pull_before_run', True): assert specific_image, ('Image must be included in config if ' + diff --git a/sky/provision/runpod/instance.py b/sky/provision/runpod/instance.py index 8f992f569d9..9e57887c3f1 100644 --- a/sky/provision/runpod/instance.py +++ b/sky/provision/runpod/instance.py @@ -83,7 +83,8 @@ def run_instances(region: str, cluster_name_on_cloud: str, node_type = 'head' if head_instance_id is None else 'worker' try: instance_id = utils.launch( - name=f'{cluster_name_on_cloud}-{node_type}', + cluster_name=cluster_name_on_cloud, + node_type=node_type, instance_type=config.node_config['InstanceType'], region=region, disk_size=config.node_config['DiskSize'], @@ -92,6 +93,8 @@ def run_instances(region: str, cluster_name_on_cloud: str, public_key=config.node_config['PublicKey'], preemptible=config.node_config['Preemptible'], bid_per_gpu=config.node_config['BidPerGPU'], + docker_login_config=config.provider_config.get( + 'docker_login_config'), ) except Exception as e: # pylint: disable=broad-except logger.warning(f'run_instances error: {e}') @@ -145,6 +148,8 @@ def terminate_instances( """See sky/provision/__init__.py""" del provider_config # unused instances = _filter_instances(cluster_name_on_cloud, None) + template_name, registry_auth_id = utils.get_registry_auth_resources( + cluster_name_on_cloud) for inst_id, inst in instances.items(): logger.debug(f'Terminating instance {inst_id}: {inst}') if worker_only and inst['name'].endswith('-head'): @@ -157,6 +162,10 @@ def terminate_instances( f'Failed to terminate instance {inst_id}: ' f'{common_utils.format_exception(e, use_bracket=False)}' ) from e + if template_name is not None: + utils.delete_pod_template(template_name) + if registry_auth_id is not None: + utils.delete_register_auth(registry_auth_id) def get_cluster_info( diff --git a/sky/provision/runpod/utils.py b/sky/provision/runpod/utils.py index d0a06b026b3..6600cfd6198 100644 --- a/sky/provision/runpod/utils.py +++ b/sky/provision/runpod/utils.py @@ -2,10 +2,11 @@ import base64 import time -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple from sky import sky_logging from sky.adaptors import runpod +from sky.provision import docker_utils import sky.provision.runpod.api.commands as runpod_commands from sky.skylet import constants from sky.utils import common_utils @@ -47,6 +48,11 @@ } +def _construct_docker_login_template_name(cluster_name: str) -> str: + """Constructs the registry auth template name.""" + return f'{cluster_name}-docker-login-template' + + def retry(func): """Decorator to retry a function.""" @@ -66,9 +72,83 @@ def wrapper(*args, **kwargs): return wrapper +# Adapted from runpod.api.queries.pods.py::QUERY_POD. +# Adding containerRegistryAuthId to the query. +_QUERY_POD = """ +query myPods { + myself { + pods { + id + containerDiskInGb + containerRegistryAuthId + costPerHr + desiredStatus + dockerArgs + dockerId + env + gpuCount + imageName + lastStatusChange + machineId + memoryInGb + name + podType + port + ports + uptimeSeconds + vcpuCount + volumeInGb + volumeMountPath + runtime { + ports{ + ip + isIpPublic + privatePort + publicPort + type + } + } + machine { + gpuDisplayName + } + } + } +} +""" + + +def _sky_get_pods() -> dict: + """List all pods with extra registry auth information. + + Adapted from runpod.get_pods() to include containerRegistryAuthId. + """ + raw_return = runpod.runpod.api.graphql.run_graphql_query(_QUERY_POD) + cleaned_return = raw_return['data']['myself']['pods'] + return cleaned_return + + +_QUERY_POD_TEMPLATE_WITH_REGISTRY_AUTH = """ +query myself { + myself { + podTemplates { + name + containerRegistryAuthId + } + } +} +""" + + +def _list_pod_templates_with_container_registry() -> dict: + """List all pod templates.""" + raw_return = runpod.runpod.api.graphql.run_graphql_query( + _QUERY_POD_TEMPLATE_WITH_REGISTRY_AUTH) + return raw_return['data']['myself']['podTemplates'] + + def list_instances() -> Dict[str, Dict[str, Any]]: """Lists instances associated with API key.""" - instances = runpod.runpod.get_pods() + instances = _sky_get_pods() instance_dict: Dict[str, Dict[str, Any]] = {} for instance in instances: @@ -100,14 +180,75 @@ def list_instances() -> Dict[str, Dict[str, Any]]: return instance_dict -def launch(name: str, instance_type: str, region: str, disk_size: int, - image_name: str, ports: Optional[List[int]], public_key: str, - preemptible: Optional[bool], bid_per_gpu: float) -> str: +def delete_pod_template(template_name: str) -> None: + """Deletes a pod template.""" + try: + runpod.runpod.api.graphql.run_graphql_query( + f'mutation {{deleteTemplate(templateName: "{template_name}")}}') + except runpod.runpod.error.QueryError as e: + logger.warning(f'Failed to delete template {template_name}: {e}' + 'Please delete it manually.') + + +def delete_register_auth(registry_auth_id: str) -> None: + """Deletes a registry auth.""" + try: + runpod.runpod.delete_container_registry_auth(registry_auth_id) + except runpod.runpod.error.QueryError as e: + logger.warning(f'Failed to delete registry auth {registry_auth_id}: {e}' + 'Please delete it manually.') + + +def _create_template_for_docker_login( + cluster_name: str, + image_name: str, + docker_login_config: Optional[Dict[str, str]], +) -> Tuple[str, Optional[str]]: + """Creates a template for the given image with the docker login config. + + Returns: + formatted_image_name: The formatted image name. + template_id: The template ID. None for no docker login config. + """ + if docker_login_config is None: + return image_name, None + login_config = docker_utils.DockerLoginConfig(**docker_login_config) + container_registry_auth_name = f'{cluster_name}-registry-auth' + container_template_name = _construct_docker_login_template_name( + cluster_name) + # The `name` argument is only for display purpose and the registry server + # will be splitted from the docker image name (Tested with AWS ECR). + # Here we only need the username and password to create the registry auth. + # TODO(tian): Now we create a template and a registry auth for each cluster. + # Consider create one for each server and reuse them. Challenges including + # calculate the reference count and delete them when no longer needed. + create_auth_resp = runpod.runpod.create_container_registry_auth( + name=container_registry_auth_name, + username=login_config.username, + password=login_config.password, + ) + registry_auth_id = create_auth_resp['id'] + create_template_resp = runpod.runpod.create_template( + name=container_template_name, + image_name=None, + registry_auth_id=registry_auth_id, + ) + return login_config.format_image(image_name), create_template_resp['id'] + + +def launch(cluster_name: str, node_type: str, instance_type: str, region: str, + disk_size: int, image_name: str, ports: Optional[List[int]], + public_key: str, preemptible: Optional[bool], bid_per_gpu: float, + docker_login_config: Optional[Dict[str, str]]) -> str: """Launches an instance with the given parameters. Converts the instance_type to the RunPod GPU name, finds the specs for the GPU, and launches the instance. + + Returns: + instance_id: The instance ID. """ + name = f'{cluster_name}-{node_type}' gpu_type = GPU_NAME_MAP[instance_type.split('_')[1]] gpu_quantity = int(instance_type.split('_')[0].replace('x', '')) cloud_type = instance_type.split('_')[2] @@ -139,21 +280,24 @@ def launch(name: str, instance_type: str, region: str, disk_size: int, # Use base64 to deal with the tricky quoting issues caused by runpod API. encoded = base64.b64encode(setup_cmd.encode('utf-8')).decode('utf-8') + docker_args = (f'bash -c \'echo {encoded} | base64 --decode > init.sh; ' + f'bash init.sh\'') + # Port 8081 is occupied for nginx in the base image. custom_ports_str = '' if ports is not None: custom_ports_str = ''.join([f'{p}/tcp,' for p in ports]) + ports_str = (f'22/tcp,' + f'{custom_ports_str}' + f'{constants.SKY_REMOTE_RAY_DASHBOARD_PORT}/http,' + f'{constants.SKY_REMOTE_RAY_PORT}/http') - docker_args = (f'bash -c \'echo {encoded} | base64 --decode > init.sh; ' - f'bash init.sh\'') - ports = (f'22/tcp,' - f'{custom_ports_str}' - f'{constants.SKY_REMOTE_RAY_DASHBOARD_PORT}/http,' - f'{constants.SKY_REMOTE_RAY_PORT}/http') + image_name_formatted, template_id = _create_template_for_docker_login( + cluster_name, image_name, docker_login_config) params = { 'name': name, - 'image_name': image_name, + 'image_name': image_name_formatted, 'gpu_type_id': gpu_type, 'cloud_type': cloud_type, 'container_disk_in_gb': disk_size, @@ -161,9 +305,10 @@ def launch(name: str, instance_type: str, region: str, disk_size: int, 'min_memory_in_gb': gpu_specs['memoryInGb'] * gpu_quantity, 'gpu_count': gpu_quantity, 'country_code': region, - 'ports': ports, + 'ports': ports_str, 'support_public_ip': True, 'docker_args': docker_args, + 'template_id': template_id, } if preemptible is None or not preemptible: @@ -177,6 +322,18 @@ def launch(name: str, instance_type: str, region: str, disk_size: int, return new_instance['id'] +def get_registry_auth_resources( + cluster_name: str) -> Tuple[Optional[str], Optional[str]]: + """Gets the registry auth resources.""" + container_registry_auth_name = _construct_docker_login_template_name( + cluster_name) + for template in _list_pod_templates_with_container_registry(): + if template['name'] == container_registry_auth_name: + return container_registry_auth_name, template[ + 'containerRegistryAuthId'] + return None, None + + def remove(instance_id: str) -> None: """Terminates the given instance.""" runpod.runpod.terminate_pod(instance_id) diff --git a/sky/setup_files/dependencies.py b/sky/setup_files/dependencies.py index 16590a9fd0d..13b99770e5b 100644 --- a/sky/setup_files/dependencies.py +++ b/sky/setup_files/dependencies.py @@ -123,7 +123,9 @@ 'oci': ['oci'] + local_ray, 'kubernetes': ['kubernetes>=20.0.0'], 'remote': remote, - 'runpod': ['runpod>=1.5.1'], + # For the container registry auth api. Reference: + # https://github.com/runpod/runpod-python/releases/tag/1.6.1 + 'runpod': ['runpod>=1.6.1'], 'fluidstack': [], # No dependencies needed for fluidstack 'cudo': ['cudo-compute>=0.1.10'], 'paperspace': [], # No dependencies needed for paperspace diff --git a/sky/skylet/providers/command_runner.py b/sky/skylet/providers/command_runner.py index 4f66ef54383..16dbc4d2668 100644 --- a/sky/skylet/providers/command_runner.py +++ b/sky/skylet/providers/command_runner.py @@ -25,7 +25,7 @@ def docker_start_cmds( docker_cmd, ): """Generating docker start command without --rm. - + The code is borrowed from `ray.autoscaler._private.docker`. Changes we made: @@ -159,19 +159,17 @@ def run_init(self, *, as_head: bool, file_mounts: Dict[str, str], return True # SkyPilot: Docker login if user specified a private docker registry. - if "docker_login_config" in self.docker_config: + if 'docker_login_config' in self.docker_config: # TODO(tian): Maybe support a command to get the login password? - docker_login_config: docker_utils.DockerLoginConfig = self.docker_config[ - "docker_login_config"] + docker_login_config: docker_utils.DockerLoginConfig = ( + self.docker_config['docker_login_config']) self._run_with_retry( f'{self.docker_cmd} login --username ' f'{docker_login_config.username} --password ' f'{docker_login_config.password} {docker_login_config.server}') # We automatically add the server prefix to the image name if # the user did not add it. - server_prefix = f'{docker_login_config.server}/' - if not specific_image.startswith(server_prefix): - specific_image = f'{server_prefix}{specific_image}' + specific_image = docker_login_config.format_image(specific_image) if self.docker_config.get('pull_before_run', True): assert specific_image, ('Image must be included in config if ' diff --git a/sky/templates/runpod-ray.yml.j2 b/sky/templates/runpod-ray.yml.j2 index 853b9142037..ea57c9ac808 100644 --- a/sky/templates/runpod-ray.yml.j2 +++ b/sky/templates/runpod-ray.yml.j2 @@ -10,6 +10,19 @@ provider: module: sky.provision.runpod region: "{{region}}" disable_launch_config_check: true + # For RunPod, we directly set the image id for the docker as runtime environment + # support, thus we need to avoid the DockerInitializer detects the docker field + # and performs the initialization. Therefore we put the docker login config in + # the provider config here. + {%- if docker_login_config is not none %} + docker_login_config: + username: |- + {{docker_login_config.username}} + password: |- + {{docker_login_config.password}} + server: |- + {{docker_login_config.server}} + {%- endif %} auth: ssh_user: root From 2c5d87cc0b4280633ff2b39569fba628c3c1e890 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Wed, 8 Jan 2025 11:22:41 -0800 Subject: [PATCH 12/17] [k8s] Fix race condition in k8s secret creation (#4505) Fix secret creation --- sky/authentication.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/sky/authentication.py b/sky/authentication.py index 2eb65bd9f6f..6108073494f 100644 --- a/sky/authentication.py +++ b/sky/authentication.py @@ -408,14 +408,26 @@ def setup_kubernetes_authentication(config: Dict[str, Any]) -> Dict[str, Any]: secret = k8s.client.V1Secret( metadata=k8s.client.V1ObjectMeta(**secret_metadata), string_data={secret_field_name: public_key}) - if kubernetes_utils.check_secret_exists(secret_name, namespace, context): - logger.debug(f'Key {secret_name} exists in the cluster, patching it...') - kubernetes.core_api(context).patch_namespaced_secret( - secret_name, namespace, secret) - else: - logger.debug( - f'Key {secret_name} does not exist in the cluster, creating it...') - kubernetes.core_api(context).create_namespaced_secret(namespace, secret) + try: + if kubernetes_utils.check_secret_exists(secret_name, namespace, + context): + logger.debug(f'Key {secret_name} exists in the cluster, ' + 'patching it...') + kubernetes.core_api(context).patch_namespaced_secret( + secret_name, namespace, secret) + else: + logger.debug(f'Key {secret_name} does not exist in the cluster, ' + 'creating it...') + kubernetes.core_api(context).create_namespaced_secret( + namespace, secret) + except kubernetes.api_exception() as e: + if e.status == 409 and e.reason == 'AlreadyExists': + logger.debug(f'Key {secret_name} was created concurrently, ' + 'patching it...') + kubernetes.core_api(context).patch_namespaced_secret( + secret_name, namespace, secret) + else: + raise e private_key_path, _ = get_or_generate_keys() if network_mode == nodeport_mode: From 379711b13f1603fe9fb61c522e4fbf60de3f5f69 Mon Sep 17 00:00:00 2001 From: Clay Rosenthal Date: Wed, 8 Jan 2025 16:05:36 -0800 Subject: [PATCH 13/17] [AWS] Adding filter for default aws vpc if not specified (#4546) * Adding filter for default vpc if not specified * Error message, docs and comments * lint --------- Co-authored-by: Clay Rosenthal Co-authored-by: Romil Bhardwaj --- docs/source/cloud-setup/cloud-permissions/aws.rst | 2 +- sky/provision/aws/config.py | 14 +++++++++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/docs/source/cloud-setup/cloud-permissions/aws.rst b/docs/source/cloud-setup/cloud-permissions/aws.rst index 89510331988..57fc7ac9732 100644 --- a/docs/source/cloud-setup/cloud-permissions/aws.rst +++ b/docs/source/cloud-setup/cloud-permissions/aws.rst @@ -223,7 +223,7 @@ IAM Role Creation Using a specific VPC ----------------------- -By default, SkyPilot uses the "default" VPC in each region. +By default, SkyPilot uses the "default" VPC in each region. If a region does not have a `default VPC `_, SkyPilot will not be able to use the region. To instruct SkyPilot to use a specific VPC, you can use SkyPilot's global config file ``~/.sky/config.yaml`` to specify the VPC name in the ``aws.vpc_name`` diff --git a/sky/provision/aws/config.py b/sky/provision/aws/config.py index ffa87c3a011..acc6fcb0e56 100644 --- a/sky/provision/aws/config.py +++ b/sky/provision/aws/config.py @@ -383,10 +383,13 @@ def _get_pruned_subnets(current_subnets: List[Any]) -> Set[str]: raise exc if not subnets: + vpc_msg = (f'Does a default VPC exist in region ' + f'{ec2.meta.client.meta.region_name}? ') if ( + vpc_id_of_sg is None) else '' _skypilot_log_error_and_exit_for_failover( - 'No usable subnets found, try ' - 'manually creating an instance in your specified region to ' - 'populate the list of subnets and trying this again. ' + f'No usable subnets found. {vpc_msg}' + 'Try manually creating an instance in your specified region to ' + 'populate the list of subnets and try again. ' 'Note that the subnet must map public IPs ' 'on instance launch unless you set `use_internal_ips: true` in ' 'the `provider` config.') @@ -495,6 +498,11 @@ def _get_subnet_and_vpc_id(ec2, security_group_ids: Optional[List[str]], vpc_id_of_sg = None all_subnets = list(ec2.subnets.all()) + # If no VPC is specified, use the default VPC. + # We filter only for default VPCs to avoid using subnets that users may + # not want SkyPilot to use. + if vpc_id_of_sg is None: + all_subnets = [s for s in all_subnets if s.vpc.is_default] subnets, vpc_id = _usable_subnets( ec2, user_specified_subnets=None, From 9b1ec550619cf420f4349b2074b45dea1c19e631 Mon Sep 17 00:00:00 2001 From: zpoint Date: Thu, 9 Jan 2025 13:49:55 +0800 Subject: [PATCH 14/17] support filter flag for buildkite (#4534) * support filter flag * default value * support filter flag from env * change AST to pytest --collect-only * fix logic * regex fix --- .buildkite/generate_pipeline.py | 167 +++++++++++++++----------------- tests/conftest.py | 8 ++ 2 files changed, 88 insertions(+), 87 deletions(-) diff --git a/.buildkite/generate_pipeline.py b/.buildkite/generate_pipeline.py index 99f29ee258a..74480c9ee4d 100644 --- a/.buildkite/generate_pipeline.py +++ b/.buildkite/generate_pipeline.py @@ -21,11 +21,13 @@ clouds are not supported yet, smoke tests for those clouds are not generated. """ -import ast import os import random +import re +import subprocess from typing import Any, Dict, List, Optional +import click from conftest import cloud_to_pytest_keyword from conftest import default_clouds_to_run import yaml @@ -60,18 +62,8 @@ 'edit directly.\n') -def _get_full_decorator_path(decorator: ast.AST) -> str: - """Recursively get the full path of a decorator.""" - if isinstance(decorator, ast.Attribute): - return f'{_get_full_decorator_path(decorator.value)}.{decorator.attr}' - elif isinstance(decorator, ast.Name): - return decorator.id - elif isinstance(decorator, ast.Call): - return _get_full_decorator_path(decorator.func) - raise ValueError(f'Unknown decorator type: {type(decorator)}') - - -def _extract_marked_tests(file_path: str) -> Dict[str, List[str]]: +def _extract_marked_tests(file_path: str, + filter_marks: List[str]) -> Dict[str, List[str]]: """Extract test functions and filter clouds using pytest.mark from a Python test file. @@ -85,80 +77,69 @@ def _extract_marked_tests(file_path: str) -> Dict[str, List[str]]: rerun failures. Additionally, the parallelism would be controlled by pytest instead of the buildkite job queue. """ - with open(file_path, 'r', encoding='utf-8') as file: - tree = ast.parse(file.read(), filename=file_path) - - for node in ast.walk(tree): - for child in ast.iter_child_nodes(node): - setattr(child, 'parent', node) - + cmd = f'pytest {file_path} --collect-only' + output = subprocess.run(cmd, shell=True, capture_output=True, text=True) + matches = re.findall('Collected .+?\.py::(.+?) with marks: \[(.*?)\]', + output.stdout) + function_name_marks_map = {} + for function_name, marks in matches: + function_name = re.sub(r'\[.*?\]', '', function_name) + marks = marks.replace('\'', '').split(',') + marks = [i.strip() for i in marks] + if function_name not in function_name_marks_map: + function_name_marks_map[function_name] = set(marks) + else: + function_name_marks_map[function_name].update(marks) function_cloud_map = {} - for node in ast.walk(tree): - if isinstance(node, ast.FunctionDef) and node.name.startswith('test_'): - class_name = None - if hasattr(node, 'parent') and isinstance(node.parent, - ast.ClassDef): - class_name = node.parent.name - - clouds_to_include = [] - clouds_to_exclude = [] - is_serve_test = False - for decorator in node.decorator_list: - if isinstance(decorator, ast.Call): - # We only need to consider the decorator with no arguments - # to extract clouds. + filter_marks = set(filter_marks) + for function_name, marks in function_name_marks_map.items(): + if filter_marks and not filter_marks & marks: + continue + clouds_to_include = [] + clouds_to_exclude = [] + is_serve_test = 'serve' in marks + for mark in marks: + if mark.startswith('no_'): + clouds_to_exclude.append(mark[3:]) + else: + if mark not in PYTEST_TO_CLOUD_KEYWORD: + # This mark does not specify a cloud, so we skip it. continue - full_path = _get_full_decorator_path(decorator) - if full_path.startswith('pytest.mark.'): - assert isinstance(decorator, ast.Attribute) - suffix = decorator.attr - if suffix.startswith('no_'): - clouds_to_exclude.append(suffix[3:]) - else: - if suffix == 'serve': - is_serve_test = True - continue - if suffix not in PYTEST_TO_CLOUD_KEYWORD: - # This mark does not specify a cloud, so we skip it. - continue - clouds_to_include.append( - PYTEST_TO_CLOUD_KEYWORD[suffix]) - clouds_to_include = (clouds_to_include if clouds_to_include else - DEFAULT_CLOUDS_TO_RUN) - clouds_to_include = [ - cloud for cloud in clouds_to_include - if cloud not in clouds_to_exclude - ] - cloud_queue_map = SERVE_CLOUD_QUEUE_MAP if is_serve_test else CLOUD_QUEUE_MAP - final_clouds_to_include = [ - cloud for cloud in clouds_to_include if cloud in cloud_queue_map - ] - if clouds_to_include and not final_clouds_to_include: - print(f'Warning: {file_path}:{node.name} ' - f'is marked to run on {clouds_to_include}, ' - f'but we do not have credentials for those clouds. ' - f'Skipped.') - continue - if clouds_to_include != final_clouds_to_include: - excluded_clouds = set(clouds_to_include) - set( - final_clouds_to_include) - print( - f'Warning: {file_path}:{node.name} ' - f'is marked to run on {clouds_to_include}, ' - f'but we only have credentials for {final_clouds_to_include}. ' - f'clouds {excluded_clouds} are skipped.') - function_name = (f'{class_name}::{node.name}' - if class_name else node.name) - function_cloud_map[function_name] = (final_clouds_to_include, [ - cloud_queue_map[cloud] for cloud in final_clouds_to_include - ]) + clouds_to_include.append(PYTEST_TO_CLOUD_KEYWORD[mark]) + + clouds_to_include = (clouds_to_include + if clouds_to_include else DEFAULT_CLOUDS_TO_RUN) + clouds_to_include = [ + cloud for cloud in clouds_to_include + if cloud not in clouds_to_exclude + ] + cloud_queue_map = SERVE_CLOUD_QUEUE_MAP if is_serve_test else CLOUD_QUEUE_MAP + final_clouds_to_include = [ + cloud for cloud in clouds_to_include if cloud in cloud_queue_map + ] + if clouds_to_include and not final_clouds_to_include: + print( + f'Warning: {function_name} is marked to run on {clouds_to_include}, ' + f'but we do not have credentials for those clouds. Skipped.') + continue + if clouds_to_include != final_clouds_to_include: + excluded_clouds = set(clouds_to_include) - set( + final_clouds_to_include) + print( + f'Warning: {function_name} is marked to run on {clouds_to_include}, ' + f'but we only have credentials for {final_clouds_to_include}. ' + f'clouds {excluded_clouds} are skipped.') + function_cloud_map[function_name] = (final_clouds_to_include, [ + cloud_queue_map[cloud] for cloud in final_clouds_to_include + ]) return function_cloud_map -def _generate_pipeline(test_file: str) -> Dict[str, Any]: +def _generate_pipeline(test_file: str, + filter_marks: List[str]) -> Dict[str, Any]: """Generate a Buildkite pipeline from test files.""" steps = [] - function_cloud_map = _extract_marked_tests(test_file) + function_cloud_map = _extract_marked_tests(test_file, filter_marks) for test_function, clouds_and_queues in function_cloud_map.items(): for cloud, queue in zip(*clouds_and_queues): step = { @@ -194,12 +175,12 @@ def _dump_pipeline_to_file(yaml_file_path: str, yaml.dump(final_pipeline, file, default_flow_style=False) -def _convert_release(test_files: List[str]): +def _convert_release(test_files: List[str], filter_marks: List[str]): yaml_file_path = '.buildkite/pipeline_smoke_tests_release.yaml' output_file_pipelines = [] for test_file in test_files: print(f'Converting {test_file} to {yaml_file_path}') - pipeline = _generate_pipeline(test_file) + pipeline = _generate_pipeline(test_file, filter_marks) output_file_pipelines.append(pipeline) print(f'Converted {test_file} to {yaml_file_path}\n\n') # Enable all clouds by default for release pipeline. @@ -208,7 +189,7 @@ def _convert_release(test_files: List[str]): extra_env={cloud: '1' for cloud in CLOUD_QUEUE_MAP}) -def _convert_quick_tests_core(test_files: List[str]): +def _convert_quick_tests_core(test_files: List[str], filter_marks: List[str]): yaml_file_path = '.buildkite/pipeline_smoke_tests_quick_tests_core.yaml' output_file_pipelines = [] for test_file in test_files: @@ -216,7 +197,7 @@ def _convert_quick_tests_core(test_files: List[str]): # We want enable all clouds by default for each test function # for pre-merge. And let the author controls which clouds # to run by parameter. - pipeline = _generate_pipeline(test_file) + pipeline = _generate_pipeline(test_file, filter_marks) pipeline['steps'].append({ 'label': 'Backward compatibility test', 'command': 'bash tests/backward_compatibility_tests.sh', @@ -231,7 +212,12 @@ def _convert_quick_tests_core(test_files: List[str]): extra_env={'SKYPILOT_SUPPRESS_SENSITIVE_LOG': '1'}) -def main(): +@click.command() +@click.option( + '--filter-marks', + type=str, + help='Filter to include only a subset of pytest marks, e.g., managed_jobs') +def main(filter_marks): test_files = os.listdir('tests/smoke_tests') release_files = [] quick_tests_core_files = [] @@ -244,8 +230,15 @@ def main(): else: release_files.append(test_file_path) - _convert_release(release_files) - _convert_quick_tests_core(quick_tests_core_files) + filter_marks = filter_marks or os.getenv('FILTER_MARKS') + if filter_marks: + filter_marks = filter_marks.split(',') + print(f'Filter marks: {filter_marks}') + else: + filter_marks = [] + + _convert_release(release_files, filter_marks) + _convert_quick_tests_core(quick_tests_core_files, filter_marks) if __name__ == '__main__': diff --git a/tests/conftest.py b/tests/conftest.py index 59825385a74..cfbdf808a6b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -220,3 +220,11 @@ def aws_config_region(monkeypatch: pytest.MonkeyPatch) -> str: if isinstance(ssh_proxy_command, dict) and ssh_proxy_command: region = list(ssh_proxy_command.keys())[0] return region + + +def pytest_collection_modifyitems(config, items): + if config.option.collectonly: + for item in items: + full_name = item.nodeid + marks = [mark.name for mark in item.iter_markers()] + print(f"Collected {full_name} with marks: {marks}") From 544c34e2e7eb995530c05fe10148aad87a4d0501 Mon Sep 17 00:00:00 2001 From: Clay Rosenthal Date: Wed, 8 Jan 2025 22:50:55 -0800 Subject: [PATCH 15/17] [AWS] Adding custom process as an option for aws credentials (#4547) * Adding custom process as an option for aws credentials * formatting fixes --------- Co-authored-by: Clay Rosenthal --- sky/clouds/aws.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/sky/clouds/aws.py b/sky/clouds/aws.py index a86a87f4feb..b7b532f19e6 100644 --- a/sky/clouds/aws.py +++ b/sky/clouds/aws.py @@ -95,6 +95,8 @@ class AWSIdentityType(enum.Enum): CONTAINER_ROLE = 'container-role' + CUSTOM_PROCESS = 'custom-process' + # Name Value Type Location # ---- ----- ---- -------- # profile None None @@ -614,10 +616,16 @@ def check_credentials(cls) -> Tuple[bool, Optional[str]]: hints = f'AWS IAM role is set.{single_cloud_hint}' elif identity_type == AWSIdentityType.CONTAINER_ROLE: # Similar to the IAM ROLE, an ECS container may not store credentials - # in the~/.aws/credentials file. So we don't check for the existence of + # in the ~/.aws/credentials file. So we don't check for the existence of # the file. i.e. the container will be assigned the IAM role of the # task: skypilot-v1. hints = f'AWS container-role is set.{single_cloud_hint}' + elif identity_type == AWSIdentityType.CUSTOM_PROCESS: + # Similar to the IAM ROLE, a custom process may not store credentials + # in the ~/.aws/credentials file. So we don't check for the existence of + # the file. i.e. the custom process will be assigned the IAM role of the + # task: skypilot-v1. + hints = f'AWS custom-process is set.{single_cloud_hint}' else: # This file is required because it is required by the VMs launched on # other clouds to access private s3 buckets and resources like EC2. @@ -677,6 +685,8 @@ def _is_access_key_of_type(type_str: str) -> bool: return AWSIdentityType.CONTAINER_ROLE elif _is_access_key_of_type(AWSIdentityType.ENV.value): return AWSIdentityType.ENV + elif _is_access_key_of_type(AWSIdentityType.CUSTOM_PROCESS.value): + return AWSIdentityType.CUSTOM_PROCESS else: return AWSIdentityType.SHARED_CREDENTIALS_FILE From bc777e28d4d45d5043678b1f352f9130a38fa205 Mon Sep 17 00:00:00 2001 From: Kaiyuan Eric Chen Date: Thu, 9 Jan 2025 10:47:17 -0800 Subject: [PATCH 16/17] [Jobs] Refactor dashboard controller launching with systemd (#4538) * convert to systemd * Refactor jobs-controller.yaml.j2 to use systemd user service for skypilot-dashboard. Added user-specific environment variables and log redirection. Updated service management commands to operate in user mode. * Enhance jobs-controller.yaml.j2 to check for systemd user service availability before executing service management commands. If systemd is not found, implement a manual setup for the SkyPilot dashboard, including process termination and background launch. This improves robustness and user experience during setup. * streamline sky/templates/jobs-controller.yaml.j2 Co-authored-by: Christopher Cooper * Refactor jobs-controller.yaml.j2 to remove Flask installation check and update systemd service configuration. Added Flask as a dependency in controller_utils.py for the dashboard. Changed service target from multi-user to default for improved service management. --------- Co-authored-by: Christopher Cooper --- sky/templates/jobs-controller.yaml.j2 | 38 ++++++++++++++++++++++++--- sky/utils/controller_utils.py | 3 +++ 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/sky/templates/jobs-controller.yaml.j2 b/sky/templates/jobs-controller.yaml.j2 index 45cdb5141d4..71c808fdd0f 100644 --- a/sky/templates/jobs-controller.yaml.j2 +++ b/sky/templates/jobs-controller.yaml.j2 @@ -26,10 +26,40 @@ setup: | echo 'export SKYPILOT_DEV=1' >> ~/.bashrc {% endif %} - # Dashboard. - ps aux | grep -v nohup | grep -v grep | grep -- "-m sky.spot.dashboard" | awk '{print $2}' | xargs kill > /dev/null 2>&1 || true - pip list | grep flask > /dev/null 2>&1 || pip install flask 2>&1 > /dev/null - ((ps aux | grep -v nohup | grep -v grep | grep -q -- "-m sky.jobs.dashboard.dashboard") || (nohup {{ sky_python_cmd }} -m sky.jobs.dashboard.dashboard >> ~/.sky/job-dashboard.log 2>&1 &)); + # Create systemd service file + mkdir -p ~/.config/systemd/user/ + + # Create systemd user service file + cat << EOF > ~/.config/systemd/user/skypilot-dashboard.service + [Unit] + Description=SkyPilot Jobs Dashboard + After=network.target + + [Service] + Environment="PATH={{ sky_python_env_path }}:\$PATH" + Environment="SKYPILOT_USER_ID={{controller_envs.SKYPILOT_USER_ID}}" + Environment="SKYPILOT_USER={{controller_envs.SKYPILOT_USER}}" + Restart=always + StandardOutput=append:/home/$USER/.sky/job-dashboard.log + StandardError=append:/home/$USER/.sky/job-dashboard.log + ExecStart={{ sky_python_cmd }} -m sky.jobs.dashboard.dashboard + + [Install] + WantedBy=default.target + EOF + + if command -v systemctl &>/dev/null && systemctl --user show &>/dev/null; then + systemctl --user daemon-reload + systemctl --user enable --now skypilot-dashboard + else + echo "Systemd user services not found. Setting up SkyPilot dashboard manually." + # Kill any old dashboard processes + ps aux | grep -v nohup | grep -v grep | grep -- '-m sky.jobs.dashboard.dashboard' \ + | awk '{print $2}' | xargs kill > /dev/null 2>&1 || true + # Launch the dashboard in the background if not already running + (ps aux | grep -v nohup | grep -v grep | grep -q -- '-m sky.jobs.dashboard.dashboard') || \ + (nohup {{ sky_python_cmd }} -m sky.jobs.dashboard.dashboard >> ~/.sky/job-dashboard.log 2>&1 &) + fi run: | {{ sky_activate_python_env }} diff --git a/sky/utils/controller_utils.py b/sky/utils/controller_utils.py index 39623085bbb..acb636893a5 100644 --- a/sky/utils/controller_utils.py +++ b/sky/utils/controller_utils.py @@ -206,6 +206,9 @@ def _get_cloud_dependencies_installation_commands( # installed, so we don't check that. python_packages: Set[str] = set() + # add flask to the controller dependencies for dashboard + python_packages.add('flask') + step_prefix = prefix_str.replace('', str(len(commands) + 1)) commands.append(f'echo -en "\\r{step_prefix}uv{empty_str}" &&' f'{constants.SKY_UV_INSTALL_CMD} >/dev/null 2>&1') From 1578108f8706e45b2b06527429a54b658106c6df Mon Sep 17 00:00:00 2001 From: Kaiyuan Eric Chen Date: Thu, 9 Jan 2025 11:12:30 -0800 Subject: [PATCH 17/17] [Jobs] --sync-down execution log support for managed jobs (#4527) * add support for managed jobs * support --name to download logs * format * fix job_id * add support that separate controller that not controller * remove the interface for getting complete job_ids, parse table instead * code formatting * documentation for sync down logs * revive job id to timestamp via new interface * make the log works * revert managed job yaml * code formatting and cleanup * Fix formatting in managed_job.yaml by removing unnecessary whitespace * Remove unnecessary whitespace in managed_job.yaml for improved formatting * add sync down smoke tests * Enhance smoke tests for managed jobs by adding command to wait for job status. This update improves the reliability of the test by ensuring the job is in a RUNNING state before proceeding with log synchronization. * refactor sync down logic remove the dependency of getting job status * formatting * fix rsync bug due to refactor * clean up printing * Update sky/jobs/core.py Co-authored-by: Christopher Cooper * Refactor job ID retrieval and update CLI option flag - Changed the method name from `get_job_ids_by_name` to `get_all_job_ids_by_name` for clarity in `ManagedJobCodeGen`. - Updated the CLI option flag for `sync-down` from `-d` to `-s` for better alignment with common conventions. * format & linting --------- Co-authored-by: Christopher Cooper --- sky/backends/cloud_vm_ray_backend.py | 146 ++++++++++++++++++++++++++ sky/cli.py | 26 +++-- sky/jobs/__init__.py | 2 + sky/jobs/core.py | 46 ++++++++ sky/jobs/state.py | 27 +++++ sky/jobs/utils.py | 9 ++ tests/smoke_tests/test_managed_job.py | 23 ++++ 7 files changed, 272 insertions(+), 7 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 2316888b44c..128c2acafe5 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -3891,6 +3891,152 @@ def tail_managed_job_logs(self, stdin=subprocess.DEVNULL, ) + def sync_down_managed_job_logs( + self, + handle: CloudVmRayResourceHandle, + job_id: Optional[int] = None, + job_name: Optional[str] = None, + controller: bool = False, + local_dir: str = constants.SKY_LOGS_DIRECTORY) -> Dict[str, str]: + """Sync down logs for a managed job. + + Args: + handle: The handle to the cluster. + job_id: The job ID to sync down logs for. + job_name: The job name to sync down logs for. + controller: Whether to sync down logs for the controller. + local_dir: The local directory to sync down logs to. + + Returns: + A dictionary mapping job_id to log path. + """ + # if job_name is not None, job_id should be None + assert job_name is None or job_id is None, (job_name, job_id) + if job_id is None and job_name is not None: + # generate code to get the job_id + code = managed_jobs.ManagedJobCodeGen.get_all_job_ids_by_name( + job_name=job_name) + returncode, run_timestamps, stderr = self.run_on_head( + handle, + code, + stream_logs=False, + require_outputs=True, + separate_stderr=True) + subprocess_utils.handle_returncode(returncode, code, + 'Failed to sync down logs.', + stderr) + job_ids = common_utils.decode_payload(run_timestamps) + if not job_ids: + logger.info(f'{colorama.Fore.YELLOW}' + 'No matching job found' + f'{colorama.Style.RESET_ALL}') + return {} + elif len(job_ids) > 1: + logger.info( + f'{colorama.Fore.YELLOW}' + f'Multiple jobs IDs found under the name {job_name}. ' + 'Downloading the latest job logs.' + f'{colorama.Style.RESET_ALL}') + job_ids = [job_ids[0]] # descending order + else: + job_ids = [job_id] + + # get the run_timestamp + # the function takes in [job_id] + code = job_lib.JobLibCodeGen.get_run_timestamp_with_globbing(job_ids) + returncode, run_timestamps, stderr = self.run_on_head( + handle, + code, + stream_logs=False, + require_outputs=True, + separate_stderr=True) + subprocess_utils.handle_returncode(returncode, code, + 'Failed to sync logs.', stderr) + # returns with a dict of {job_id: run_timestamp} + run_timestamps = common_utils.decode_payload(run_timestamps) + if not run_timestamps: + logger.info(f'{colorama.Fore.YELLOW}' + 'No matching log directories found' + f'{colorama.Style.RESET_ALL}') + return {} + + run_timestamp = list(run_timestamps.values())[0] + job_id = list(run_timestamps.keys())[0] + local_log_dir = '' + if controller: # download controller logs + remote_log_dir = os.path.join(constants.SKY_LOGS_DIRECTORY, + run_timestamp) + local_log_dir = os.path.expanduser( + os.path.join(local_dir, run_timestamp)) + + logger.info(f'{colorama.Fore.CYAN}' + f'Job {job_ids} local logs: {local_log_dir}' + f'{colorama.Style.RESET_ALL}') + + runners = handle.get_command_runners() + + def _rsync_down(args) -> None: + """Rsync down logs from remote nodes. + + Args: + args: A tuple of (runner, local_log_dir, remote_log_dir) + """ + (runner, local_log_dir, remote_log_dir) = args + try: + os.makedirs(local_log_dir, exist_ok=True) + runner.rsync( + source=f'{remote_log_dir}/', + target=local_log_dir, + up=False, + stream_logs=False, + ) + except exceptions.CommandError as e: + if e.returncode == exceptions.RSYNC_FILE_NOT_FOUND_CODE: + # Raised by rsync_down. Remote log dir may not exist + # since the job can be run on some part of the nodes. + logger.debug( + f'{runner.node_id} does not have the tasks/*.') + else: + raise + + parallel_args = [[runner, *item] + for item in zip([local_log_dir], [remote_log_dir]) + for runner in runners] + subprocess_utils.run_in_parallel(_rsync_down, parallel_args) + else: # download job logs + local_log_dir = os.path.expanduser( + os.path.join(local_dir, 'managed_jobs', run_timestamp)) + os.makedirs(os.path.dirname(local_log_dir), exist_ok=True) + log_file = os.path.join(local_log_dir, 'run.log') + + code = managed_jobs.ManagedJobCodeGen.stream_logs(job_name=None, + job_id=job_id, + follow=False, + controller=False) + + # With the stdin=subprocess.DEVNULL, the ctrl-c will not + # kill the process, so we need to handle it manually here. + if threading.current_thread() is threading.main_thread(): + signal.signal(signal.SIGINT, backend_utils.interrupt_handler) + signal.signal(signal.SIGTSTP, backend_utils.stop_handler) + + # We redirect the output to the log file + # and disable the STDOUT and STDERR + self.run_on_head( + handle, + code, + log_path=log_file, + stream_logs=False, + process_stream=False, + ssh_mode=command_runner.SshMode.INTERACTIVE, + stdin=subprocess.DEVNULL, + ) + + logger.info(f'{colorama.Fore.CYAN}' + f'Job {job_id} logs: {local_log_dir}' + f'{colorama.Style.RESET_ALL}') + return {str(job_id): local_log_dir} + def tail_serve_logs(self, handle: CloudVmRayResourceHandle, service_name: str, target: serve_lib.ServiceComponent, replica_id: Optional[int], follow: bool) -> None: diff --git a/sky/cli.py b/sky/cli.py index d00aae9b646..27948f9ec85 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -3933,17 +3933,29 @@ def jobs_cancel(name: Optional[str], job_ids: Tuple[int], all: bool, yes: bool): required=False, help='Query the latest job logs, restarting the jobs controller if stopped.' ) +@click.option('--sync-down', + '-s', + default=False, + is_flag=True, + required=False, + help='Download logs for all jobs shown in the queue.') @click.argument('job_id', required=False, type=int) @usage_lib.entrypoint def jobs_logs(name: Optional[str], job_id: Optional[int], follow: bool, - controller: bool, refresh: bool): - """Tail the log of a managed job.""" + controller: bool, refresh: bool, sync_down: bool): + """Tail or sync down the log of a managed job.""" try: - managed_jobs.tail_logs(name=name, - job_id=job_id, - follow=follow, - controller=controller, - refresh=refresh) + if sync_down: + managed_jobs.sync_down_logs(name=name, + job_id=job_id, + controller=controller, + refresh=refresh) + else: + managed_jobs.tail_logs(name=name, + job_id=job_id, + follow=follow, + controller=controller, + refresh=refresh) except exceptions.ClusterNotUpError: with ux_utils.print_exception_no_traceback(): raise diff --git a/sky/jobs/__init__.py b/sky/jobs/__init__.py index 5688ca7c7a2..5f52a863e36 100644 --- a/sky/jobs/__init__.py +++ b/sky/jobs/__init__.py @@ -9,6 +9,7 @@ from sky.jobs.core import launch from sky.jobs.core import queue from sky.jobs.core import queue_from_kubernetes_pod +from sky.jobs.core import sync_down_logs from sky.jobs.core import tail_logs from sky.jobs.recovery_strategy import DEFAULT_RECOVERY_STRATEGY from sky.jobs.recovery_strategy import RECOVERY_STRATEGIES @@ -37,6 +38,7 @@ 'queue', 'queue_from_kubernetes_pod', 'tail_logs', + 'sync_down_logs', # utils 'ManagedJobCodeGen', 'format_job_table', diff --git a/sky/jobs/core.py b/sky/jobs/core.py index 3718d0ac67c..3cb67daba94 100644 --- a/sky/jobs/core.py +++ b/sky/jobs/core.py @@ -427,6 +427,52 @@ def tail_logs(name: Optional[str], job_id: Optional[int], follow: bool, controller=controller) +@usage_lib.entrypoint +def sync_down_logs( + name: Optional[str], + job_id: Optional[int], + refresh: bool, + controller: bool, + local_dir: str = skylet_constants.SKY_LOGS_DIRECTORY) -> None: + """Sync down logs of managed jobs. + + Please refer to sky.cli.job_logs for documentation. + + Raises: + ValueError: invalid arguments. + sky.exceptions.ClusterNotUpError: the jobs controller is not up. + """ + # TODO(zhwu): Automatically restart the jobs controller + if name is not None and job_id is not None: + with ux_utils.print_exception_no_traceback(): + raise ValueError('Cannot specify both name and job_id.') + + jobs_controller_type = controller_utils.Controllers.JOBS_CONTROLLER + job_name_or_id_str = '' + if job_id is not None: + job_name_or_id_str = str(job_id) + elif name is not None: + job_name_or_id_str = f'-n {name}' + else: + job_name_or_id_str = '' + handle = _maybe_restart_controller( + refresh, + stopped_message=( + f'{jobs_controller_type.value.name.capitalize()} is stopped. To ' + f'get the logs, run: {colorama.Style.BRIGHT}sky jobs logs ' + f'-r --sync-down {job_name_or_id_str}{colorama.Style.RESET_ALL}'), + spinner_message='Retrieving job logs') + + backend = backend_utils.get_backend_from_handle(handle) + assert isinstance(backend, backends.CloudVmRayBackend), backend + + backend.sync_down_managed_job_logs(handle, + job_id=job_id, + job_name=name, + controller=controller, + local_dir=local_dir) + + spot_launch = common_utils.deprecated_function( launch, name='sky.jobs.launch', diff --git a/sky/jobs/state.py b/sky/jobs/state.py index 31dcfcfd5eb..5da807b8bbb 100644 --- a/sky/jobs/state.py +++ b/sky/jobs/state.py @@ -564,6 +564,33 @@ def get_nonterminal_job_ids_by_name(name: Optional[str]) -> List[int]: return job_ids +def get_all_job_ids_by_name(name: Optional[str]) -> List[int]: + """Get all job ids by name.""" + name_filter = '' + field_values = [] + if name is not None: + # We match the job name from `job_info` for the jobs submitted after + # #1982, and from `spot` for the jobs submitted before #1982, whose + # job_info is not available. + name_filter = ('WHERE (job_info.name=(?) OR ' + '(job_info.name IS NULL AND spot.task_name=(?)))') + field_values = [name, name] + + # Left outer join is used here instead of join, because the job_info does + # not contain the managed jobs submitted before #1982. + with db_utils.safe_cursor(_DB_PATH) as cursor: + rows = cursor.execute( + f"""\ + SELECT DISTINCT spot.spot_job_id + FROM spot + LEFT OUTER JOIN job_info + ON spot.spot_job_id=job_info.spot_job_id + {name_filter} + ORDER BY spot.spot_job_id DESC""", field_values).fetchall() + job_ids = [row[0] for row in rows if row[0] is not None] + return job_ids + + def _get_all_task_ids_statuses( job_id: int) -> List[Tuple[int, ManagedJobStatus]]: with db_utils.safe_cursor(_DB_PATH) as cursor: diff --git a/sky/jobs/utils.py b/sky/jobs/utils.py index e5bbced997c..b044e31bda6 100644 --- a/sky/jobs/utils.py +++ b/sky/jobs/utils.py @@ -855,6 +855,15 @@ def cancel_job_by_name(cls, job_name: str) -> str: """) return cls._build(code) + @classmethod + def get_all_job_ids_by_name(cls, job_name: str) -> str: + code = textwrap.dedent(f"""\ + from sky.utils import common_utils + job_id = managed_job_state.get_all_job_ids_by_name({job_name!r}) + print(common_utils.encode_payload(job_id), end="", flush=True) + """) + return cls._build(code) + @classmethod def stream_logs(cls, job_name: Optional[str], diff --git a/tests/smoke_tests/test_managed_job.py b/tests/smoke_tests/test_managed_job.py index 5c930724523..4a16b469e5a 100644 --- a/tests/smoke_tests/test_managed_job.py +++ b/tests/smoke_tests/test_managed_job.py @@ -871,3 +871,26 @@ def test_managed_jobs_inline_env(generic_cloud: str): timeout=20 * 60, ) smoke_tests_utils.run_one_test(test) + + +@pytest.mark.managed_jobs +def test_managed_jobs_logs_sync_down(): + name = smoke_tests_utils.get_cluster_name() + test = smoke_tests_utils.Test( + 'test-managed-jobs-logs-sync-down', + [ + f'sky jobs launch -n {name} -y examples/managed_job.yaml -d', + smoke_tests_utils. + get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=f'{name}', + job_status=[sky.ManagedJobStatus.RUNNING], + timeout=300 + smoke_tests_utils.BUMP_UP_SECONDS), + f'sky jobs logs --controller 1 --sync-down', + f'sky jobs logs 1 --sync-down', + f'sky jobs logs --controller --name minimal --sync-down', + f'sky jobs logs --name minimal --sync-down', + ], + f'sky jobs cancel -y -n {name}', + timeout=20 * 60, + ) + smoke_tests_utils.run_one_test(test)