Skip to content

Commit

Permalink
Client: Runtime service wrapper to track runtime jobs (#1358)
Browse files Browse the repository at this point in the history
  • Loading branch information
IceKhan13 authored Jun 13, 2024
1 parent 4bda5e3 commit 6c9e6a9
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 0 deletions.
1 change: 1 addition & 0 deletions client/qiskit_serverless/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from .exception import QiskitServerlessException
from .core.function import QiskitPattern, QiskitFunction
from .serializers import get_arguments
from .utils import ServerlessRuntimeService

try:
__version__ = metadata_version("qiskit_serverless")
Expand Down
1 change: 1 addition & 0 deletions client/qiskit_serverless/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@
from .json import JsonSerializable
from .errors import ErrorCodes
from .storage import S3Storage, BaseStorage
from .runtime_service_client import ServerlessRuntimeService
119 changes: 119 additions & 0 deletions client/qiskit_serverless/utils/runtime_service_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# This code is a Qiskit project.
#
# (C) Copyright IBM 2024.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""
======================================================================
Json utilities (:mod:`qiskit_serverless.utils.runtime_service_client`)
======================================================================
.. currentmodule:: qiskit_serverless.utils.runtime_service_client
Qiskit Serverless runtime client wrapper
========================================
.. autosummary::
:toctree: ../stubs/
ServerlessRuntimeService
"""
import os
import logging
from typing import Callable, Dict, Sequence, Type, Union, Optional

import requests
from qiskit_ibm_runtime import QiskitRuntimeService
from qiskit_ibm_runtime.runtime_job import RuntimeJob
from qiskit_ibm_runtime.runtime_job_v2 import RuntimeJobV2
from qiskit_ibm_runtime.runtime_options import RuntimeOptions
from qiskit_ibm_runtime.utils.result_decoder import ResultDecoder

from qiskit_serverless.core.constants import (
REQUESTS_TIMEOUT,
ENV_JOB_GATEWAY_TOKEN,
ENV_JOB_GATEWAY_HOST,
ENV_JOB_ID_GATEWAY,
ENV_GATEWAY_PROVIDER_VERSION,
GATEWAY_PROVIDER_VERSION_DEFAULT,
)


def associate_runtime_job_with_serverless_job(
runtime_job_id: str, session_id: Optional[str] = None
) -> bool:
"""Make a request to gateway to associate runtime job id with serverless job id.
Args:
runtime_job_id (str): job id for runtime primitive
session_id (str): session/batch id
Returns:
bool: if request was ok
"""
version = os.environ.get(ENV_GATEWAY_PROVIDER_VERSION)
if version is None:
version = GATEWAY_PROVIDER_VERSION_DEFAULT

token = os.environ.get(ENV_JOB_GATEWAY_TOKEN)
if token is None:
logging.warning("Runtime job will not be associated with serverless job.")
return False

url = (
f"{os.environ.get(ENV_JOB_GATEWAY_HOST)}/"
f"api/{version}/jobs/{os.environ.get(ENV_JOB_ID_GATEWAY)}/add_runtimejob/"
)
response = requests.post(
url,
json={"runtime_job": runtime_job_id, "runtime_session": session_id},
headers={"Authorization": f"Bearer {token}"},
timeout=REQUESTS_TIMEOUT,
)
if not response.ok:
logging.warning("Something went wrong: %s", response.text)

return response.ok


class ServerlessRuntimeService(QiskitRuntimeService):
"""Serverless wrapper for QiskitRuntimeService.
Used for associating runtime jobs with serverless jobs.
Args:
QiskitRuntimeService (QiskitRuntimeService): Qiskit runtime service object.
"""

def run(
self,
program_id: str,
inputs: Dict,
options: Optional[Union[RuntimeOptions, Dict]] = None,
callback: Optional[Callable] = None,
result_decoder: Optional[
Union[Type[ResultDecoder], Sequence[Type[ResultDecoder]]]
] = None,
session_id: Optional[str] = None,
start_session: Optional[bool] = False,
) -> Union[RuntimeJob, RuntimeJobV2]:
runtime_job = super().run(
program_id,
inputs,
options,
callback,
result_decoder,
session_id,
start_session,
)
associate_runtime_job_with_serverless_job(
runtime_job.job_id(), runtime_job.session_id
)
return runtime_job
18 changes: 18 additions & 0 deletions gateway/api/migrations/0024_runtimejob_session_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 5.0.6 on 2024-06-06 14:19

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("api", "0023_provider_program_provider"),
]

operations = [
migrations.AddField(
model_name="runtimejob",
name="session_id",
field=models.CharField(blank=True, default=None, max_length=100, null=True),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 5.0.6 on 2024-06-11 18:49

from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
("api", "0024_runtimejob_session_id"),
]

operations = [
migrations.RenameField(
model_name="runtimejob",
old_name="session_id",
new_name="runtime_session",
),
]
13 changes: 13 additions & 0 deletions gateway/api/migrations/0026_merge_20240613_1848.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Generated by Django 5.0.6 on 2024-06-13 18:48

from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
("api", "0024_program_description"),
("api", "0025_rename_session_id_runtimejob_runtime_session"),
]

operations = []
3 changes: 3 additions & 0 deletions gateway/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,6 @@ class RuntimeJob(models.Model):
runtime_job = models.CharField(
primary_key=True, max_length=100, blank=False, null=False
)
runtime_session = models.CharField(
max_length=100, blank=True, null=True, default=None
)
6 changes: 6 additions & 0 deletions gateway/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,11 @@ def stop(self, request, pk=None): # pylint: disable=invalid-name,unused-argumen
except RuntimeInvalidStateError:
logger.warning("cancel failed")

if jobinstance.session_id:
service._api_client.cancel_session( # pylint: disable=protected-access
jobinstance.session_id
)

if job.compute_resource:
if job.compute_resource.active:
job_handler = get_job_handler(job.compute_resource.host)
Expand Down Expand Up @@ -462,6 +467,7 @@ def add_runtimejob(
runtimejob = RuntimeJob(
job=job,
runtime_job=request.data.get("runtime_job"),
runtime_session=request.data.get("runtime_session"),
)
runtimejob.save()
message = "RuntimeJob is added."
Expand Down

0 comments on commit 6c9e6a9

Please sign in to comment.