From 6c9e6a946c210d869d3932f34d0919064ea51af4 Mon Sep 17 00:00:00 2001 From: Iskandar Sitdikov Date: Thu, 13 Jun 2024 15:53:50 -0400 Subject: [PATCH] Client: Runtime service wrapper to track runtime jobs (#1358) --- client/qiskit_serverless/__init__.py | 1 + client/qiskit_serverless/utils/__init__.py | 1 + .../utils/runtime_service_client.py | 119 ++++++++++++++++++ .../migrations/0024_runtimejob_session_id.py | 18 +++ ...e_session_id_runtimejob_runtime_session.py | 18 +++ .../migrations/0026_merge_20240613_1848.py | 13 ++ gateway/api/models.py | 3 + gateway/api/views.py | 6 + 8 files changed, 179 insertions(+) create mode 100644 client/qiskit_serverless/utils/runtime_service_client.py create mode 100644 gateway/api/migrations/0024_runtimejob_session_id.py create mode 100644 gateway/api/migrations/0025_rename_session_id_runtimejob_runtime_session.py create mode 100644 gateway/api/migrations/0026_merge_20240613_1848.py diff --git a/client/qiskit_serverless/__init__.py b/client/qiskit_serverless/__init__.py index 112764e64..df7d1bb88 100644 --- a/client/qiskit_serverless/__init__.py +++ b/client/qiskit_serverless/__init__.py @@ -43,6 +43,7 @@ from .exception import QiskitServerlessException from .core.function import QiskitPattern, QiskitFunction from .serializers import get_arguments +from .utils import ServerlessRuntimeService try: __version__ = metadata_version("qiskit_serverless") diff --git a/client/qiskit_serverless/utils/__init__.py b/client/qiskit_serverless/utils/__init__.py index 57572acfb..ebeb13dac 100644 --- a/client/qiskit_serverless/utils/__init__.py +++ b/client/qiskit_serverless/utils/__init__.py @@ -32,3 +32,4 @@ from .json import JsonSerializable from .errors import ErrorCodes from .storage import S3Storage, BaseStorage +from .runtime_service_client import ServerlessRuntimeService diff --git a/client/qiskit_serverless/utils/runtime_service_client.py b/client/qiskit_serverless/utils/runtime_service_client.py new file mode 100644 index 000000000..31d7b53c7 --- /dev/null +++ b/client/qiskit_serverless/utils/runtime_service_client.py @@ -0,0 +1,119 @@ +# This code is a Qiskit project. +# +# (C) Copyright IBM 2024. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +""" +====================================================================== +Json utilities (:mod:`qiskit_serverless.utils.runtime_service_client`) +====================================================================== + +.. currentmodule:: qiskit_serverless.utils.runtime_service_client + +Qiskit Serverless runtime client wrapper +======================================== + +.. autosummary:: + :toctree: ../stubs/ + + ServerlessRuntimeService +""" +import os +import logging +from typing import Callable, Dict, Sequence, Type, Union, Optional + +import requests +from qiskit_ibm_runtime import QiskitRuntimeService +from qiskit_ibm_runtime.runtime_job import RuntimeJob +from qiskit_ibm_runtime.runtime_job_v2 import RuntimeJobV2 +from qiskit_ibm_runtime.runtime_options import RuntimeOptions +from qiskit_ibm_runtime.utils.result_decoder import ResultDecoder + +from qiskit_serverless.core.constants import ( + REQUESTS_TIMEOUT, + ENV_JOB_GATEWAY_TOKEN, + ENV_JOB_GATEWAY_HOST, + ENV_JOB_ID_GATEWAY, + ENV_GATEWAY_PROVIDER_VERSION, + GATEWAY_PROVIDER_VERSION_DEFAULT, +) + + +def associate_runtime_job_with_serverless_job( + runtime_job_id: str, session_id: Optional[str] = None +) -> bool: + """Make a request to gateway to associate runtime job id with serverless job id. + + Args: + runtime_job_id (str): job id for runtime primitive + session_id (str): session/batch id + + Returns: + bool: if request was ok + """ + version = os.environ.get(ENV_GATEWAY_PROVIDER_VERSION) + if version is None: + version = GATEWAY_PROVIDER_VERSION_DEFAULT + + token = os.environ.get(ENV_JOB_GATEWAY_TOKEN) + if token is None: + logging.warning("Runtime job will not be associated with serverless job.") + return False + + url = ( + f"{os.environ.get(ENV_JOB_GATEWAY_HOST)}/" + f"api/{version}/jobs/{os.environ.get(ENV_JOB_ID_GATEWAY)}/add_runtimejob/" + ) + response = requests.post( + url, + json={"runtime_job": runtime_job_id, "runtime_session": session_id}, + headers={"Authorization": f"Bearer {token}"}, + timeout=REQUESTS_TIMEOUT, + ) + if not response.ok: + logging.warning("Something went wrong: %s", response.text) + + return response.ok + + +class ServerlessRuntimeService(QiskitRuntimeService): + """Serverless wrapper for QiskitRuntimeService. + + Used for associating runtime jobs with serverless jobs. + + Args: + QiskitRuntimeService (QiskitRuntimeService): Qiskit runtime service object. + """ + + def run( + self, + program_id: str, + inputs: Dict, + options: Optional[Union[RuntimeOptions, Dict]] = None, + callback: Optional[Callable] = None, + result_decoder: Optional[ + Union[Type[ResultDecoder], Sequence[Type[ResultDecoder]]] + ] = None, + session_id: Optional[str] = None, + start_session: Optional[bool] = False, + ) -> Union[RuntimeJob, RuntimeJobV2]: + runtime_job = super().run( + program_id, + inputs, + options, + callback, + result_decoder, + session_id, + start_session, + ) + associate_runtime_job_with_serverless_job( + runtime_job.job_id(), runtime_job.session_id + ) + return runtime_job diff --git a/gateway/api/migrations/0024_runtimejob_session_id.py b/gateway/api/migrations/0024_runtimejob_session_id.py new file mode 100644 index 000000000..cf99533a6 --- /dev/null +++ b/gateway/api/migrations/0024_runtimejob_session_id.py @@ -0,0 +1,18 @@ +# Generated by Django 5.0.6 on 2024-06-06 14:19 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("api", "0023_provider_program_provider"), + ] + + operations = [ + migrations.AddField( + model_name="runtimejob", + name="session_id", + field=models.CharField(blank=True, default=None, max_length=100, null=True), + ), + ] diff --git a/gateway/api/migrations/0025_rename_session_id_runtimejob_runtime_session.py b/gateway/api/migrations/0025_rename_session_id_runtimejob_runtime_session.py new file mode 100644 index 000000000..8551e9118 --- /dev/null +++ b/gateway/api/migrations/0025_rename_session_id_runtimejob_runtime_session.py @@ -0,0 +1,18 @@ +# Generated by Django 5.0.6 on 2024-06-11 18:49 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ("api", "0024_runtimejob_session_id"), + ] + + operations = [ + migrations.RenameField( + model_name="runtimejob", + old_name="session_id", + new_name="runtime_session", + ), + ] diff --git a/gateway/api/migrations/0026_merge_20240613_1848.py b/gateway/api/migrations/0026_merge_20240613_1848.py new file mode 100644 index 000000000..178bfd1dc --- /dev/null +++ b/gateway/api/migrations/0026_merge_20240613_1848.py @@ -0,0 +1,13 @@ +# Generated by Django 5.0.6 on 2024-06-13 18:48 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ("api", "0024_program_description"), + ("api", "0025_rename_session_id_runtimejob_runtime_session"), + ] + + operations = [] diff --git a/gateway/api/models.py b/gateway/api/models.py index 3f7d52740..23d672b1a 100644 --- a/gateway/api/models.py +++ b/gateway/api/models.py @@ -219,3 +219,6 @@ class RuntimeJob(models.Model): runtime_job = models.CharField( primary_key=True, max_length=100, blank=False, null=False ) + runtime_session = models.CharField( + max_length=100, blank=True, null=True, default=None + ) diff --git a/gateway/api/views.py b/gateway/api/views.py index 34556cbf6..98782cfc5 100644 --- a/gateway/api/views.py +++ b/gateway/api/views.py @@ -429,6 +429,11 @@ def stop(self, request, pk=None): # pylint: disable=invalid-name,unused-argumen except RuntimeInvalidStateError: logger.warning("cancel failed") + if jobinstance.session_id: + service._api_client.cancel_session( # pylint: disable=protected-access + jobinstance.session_id + ) + if job.compute_resource: if job.compute_resource.active: job_handler = get_job_handler(job.compute_resource.host) @@ -462,6 +467,7 @@ def add_runtimejob( runtimejob = RuntimeJob( job=job, runtime_job=request.data.get("runtime_job"), + runtime_session=request.data.get("runtime_session"), ) runtimejob.save() message = "RuntimeJob is added."