diff --git a/lamini/__init__.py b/lamini/__init__.py index 075699a..b0b701f 100644 --- a/lamini/__init__.py +++ b/lamini/__init__.py @@ -3,10 +3,6 @@ # isort: off from lamini.error import error -from lamini.runners.llama_v2_runner import LlamaV2Runner -from lamini.runners.llama_v3_runner import LlamaV3Runner -from lamini.runners.basic_model_runner import BasicModelRunner -from lamini.runners.mistral_runner import MistralRunner from lamini.api.lamini import Lamini from lamini.api.classifier import Classifier from lamini.api.embedding import Embedding @@ -32,3 +28,4 @@ max_workers = int(os.environ.get("LAMINI_MAX_WORKERS", 4)) batch_size = int(os.environ.get("LAMINI_BATCH_SIZE", 5)) static_batching = bool(os.environ.get("LAMINI_STATIC_BATCHING", False)) +bypass_reservation = bool(os.environ.get("LAMINI_BYPASS_RESERVATION", False)) diff --git a/lamini/api/lamini.py b/lamini/api/lamini.py index b6ada85..2b02a93 100644 --- a/lamini/api/lamini.py +++ b/lamini/api/lamini.py @@ -10,7 +10,7 @@ from lamini.api.rest_requests import get_version from lamini.api.train import Train from lamini.api.utils.completion import Completion -from lamini.api.utils.upload_client import get_dataset_name, upload_to_blob +from lamini.api.utils.upload_client import upload_to_blob logger = logging.getLogger(__name__) @@ -98,37 +98,36 @@ def get_data_str(d): output = self.trainer.get_upload_base_path() self.upload_base_path = output["upload_base_path"] - dataset_id = get_dataset_name() - try: if self.upload_base_path == "azure": data_str = get_data_str(data) - output = self.trainer.create_blob_dataset_location( - self.upload_base_path, dataset_id, is_public + response = self.trainer.create_blob_dataset_location( + self.upload_base_path, is_public ) - self.upload_file_path = output["dataset_location"] + self.upload_file_path = response["dataset_location"] upload_to_blob(data_str, self.upload_file_path) self.trainer.update_blob_dataset_num_datapoints( - dataset_id, num_datapoints + response["dataset_id"], num_datapoints ) print("Data pairs uploaded to blob.") else: - output = self.trainer.upload_dataset_locally( - self.upload_base_path, dataset_id, is_public, data + response = self.trainer.upload_dataset_locally( + self.upload_base_path, is_public, data ) - self.upload_file_path = output["dataset_location"] + self.upload_file_path = response["dataset_location"] print("Data pairs uploaded to local.") + print(response) print( - f"\nYour dataset id is: {dataset_id} . Consider using this in the future to train using the same data. \nEg: " - f"llm.train(dataset_id='{dataset_id}')" + f"\nYour dataset id is: {response['dataset_id']} . Consider using this in the future to train using the same data. \nEg: " + f"llm.train(data_or_dataset_id='{response['dataset_id']}')" ) except Exception as e: print(f"Error uploading data pairs: {e}") raise e - return dataset_id + return response["dataset_id"] def upload_file( self, file_path: str, input_key: str = "input", output_key: str = "output" @@ -186,10 +185,8 @@ def train( ], finetune_args: Optional[dict] = None, gpu_config: Optional[dict] = None, - peft_args: Optional[dict] = None, is_public: Optional[bool] = None, - use_cached_model: Optional[bool] = None, - multi_node: Optional[bool] = None, + **kwargs, ): if isinstance(data_or_dataset_id, str): dataset_id = data_or_dataset_id @@ -199,7 +196,7 @@ def train( base_path = self.trainer.get_upload_base_path() self.upload_base_path = base_path["upload_base_path"] existing_dataset = self.trainer.get_existing_dataset( - dataset_id, self.upload_base_path, is_public + dataset_id, self.upload_base_path ) self.upload_file_path = existing_dataset["dataset_location"] @@ -209,10 +206,7 @@ def train( upload_file_path=self.upload_file_path, finetune_args=finetune_args, gpu_config=gpu_config, - peft_args=peft_args, is_public=is_public, - use_cached_model=use_cached_model, - multi_node=multi_node, ) job["dataset_id"] = dataset_id return job @@ -228,20 +222,14 @@ def train_and_wait( ], finetune_args: Optional[dict] = None, gpu_config: Optional[dict] = None, - peft_args: Optional[dict] = None, is_public: Optional[bool] = None, - use_cached_model: Optional[bool] = None, - multi_node: Optional[bool] = None, **kwargs, ): job = self.train( data_or_dataset_id, finetune_args=finetune_args, gpu_config=gpu_config, - peft_args=peft_args, is_public=is_public, - use_cached_model=use_cached_model, - multi_node=multi_node, ) try: diff --git a/lamini/api/train.py b/lamini/api/train.py index 2a3e53d..80bafe1 100644 --- a/lamini/api/train.py +++ b/lamini/api/train.py @@ -28,10 +28,7 @@ def train( upload_file_path: Optional[str] = None, finetune_args: Optional[dict] = None, gpu_config: Optional[dict] = None, - peft_args: Optional[dict] = None, is_public: Optional[bool] = None, - use_cached_model: Optional[bool] = None, - multi_node: Optional[bool] = None, ): req_data = {"model_name": model_name} req_data["dataset_id"] = dataset_id @@ -41,14 +38,8 @@ def train( req_data["finetune_args"] = finetune_args if gpu_config is not None: req_data["gpu_config"] = gpu_config - if peft_args is not None: - req_data["peft_args"] = peft_args if is_public is not None: req_data["is_public"] = is_public - if use_cached_model is not None: - req_data["use_cached_model"] = use_cached_model - if multi_node is not None: - req_data["multi_node"] = multi_node url = self.api_prefix + "train" job = make_web_request(self.api_key, url, "post", req_data) @@ -102,21 +93,15 @@ def evaluate(self, job_id=None): return make_web_request(self.api_key, url, "get") - def create_blob_dataset_location( - self, upload_base_path, dataset_id, is_public, data=None - ): + def create_blob_dataset_location(self, upload_base_path, is_public): url = self.api_prefix + "data" req_data = { "upload_base_path": upload_base_path, - "dataset_id": dataset_id, } if is_public is not None: req_data["is_public"] = is_public - if data is not None: - req_data["data"] = data - return make_web_request( self.api_key, url, @@ -142,11 +127,10 @@ def get_upload_base_path(self): url = self.api_prefix + "get-upload-base-path" return make_web_request(self.api_key, url, "get") - def upload_dataset_locally(self, upload_base_path, dataset_id, is_public, data): + def upload_dataset_locally(self, upload_base_path, is_public, data): url = self.api_prefix + "local-data" req_data = {} req_data["upload_base_path"] = upload_base_path - req_data["dataset_id"] = dataset_id req_data["data"] = SerializableGenerator(data) if is_public is not None: req_data["is_public"] = is_public @@ -157,12 +141,10 @@ def upload_dataset_locally(self, upload_base_path, dataset_id, is_public, data): req_data, ) - def get_existing_dataset(self, dataset_id, upload_base_path, is_public): + def get_existing_dataset(self, dataset_id, upload_base_path): url = self.api_prefix + "existing-data" req_data = {"dataset_id": dataset_id} req_data["upload_base_path"] = upload_base_path - if is_public is not None: - req_data["is_public"] = is_public return make_web_request( self.api_key, url, diff --git a/lamini/api/utils/completion.py b/lamini/api/utils/completion.py index 5f8cc1c..629a1b2 100644 --- a/lamini/api/utils/completion.py +++ b/lamini/api/utils/completion.py @@ -1,17 +1,55 @@ -import logging -from typing import List, Optional, Union +from typing import List, Optional, Union, Dict, Any import aiohttp import lamini from lamini.api.lamini_config import get_config, get_configured_key, get_configured_url from lamini.api.rest_requests import make_async_web_request, make_web_request -logger = logging.getLogger(__name__) - class Completion: - def __init__(self, api_key, api_url): + """ Hanlder for formatting and POST request for the completions + and streaming_completions API endpoints. + + + Parameters + ---------- + api_key: Optinal[str] + Lamini platform API key, if not provided the key stored + within ~.lamini/configure.yaml will be used. If either + don't exist then an error is raised. + + api_url: Optional[str] + Lamini platform api url, only needed if a different url is needed outside of the + defined ones here: https://github.com/lamini-ai/lamini-platform/blob/main/sdk/lamini/api/lamini_config.py#L68 + i.e. localhost, staging.lamini.ai, or api.lamini.ai + Additionally, LLAMA_ENVIRONMENT can be set as an environment variable + that will be grabbed for the url before any of the above defaults + + """ + + def __init__(self, api_key, api_url) -> None: + + """ + Configuration dictionary for platform metadata provided by the following function: + https://github.com/lamini-ai/lamini-platform/blob/main/sdk/lamini/api/lamini_config.py + Configurations currently hold the following keys and data as a yaml format: + local: + url: + staging: + url: + production: + url: + + local: + key: + staging: + key: + production: + key: + + """ self.config = get_config() + self.api_key = api_key or lamini.api_key or get_configured_key(self.config) self.api_url = api_url or lamini.api_url or get_configured_url(self.config) self.api_prefix = self.api_url + "/v1/" @@ -23,7 +61,33 @@ def generate( output_type: Optional[dict] = None, max_tokens: Optional[int] = None, max_new_tokens: Optional[int] = None, - ): + ) -> Dict[str, Any]: + """Handles construction of the POST request headers and body, then + a web request is made with the response returned. + + Parameters + ---------- + prompt: Union[str, List[str]]: + Input prompt for the LLM + + model_name: str + LLM model name from HuggingFace + + output_type: Optional[dict] = None + Json format for the LLM output + + max_tokens: Optional[int] = None + Upper limit in total tokens + + max_new_tokens: Optional[int] = None + Upper limit for newly generated tokens + + Returns + ------- + resp: Dict[str, Any] + Json data returned from POST request + """ + req_data = self.make_llm_req_map( prompt=prompt, model_name=model_name, @@ -36,7 +100,23 @@ def generate( ) return resp - async def async_generate(self, params, client: aiohttp.ClientSession = None): + async def async_generate(self, params: Dict[str, Any], client: aiohttp.ClientSession = None) -> Dict[str, Any]: + """ + + Parameters + ---------- + params: Dict[str, Any] + POST Request input parameters + + client: aiohttp.ClientSession = None + ClientSession handler + + Returns + ------- + resp: Dict[str, Any] + Json data returned from POST request + """ + if client is not None: assert isinstance(client, aiohttp.ClientSession) resp = await make_async_web_request( @@ -61,14 +141,38 @@ def make_llm_req_map( output_type: Optional[dict] = None, max_tokens: Optional[int] = None, max_new_tokens: Optional[int] = None, - ): + ) -> Dict[str, Any]: """Returns a dict of parameters for calling the remote LLM inference API. NOTE: Copied from lamini.py. TODO: Create a helper function that accepts all values and returns a dict. And replace callers of self.make_llm_req_map() with the calling of the free function. + + Parameters + ---------- + model_name: str + LLM model name from HuggingFace + + prompt: Union[str, List[str]]: + Input prompt for the LLM + + output_type: Optional[dict] = None + Json format for the LLM output + + max_tokens: Optional[int] = None + Upper limit in total tokens + + max_new_tokens: Optional[int] = None + Upper limit for newly generated tokens + + Returns + ------- + req_data: Dict[str, Any] + Constructed dictionary with parameters provided into the correctly + specified keys for a REST request. """ + req_data = {} req_data["model_name"] = model_name # TODO: prompt should be named prompt to signal it's a batch. diff --git a/lamini/api/utils/iterators.py b/lamini/api/utils/iterators.py index 1e9fdaa..2ba917b 100644 --- a/lamini/api/utils/iterators.py +++ b/lamini/api/utils/iterators.py @@ -1,7 +1,19 @@ -from typing import Iterator +from typing import Iterator, AsyncGenerator, Any -async def async_iter(normal_iter: Iterator): - """Adapt an normal iterator to an async iterator""" +async def async_iter(normal_iter: Iterator) -> AsyncGenerator[Any, None]: + """Adapt an normal iterator to an async iterator + + Parameters + ---------- + normal_iter: Iterator + Iterator to wrap with a yield generator + + Yields + ------- + item: Any + Items within the provided normal iterator + """ + for item in normal_iter: yield item diff --git a/lamini/api/utils/reservations.py b/lamini/api/utils/reservations.py index 72523f7..8c23f63 100644 --- a/lamini/api/utils/reservations.py +++ b/lamini/api/utils/reservations.py @@ -4,6 +4,7 @@ import time from typing import Optional +import aiohttp import lamini from lamini.api.lamini_config import get_config, get_configured_key, get_configured_url from lamini.api.rest_requests import make_async_web_request, make_web_request @@ -12,12 +13,33 @@ class Reservations: + """ Hanlder for API reservations endpoint. + + + Parameters + ---------- + api_key: Optinal[str] = None + Lamini platform API key, if not provided the key stored + within ~.lamini/configure.yaml will be used. If either + don't exist then an error is raised. + + api_url: Optional[str] = None + Lamini platform api url, only needed if a different url is needed outside of the + defined ones here: https://github.com/lamini-ai/lamini-platform/blob/main/sdk/lamini/api/lamini_config.py#L68 + i.e. localhost, staging.lamini.ai, or api.lamini.ai + Additionally, LLAMA_ENVIRONMENT can be set as an environment variable + that will be grabbed for the url before any of the above defaults + + variable_capacity: Optional[bool] = False + + """ + def __init__( self, - api_key: str = None, - api_url: str = None, - variable_capacity=False, - ): + api_key: Optional[str] = None, + api_url: Optional[str] = None, + variable_capacity: Optional[bool] = False, + ) -> None: self.config = get_config() self.api_key = api_key or lamini.api_key or get_configured_key(self.config) self.api_url = api_url or lamini.api_url or get_configured_url(self.config) @@ -35,7 +57,35 @@ def __init__( def initialize_reservation( self, capacity: int, model_name: str, batch_size: int, max_tokens: Optional[int] - ): + ) -> None: + """Submit post request to the reservations endpoint and store the + reservation metadata within this object. + + Parameters + ---------- + capacity: int + Reservation capactiy + + model_name: str + Model to use for the reserved request + + batch_size: int + Batch size for the inference call + + max_tokens: Optional[int] + Max tokens for the inference call + + Returns + ------- + None + + Raises + ------ + Exception + General exception for reservation issues. The exception is logged + but execution is continued. + """ + try: logger.info( f"Attempt reservation {capacity} {model_name} {batch_size} {max_tokens}" @@ -74,7 +124,18 @@ def initialize_reservation( self.model_name = model_name self.max_tokens = None - def pause_for_reservation_start(self): + def pause_for_reservation_start(self) -> None: + """ Barrier until specified start time for the reservation + + Parameters + ---------- + None + + Returns + ------- + None + """ + if self.current_reservation is None: return current_time = datetime.datetime.utcnow() @@ -85,7 +146,21 @@ def pause_for_reservation_start(self): if sleep_time.total_seconds() > 0: time.sleep(sleep_time.total_seconds()) - async def wait_and_poll_for_reservation(self, client): + async def wait_and_poll_for_reservation(self, client: aiohttp.ClientSession) -> None: + """Wait for current reservation to finish and then make a new reservation. If + this reservation is working (indicated by the self.is_working flag), then + set the kickoff and timer based polling jobs. + + Parameters + ---------- + client: aiohttp.ClientSession + Http Client Handler + + Returns + ------- + None + """ + await self.poll_for_reservation.wait() self.is_polling = True self.poll_for_reservation.clear() @@ -98,7 +173,7 @@ async def wait_and_poll_for_reservation(self, client): "capacity": max(self.capacity_needed, self.batch_size), "model_name": self.model_name, "max_tokens": self.max_tokens, - "batch_size": self.get_dynamic_max_batch_size(), + "batch_size": self.batch_size, }, ) logger.info("Made reservation " + str(reservation)) @@ -110,11 +185,43 @@ async def wait_and_poll_for_reservation(self, client): async with self.condition: self.condition.notify(len(self.condition._waiters)) self.is_polling = False - self.polling_task = asyncio.create_task( - self.kickoff_reservation_polling(client) - ) + if self.is_working: + self.polling_task = asyncio.create_task( + self.kickoff_reservation_polling(client) + ) + logger.info("Made reservation " + str(reservation)) + if "dynamic_max_batch_size" not in reservation: + reservation["dynamic_max_batch_size"] = lamini.batch_size + self.current_reservation = reservation + self.capacity_remaining = reservation["capacity_remaining"] + self.dynamic_max_batch_size = reservation["dynamic_max_batch_size"] + if self.variable_capacity: + self.capacity_needed = self.dynamic_max_batch_size * lamini.max_workers + async with self.condition: + self.condition.notify(len(self.condition._waiters)) + self.is_polling = False + if self.is_working: + self.polling_task = asyncio.create_task( + self.kickoff_reservation_polling(client) + ) + _ = asyncio.create_task( + self.timer_based_polling(reservation["end_time"]) + ) + + async def timer_based_polling(self, wakeup_time: int) -> None: + """Wait for the provided wakeup_time to run the polling for the + current reservation. + + Parameters + ---------- + wakeup_time: int + ISO format datetime + + Returns + ------- + None + """ - async def timer_based_polling(self, wakeup_time): try: current_time = datetime.datetime.utcnow() end_time = datetime.datetime.fromisoformat(wakeup_time) @@ -126,7 +233,21 @@ async def timer_based_polling(self, wakeup_time): except asyncio.CancelledError: logger.debug("Task was cancelled") - async def kickoff_reservation_polling(self, client): + async def kickoff_reservation_polling(self, client: aiohttp.ClientSession) -> None: + """If a current reservation is present, then kickoff the polling for this + reservation. If an error occurrs, the reservation is set to None and the + polling task is cancelled. + + Parameters + ---------- + client: aiohttp.ClientSession + Http Session handler + + Returns + ------- + None + """ + if self.current_reservation is None: return None try: @@ -137,7 +258,18 @@ async def kickoff_reservation_polling(self, client): self.polling_task.cancel() return None - async def async_pause_for_reservation_start(self): + async def async_pause_for_reservation_start(self) -> None: + """Sleep until start of the current reseravtion + + Parameters + ---------- + None + + Returns + ------- + None + """ + if self.current_reservation is None: return current_time = datetime.datetime.utcnow() @@ -148,22 +280,41 @@ async def async_pause_for_reservation_start(self): if sleep_time.total_seconds() > 0: await asyncio.sleep(sleep_time.total_seconds()) - def update_capacity_use(self, queries: int): + def update_capacity_use(self, queries: int) -> None: + """Decrease the self.capacity_remaining param by the int queries + + Parameters + ---------- + queries: int + Quantity of queries to decrease from self.capacity_remaining + + Returns + ------- + None + """ + if self.current_reservation is None: return self.capacity_remaining -= queries - def update_capacity_needed(self, queries: int): + def update_capacity_needed(self, queries: int) -> None: + """Decrease the self.capacity_needed param by the int queries + + Parameters + ---------- + queries: int + Quantity of queries to decrease from self.capacity_needed + + Returns + ------- + None + """ + if self.current_reservation is None: return self.capacity_needed -= queries - def get_dynamic_max_batch_size(self): - if lamini.static_batching: - return self.batch_size - - return self.dynamic_max_batch_size - - def __del__(self): + def __del__(self) -> None: + """Handler for object deletion, jobs cancelled when __del__ is called""" if self.polling_task is not None: self.polling_task.cancel() diff --git a/lamini/api/utils/shutdown.py b/lamini/api/utils/shutdown.py index 39f2f27..56bd333 100644 --- a/lamini/api/utils/shutdown.py +++ b/lamini/api/utils/shutdown.py @@ -2,8 +2,20 @@ import logging -async def shutdown(signal, loop): - """Cleanup tasks tied to the service's shutdown.""" +async def shutdown(signal, loop) -> None: + """ Cleanup tasks tied to the service's shutdown. + + Parameters + ---------- + signal: signal object containing signal source + + loop: current asyncio loop + + Returns + ------- + None + """ + logging.info(f"Received exit signal {signal.name}...") tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] diff --git a/lamini/api/utils/upload_client.py b/lamini/api/utils/upload_client.py index a109a7e..bc2f879 100644 --- a/lamini/api/utils/upload_client.py +++ b/lamini/api/utils/upload_client.py @@ -1,14 +1,27 @@ -import hashlib import itertools import os -import time -from typing import Dict, Iterable, List, Union +from typing import Iterable, Generator, Any import jsonlines from azure.storage.blob import BlobClient -def upload_to_blob(data: Iterable[str], sas_url: str): +def upload_to_blob(data: Iterable[str], sas_url: str) -> None: + """Upload the provided data to the sas_url + + Parameters + ---------- + data: Iterable[str] + Data to upload + + sas_url: str + Location to upload to + + Returns + ------- + None + """ + blob_client_sas = BlobClient.from_blob_url(blob_url=sas_url) if blob_client_sas.exists(): @@ -20,7 +33,22 @@ def upload_to_blob(data: Iterable[str], sas_url: str): print(f"Upload to blob completed for data.") -def upload_to_local(data, dataset_location): +def upload_to_local(data: Iterable[str], dataset_location: str) -> None: + """Upload provided data to local storage + + Parameters + ---------- + data: Iterable[str] + Data to upload + + dataset_location: str + Local location to store data + + Returns + ------- + None + """ + if os.path.exists(dataset_location): print(f"File/data already exists") else: @@ -31,16 +59,10 @@ def upload_to_local(data, dataset_location): print(f"Upload completed for data.") -def get_dataset_name(): - m = hashlib.sha256() - m.update(str(time.time()).encode("utf-8")) - return m.hexdigest() - - class SerializableGenerator(list): """Generator that is serializable by JSON to send uploaded data over http requests""" - def __init__(self, iterable): + def __init__(self, iterable) -> None: tmp_body = iter(iterable) try: self._head = iter([next(tmp_body)]) @@ -48,5 +70,5 @@ def __init__(self, iterable): except StopIteration: self._head = [] - def __iter__(self): + def __iter__(self) -> Generator[Any, None, None]: return itertools.chain(self._head, *self[:1]) diff --git a/lamini/generation/process_generation_batch.py b/lamini/generation/process_generation_batch.py index 217cd05..8f44a2c 100644 --- a/lamini/generation/process_generation_batch.py +++ b/lamini/generation/process_generation_batch.py @@ -37,8 +37,8 @@ def can_submit_query(): if reservation_api.current_reservation is not None: reservation_id = reservation_api.current_reservation["reservation_id"] json = get_body_from_args(batch, reservation_id) - logger.debug(f"Sending batch with {len(batch['prompt'])}") - result = await query_api(client, key, url, json, batch) + logger.info(f"Sending batch with {len(batch['prompt'])}") + result = await query_api(client, key, url, json, batch["type"]) except Exception as e: logger.debug( f"Error in process_generation_batch, type: {type(e)}, message: {e}" @@ -60,8 +60,8 @@ def can_submit_query(): prompt_obj.response = result[i] -async def query_api(client, key, url, json, batch): - if batch["type"] == "embedding": +async def query_api(client, key, url, json, type): + if type == "embedding": # TODO: Replace make_async_web_request() with Completion.generate() result = await make_async_web_request(client, key, url, "post", json) result = result["embedding"] diff --git a/pyproject.toml b/pyproject.toml index 5828669..a47ddf1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "lamini" -version = "3.0.4" +version = "3.0.5" authors = [ { name="PowerML", email="info@powerml.co" }, ] @@ -39,7 +39,6 @@ packages = [ "lamini.error", "lamini.api", "lamini.api.utils", - "lamini.runners", "lamini.generation", "lamini.evaluators", "lamini.evaluators.custom",