diff --git a/README.md b/README.md index 17b4c6f..01df875 100644 --- a/README.md +++ b/README.md @@ -21,10 +21,10 @@ production: key: "" ``` -# Documentation +## Documentation Package documentation here: [https://lamini-ai.github.io/](https://lamini-ai.github.io/) -# Github +## Github Package source code here: [https://github.com/lamini-ai/lamini](https://github.com/lamini-ai/lamini) diff --git a/lamini/__init__.py b/lamini/__init__.py index b99df08..cff7699 100644 --- a/lamini/__init__.py +++ b/lamini/__init__.py @@ -1,18 +1,5 @@ # Turn of isort, because alphabetic order for the following imports causes circular dependency issues -# isort: off -from lamini.error import error - -from lamini.api.lamini import Lamini -from lamini.api.classifier import Classifier -from lamini.api.embedding import Embedding -from lamini.classify.lamini_classifier import LaminiClassifier -from lamini.generation.generation_node import GenerationNode -from lamini.generation.generation_pipeline import GenerationPipeline -from lamini.generation.base_prompt_object import PromptObject -from lamini.generation.split_response_node import SplitResponseNode -from lamini.api.streaming_completion import StreamingCompletion - import os api_key = os.environ.get("LAMINI_API_KEY", None) @@ -29,5 +16,23 @@ batch_size = int(os.environ.get("LAMINI_BATCH_SIZE", 5)) static_batching = bool(os.environ.get("LAMINI_STATIC_BATCHING", False)) bypass_reservation = bool(os.environ.get("LAMINI_BYPASS_RESERVATION", False)) +gate_pipeline_batch_completions = bool( + os.environ.get("GATE_PIPELINE_BATCH_COMPLETIONS", False) +) -__version__ = "3.1.0" +__version__ = "3.1.3" + +# isort: off + +from lamini.api.lamini import Lamini +from lamini.api.classifier import Classifier +from lamini.api.embedding import Embedding +from lamini.api.model_downloader import ModelDownloader +from lamini.api.model_downloader import ModelType +from lamini.api.model_downloader import DownloadedModel +from lamini.classify.lamini_classifier import LaminiClassifier +from lamini.generation.generation_node import GenerationNode +from lamini.generation.generation_pipeline import GenerationPipeline +from lamini.generation.base_prompt_object import PromptObject +from lamini.generation.split_response_node import SplitResponseNode +from lamini.api.streaming_completion import StreamingCompletion diff --git a/lamini/api/classifier.py b/lamini/api/classifier.py index 7cd1617..f0046da 100644 --- a/lamini/api/classifier.py +++ b/lamini/api/classifier.py @@ -1,5 +1,4 @@ -import time -from typing import List, Union, Optional +from typing import List, Optional, Union import lamini import requests diff --git a/lamini/api/lamini.py b/lamini/api/lamini.py index d307d81..82e30e2 100644 --- a/lamini/api/lamini.py +++ b/lamini/api/lamini.py @@ -1,19 +1,20 @@ +import enum import json -import jsonlines import logging import os -import pandas as pd import time +from typing import Any, Dict, Generator, Iterable, List, Optional, Union -from lamini.api.lamini_config import get_config -from lamini.api.rest_requests import get_version +import jsonlines +import pandas as pd +import lamini +from lamini.api.lamini_config import get_config, get_configured_key, get_configured_url +from lamini.api.model_downloader import ModelDownloader, ModelType, DownloadedModel +from lamini.api.rest_requests import get_version, make_web_request from lamini.api.train import Train from lamini.api.utils.completion import Completion from lamini.api.utils.upload_client import upload_to_blob -from lamini.error.error import ( - DownloadingModelError, -) -from typing import Dict, Iterable, List, Optional, Union, Any, Generator +from lamini.error.error import DownloadingModelError logger = logging.getLogger(__name__) @@ -48,8 +49,12 @@ def __init__( model_name: str, api_key: Optional[str] = None, api_url: Optional[str] = None, + model_type: ModelType = ModelType.transformer, ): self.config = get_config() + api_key = api_key or lamini.api_key or get_configured_key(self.config) + api_url = api_url or lamini.api_url or get_configured_url(self.config) + self.model_name = model_name self.api_key = api_key self.api_url = api_url @@ -57,20 +62,19 @@ def __init__( self.trainer = Train(api_key, api_url) self.upload_file_path = None self.upload_base_path = None + self.model_downloader = ModelDownloader(api_key, api_url) + self.model_type = model_type def version(self) -> str: """Get the version of the Lamini platform - Parameters ---------- None - Returns ------- str Returned version fo the platform """ - return get_version(self.api_key, self.api_url, self.config) def generate( @@ -117,17 +121,13 @@ def generate( specified, otherwise a dictionary matching the output_type is returned. """ - result = None - try: - result = self.completion.generate( - prompt=prompt, - model_name=model_name or self.model_name, - output_type=output_type, - max_tokens=max_tokens, - max_new_tokens=max_new_tokens, - ) - except DownloadingModelError as e: - return e + result = self.completion.generate( + prompt=prompt, + model_name=model_name or self.model_name, + output_type=output_type, + max_tokens=max_tokens, + max_new_tokens=max_new_tokens, + ) if output_type is None: if isinstance(prompt, list): result = [single_result["output"] for single_result in result] @@ -370,6 +370,63 @@ def _upload_file_impl( ) return items + def download_model( + self, + model_name: Optional[str] = None, + model_type: Optional[ModelType] = None, + wait: bool = False, + wait_time_seconds: int = 60, + ) -> DownloadedModel: + """Request Lamini Platform to download and cache the specified hugging face model. + So that the model can be immediately loaded to GPU memory afterwards. + Right now, only support downloading models from Hugging Face. + + Parameters + ---------- + hf_model_name: str + The full name of a hugging face model. Like meta-llama/Llama-3.2-11B-Vision-Instruct + in https://huggingface.co/meta-llama/Llama-3.2-11B-Vision-Instruct + + model_type: ModelType + The type of the requested model. + + Raises + ------ + Exception + Raised if there is an issue with upload + + Returns + ------- + DownloadedModel + """ + model_name_to_download = self.model_name if model_name is None else model_name + model_type_to_download = self.model_type if model_type is None else model_type + if not wait: + return self.model_downloader.download( + model_name_to_download, model_type_to_download + ) + + start_time = time.time() + + while True: + result = self.model_downloader.download( + model_name_to_download, model_type_to_download + ) + + # Check the status of foo()'s result + if result.status == "available": + return result + + # Check if the specified timeout has been exceeded + elapsed_time = time.time() - start_time + if elapsed_time > wait_time_seconds: + return result + INTERVAL_SECONDS = 1 + time.sleep(INTERVAL_SECONDS) + + def list_models(self) -> List[DownloadedModel]: + return self.model_downloader.list() + def train( self, data_or_dataset_id: Union[ @@ -378,6 +435,7 @@ def train( finetune_args: Optional[dict] = None, gpu_config: Optional[dict] = None, is_public: Optional[bool] = None, + custom_model_name: Optional[str] = None, ) -> str: """Handler for training jobs through the Trainer object. This submits a training job request to the platform using the provided data. @@ -398,6 +456,9 @@ def train( is_public: Optional[bool] = None Allow public access to the model and dataset + custom_model_name: Optional[str] = None + A human-readable name for the model. + Raises ------ AssertionError @@ -429,6 +490,7 @@ def train( finetune_args=finetune_args, gpu_config=gpu_config, is_public=is_public, + custom_model_name=custom_model_name, ) job["dataset_id"] = dataset_id return job @@ -445,6 +507,7 @@ def train_and_wait( finetune_args: Optional[dict] = None, gpu_config: Optional[dict] = None, is_public: Optional[bool] = None, + custom_model_name: Optional[str] = None, **kwargs, ) -> str: """Handler for training jobs through the Trainer object. This submits a training @@ -467,6 +530,9 @@ def train_and_wait( is_public: Optional[bool] = None Allow public access to the model and dataset + custom_model_name: Optional[str] = None + A human-readable name for the model. + kwargs: Dict[str, Any] Key word arguments verbose @@ -488,6 +554,7 @@ def train_and_wait( finetune_args=finetune_args, gpu_config=gpu_config, is_public=is_public, + custom_model_name=custom_model_name, ) try: diff --git a/lamini/api/lamini_config.py b/lamini/api/lamini_config.py index e97d76e..bfd65e0 100644 --- a/lamini/api/lamini_config.py +++ b/lamini/api/lamini_config.py @@ -176,13 +176,21 @@ def get_configured_url(config: config.Configuration) -> str: Extracted platform url """ - environment = os.environ.get("LLAMA_ENVIRONMENT") + LAMINI_ENV_ENV_VAR_NAME = "LLAMA_ENVIRONMENT" + LOCAL_URL_KEY = "local.url" + LOCAL_URL_DEFAULT_VALUE = "http://localhost:5001" + STAGING_URL_KEY = "staging.url" + STAGING_URL_DEFAULT_VALUE = "https://staging.lamini.ai" + PUBLIC_URL_DEFAULT_VALUE = "https://api.lamini.ai" + PUBLIC_URL_KEY = "production.url" + + environment = os.environ.get(LAMINI_ENV_ENV_VAR_NAME) if environment == "LOCAL": - url = config.get("local.url", "http://localhost:5001") + url = config.get(LOCAL_URL_KEY, LOCAL_URL_DEFAULT_VALUE) elif environment == "STAGING": - url = config.get("staging.url", "https://staging.lamini.ai") + url = config.get(STAGING_URL_KEY, STAGING_URL_DEFAULT_VALUE) else: - url = config.get("production.url", "https://api.lamini.ai") + url = config.get(PUBLIC_URL_KEY, PUBLIC_URL_DEFAULT_VALUE) return url diff --git a/lamini/api/model_downloader.py b/lamini/api/model_downloader.py new file mode 100644 index 0000000..31f2a36 --- /dev/null +++ b/lamini/api/model_downloader.py @@ -0,0 +1,95 @@ +import enum +from typing import List, Union + +import lamini +import numpy as np +from lamini.api.lamini_config import get_config, get_configured_key, get_configured_url +from lamini.api.rest_requests import make_web_request + + +class DownloadedModel: + + def __init__(self, **kwargs): + self.__dict__.update(kwargs) + + model_id = None + model_name = None + model_type = None + user_id = None + is_public = None + creation_ts = None + prev_download_ts = None + prev_download_error = None + download_attempts = None + status = None + + def __repr__(self): + return f"" + + +class ModelType(enum.Enum): + """This must be consistent with the db/migrations table definition's MODEL_TYPE type.""" + + transformer = "transformer" + embedding = "embedding" + + +class ModelDownloader: + """Handler for requesting Lamini Platform to download a hugging face model. + + Parameters + ---------- + api_key: Optional[str] + Lamini platform API key, if not provided the key stored + within ~.lamini/configure.yaml will be used. If either + don't exist then an error is raised. + + api_url: Optional[str] + Lamini platform api url, only needed if a different url is needed outside of the + defined ones here: https://github.com/lamini-ai/lamini-platform/blob/main/sdk/lamini/api/lamini_config.py#L68 + i.e. localhost, staging.lamini.ai, or api.lamini.ai + Additionally, LLAMA_ENVIRONMENT can be set as an environment variable + that will be grabbed for the url before any of the above defaults + """ + + def __init__( + self, + api_key: str, + api_url: str, + ): + self.api_key = api_key + self.api_endpoint = api_url + "/v1/downloaded_models/" + + def download(self, hf_model_name: str, model_type: ModelType) -> DownloadedModel: + """Request to Lamini platform for an embedding encoding of the provided + prompt + + + Parameters + ---------- + prompt: Union[str, List[str]] + Prompt to encoding into an embedding + + Returns + ------- + DownloadedModel: + A object describing the state of the model. + """ + + params = {"hf_model_name": hf_model_name, "model_type": model_type.value} + resp = make_web_request(self.api_key, self.api_endpoint, "post", params) + return DownloadedModel(**resp) + + def list(self) -> List[DownloadedModel]: + """List all models on the Lamini Platform. + + Returns + ------- + List[DownloadedModel]: + A object describing the state of the model. + """ + resp = make_web_request(self.api_key, self.api_endpoint, "get") + res = [] + for model in resp: + res.append(DownloadedModel(**model)) + return res diff --git a/lamini/api/pipeline_client.py b/lamini/api/pipeline_client.py new file mode 100644 index 0000000..f95f939 --- /dev/null +++ b/lamini/api/pipeline_client.py @@ -0,0 +1,54 @@ +import asyncio +from typing import Any, Dict + +from lamini.api.rest_requests import make_async_web_request +from lamini.api.utils.batch_completions import BatchCompletions +from lamini.api.utils.batch_embeddings import BatchEmbeddings + + +class PipelineClient: + + async def embedding(self, client, key, url, json): + result = await make_async_web_request(client, key, url, "post", json) + result = result["embedding"] + return result + + async def completions(self, client, key, url, json: dict) -> Dict[str, Any]: + result = await make_async_web_request(client, key, url, "post", json) + return result + + async def batch_completions( + self, + client, + json: dict, + ) -> Dict[str, Any]: + batch_api = BatchCompletions() + submit_response = await batch_api.async_submit( + prompt=json["prompt"], + model_name=json["model_name"], + output_type=json["output_type"], + max_new_tokens=json["max_new_tokens"], + ) + while True: + await asyncio.sleep(5) + result = await batch_api.async_check_result(submit_response["id"]) + if result and all(result["finish_reason"]): + break + return result + + async def batch_embeddings( + self, + client, + json: dict, + ) -> Dict[str, Any]: + batch_api = BatchEmbeddings() + submit_response = await batch_api.async_submit( + prompt=json["prompt"], model_name=json["model_name"] + ) + while True: + await asyncio.sleep(1) + result = await batch_api.async_check_result(submit_response["id"]) + if result: + break + result = result["embedding"] + return result diff --git a/lamini/api/rest_requests.py b/lamini/api/rest_requests.py index 2e3c715..ad0dd52 100644 --- a/lamini/api/rest_requests.py +++ b/lamini/api/rest_requests.py @@ -1,8 +1,7 @@ -from typing import Optional, Dict, Any - import asyncio import importlib.metadata import logging +from typing import Any, Dict, Optional import aiohttp import requests @@ -24,60 +23,55 @@ warn_once = False +def check_version(resp: Dict[str, Any]) -> None: + """If the flag of warn_once is not set then print the X-warning + from the post request response and set the flag to true. + + Parameters + ---------- + resp: Dict[str, Any] + Request response dictionary + + Returns + ------- + None + """ + + global warn_once + if not warn_once: + if resp.headers is not None and "X-Warning" in resp.headers: + warn_once = True + print(resp.headers["X-Warning"]) + + def get_version( key: Optional[str], url: Optional[str], config: Optional[Dict[str, Any]] ) -> str: """Getter for the Lamini Platform version - Parameters ---------- key: Optional[str] Lamini platform API key, if not provided the key stored within ~.lamini/configure.yaml will be used. If either don't exist then an error is raised. - url: Optional[str] Lamini platform api url, only needed if a different url is needed outside of the defined ones here: https://github.com/lamini-ai/lamini-platform/blob/main/sdk/lamini/api/lamini_config.py#L68 i.e. localhost, staging.lamini.ai, or api.lamini.ai Additionally, LLAMA_ENVIRONMENT can be set as an environment variable that will be grabbed for the url before any of the above defaults - config: Dict[str, Any] Configuration storing the key and url - Returns ------- str Version of the Lamini Platform """ - api_key = key or get_configured_key(config) api_url = url or get_configured_url(config) return make_web_request(api_key, api_url + "/v1/version", "get", None) -def check_version(resp: Dict[str, Any]) -> None: - """If the flag of warn_once is not set then print the X-warning - from the post request response and set the flag to true. - - Parameters - ---------- - resp: Dict[str, Any] - Request response dictionary - - Returns - ------- - None - """ - - global warn_once - if not warn_once: - if resp.headers is not None and "X-Warning" in resp.headers: - warn_once = True - print(resp.headers["X-Warning"]) - - async def make_async_web_request( client: requests.Session, key: str, @@ -160,7 +154,7 @@ async def make_async_web_request( if resp.status == 200: json_response = await resp.json() else: - handle_error(resp) + await handle_error(resp) except asyncio.TimeoutError: raise APIError( "Request Timeout: The server did not respond in time.", diff --git a/lamini/api/streaming_completion.py b/lamini/api/streaming_completion.py index 6d1428a..760a9d3 100644 --- a/lamini/api/streaming_completion.py +++ b/lamini/api/streaming_completion.py @@ -1,6 +1,6 @@ import asyncio import time -from typing import List, Optional, Union, Dict, Any +from typing import Any, Dict, List, Optional, Union import aiohttp import lamini @@ -13,9 +13,6 @@ class StreamingCompletionObject: Parameters ---------- - request_params: Dict[str, Any] - Parameters to pass into the request - api_key: Optional[str] Lamini platform API key, if not provided the key stored within ~.lamini/configure.yaml will be used. If either @@ -37,17 +34,15 @@ class StreamingCompletionObject: def __init__( self, - request_params: Dict[str, Any], api_url: str, api_key: str, + id: str, polling_interval: int, max_errors: int = 0, ): - self.request_params = request_params - self.api_url = api_url + self.api_url = api_url + f"/{id}/result" self.api_key = api_key self.done_streaming = False - self.server = None self.polling_interval = polling_interval self.current_result = None self.error_count = 0 @@ -98,20 +93,19 @@ def next(self) -> str: if self.done_streaming: raise StopIteration() time.sleep(self.polling_interval) - if self.server is not None: - self.request_params["server"] = self.server try: resp = make_web_request( self.api_key, self.api_url, - "post", - self.request_params, + "get", ) + if len(resp) == 0: + self.current_result = None + return self.current_result - self.server = resp["server"] - if resp["status"][0]: + if all(r is not None for r in resp["finish_reason"]): self.done_streaming = True - self.current_result = resp["data"][0] + self.current_result = resp except Exception as e: self.error_count += 1 if self.error_count > self.max_errors: @@ -124,9 +118,6 @@ class AsyncStreamingCompletionObject: Parameters ---------- - request_params: Dict[str, Any] - Parameters to pass into the request - api_key: Optional[str] Lamini platform API key, if not provided the key stored within ~.lamini/configure.yaml will be used. If either @@ -148,17 +139,15 @@ class AsyncStreamingCompletionObject: def __init__( self, - request_params: Dict[str, Any], api_url: str, api_key: str, + id: str, polling_interval: int, max_errors: int = 5, ): - self.request_params = request_params - self.api_url = api_url + self.api_url = api_url + f"/{id}/result" self.api_key = api_key self.done_streaming = False - self.server = None self.polling_interval = polling_interval self.current_result = None self.error_count = 0 @@ -209,21 +198,20 @@ async def next(self): if self.done_streaming: raise StopAsyncIteration() await asyncio.sleep(self.polling_interval) - if self.server is not None: - self.request_params["server"] = self.server try: async with aiohttp.ClientSession() as client: resp = await make_async_web_request( client, self.api_key, self.api_url, - "post", - self.request_params, + "get", ) - self.server = resp["server"] - if resp["status"][0]: + if len(resp) == 0: + self.current_result = None + return self.current_result + if all(r is not None for r in resp["finish_reason"]): self.done_streaming = True - self.current_result = resp["data"][0] + self.current_result = resp except Exception as e: self.error_count += 1 if self.error_count > self.max_errors: @@ -257,23 +245,19 @@ def __init__( self.config = get_config() self.api_key = api_key or lamini.api_key or get_configured_key(self.config) self.api_url = api_url or lamini.api_url or get_configured_url(self.config) - self.api_prefix = self.api_url + "/v1/" + self.api_prefix = self.api_url + "/v3/" self.streaming_completions_url = self.api_prefix + "streaming_completions" def submit( self, prompt: Union[str, List[str]], - model_name: Optional[str] = None, - output_type: Optional[dict] = None, - max_tokens: Optional[int] = None, + model_name: str, max_new_tokens: Optional[int] = None, ) -> Dict[str, Any]: """Conduct a web request to the streaming completions api endpoint with the - provided prompt to the model_name if provided. Output_type handles the formatting - of the output into a structure from this provided output_type. - max_tokens and max_new_tokens are related to the total amount of tokens - the model can use and generate. max_new_tokens is recommended to be used - over max_tokens to adjust model output. + provided prompt to the model_name if provided. + max_new_tokens are related to the total amount of tokens + the model can use and generate. Parameters ---------- @@ -283,12 +267,6 @@ def submit( model_name: Optional[str] = None Which model to use from hugging face - output_type: Optional[dict] = None - Structured output format - - max_tokens: Optional[int] = None - Max number of tokens for the model's generation - max_new_tokens: Optional[int] = None Max number of new tokens from the model's generation @@ -301,33 +279,23 @@ def submit( req_data = self.make_llm_req_map( prompt=prompt, model_name=model_name, - output_type=output_type, - max_tokens=max_tokens, max_new_tokens=max_new_tokens, - server=None, ) resp = make_web_request( self.api_key, self.streaming_completions_url, "post", req_data ) - return { - "url": self.streaming_completions_url, - "params": {**req_data, "server": resp["server"]}, - } + return resp async def async_submit( self, prompt: Union[str, List[str]], - model_name: Optional[str] = None, - output_type: Optional[dict] = None, - max_tokens: Optional[int] = None, + model_name: str, max_new_tokens: Optional[int] = None, ) -> Dict[str, Any]: """Asynchronously send a web request to the streaming completions api endpoint with the - provided prompt to the model_name if provided. Output_type handles the formatting - of the output into a structure from this provided output_type. - max_tokens and max_new_tokens are related to the total amount of tokens - the model can use and generate. max_new_tokens is recommended to be used - over max_tokens to adjust model output. + provided prompt to the model_name if provided. + max_new_tokens are related to the total amount of tokens + the model can use and generate. Parameters ---------- @@ -337,12 +305,6 @@ async def async_submit( model_name: Optional[str] = None Which model to use from hugging face - output_type: Optional[dict] = None - Structured output format - - max_tokens: Optional[int] = None - Max number of tokens for the model's generation - max_new_tokens: Optional[int] = None Max number of new tokens from the model's generation @@ -355,26 +317,18 @@ async def async_submit( req_data = self.make_llm_req_map( prompt=prompt, model_name=model_name, - output_type=output_type, - max_tokens=max_tokens, max_new_tokens=max_new_tokens, - server=None, ) async with aiohttp.ClientSession() as client: resp = await make_async_web_request( client, self.api_key, self.streaming_completions_url, "post", req_data ) - return { - "url": self.streaming_completions_url, - "params": {**req_data, "server": resp["server"]}, - } + return resp def create( self, prompt: Union[str, List[str]], - model_name: Optional[str] = None, - output_type: Optional[dict] = None, - max_tokens: Optional[int] = None, + model_name: str, max_new_tokens: Optional[int] = None, polling_interval: Optional[float] = 1, ) -> object: @@ -388,12 +342,6 @@ def create( model_name: Optional[str] = None Which model to use from hugging face - output_type: Optional[dict] = None - Structured output format - - max_tokens: Optional[int] = None - Max number of tokens for the model's generation - max_new_tokens: Optional[int] = None Max number of new tokens from the model's generation @@ -406,34 +354,20 @@ def create( Newly instantiated object """ - self.done_streaming = False - self.server = None - self.prompt = prompt - self.model_name = model_name - self.output_type = output_type - self.max_tokens = max_tokens - self.max_new_tokens = max_new_tokens - req_data = self.make_llm_req_map( - prompt=prompt, - model_name=model_name, - output_type=output_type, - max_tokens=max_tokens, - max_new_tokens=max_new_tokens, - server=None, + req_data = self.submit( + prompt=prompt, model_name=model_name, max_new_tokens=max_new_tokens ) return StreamingCompletionObject( - req_data, api_key=self.api_key, api_url=self.streaming_completions_url, + id=req_data["id"], polling_interval=polling_interval, ) - def async_create( + async def async_create( self, prompt: Union[str, List[str]], - model_name: Optional[str] = None, - output_type: Optional[dict] = None, - max_tokens: Optional[int] = None, + model_name: str, max_new_tokens: Optional[int] = None, polling_interval: Optional[float] = 1, ) -> object: @@ -447,12 +381,6 @@ def async_create( model_name: Optional[str] = None Which model to use from hugging face - output_type: Optional[dict] = None - Structured output format - - max_tokens: Optional[int] = None - Max number of tokens for the model's generation - max_new_tokens: Optional[int] = None Max number of new tokens from the model's generation @@ -465,36 +393,21 @@ def async_create( Newly instantiated object """ - self.done_streaming = False - self.server = None - self.prompt = prompt - self.model_name = model_name - self.output_type = output_type - self.max_tokens = max_tokens - self.max_new_tokens = max_new_tokens - req_data = self.make_llm_req_map( - prompt=prompt, - model_name=model_name, - output_type=output_type, - max_tokens=max_tokens, - max_new_tokens=max_new_tokens, - server=None, + req_data = await self.async_submit( + prompt=prompt, model_name=model_name, max_new_tokens=max_new_tokens ) return AsyncStreamingCompletionObject( - req_data, api_key=self.api_key, api_url=self.streaming_completions_url, polling_interval=polling_interval, + id=req_data["id"], ) def make_llm_req_map( self, model_name: Optional[str], prompt: Union[str, List[str]], - output_type: Optional[dict], - max_tokens: Optional[int], max_new_tokens: Optional[int], - server: Optional[str], ) -> Dict[str, Any]: """Make a web request to the Lamini Platform @@ -506,18 +419,10 @@ def make_llm_req_map( prompt: Union[str, List[str]] Prompt to send to LLM - output_type: Optional[dict] = None - Structured output format - - max_tokens: Optional[int] = None - Max number of tokens for the model's generation max_new_tokens: Optional[int] = None Max number of new tokens from the model's generation - server: Optional[str] - Which Lamini Platform to make the request out to - Returns ------- req_data: Dict[str, Any] @@ -527,10 +432,6 @@ def make_llm_req_map( req_data = {} req_data["model_name"] = model_name req_data["prompt"] = prompt - req_data["output_type"] = output_type - req_data["max_tokens"] = max_tokens if max_new_tokens is not None: req_data["max_new_tokens"] = max_new_tokens - if server is not None: - req_data["server"] = server return req_data diff --git a/lamini/api/train.py b/lamini/api/train.py index e3ab143..063b82e 100644 --- a/lamini/api/train.py +++ b/lamini/api/train.py @@ -45,6 +45,7 @@ def train( finetune_args: Optional[dict] = None, gpu_config: Optional[dict] = None, is_public: Optional[bool] = None, + custom_model_name: Optional[str] = None, ) -> str: """Make a web request to start a training job using the dataset ID provided @@ -67,6 +68,9 @@ def train( is_public: Optional[bool] = None Allow public access to the model and dataset + custom_model_name: Optional[str] = None + A human-readable name for the model. + Returns ------- job: str @@ -83,6 +87,8 @@ def train( req_data["gpu_config"] = gpu_config if is_public is not None: req_data["is_public"] = is_public + if custom_model_name is not None: + req_data["custom_model_name"] = custom_model_name url = self.api_prefix + "train" job = make_web_request(self.api_key, url, "post", req_data) @@ -351,7 +357,6 @@ def get_existing_dataset( url = self.api_prefix + "existing-data" req_data = {"dataset_id": dataset_id} req_data["upload_base_path"] = upload_base_path - print(f"EXISTING DATA {self.api_key}") return make_web_request( self.api_key, url, diff --git a/lamini/api/utils/batch_completions.py b/lamini/api/utils/batch_completions.py new file mode 100644 index 0000000..64aa39f --- /dev/null +++ b/lamini/api/utils/batch_completions.py @@ -0,0 +1,363 @@ +import asyncio +import time +from typing import Any, Dict, List, Optional, Union + +import aiohttp + +import lamini +from lamini.api.lamini_config import get_config, get_configured_key, get_configured_url +from lamini.api.rest_requests import make_async_web_request, make_web_request + + +class BatchStreamingCompletionObject: + + def __init__( + self, + api_url: str, + api_key: str, + id: str, + polling_interval: int, + max_errors: int = 5, + ): + self.api_url = api_url + f"/{id}/result" + self.api_key = api_key + self.done_streaming = False + self.polling_interval = polling_interval + self.current_result = None + self.error_count = 0 + self.max_errors = max_errors + self.current_index = 0 + self.available_results = 0 + + def __iter__(self) -> object: + """Iteration definition + + Parameters + ---------- + None + + Returns + ------- + Reference to self + """ + + return self + + def __next__(self) -> str: + """Iterator next step definition + + Parameters + ---------- + None + + Returns + ------- + str + Streamed next result + """ + + return self.next() + + def next(self): + """Retrieve the next iteration of the response stream + + Parameters + ---------- + None + + Returns + ------- + self.current_result: str + Streamed result from the web request + """ + if self.done_streaming: + raise StopIteration() + + self.wait_for_results() + result = { + "output": self.current_result["outputs"][self.current_index], + "finish_reason": self.current_result["finish_reason"][self.current_index], + } + self.current_index += 1 + if self.current_index >= len(self.current_result["finish_reason"]): + self.done_streaming = True + return result + + def wait_for_results(self): + # Poll for results until more work is available + while self.available_results <= self.current_index: + time.sleep(self.polling_interval) + try: + self.current_result = make_web_request( + self.api_key, + self.api_url, + "get", + ) + if self.current_result == {}: + continue + else: + self.available_results = len( + self.current_result["finish_reason"] + ) - self.current_result["finish_reason"].count(None) + + except Exception as e: + self.error_count += 1 + if self.error_count > self.max_errors: + raise e + + +class AsyncBatchStreamingCompletionObject: + + def __init__( + self, + api_url: str, + api_key: str, + id: str, + polling_interval: int, + max_errors: int = 5, + ): + self.api_url = api_url + f"/{id}/result" + self.api_key = api_key + self.done_streaming = False + self.polling_interval = polling_interval + self.current_result = None + self.error_count = 0 + self.max_errors = max_errors + self.current_index = 0 + self.available_results = 0 + + def __aiter__(self) -> object: + """Asychronous iteration definition + + Parameters + ---------- + None + + Returns + ------- + Reference to this instance of AsyncBatchStreamingCompletionObject + """ + + return self + + async def __anext__(self): + """Asynchronous next definition + + Parameters + ---------- + None + + Returns + ------- + str + Current streaming result from the web request + """ + + return await self.next() + + async def next(self): + """Retrieve the next iteration of the response stream + + Parameters + ---------- + None + + Returns + ------- + self.current_result: str + Streamed result from the web request + """ + if self.done_streaming: + raise StopAsyncIteration() + + await self.wait_for_results() + # print(self.current_result) + result = { + "output": self.current_result["outputs"][self.current_index], + "finish_reason": self.current_result["finish_reason"][self.current_index], + } + self.current_index += 1 + if self.current_index >= len(self.current_result["finish_reason"]): + self.done_streaming = True + return result + + async def wait_for_results(self): + # Poll for results until more work is available + while self.available_results <= self.current_index: + await asyncio.sleep(self.polling_interval) + try: + async with aiohttp.ClientSession() as client: + self.current_result = await make_async_web_request( + client, + self.api_key, + self.api_url, + "get", + ) + # print(self.current_result) + if self.current_result == {}: + continue + else: + self.available_results = len( + self.current_result["finish_reason"] + ) - self.current_result["finish_reason"].count(None) + + except Exception as e: + self.error_count += 1 + if self.error_count > self.max_errors: + raise e + + +class BatchCompletions: + """Handler for formatting and POST request for the batch submission API""" + + def __init__( + self, + api_key: Optional[str] = None, + api_url: Optional[str] = None, + ) -> None: + self.config = get_config() + self.api_key = api_key or lamini.api_key or get_configured_key(self.config) + self.api_url = api_url or lamini.api_url or get_configured_url(self.config) + self.api_prefix = self.api_url + "/v1/" + + def submit( + self, + prompt: Union[str, List[str]], + model_name: str, + output_type: Optional[dict] = None, + max_tokens: Optional[int] = None, + max_new_tokens: Optional[int] = None, + ) -> Dict[str, Any]: + + req_data = self.make_llm_req_map( + prompt=prompt, + model_name=model_name, + output_type=output_type, + max_tokens=max_tokens, + max_new_tokens=max_new_tokens, + ) + resp = make_web_request( + self.api_key, + self.api_prefix + "batch_completions", + "post", + req_data, + ) + return resp + + async def async_submit( + self, + prompt: Union[str, List[str]], + model_name: str, + output_type: Optional[dict] = None, + max_tokens: Optional[int] = None, + max_new_tokens: Optional[int] = None, + ) -> Dict[str, Any]: + + req_data = self.make_llm_req_map( + prompt=prompt, + model_name=model_name, + output_type=output_type, + max_tokens=max_tokens, + max_new_tokens=max_new_tokens, + ) + async with aiohttp.ClientSession() as client: + resp = await make_async_web_request( + client, + self.api_key, + self.api_prefix + "batch_completions", + "post", + req_data, + ) + return resp + + def streaming_generate( + self, + prompt: Union[str, List[str]], + model_name: str, + output_type: Optional[dict] = None, + max_tokens: Optional[int] = None, + max_new_tokens: Optional[int] = None, + polling_interval: Optional[float] = 1, + ) -> Dict[str, Any]: + + resp = self.submit( + prompt=prompt, + model_name=model_name, + output_type=output_type, + max_tokens=max_tokens, + max_new_tokens=max_new_tokens, + ) + return BatchStreamingCompletionObject( + api_key=self.api_key, + api_url=self.api_prefix + f"batch_completions", + polling_interval=polling_interval, + id=resp["id"], + ) + + async def async_streaming_generate( + self, + prompt: Union[str, List[str]], + model_name: str, + output_type: Optional[dict] = None, + max_tokens: Optional[int] = None, + max_new_tokens: Optional[int] = None, + polling_interval: Optional[float] = 1, + ) -> Dict[str, Any]: + + resp = await self.async_submit( + prompt=prompt, + model_name=model_name, + output_type=output_type, + max_tokens=max_tokens, + max_new_tokens=max_new_tokens, + ) + return AsyncBatchStreamingCompletionObject( + api_key=self.api_key, + api_url=self.api_prefix + f"batch_completions", + polling_interval=polling_interval, + id=resp["id"], + ) + + async def async_check_result( + self, + id: str, + ) -> Dict[str, Any]: + """Check for the result of a batch request with the appropriate batch id.""" + async with aiohttp.ClientSession() as client: + resp = await make_async_web_request( + client, + self.api_key, + self.api_prefix + f"batch_completions/{id}/result", + "get", + ) + return resp + + def check_result( + self, + id: str, + ) -> Dict[str, Any]: + """Check for the result of a batch request with the appropriate batch id.""" + resp = make_web_request( + self.api_key, + self.api_prefix + f"batch_completions/{id}/result", + "get", + ) + return resp + + def make_llm_req_map( + self, + model_name: str, + prompt: Union[str, List[str]], + output_type: Optional[dict] = None, + max_tokens: Optional[int] = None, + max_new_tokens: Optional[int] = None, + ) -> Dict[str, Any]: + + req_data = {} + req_data["model_name"] = model_name + req_data["prompt"] = prompt + req_data["output_type"] = output_type + req_data["max_tokens"] = max_tokens + if max_new_tokens is not None: + req_data["max_new_tokens"] = max_new_tokens + return req_data diff --git a/lamini/api/utils/batch_embeddings.py b/lamini/api/utils/batch_embeddings.py new file mode 100644 index 0000000..deaed7b --- /dev/null +++ b/lamini/api/utils/batch_embeddings.py @@ -0,0 +1,96 @@ +from typing import Any, Dict, List, Optional, Union + +import aiohttp + +import lamini +from lamini.api.lamini_config import get_config, get_configured_key, get_configured_url +from lamini.api.rest_requests import make_async_web_request, make_web_request + + +class BatchEmbeddings: + + def __init__( + self, + api_key: Optional[str] = None, + api_url: Optional[str] = None, + ) -> None: + self.config = get_config() + self.api_key = api_key or lamini.api_key or get_configured_key(self.config) + self.api_url = api_url or lamini.api_url or get_configured_url(self.config) + self.api_prefix = self.api_url + "/v1/" + + def submit( + self, + prompt: Union[str, List[str]], + model_name: Optional[str] = None, + ) -> Dict[str, Any]: + + req_data = self.make_llm_req_map( + prompt=prompt, + model_name=model_name, + ) + resp = make_web_request( + self.api_key, + self.api_prefix + "batch_embeddings", + "post", + req_data, + ) + return resp + + async def async_submit( + self, + prompt: Union[str, List[str]], + model_name: Optional[str] = None, + ) -> Dict[str, Any]: + req_data = self.make_llm_req_map( + prompt=prompt, + model_name=model_name, + ) + async with aiohttp.ClientSession() as client: + + resp = await make_async_web_request( + client, + self.api_key, + self.api_prefix + "batch_embeddings", + "post", + req_data, + ) + return resp + + def check_result( + self, + id: str, + ) -> Dict[str, Any]: + """Check for the result of a batch request with the appropriate batch id.""" + resp = make_web_request( + self.api_key, + self.api_prefix + f"batch_embeddings/{id}/result", + "get", + ) + return resp + + async def async_check_result( + self, + id: str, + ) -> Dict[str, Any]: + """Check for the result of a batch request with the appropriate batch id.""" + async with aiohttp.ClientSession() as client: + resp = await make_async_web_request( + client, + self.api_key, + self.api_prefix + f"batch_embeddings/{id}/result", + "get", + ) + return resp + + def make_llm_req_map( + self, + model_name: Optional[str], + prompt: Union[str, List[str]], + ) -> Dict[str, Any]: + + req_data = {} + if model_name is not None: + req_data["model_name"] = model_name + req_data["prompt"] = prompt + return req_data diff --git a/lamini/api/utils/supported_models.py b/lamini/api/utils/supported_models.py new file mode 100644 index 0000000..88d4cd8 --- /dev/null +++ b/lamini/api/utils/supported_models.py @@ -0,0 +1,31 @@ +# This file list all of tested huggingface models on Lamini Platform. +# The variable names are shortened for easier recognition. + + +LLAMA_32_1B_INST = "meta-llama/Llama-3.2-1B-Instruct" +LLAMA_32_3B_INST = "meta-llama/Llama-3.2-3B-Instruct" +TS_1M = "roneneldan/TinyStories-1M" +TINY_GPT2 = "hf-internal-testing/tiny-random-gpt2" +FALCON_7B = "tiiuae/falcon-7b" +FALCON_7B_INST = "tiiuae/falcon-7b-instruct" +FALCON_11B = "tiiuae/falcon-11B" +STAR_CODER_2_7B = "bigcode/starcoder2-7b" +LLAMA_31_8B_INST = "meta-llama/Meta-Llama-3.1-8B-Instruct" +MISTRAL_7B_INST_V03 = "mistralai/Mistral-7B-Instruct-v0.3" +TINY_MISTRAL = "hf-internal-testing/tiny-random-MistralForCausalLM" + +ALL = [ + LLAMA_31_8B_INST, + LLAMA_32_1B_INST, + LLAMA_32_3B_INST, + TS_1M, + TINY_GPT2, + FALCON_7B, + FALCON_7B_INST, + FALCON_11B, + STAR_CODER_2_7B, + MISTRAL_7B_INST_V03, + TINY_MISTRAL, +] + +PROD = [LLAMA_31_8B_INST, LLAMA_32_3B_INST, MISTRAL_7B_INST_V03, TINY_MISTRAL] diff --git a/lamini/api/versions.py b/lamini/api/versions.py new file mode 100644 index 0000000..73e7afb --- /dev/null +++ b/lamini/api/versions.py @@ -0,0 +1,72 @@ +import re + +import lamini +from lamini import __version__ +from lamini.api.lamini_config import get_config, get_configured_key, get_configured_url +from lamini.api.rest_requests import make_web_request + + +class APIVersion: + """Handler for embedding requests to the Lamini Platform + + + Parameters + ---------- + + api_key: Optional[str] + Lamini platform API key, if not provided the key stored + within ~.lamini/configure.yaml will be used. If either + don't exist then an error is raised. + + api_url: Optional[str] + Lamini platform api url, only needed if a different url is needed outside of the + defined ones here: https://github.com/lamini-ai/lamini-platform/blob/main/sdk/lamini/api/lamini_config.py#L68 + i.e. localhost, staging.lamini.ai, or api.lamini.ai + Additionally, LLAMA_ENVIRONMENT can be set as an environment variable + that will be grabbed for the url before any of the above defaults + """ + + def __init__( + self, + api_key: str = None, + api_url: str = None, + ): + self.config = get_config() + self.api_key = api_key or lamini.api_key or get_configured_key(self.config) + self.api_url = api_url or lamini.api_url or get_configured_url(self.config) + self.api_prefix = self.api_url + "/v1/" + self.server_version = None + self.client_version = __version__ + self.api_endpoints = None + self.features_to_api_versions = {} + try: + self.get_versions() + except: + pass + + def __path_to_version_number(self, path: str) -> int: + # Regular expression to match the version number in the format /v{number} + match = re.search(r"/v(\d+)/", path) + + # If a version is found, return it as an integer, otherwise return None + return int(match.group(1)) if match else None + + def _populate_features_to_api_versions(self): + for endpoint in self.api_endpoints: + if endpoint["name"] not in self.features_to_api_versions: + self.features_to_api_versions[endpoint["name"]] = set() + version = self.__path_to_version_number(endpoint["path"]) + if version is not None: + self.features_to_api_versions[endpoint["name"]].add(version) + + def get_versions(self) -> dict: + """Request to Lamini platform for an embedding encoding of the provided + prompt + """ + resp = make_web_request(self.api_key, self.api_prefix + "version", "get") + self.api_endpoints = resp["api"] + self.server_version = resp["server"] + self.client_version = resp["client"] + + self._populate_features_to_api_versions() + return self.features_to_api_versions diff --git a/lamini/classify/lamini_classifier.py b/lamini/classify/lamini_classifier.py index 1c4a1d8..98a950b 100644 --- a/lamini/classify/lamini_classifier.py +++ b/lamini/classify/lamini_classifier.py @@ -1,565 +1,105 @@ -import asyncio import logging -import os -import pickle -import random -from itertools import chain -from typing import List +from typing import List, Optional, Union -import warnings +import lamini +from lamini.api.lamini_config import get_config, get_configured_key, get_configured_url +from lamini.api.rest_requests import make_web_request +from lamini.api.utils.supported_models import LLAMA_31_8B_INST -import jsonlines -from lamini import Lamini -from lamini.api.embedding import Embedding -from sklearn.linear_model import LogisticRegression -from tqdm import tqdm logger = logging.getLogger(__name__) class LaminiClassifier: - """A zero shot classifier that uses the Lamini LlamaV2Runner to generate - examples from prompts and then trains a final logistic regression on top - of an LLM to classify the examples. - """ - def __init__( self, - model_name: str = "meta-llama/Meta-Llama-3.1-8B-Instruct", - augmented_example_count: int = 10, - batch_size: int = 10, - threads: int = 1, - saved_examples_path: str = "saved_examples.jsonl", - generator_from_prompt=None, - example_modifier=None, - example_expander=None, + classifier_name: str, + model_name: str = LLAMA_31_8B_INST, + api_key: Optional[str] = None, + api_url: Optional[str] = None, ): - warnings.warn( - "LaminiClassifer will be removed in a future version and will be replaced with the API endpoint /v2/classifier", - DeprecationWarning, - stacklevel=2, - ) + self.config = get_config() + self.api_key = api_key or lamini.api_key or get_configured_key(self.config) + self.api_url = api_url or lamini.api_url or get_configured_url(self.config) + self.api_prefix = self.api_url + "/v2/classifier" + self.classifier_name = classifier_name self.model_name = model_name - self.augmented_example_count = augmented_example_count - self.batch_size = batch_size - self.threads = threads - self.saved_examples_path = saved_examples_path - - if generator_from_prompt is None: - generator_from_prompt = DefaultExampleGenerator - self.generator_from_prompt = generator_from_prompt - - if example_modifier is None: - example_modifier = DefaultExampleModifier - self.example_modifier = example_modifier - - if example_expander is None: - example_expander = DefaultExampleExpander - self.example_expander = example_expander - - self.class_ids_to_metadata = {} - self.class_names_to_ids = {} - - # Examples is a dict of examples, where each row is a different - # example class, followed by examples of that class - self.examples = self.load_examples() - - def prompt_train(self, prompts: dict): - """Trains the classifier using prompts for each class. - - First, augment the examples for each class using the prompts. - """ - try: - for class_name, prompt in prompts.items(): - logger.info( - f"Generating examples for class '{class_name}' from prompt {prompt}" - ) - self.add_class(class_name) - - result = self.generate_examples_from_prompt( - class_name, prompt, self.examples.get(class_name, []) - ) - - self.examples[class_name] = result - - # Save partial progress - self.save_examples() - - self.train() - except Exception as e: - logger.error(f"Failed to generate examples for class {class_name}") - logger.error(e) - logger.error( - "Consider rerunning the generation task if the error is transient, e.g. 500" - ) - return False - return True - - def train(self): - # Form the embeddings - X = [] - y = [] - - for class_name, examples in tqdm(self.examples.items()): - index = self.class_names_to_ids[class_name] - y += [index] * len(examples) - class_embeddings = self.get_embeddings(examples) - X += class_embeddings - - # Train the classifier - self.logistic_regression = LogisticRegression(random_state=0).fit(X, y) - - # Add alias for tune - tune = train - - def add_data_to_class(self, class_name, examples): - if not isinstance(examples, list): - examples = [examples] - - self.add_class(class_name) - - if not class_name in self.examples: - self.examples[class_name] = [] - self.examples[class_name] += examples - - def add_class(self, class_name): - if not class_name in self.class_names_to_ids: - class_id = len(self.class_names_to_ids) - self.class_names_to_ids[class_name] = class_id - self.class_ids_to_metadata[class_id] = {"class_name": class_name} - - def add_metadata_to_class(self, class_name, metadata): - self.class_ids_to_metadata[self.class_names_to_ids[class_name]][ - "metadata" - ] = metadata - - def get_data(self): - return self.examples - - def get_embeddings(self, examples): - if isinstance(examples, str): - examples = [examples] - - embed = Embedding() - embeddings = embed.generate(examples) - return [embedding[0] for embedding in embeddings] - - def predict_proba(self, text): - return self.logistic_regression.predict_proba(self.get_embeddings(text)) - - def predict_proba_from_embedding(self, embeddings): - return self.logistic_regression.predict_proba(embeddings) - - def predict(self, text): - if not isinstance(text, list): - raise Exception("Text to predict must be a list of string(s)") - - probs = self.predict_proba(text) - - # select the class with the highest probability, note that text and - # probs are lists of arbitrary length - winning_classes = [ - max(enumerate(prob), key=lambda x: x[1])[0] for prob in probs - ] - - # convert the class ids to class names - return [ - list(self.class_names_to_ids.keys())[class_id] - for class_id in winning_classes - ] - - def classify_from_embedding( - self, embedding, top_n=None, threshold=None, metadata=False - ): - is_singleton = True if len(embedding) == 1 else False - - batch_probs = self.predict_proba_from_embedding(embedding) - - return self._classify_impl( - batch_probs, is_singleton, top_n, threshold, metadata + self.classifier_id = None + self.initialize_job_id = None + self.train_job_id = None + + def initialize(self, classes: dict): + resp = make_web_request( + self.api_key, + self.api_prefix + f"/initialize", + "post", + { + "classes": classes, + "name": self.classifier_name, + "model_name": self.model_name, + }, ) - - def classify(self, text, top_n=None, threshold=None, metadata=False): - is_singleton = True if isinstance(text, str) else False - - batch_probs = self.predict_proba(text) - - return self._classify_impl( - batch_probs, is_singleton, top_n, threshold, metadata + self.initialize_job_id = resp["job_id"] + return resp + + def initialize_status(self): + resp = make_web_request( + self.api_key, + self.api_url + f"/v1/data_generation/{self.initialize_job_id}/status", + "get", ) + return resp - def _classify_impl( - self, batch_probs, is_singleton, top_n=None, threshold=None, metadata=False - ): - batch_final_probs = [] - for probs in batch_probs: - final_probs = [] - for class_id, prob in enumerate(probs): - if threshold is None or prob > threshold: - # Include the metadata if requested - class_name = self.class_ids_to_metadata[class_id]["class_name"] - final_prob = { - "class_id": class_id, - "class_name": class_name, - "prob": prob, - } - if metadata: - metadata = self.class_ids_to_metadata[class_id] - final_prob["metadata"] = metadata - final_probs.append(final_prob) - - # Sort the final_probs, a list of dicts each with a key "prob" - sorted_probs = sorted(final_probs, key=lambda x: x["prob"], reverse=True) - - if top_n is not None: - sorted_probs = sorted_probs[:top_n] - batch_final_probs.append(sorted_probs) - - return batch_final_probs if not is_singleton else batch_final_probs[0] - - def dumps(self): - return pickle.dumps(self) - - @staticmethod - def loads(data): - return pickle.loads(data) - - def save(self, filename): - obj = SavedLaminiClassifier( - self.logistic_regression, - self.class_names_to_ids, - self.class_ids_to_metadata, - ) - with open(filename, "wb") as f: - pickle.dump(obj, f) - - def save_local(self, filename): - with open(filename, "wb") as f: - pickle.dump(self, f) - - @staticmethod - def load(filename): - with open(filename, "rb") as f: - return LaminiClassifier.loads(f.read()) - - def create_new_example_generator(self, prompt, original_examples): - example_generator = self.generator_from_prompt( - prompt, - model_name=self.model_name, - batch_size=self.batch_size // 5, + def train(self): + resp = make_web_request( + self.api_key, + self.api_prefix + f"/train", + "post", + {"name": self.classifier_name}, ) - example_modifier = self.example_modifier( - model_name=self.model_name, - batch_size=self.batch_size // 5, + self.train_job_id = resp["job_id"] + return resp + + def train_status(self): + resp = make_web_request( + self.api_key, + self.api_prefix + f"/{self.train_job_id}/status", + "get", ) - example_expander = self.example_expander(prompt, model_name=self.model_name) - - examples = original_examples.copy() - - index = len(examples) - - while True: - # Phase 1: Generate example types from prompt - compressed_example_features = example_generator.generate_examples( - seed=index, examples=examples - ) - - # Phase 2: Modify the features to be more diverse - different_example_features = example_modifier.modify_examples( - compressed_example_features - ) - - different_example_features = chain( - different_example_features, compressed_example_features - ) - - different_example_features_batches = self.batchify( - different_example_features - ) - - # Phase 3: Expand examples from features - for features_batches in different_example_features_batches: - expanded_example_batch = example_expander.expand_example( - features_batches - ) - - for expanded_example in expanded_example_batch: - logger.debug( - f"Generated example number {index} out of {self.augmented_example_count}" - ) - - index += 1 - examples.append(expanded_example) - yield expanded_example - - if index >= self.augmented_example_count: - return - - def batchify(self, examples): - batches = [] - # handle batches that are smaller than batch_size - for example in examples: - if len(batches) == 0 or len(batches[-1]) == self.batch_size: - batches.append([]) - batches[-1].append(example) - - return batches - - def generate_examples_from_prompt(self, class_name, prompt, original_examples): - examples = [] - if isinstance(original_examples, str): - original_examples = [original_examples] - - # No need to generate more examples if we already have enough - if len(original_examples) >= self.augmented_example_count: - logger.debug( - f"Already have enough examples ({len(original_examples)}) for class '{class_name}', not generating more" - ) - return original_examples - - for example in tqdm( - self.create_new_example_generator(prompt, original_examples), - total=self.augmented_example_count, - ): - examples.append(example) - - if len(examples) >= self.augmented_example_count: - break + self.classifier_id = resp["model_id"] + return resp - return examples + original_examples - - def load_examples(self): - filename = self.saved_examples_path - if not os.path.exists(filename): - return {} - - # load the examples from the jsonl file using the jsonlines library - with jsonlines.open(filename) as reader: - examples = {} - for row in reader: - class_name = row["class_name"] - example = row["examples"] - self.add_class(class_name) - examples[class_name] = example - - return examples - - def save_examples(self): - filename = self.saved_examples_path - - # save the examples as a jsonl file using the jsonlines library - with jsonlines.open(filename, "w") as writer: - for class_name, example in self.examples.items(): - row = { - "class_name": class_name, - "examples": example, - } - writer.write(row) - - -class SavedLaminiClassifier: - def __init__( - self, - logistic_regression: LogisticRegression, - class_names_to_ids: dict, - class_ids_to_metadata: dict, - ): - self.logistic_regression = logistic_regression - self.class_names_to_ids = class_names_to_ids - self.class_ids_to_metadata = class_ids_to_metadata - - @staticmethod - def loads(data): - return pickle.loads(data) - - -class DefaultExampleGenerator: - def __init__( - self, - prompt, - model_name="meta-llama/Meta-Llama-3.1-8B-Instruct", - batch_size=10, - ): - self.prompt = prompt - self.example_count = 5 - self.model_name = model_name - self.batch_size = batch_size - - self.max_history = 2 - - def generate_examples(self, seed, examples): - prompt_batch = self.get_prompts(seed=seed, examples=examples) - - runner = Lamini(model_name=self.model_name) - - results = runner.generate( - prompt=prompt_batch, - output_type={ - "example_1": "str", - "example_2": "str", - "example_3": "str", + # Add alias for tune + tune = train + prompt_train = initialize + create = initialize + + def add(self, dataset_name: str, data: dict): + resp = make_web_request( + self.api_key, + self.api_prefix + f"/add", + "post", + { + "data": data, + "dataset_name": dataset_name, + "project_name": self.classifier_name, }, ) + return resp - logger.debug("+++++++ Default Example Generator Result ++++++++") - logger.debug(results) - logger.debug("+++++++++++++++++++++++++++++++++++++++++++++++++++++") - - examples = self.parse_result(results) - - return examples - - def get_prompts(self, seed, examples): - prompt = "You are a domain expert who is able to generate many different examples given a description." - - # Randomly shuffle the examples - random.seed(seed) - random.shuffle(examples) - - # Include examples if they are available - if len(examples) > 0: - selected_example_count = min(self.max_history, len(examples)) - - prompt += "Consider the following examples:\n" - - for i in range(selected_example_count): - prompt += "----------------------------------------\n" - prompt += f"{examples[i]}" - prompt += "\n----------------------------------------\n" - - prompt += "Read the following description carefully:\n" - prompt += "----------------------------------------\n" - prompt += self.prompt - prompt += "\n----------------------------------------\n" - - prompt += f"Generate {self.example_count} different example summaries following this description. Each example summary should be as specific as possible using at most 10 words.\n" - - return prompt - - def parse_result(self, result): - return [ - result["example_1"], - result["example_2"], - result["example_3"], - ] - - -class DefaultExampleModifier: - def __init__( + def classify( self, - model_name="meta-llama/Meta-Llama-3.1-8B-Instruct", - batch_size=10, + prompt: Union[str, List[str]], ): - self.model_name = model_name - self.required_examples = 5 - self.batch_size = batch_size - - def modify_examples(self, examples): - prompts = self.get_prompts(examples) - - runner = Lamini(model_name=self.model_name) - - results = runner.generate( - prompt=prompts, - output_type={ - "example_1": "str", - "example_2": "str", - "example_3": "str", - }, + if self.classifier_id is None: + raise Exception( + "LaminiClassifier.classifier_id must be set in order to classify. Manually set this or train a new classifier." + ) + params = {"prompt": prompt} + resp = make_web_request( + self.api_key, + self.api_prefix + f"/{self.classifier_id}/classify", + "post", + params, ) - - logger.debug("+++++++ Default Example Modifier Result ++++++++") - logger.debug(results) - logger.debug("+++++++++++++++++++++++++++++++++++++++++++++++++++++") - - examples = self.parse_result(results) - - return examples - - def get_prompts(self, existing_examples): - - examples = existing_examples.copy() - random.seed(42) - - prompts = [] - - for batch_example in range(self.batch_size): - # Randomly shuffle the examples - random.shuffle(examples) - - example_count = min(5, len(examples)) - - prompt = "You are a domain expert who is able to clearly understand these descriptions and modify them to be more diverse." - prompt += "Read the following descriptions carefully:\n" - prompt += "----------------------------------------\n" - for index, example in enumerate(examples[:example_count]): - prompt += f"{index + 1}. {example}\n" - prompt += "\n----------------------------------------\n" - - prompt += "Generate 3 more examples that are similar, but substantially different from those above. Each example should be as specific as possible using at most 10 words.\n" - - logger.debug("+++++++ Default Example Modifier Prompt ++++++++") - logger.debug(prompt) - logger.debug("+++++++++++++++++++++++++++++++++++++++++++++++++++++") - - prompts.append(prompt) - - return prompts - - def parse_result(self, results): - all_examples = [] - for result in results: - all_examples += [ - result["example_1"], - result["example_2"], - result["example_3"], - ] - - return all_examples - - -class DefaultExampleExpander: - def __init__(self, prompt, model_name="meta-llama/Meta-Llama-3.1-8B-Instruct"): - self.prompt = prompt - self.model_name = model_name - - def expand_example(self, example_batch): - runner = Lamini(model_name=self.model_name) - - prompts = self.get_prompts(example_batch) - - results = runner.generate(prompt=prompts) - - for result in results: - logger.debug("+++++++ Default Example Expander Result ++++++++") - logger.debug(result) - logger.debug("+++++++++++++++++++++++++++++++++++++++++++++++++++++") - - return results - - def get_prompts(self, example_batch): - - prompts = [] - - for example in example_batch: - - prompt = "You are a domain expert who is able to clearly understand this description and expand to a complete example from a short summary." - - prompt += "Read the following description carefully:\n" - prompt += "----------------------------------------\n" - prompt += self.prompt - prompt += "\n----------------------------------------\n" - prompt += "Now read the following summary of an example matching this description carefully:\n" - prompt += "----------------------------------------\n" - prompt += example - prompt += "\n----------------------------------------\n" - - prompt += "Expand the summary to a complete example with about 3 sentences. Be consistent with both the summary and the description. Get straight to the point.\n" - - logger.debug("+++++++ Default Example Expander Prompt ++++++++") - logger.debug(prompt) - logger.debug("+++++++++++++++++++++++++++++++++++++++++++++++++++++") - - prompts.append(prompt) - - return prompts + return resp diff --git a/lamini/error/error.py b/lamini/error/error.py index ac8ca5b..aedcd17 100644 --- a/lamini/error/error.py +++ b/lamini/error/error.py @@ -1,46 +1,50 @@ -class LlamaError(Exception): +class LaminiError(Exception): def __init__( self, message=None, ): - super(LlamaError, self).__init__(message) + super(LaminiError, self).__init__(message) -class ModelNotFound(LlamaError): +class ModelNotFound(LaminiError): """The model name is invalid. Make sure it's a valid model in Huggingface or a finetuned model""" -class APIError(LlamaError): +class APIError(LaminiError): """There is an internal error in the Lamini API""" -class AuthenticationError(LlamaError): +class AuthenticationError(LaminiError): """The Lamini API key is invalid""" -class RateLimitError(LlamaError): +class RateLimitError(LaminiError): """The QPS of requests to the API is too high""" -class UserError(LlamaError): +class UserError(LaminiError): """The user has made an invalid request""" -class APIUnprocessableContentError(LlamaError): +class APIUnprocessableContentError(LaminiError): """Invalid request format. Consider upgrading lamini library version""" -class UnavailableResourceError(LlamaError): +class UnavailableResourceError(LaminiError): """Model is still downloading""" -class ServerTimeoutError(LlamaError): +class ServerTimeoutError(LaminiError): """Model is still downloading""" -class DownloadingModelError(LlamaError): +class DownloadingModelError(LaminiError): """Downloading model""" -class RequestTimeoutError(LlamaError): +class RequestTimeoutError(LaminiError): """Request Timeout. Please try again.""" + + +class OutdatedServerError(LaminiError): + """Outdated Server Version""" diff --git a/lamini/evaluators/custom/earnings_call_evaluator.py b/lamini/evaluators/custom/earnings_call_evaluator.py index b982eaf..ffd9993 100644 --- a/lamini/evaluators/custom/earnings_call_evaluator.py +++ b/lamini/evaluators/custom/earnings_call_evaluator.py @@ -1,12 +1,11 @@ import logging -from typing import Union, Iterator, AsyncIterator +from typing import AsyncIterator, Iterator, Union import jsonlines +from lamini.evaluators.utils.utils import save_results from lamini.generation.base_prompt_object import PromptObject from lamini.generation.generation_node import GenerationNode from lamini.generation.generation_pipeline import GenerationPipeline -from lamini.evaluators.utils.utils import save_results, format_results - logger = logging.getLogger(__name__) diff --git a/lamini/evaluators/helm/harness_evaluator.py b/lamini/evaluators/helm/harness_evaluator.py index 638e194..731dae3 100644 --- a/lamini/evaluators/helm/harness_evaluator.py +++ b/lamini/evaluators/helm/harness_evaluator.py @@ -1,14 +1,15 @@ -from lm_eval import tasks, evaluator, utils -from lm_eval.api.model import LM -from datetime import datetime -from tqdm import tqdm +import logging import os -import jsonlines +from datetime import datetime from typing import List + +import jsonlines from lamini.api.lamini import Lamini from lamini.evaluators.helm.mmlu_evaluator import MMLUEvaluator from lamini.evaluators.helm.truthfulqa_evaluator import TruthfulQAEvaluator -import logging +from lm_eval import evaluator +from lm_eval.api.model import LM +from tqdm import tqdm logger = logging.getLogger(__name__) diff --git a/lamini/generation/base_generation_queue.py b/lamini/generation/base_generation_queue.py index f09f928..cabd59d 100644 --- a/lamini/generation/base_generation_queue.py +++ b/lamini/generation/base_generation_queue.py @@ -1,5 +1,5 @@ import logging -from typing import Any, Dict, Optional +from typing import Optional import aiohttp import lamini diff --git a/lamini/generation/base_prompt_object.py b/lamini/generation/base_prompt_object.py index 649dd22..045b06e 100644 --- a/lamini/generation/base_prompt_object.py +++ b/lamini/generation/base_prompt_object.py @@ -1,6 +1,3 @@ -from typing import Any - - class PromptObject: def __init__(self, prompt: str, response: str = None, data: dict = {}) -> None: assert isinstance(prompt, str) diff --git a/lamini/generation/embedding_node.py b/lamini/generation/embedding_node.py index 0c49e2e..794a890 100644 --- a/lamini/generation/embedding_node.py +++ b/lamini/generation/embedding_node.py @@ -1,5 +1,4 @@ import logging -import sys from typing import AsyncIterator, Iterator, Optional, Union from lamini.generation.base_prompt_object import PromptObject diff --git a/lamini/generation/generation_node.py b/lamini/generation/generation_node.py index 75ead26..a485563 100644 --- a/lamini/generation/generation_node.py +++ b/lamini/generation/generation_node.py @@ -1,8 +1,6 @@ import logging -import sys from typing import AsyncIterator, Generator, Iterator, Optional, Union -from lamini.api.lamini_config import get_config from lamini.api.utils.iterators import async_iter from lamini.generation.base_node_object import BaseGenerationNode from lamini.generation.base_prompt_object import PromptObject diff --git a/lamini/generation/generation_pipeline.py b/lamini/generation/generation_pipeline.py index 2fbbdfd..7457832 100644 --- a/lamini/generation/generation_pipeline.py +++ b/lamini/generation/generation_pipeline.py @@ -30,7 +30,6 @@ def forward(...): define pipeline nodes. The input and output of pipeline nodes should all be async iterators. """ - pass async def __call( self, diff --git a/lamini/generation/llm_stream.py b/lamini/generation/llm_stream.py new file mode 100644 index 0000000..34ed237 --- /dev/null +++ b/lamini/generation/llm_stream.py @@ -0,0 +1,152 @@ +import time +from typing import Any, Dict, Iterator, List, Optional, TypeVar, Union + + +import lamini +from lamini.api.lamini_config import get_config, get_configured_key, get_configured_url +from lamini.api.rest_requests import make_web_request + +T = TypeVar("T") + + +class LLMStream: + """Handler for formatting and POST request for the batch submission API""" + + def __init__( + self, + api_key: Optional[str] = None, + api_url: Optional[str] = None, + ) -> None: + self.config = get_config() + self.api_key = api_key or lamini.api_key or get_configured_key(self.config) + self.api_url = api_url or lamini.api_url or get_configured_url(self.config) + self.api_prefix = self.api_url + "/v1/" + self.current_minibatch_result = None + self.current_minibatch_index = 0 + self.polling_interval = 1 + self.error_count = 0 + self.max_errors = 3 + + def generate( + self, + prompts: Union[Iterator, List], + model_name: str, + output_type: Optional[dict] = None, + max_new_tokens: Optional[int] = None, + ): + if isinstance(prompts, list): + prompts = iter(prompts) + minibatch_stream = self.minibatch(prompts, lambda: lamini.batch_size) + for minibatch in minibatch_stream: + # print("MINIBATCH", minibatch) + assert isinstance(minibatch, list) + req_data = self.make_llm_req_map( + prompt=minibatch, + model_name=model_name, + output_type=output_type, + max_new_tokens=max_new_tokens, + ) + resp = make_web_request( + self.api_key, + self.api_prefix + "batch_completions", + "post", + req_data, + ) + # print("rrrrrr", resp) + result_stream = self.get_minibatch_result_stream(resp["id"]) + # print("streammmm", result_stream) + for result in result_stream: + yield result + + self.current_minibatch_result = None + self.current_minibatch_index = 0 + + def get_minibatch_result_stream(self, id: str): + # Keep polling until results are yielded + while ( + self.current_minibatch_result is None + or self.current_minibatch_result == {} + or not all(self.current_minibatch_result["finish_reason"]) + ): + time.sleep(self.polling_interval) + try: + self.current_minibatch_result = make_web_request( + self.api_key, + self.api_prefix + f"batch_completions/{id}/result", + "get", + ) + if self.current_minibatch_result == {}: + continue + + # Yield all most recently available results + available_results = len( + self.current_minibatch_result["finish_reason"] + ) - self.current_minibatch_result["finish_reason"].count(None) + for i in range(self.current_minibatch_index, available_results): + result = { + "output": self.current_minibatch_result["outputs"][i], + "finish_reason": self.current_minibatch_result["finish_reason"][ + i + ], + } + yield result + + self.current_minibatch_index = available_results + + except Exception as e: + self.error_count += 1 + if self.error_count > self.max_errors: + raise e + + def check_result( + self, + id: str, + ) -> Dict[str, Any]: + """Check for the result of a batch request with the appropriate batch id.""" + resp = make_web_request( + self.api_key, + self.api_prefix + f"batch_completions/{id}/result", + "get", + ) + return resp + + def make_llm_req_map( + self, + model_name: str, + prompt: Union[str, List[str]], + output_type: Optional[dict] = None, + max_new_tokens: Optional[int] = None, + ) -> Dict[str, Any]: + + req_data = {} + req_data["model_name"] = model_name + req_data["prompt"] = prompt + req_data["output_type"] = output_type + if max_new_tokens is not None: + req_data["max_new_tokens"] = max_new_tokens + return req_data + + def minibatch( + self, + iterator: Iterator[T], + size_fn, + ) -> Iterator[list[T]]: + """Yield successive n-sized chunks from lst.""" + finished = False + + while not finished: + results: list[T] = [] + size = size_fn() + + for _ in range(size): + try: + result = None + while result is None: + result = next(iterator) + except StopIteration: + finished = True + else: + results.append(result) + + if results: + yield results diff --git a/lamini/generation/modify_node.py b/lamini/generation/modify_node.py index cde873b..a49ee50 100644 --- a/lamini/generation/modify_node.py +++ b/lamini/generation/modify_node.py @@ -1,7 +1,6 @@ import logging from typing import AsyncIterator, Callable, Iterator, Optional, Union -from lamini.api.lamini_config import get_config from lamini.generation.base_node_object import BaseGenerationNode from lamini.generation.base_prompt_object import PromptObject diff --git a/lamini/generation/process_generation_batch.py b/lamini/generation/process_generation_batch.py index 284d0b5..9fb5d9a 100644 --- a/lamini/generation/process_generation_batch.py +++ b/lamini/generation/process_generation_batch.py @@ -1,8 +1,7 @@ -import asyncio import logging -from lamini.api.rest_requests import make_async_web_request -from lamini.api.utils.batch import Batch +import lamini +from lamini.api.pipeline_client import PipelineClient logger = logging.getLogger(__name__) @@ -42,8 +41,10 @@ def can_submit_query(): logger.info(f"Sending batch with {len(batch['prompt'])}") result = await query_api(client, key, url, json, batch["type"]) except Exception as e: - logger.debug( - f"Error in process_generation_batch, type: {type(e)}, message: {e}" + logger.error( + f"Error in process_generation_batch, type: {type(e)}, message: {e}", + stack_info=True, + exc_info=True, ) for prompt_obj in batch["prompt"]: prompt_obj.error.append(e) @@ -63,28 +64,26 @@ def can_submit_query(): prompt_obj.response = result[i] else: for i, prompt_obj in enumerate(batch["prompt"]): - prompt_obj.response = result["outputs"][i] - prompt_obj.finish_reason = result["finish_reason"][i] + if lamini.gate_pipeline_batch_completions: + prompt_obj.response = result["outputs"][i] + prompt_obj.finish_reason = result["finish_reason"][i] + else: + prompt_obj.response = result[i] async def query_api(client, key, url, json, type): + pipeline_client = PipelineClient() if type == "embedding": - # TODO: Replace make_async_web_request() with Completion.generate() - result = await make_async_web_request(client, key, url, "post", json) - result = result["embedding"] + if lamini.gate_pipeline_batch_completions: + result = await pipeline_client.batch_embeddings(client, json) + else: + result = await pipeline_client.embedding(client, key, url, json) else: - batch_api = Batch() - submit_response = batch_api.submit( - prompt=json["prompt"], - model_name=json["model_name"], - output_type=json["output_type"], - max_new_tokens=json["max_new_tokens"], - ) # TODO: Don't resubmit work if an error is thrown in the while loop - while True: - await asyncio.sleep(5) - result = batch_api.check_result(submit_response["id"]) - if result: - break + if lamini.gate_pipeline_batch_completions: + result = await pipeline_client.batch_completions(client, json) + else: + result = await pipeline_client.completions(client, key, url, json) + return result diff --git a/lamini/generation/split_response_node.py b/lamini/generation/split_response_node.py index a273e1a..cee9ee6 100644 --- a/lamini/generation/split_response_node.py +++ b/lamini/generation/split_response_node.py @@ -1,7 +1,6 @@ import logging from typing import AsyncIterator, Callable, Iterator, Optional, Union -from lamini.api.lamini_config import get_config from lamini.generation.base_node_object import BaseGenerationNode from lamini.generation.base_prompt_object import PromptObject diff --git a/pyproject.toml b/pyproject.toml index b7d868f..b8b2d7b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "lamini" -version = "3.1.0" +version = "3.1.3" authors = [ { name="Lamini", email="info@lamini.ai" }, ]