diff --git a/lamini/__init__.py b/lamini/__init__.py index 3fee44b..075699a 100644 --- a/lamini/__init__.py +++ b/lamini/__init__.py @@ -8,9 +8,9 @@ from lamini.runners.basic_model_runner import BasicModelRunner from lamini.runners.mistral_runner import MistralRunner from lamini.api.lamini import Lamini -from lamini.classify.lamini_classifier import LaminiClassifier from lamini.api.classifier import Classifier from lamini.api.embedding import Embedding +from lamini.classify.lamini_classifier import LaminiClassifier from lamini.generation.generation_node import GenerationNode from lamini.generation.generation_pipeline import GenerationPipeline from lamini.generation.base_prompt_object import PromptObject @@ -27,7 +27,8 @@ Find your LAMINI_API_KEY at https://app.lamini.ai/account""" # When inference call failed, how much retry should we perform. -retry_limit = os.environ.get("LAMINI_RETRY_LIMIT", 3) +retry_limit = int(os.environ.get("LAMINI_RETRY_LIMIT", 3)) -max_workers = os.environ.get("LAMINI_MAX_WORKERS", 10) -batch_size = os.environ.get("LAMINI_BATCH_SIZE", 5) +max_workers = int(os.environ.get("LAMINI_MAX_WORKERS", 4)) +batch_size = int(os.environ.get("LAMINI_BATCH_SIZE", 5)) +static_batching = bool(os.environ.get("LAMINI_STATIC_BATCHING", False)) diff --git a/lamini/api/classifier.py b/lamini/api/classifier.py index 7836f37..ddf522e 100644 --- a/lamini/api/classifier.py +++ b/lamini/api/classifier.py @@ -8,11 +8,9 @@ class Classifier: - def __init__( - self, model_id: int = None, api_key: str = None, api_url: str = None, config={} - ): + def __init__(self, model_id: int = None, api_key: str = None, api_url: str = None): self.model_id = model_id - self.config = get_config(config) + self.config = get_config() self.api_key = api_key or lamini.api_key or get_configured_key(self.config) self.api_url = api_url or lamini.api_url or get_configured_url(self.config) self.api_prefix = self.api_url + "/v1/classifier" diff --git a/lamini/api/embedding.py b/lamini/api/embedding.py index 6c07241..5c17cd2 100644 --- a/lamini/api/embedding.py +++ b/lamini/api/embedding.py @@ -1,4 +1,4 @@ -from typing import List, Optional, Union +from typing import List, Union import lamini import numpy as np @@ -9,15 +9,14 @@ class Embedding: def __init__( self, + model_name: str = None, api_key: str = None, api_url: str = None, - model_name: str = None, - config={}, ): - self.config = get_config(config) + self.config = get_config() self.api_key = api_key or lamini.api_key or get_configured_key(self.config) self.api_url = api_url or lamini.api_url or get_configured_url(self.config) - self.api_prefix = self.api_url + "/v1/inference/" + self.api_prefix = self.api_url + "/v1/" self.model_name = model_name def generate(self, prompt: Union[str, List[str]]): diff --git a/lamini/api/lamini.py b/lamini/api/lamini.py index f975668..b6ada85 100644 --- a/lamini/api/lamini.py +++ b/lamini/api/lamini.py @@ -1,20 +1,16 @@ import json import logging import os -import sys import time -from typing import Callable, Dict, Iterable, List, Optional, Union +from typing import Dict, Iterable, List, Optional, Union import jsonlines import pandas as pd from lamini.api.lamini_config import get_config from lamini.api.rest_requests import get_version -from lamini.api.synchronize import sync from lamini.api.train import Train -from lamini.api.utils.async_inference_queue import AsyncInferenceQueue from lamini.api.utils.completion import Completion from lamini.api.utils.upload_client import get_dataset_name, upload_to_blob -from lamini.generation.token_optimizer import TokenOptimizer logger = logging.getLogger(__name__) @@ -25,33 +21,15 @@ def __init__( model_name: str, api_key: Optional[str] = None, api_url: Optional[str] = None, - local_cache_file: Optional[str] = None, - config: dict = {}, ): - self.config = get_config(config) + self.config = get_config() self.model_name = model_name self.api_key = api_key self.api_url = api_url - if sys.version_info >= (3, 10): - logger.info("Using 3.10 InferenceQueue Interface") - from lamini.api.utils.async_inference_queue_3_10 import ( - AsyncInferenceQueue as AsyncInferenceQueue310, - ) - - self.async_inference_queue = AsyncInferenceQueue310( - api_key, api_url, config=config - ) - else: - self.async_inference_queue = AsyncInferenceQueue( - api_key, api_url, config=config - ) - - self.completion = Completion(api_key, api_url, config=config) - self.trainer = Train(api_key, api_url, config=config) + self.completion = Completion(api_key, api_url) + self.trainer = Train(api_key, api_url) self.upload_file_path = None self.upload_base_path = None - self.local_cache_file = local_cache_file - self.model_config = self.config.get("model_config", None) def version(self): return get_version(self.api_key, self.api_url, self.config) @@ -63,36 +41,20 @@ def generate( output_type: Optional[dict] = None, max_tokens: Optional[int] = None, max_new_tokens: Optional[int] = None, - callback: Optional[Callable] = None, - metadata: Optional[List] = None, ): - if isinstance(prompt, str) or (isinstance(prompt, list) and len(prompt) == 1): - result = self.completion.generate( - prompt=prompt, - model_name=model_name or self.model_name, - output_type=output_type, - max_tokens=max_tokens, - max_new_tokens=max_new_tokens, - ) - if output_type is None: - if isinstance(prompt, list) and len(prompt) == 1: - result = [single_result["output"] for single_result in result] - else: - result = result["output"] - return result - - assert isinstance(prompt, list) - return sync( - self.async_generate( - prompt=prompt, - model_name=model_name, - output_type=output_type, - max_tokens=max_tokens, - max_new_tokens=max_new_tokens, - callback=callback, - metadata=metadata, - ) + result = self.completion.generate( + prompt=prompt, + model_name=model_name or self.model_name, + output_type=output_type, + max_tokens=max_tokens, + max_new_tokens=max_new_tokens, ) + if output_type is None: + if isinstance(prompt, list): + result = [single_result["output"] for single_result in result] + else: + result = result["output"] + return result async def async_generate( self, @@ -101,8 +63,6 @@ async def async_generate( output_type: Optional[dict] = None, max_tokens: Optional[int] = None, max_new_tokens: Optional[int] = None, - callback: Optional[Callable] = None, - metadata: Optional[List] = None, ): req_data = self.completion.make_llm_req_map( prompt=prompt, @@ -111,32 +71,13 @@ async def async_generate( max_tokens=max_tokens, max_new_tokens=max_new_tokens, ) - - if isinstance(prompt, str) or (isinstance(prompt, list) and len(prompt) == 1): - result = await self.completion.async_generate(req_data) - if output_type is None: - if isinstance(prompt, list) and len(prompt) == 1: - result = [single_result["output"] for single_result in result] - else: - result = result["output"] - return result - - assert isinstance(prompt, list) - if metadata is not None: - assert isinstance(metadata, list) - assert len(metadata) == len(prompt) - results = await self.async_inference_queue.submit( - req_data, - self.local_cache_file, - callback, - metadata, - token_optimizer=TokenOptimizer(model_name or self.model_name), - ) - + result = await self.completion.async_generate(req_data) if output_type is None: - results = [single_result["output"] for single_result in results] - - return results + if isinstance(prompt, list): + result = [single_result["output"] for single_result in result] + else: + result = result["output"] + return result def upload_data( self, @@ -245,7 +186,6 @@ def train( ], finetune_args: Optional[dict] = None, gpu_config: Optional[dict] = None, - enable_peft: Optional[bool] = None, peft_args: Optional[dict] = None, is_public: Optional[bool] = None, use_cached_model: Optional[bool] = None, @@ -269,7 +209,6 @@ def train( upload_file_path=self.upload_file_path, finetune_args=finetune_args, gpu_config=gpu_config, - enable_peft=enable_peft, peft_args=peft_args, is_public=is_public, use_cached_model=use_cached_model, @@ -289,7 +228,6 @@ def train_and_wait( ], finetune_args: Optional[dict] = None, gpu_config: Optional[dict] = None, - enable_peft: Optional[bool] = None, peft_args: Optional[dict] = None, is_public: Optional[bool] = None, use_cached_model: Optional[bool] = None, @@ -300,7 +238,6 @@ def train_and_wait( data_or_dataset_id, finetune_args=finetune_args, gpu_config=gpu_config, - enable_peft=enable_peft, peft_args=peft_args, is_public=is_public, use_cached_model=use_cached_model, diff --git a/lamini/api/rest_requests.py b/lamini/api/rest_requests.py index 05d0875..680e762 100644 --- a/lamini/api/rest_requests.py +++ b/lamini/api/rest_requests.py @@ -1,9 +1,9 @@ -import aiohttp import asyncio import importlib.metadata import logging -import requests +import aiohttp +import requests from lamini.api.lamini_config import get_config, get_configured_key, get_configured_url from lamini.error.error import ( APIError, diff --git a/lamini/api/streaming_completion.py b/lamini/api/streaming_completion.py index 2564725..2d0f5a2 100644 --- a/lamini/api/streaming_completion.py +++ b/lamini/api/streaming_completion.py @@ -104,9 +104,8 @@ def __init__( self, api_key: str = None, api_url: str = None, - config={}, ): - self.config = get_config(config) + self.config = get_config() self.api_key = api_key or lamini.api_key or get_configured_key(self.config) self.api_url = api_url or lamini.api_url or get_configured_url(self.config) self.api_prefix = self.api_url + "/v1/" diff --git a/lamini/api/train.py b/lamini/api/train.py index 9f01307..2a3e53d 100644 --- a/lamini/api/train.py +++ b/lamini/api/train.py @@ -14,14 +14,12 @@ def __init__( self, api_key: Optional[str] = None, api_url: Optional[str] = None, - config: Optional[dict] = {}, ): - self.config = get_config(config) + self.config = get_config() self.api_key = api_key or lamini.api_key or get_configured_key(self.config) self.api_url = api_url or lamini.api_url or get_configured_url(self.config) self.api_prefix = self.api_url + "/v1/" self.ui_url = "https://app.lamini.ai" - self.model_config = self.config.get("model_config", None) def train( self, @@ -30,7 +28,6 @@ def train( upload_file_path: Optional[str] = None, finetune_args: Optional[dict] = None, gpu_config: Optional[dict] = None, - enable_peft: Optional[bool] = None, peft_args: Optional[dict] = None, is_public: Optional[bool] = None, use_cached_model: Optional[bool] = None, @@ -44,16 +41,12 @@ def train( req_data["finetune_args"] = finetune_args if gpu_config is not None: req_data["gpu_config"] = gpu_config - if enable_peft is not None: - req_data["enable_peft"] = enable_peft if peft_args is not None: req_data["peft_args"] = peft_args if is_public is not None: req_data["is_public"] = is_public if use_cached_model is not None: req_data["use_cached_model"] = use_cached_model - if self.model_config: - req_data["model_config"] = self.model_config.as_dict() if multi_node is not None: req_data["multi_node"] = multi_node url = self.api_prefix + "train" @@ -69,43 +62,6 @@ def train( # Add alias for tune tune = train - def precise_train( - self, - model_name: str, - dataset_id: str, - upload_file_path: Optional[str] = None, - finetune_args: Optional[dict] = None, - gpu_config: Optional[dict] = None, - is_public: Optional[bool] = None, - use_cached_model: Optional[bool] = None, - ): - req_data = {"model_name": model_name} - req_data["dataset_id"] = dataset_id - if upload_file_path is not None: - req_data["upload_file_path"] = upload_file_path - if finetune_args is not None: - req_data["finetune_args"] = finetune_args - if gpu_config is not None: - req_data["gpu_config"] = gpu_config - if is_public is not None: - req_data["is_public"] = is_public - if use_cached_model is not None: - req_data["use_cached_model"] = use_cached_model - if self.model_config: - req_data["model_config"] = self.model_config.as_dict() - url = self.api_prefix + "precise_train" - - job = make_web_request(self.api_key, url, "post", req_data) - self.job_id = job["job_id"] - print( - f"Tuning job submitted! Check status of job {self.job_id} here: {self.ui_url}/train/{self.job_id}" - ) - - return job - - # Add alias for tune - precise_tune = precise_train - def cancel_job(self, job_id=None): if job_id is None: job_id = self.job_id diff --git a/lamini/api/utils/completion.py b/lamini/api/utils/completion.py index 8631c62..ac9349b 100644 --- a/lamini/api/utils/completion.py +++ b/lamini/api/utils/completion.py @@ -1,3 +1,4 @@ +import logging from typing import List, Optional, Union import aiohttp @@ -5,14 +6,15 @@ from lamini.api.lamini_config import get_config, get_configured_key, get_configured_url from lamini.api.rest_requests import make_async_web_request, make_web_request +logger = logging.getLogger(__name__) + class Completion: - def __init__(self, api_key, api_url, config): - self.config = get_config(config) + def __init__(self, api_key, api_url): + self.config = get_config() self.api_key = api_key or lamini.api_key or get_configured_key(self.config) self.api_url = api_url or lamini.api_url or get_configured_url(self.config) self.api_prefix = self.api_url + "/v1/" - self.model_config = self.config.get("model_config", None) def generate( self, @@ -70,11 +72,13 @@ def make_llm_req_map( req_data = {} req_data["model_name"] = model_name # TODO: prompt should be named prompt to signal it's a batch. + if isinstance(prompt, list) and len(prompt) > 20: + print( + "For large inference batches, consider using a Generation Pipeline instead: https://github.com/lamini-ai/lamini-examples/blob/main/05_data_pipeline/README.md" + ) req_data["prompt"] = prompt req_data["output_type"] = output_type req_data["max_tokens"] = max_tokens if max_new_tokens is not None: req_data["max_new_tokens"] = max_new_tokens - if self.model_config: - req_data["model_config"] = self.model_config.as_dict() return req_data diff --git a/lamini/api/utils/reservations.py b/lamini/api/utils/reservations.py index 424e363..4f4739d 100644 --- a/lamini/api/utils/reservations.py +++ b/lamini/api/utils/reservations.py @@ -10,25 +10,15 @@ logger = logging.getLogger(__name__) -reservation_api = None - - -def create_reservation_api(api_key, api_url, config): - global reservation_api - if reservation_api is None: - reservation_api = Reservations(api_key, api_url, config) - return reservation_api - - -def get_reservation_api(): - global reservation_api - assert reservation_api is not None - return reservation_api - class Reservations: - def __init__(self, api_key: str = None, api_url: str = None, config={}): - self.config = get_config(config) + def __init__( + self, + api_key: str = None, + api_url: str = None, + variable_capacity=False, + ): + self.config = get_config() self.api_key = api_key or lamini.api_key or get_configured_key(self.config) self.api_url = api_url or lamini.api_url or get_configured_url(self.config) self.api_prefix = self.api_url + "/v1/reservation" @@ -41,13 +31,10 @@ def __init__(self, api_key: str = None, api_url: str = None, config={}): self.polling_task = None self.poll_for_reservation = asyncio.Event() self.is_polling = False + self.variable_capacity = variable_capacity def initialize_reservation( - self, - capacity: int, - model_name: Optional[str] = None, - batch_size: Optional[int] = None, - max_tokens: Optional[int] = None, + self, capacity: int, model_name: str, batch_size: int, max_tokens: Optional[int] ): try: logger.info( @@ -65,8 +52,6 @@ def initialize_reservation( }, ) logger.info("Made initial reservation " + str(reservation)) - if "dynamic_max_batch_size" not in reservation: - reservation["dynamic_max_batch_size"] = batch_size self.current_reservation = reservation self.capacity_needed = capacity self.model_name = model_name @@ -75,8 +60,11 @@ def initialize_reservation( self.dynamic_max_batch_size = min( reservation["dynamic_max_batch_size"], reservation["capacity_remaining"] ) + if self.variable_capacity: + self.capacity_needed = self.dynamic_max_batch_size * lamini.max_workers self.is_working = True self.batch_size = batch_size + except Exception as e: logger.warning(f"Error making reservation, continuing without one. {e}") self.current_reservation = None @@ -110,15 +98,15 @@ async def wait_and_poll_for_reservation(self, client): "capacity": max(self.capacity_needed, self.batch_size), "model_name": self.model_name, "max_tokens": self.max_tokens, - "batch_size": self.batch_size, + "batch_size": self.get_dynamic_max_batch_size(), }, ) logger.info("Made reservation " + str(reservation)) - if "dynamic_max_batch_size" not in reservation: - reservation["dynamic_max_batch_size"] = lamini.batch_size self.current_reservation = reservation self.capacity_remaining = reservation["capacity_remaining"] self.dynamic_max_batch_size = reservation["dynamic_max_batch_size"] + if self.variable_capacity: + self.capacity_needed = self.dynamic_max_batch_size * lamini.max_workers async with self.condition: self.condition.notify(len(self.condition._waiters)) self.is_polling = False @@ -126,16 +114,36 @@ async def wait_and_poll_for_reservation(self, client): self.polling_task = asyncio.create_task( self.kickoff_reservation_polling(client) ) - _ = asyncio.create_task(self.timer_based_polling(reservation["end_time"])) + logger.info("Made reservation " + str(reservation)) + if "dynamic_max_batch_size" not in reservation: + reservation["dynamic_max_batch_size"] = lamini.batch_size + self.current_reservation = reservation + self.capacity_remaining = reservation["capacity_remaining"] + self.dynamic_max_batch_size = reservation["dynamic_max_batch_size"] + if self.variable_capacity: + self.capacity_needed = self.dynamic_max_batch_size * lamini.max_workers + async with self.condition: + self.condition.notify(len(self.condition._waiters)) + self.is_polling = False + if self.is_working: + self.polling_task = asyncio.create_task( + self.kickoff_reservation_polling(client) + ) + _ = asyncio.create_task( + self.timer_based_polling(reservation["end_time"]) + ) async def timer_based_polling(self, wakeup_time): - current_time = datetime.datetime.utcnow() - end_time = datetime.datetime.fromisoformat(wakeup_time) - sleep_time = end_time - current_time - if sleep_time.total_seconds() > 0: - logger.debug("timer_based_polling sleep time: " + str(sleep_time)) - await asyncio.sleep(sleep_time.total_seconds()) - self.poll_for_reservation.set() + try: + current_time = datetime.datetime.utcnow() + end_time = datetime.datetime.fromisoformat(wakeup_time) + sleep_time = end_time - current_time + if sleep_time.total_seconds() > 0: + logger.debug("timer_based_polling sleep time: " + str(sleep_time)) + await asyncio.sleep(sleep_time.total_seconds()) + self.poll_for_reservation.set() + except asyncio.CancelledError: + logger.debug("Task was cancelled") async def kickoff_reservation_polling(self, client): if self.current_reservation is None: @@ -169,9 +177,12 @@ def update_capacity_needed(self, queries: int): return self.capacity_needed -= queries + def get_dynamic_max_batch_size(self): + if lamini.static_batching: + return self.batch_size + + return self.dynamic_max_batch_size + def __del__(self): if self.polling_task is not None: self.polling_task.cancel() - - def get_dynamic_max_batch_size(self): - return self.dynamic_max_batch_size diff --git a/lamini/classify/lamini_classifier.py b/lamini/classify/lamini_classifier.py index 5b1be96..159a414 100644 --- a/lamini/classify/lamini_classifier.py +++ b/lamini/classify/lamini_classifier.py @@ -1,3 +1,4 @@ +import asyncio import logging import os import pickle @@ -13,8 +14,6 @@ logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.DEBUG) -logging.getLogger().setLevel(logging.DEBUG) class LaminiClassifier: """A zero shot classifier that uses the Lamini LlamaV2Runner to generate @@ -24,17 +23,15 @@ class LaminiClassifier: def __init__( self, - config: dict = {}, - model_name: str = "meta-llama/Meta-Llama-3-8B-Instruct", + model_name: str = "meta-llama/Meta-Llama-3.1-8B-Instruct", augmented_example_count: int = 10, batch_size: int = 10, threads: int = 1, - saved_examples_path: str = "/tmp/saved_examples.jsonl", + saved_examples_path: str = "saved_examples.jsonl", generator_from_prompt=None, example_modifier=None, example_expander=None, ): - self.config = config self.model_name = model_name self.augmented_example_count = augmented_example_count self.batch_size = batch_size @@ -65,7 +62,6 @@ def prompt_train(self, prompts: dict): First, augment the examples for each class using the prompts. """ - for class_name, prompt in prompts.items(): try: logger.info( @@ -74,9 +70,8 @@ def prompt_train(self, prompts: dict): self.add_class(class_name) result = self.generate_examples_from_prompt( - class_name, - prompt, - self.examples.get(class_name, [])) + class_name, prompt, self.examples.get(class_name, []) + ) self.examples[class_name] = result @@ -86,7 +81,6 @@ def prompt_train(self, prompts: dict): except Exception as e: logger.error(f"Failed to generate examples for class {class_name}") logger.error(e) - logger.error(generated_examples.exception()) logger.error( "Consider rerunning the generation task if the error is transient, e.g. 500" ) @@ -138,7 +132,7 @@ def get_embeddings(self, examples): if isinstance(examples, str): examples = [examples] - embed = Embedding(self.config) + embed = Embedding() embeddings = embed.generate(examples) return [embedding[0] for embedding in embeddings] @@ -243,18 +237,14 @@ def load(filename): def create_new_example_generator(self, prompt, original_examples): example_generator = self.generator_from_prompt( prompt, - config=self.config, model_name=self.model_name, batch_size=self.batch_size // 5, ) example_modifier = self.example_modifier( - config=self.config, model_name=self.model_name, batch_size=self.batch_size // 5, ) - example_expander = self.example_expander( - prompt, config=self.config, model_name=self.model_name - ) + example_expander = self.example_expander(prompt, model_name=self.model_name) examples = original_examples.copy() @@ -379,12 +369,10 @@ class DefaultExampleGenerator: def __init__( self, prompt, - config=None, - model_name="meta-llama/Meta-Llama-3-8B-Instruct", + model_name="meta-llama/Meta-Llama-3.1-8B-Instruct", batch_size=10, ): self.prompt = prompt - self.config = config self.example_count = 5 self.model_name = model_name self.batch_size = batch_size @@ -392,11 +380,9 @@ def __init__( self.max_history = 2 def generate_examples(self, seed, examples): - prompt_batch = self.get_prompts( - seed=seed, examples=examples - ) + prompt_batch = self.get_prompts(seed=seed, examples=examples) - runner = Lamini(config=self.config, model_name=self.model_name) + runner = Lamini(model_name=self.model_name) results = runner.generate( prompt=prompt_batch, @@ -444,19 +430,18 @@ def get_prompts(self, seed, examples): def parse_result(self, result): return [ - result["example_1"], - result["example_2"], - result["example_3"], - ] + result["example_1"], + result["example_2"], + result["example_3"], + ] + class DefaultExampleModifier: def __init__( self, - config=None, - model_name="meta-llama/Meta-Llama-3-8B-Instruct", + model_name="meta-llama/Meta-Llama-3.1-8B-Instruct", batch_size=10, ): - self.config = config self.model_name = model_name self.required_examples = 5 self.batch_size = batch_size @@ -464,7 +449,7 @@ def __init__( def modify_examples(self, examples): prompts = self.get_prompts(examples) - runner = Lamini(config=self.config, model_name=self.model_name) + runner = Lamini(model_name=self.model_name) results = runner.generate( prompt=prompts, @@ -526,15 +511,12 @@ def parse_result(self, results): class DefaultExampleExpander: - def __init__( - self, prompt, config=None, model_name="meta-llama/Meta-Llama-3-8B-Instruct" - ): + def __init__(self, prompt, model_name="meta-llama/Meta-Llama-3.1-8B-Instruct"): self.prompt = prompt - self.config = config self.model_name = model_name def expand_example(self, example_batch): - runner = Lamini(config=self.config, model_name=self.model_name) + runner = Lamini(model_name=self.model_name) prompts = self.get_prompts(example_batch) @@ -572,4 +554,4 @@ def get_prompts(self, example_batch): prompts.append(prompt) - return prompts \ No newline at end of file + return prompts diff --git a/lamini/evaluators/benchmark.py b/lamini/evaluators/benchmark.py index d2b268e..ea19795 100644 --- a/lamini/evaluators/benchmark.py +++ b/lamini/evaluators/benchmark.py @@ -22,7 +22,6 @@ def __init__( model_name=self.model_name, api_key=api_key, api_url=api_url, - config=config, ) def _get_task_names(self, tasks): diff --git a/lamini/evaluators/custom/earnings_call_evaluator.py b/lamini/evaluators/custom/earnings_call_evaluator.py index 36f5b8f..b982eaf 100644 --- a/lamini/evaluators/custom/earnings_call_evaluator.py +++ b/lamini/evaluators/custom/earnings_call_evaluator.py @@ -57,8 +57,8 @@ async def evaluate_hallucination(self): class EarningsPipeline(GenerationPipeline): def __init__( self, - answer_model="mistralai/Mistral-7B-Instruct-v0.2", - score_model="mistralai/Mistral-7B-Instruct-v0.2", + answer_model="mistralai/Mistral-7B-Instruct-v0.3", + score_model="mistralai/Mistral-7B-Instruct-v0.3", ): super(EarningsPipeline, self).__init__() @@ -72,7 +72,7 @@ def forward(self, x): class EarningsAnswerGenerator(GenerationNode): - def __init__(self, model_name="mistralai/Mistral-7B-Instruct-v0.2"): + def __init__(self, model_name="mistralai/Mistral-7B-Instruct-v0.3"): super(EarningsAnswerGenerator, self).__init__(model_name) def generate( @@ -128,7 +128,7 @@ def make_prompt(self, chunk): class EarningsScoreGenerator(GenerationNode): - def __init__(self, model_name="mistralai/Mistral-7B-Instruct-v0.2"): + def __init__(self, model_name="mistralai/Mistral-7B-Instruct-v0.3"): super(EarningsScoreGenerator, self).__init__(model_name=model_name) def generate( diff --git a/lamini/evaluators/custom/ecommerce_evaluator.py b/lamini/evaluators/custom/ecommerce_evaluator.py index bfaf616..35a8f38 100644 --- a/lamini/evaluators/custom/ecommerce_evaluator.py +++ b/lamini/evaluators/custom/ecommerce_evaluator.py @@ -56,8 +56,8 @@ async def evaluate_hallucination(self): class AnswerScorePipeline(GenerationPipeline): def __init__( self, - answer_model="mistralai/Mistral-7B-Instruct-v0.2", - score_model="mistralai/Mistral-7B-Instruct-v0.2", + answer_model="mistralai/Mistral-7B-Instruct-v0.3", + score_model="mistralai/Mistral-7B-Instruct-v0.3", ): super(AnswerScorePipeline, self).__init__() @@ -71,7 +71,7 @@ def forward(self, x): class AnswerGenerator(GenerationNode): - def __init__(self, model_name="mistralai/Mistral-7B-Instruct-v0.2"): + def __init__(self, model_name="mistralai/Mistral-7B-Instruct-v0.3"): super(AnswerGenerator, self).__init__(model_name, max_new_tokens=150) def generate( @@ -123,7 +123,7 @@ def make_prompt(self, chunk): class ScoreGenerator(GenerationNode): - def __init__(self, model_name="mistralai/Mistral-7B-Instruct-v0.2"): + def __init__(self, model_name="mistralai/Mistral-7B-Instruct-v0.3"): super(ScoreGenerator, self).__init__(model_name=model_name, max_new_tokens=150) def generate( diff --git a/lamini/evaluators/custom/icd_evaluator.py b/lamini/evaluators/custom/icd_evaluator.py index 624f68f..b27c8dd 100644 --- a/lamini/evaluators/custom/icd_evaluator.py +++ b/lamini/evaluators/custom/icd_evaluator.py @@ -55,8 +55,8 @@ async def evaluate_hallucination(self): class ICDPipeline(GenerationPipeline): def __init__( self, - answer_model="mistralai/Mistral-7B-Instruct-v0.2", - score_model="mistralai/Mistral-7B-Instruct-v0.2", + answer_model="mistralai/Mistral-7B-Instruct-v0.3", + score_model="mistralai/Mistral-7B-Instruct-v0.3", ): super(ICDPipeline, self).__init__() @@ -70,7 +70,7 @@ def forward(self, x): class ICDAnswerGenerator(GenerationNode): - def __init__(self, model_name="mistralai/Mistral-7B-Instruct-v0.2"): + def __init__(self, model_name="mistralai/Mistral-7B-Instruct-v0.3"): super(ICDAnswerGenerator, self).__init__(model_name, max_new_tokens=150) def generate( @@ -118,7 +118,7 @@ def make_prompt(self, chunk): class ICDScoreGenerator(GenerationNode): - def __init__(self, model_name="mistralai/Mistral-7B-Instruct-v0.2"): + def __init__(self, model_name="mistralai/Mistral-7B-Instruct-v0.3"): super(ICDScoreGenerator, self).__init__( model_name=model_name, max_new_tokens=150 ) diff --git a/lamini/generation/base_generation_queue.py b/lamini/generation/base_generation_queue.py index 136cc51..f09f928 100644 --- a/lamini/generation/base_generation_queue.py +++ b/lamini/generation/base_generation_queue.py @@ -1,39 +1,80 @@ -import json import logging -import os +from typing import Any, Dict, Optional import aiohttp import lamini from lamini.api.lamini_config import get_config, get_configured_key, get_configured_url -from lamini.api.utils.reservations import create_reservation_api +from lamini.api.utils.reservations import Reservations logger = logging.getLogger(__name__) class BaseGenerationQueue: - def __init__(self, api_key, api_url, config): - self.config = get_config(config) + def __init__( + self, + api_key: Optional[str], + api_url: Optional[str], + variable_capacity: Optional[bool] = False, + ): + self.config = get_config() self.api_key = api_key or lamini.api_key or get_configured_key(self.config) self.api_url = api_url or lamini.api_url or get_configured_url(self.config) self.api_prefix = self.api_url + "/v1/" - self.reservation_api = None self.reservation_polling_task = None - # TODO: dedup code with base_async_inference_queue once stable self.connector = aiohttp.TCPConnector(limit=self.get_max_workers()) self.client = aiohttp.ClientSession(connector=self.connector) - self.reservation_api = create_reservation_api( - self.api_key, self.api_url, self.config + self.reservation_api = Reservations( + self.api_key, self.api_url, variable_capacity ) def get_max_workers(self): - return lamini.max_workers + """Return the Lamini API max number of workers + + Parameters + ---------- + None + + Returns + ------- + int + lamini.max_workers + """ + return int(lamini.max_workers) def get_batch_size(self): + """Return the Lamini API batch size + + Parameters + ---------- + None + + Returns + ------- + int + lamini.batch_size + """ return int(lamini.batch_size) def get_retry_limit(self): return int(lamini.retry_limit) + def get_dynamic_max_batch_size(self): + if lamini.static_batching: + return self.get_batch_size() + + return self.reservation_api.dynamic_max_batch_size + def __del__(self): + """Handle cancelling reservation_polling_task if one is present + upon deletion of this object. + + Parameters + ---------- + None + + Returns + ------- + None + """ if self.reservation_polling_task is not None: self.reservation_polling_task.cancel() diff --git a/lamini/generation/base_prompt_object.py b/lamini/generation/base_prompt_object.py index 30c4d67..ff75126 100644 --- a/lamini/generation/base_prompt_object.py +++ b/lamini/generation/base_prompt_object.py @@ -4,10 +4,10 @@ class PromptObject: def __init__(self, prompt: str, response: str = None, data: dict = {}) -> None: assert isinstance(prompt, str) - assert isinstance(data, dict) + # assert isinstance(data, dict) self.prompt = prompt self.response = response - self.error = None + self.error = [] self.data = data # Records the input prompt to the first node of the pipeline. self.orig_prompt: PromptObject = None diff --git a/lamini/generation/embedding_node.py b/lamini/generation/embedding_node.py index 2ca6878..4c8ed8b 100644 --- a/lamini/generation/embedding_node.py +++ b/lamini/generation/embedding_node.py @@ -10,7 +10,7 @@ class EmbeddingNode(GenerationNode): - """ + """ This child class of GenerationNode is for use of specific calls for an embedding generated response. The main change is a reduction in the needed parameters for an Embedding response which is seen through @@ -22,16 +22,15 @@ class EmbeddingNode(GenerationNode): - output_type - max_tokens - max_new_tokens - - model_config - "type" is the main difference with a hard coded "embedding" - + "type" is the main difference with a hard coded "embedding" + Parameters ---------- model_name: Optional[str] Model name as referred to on HuggingFace https://huggingface.co/models - + api_key: Optional[str] - Lamini platform API key, if not provided the key stored + Lamini platform API key, if not provided the key stored within ~.lamini/configure.yaml will be used. If either don't exist then an error is raised. @@ -41,7 +40,7 @@ class EmbeddingNode(GenerationNode): i.e. localhost, staging.lamini.ai, or api.lamini.ai Additionally, LLAMA_ENVIRONMENT can be set as an environment variable that will be grabbed for the url before any of the above defaults - + config: dict Dictionary that is handled from the following script: https://github.com/lamini-ai/lamini-platform/blob/main/sdk/lamini/api/lamini_config.py @@ -52,15 +51,15 @@ class EmbeddingNode(GenerationNode): url: production: url: - + local: key: staging: key: production: key: - = (3, 10): - logger.info("Using 3.10 InferenceQueue Interface") - from lamini.generation.generation_queue_3_10 import ( - get_global_inference_queue as get_global_inference_queue_3_10, - ) - - self.async_inference_queue = get_global_inference_queue_3_10( - api_key, api_url, config=config - ) - else: - raise Exception("Must use Python 3.10 or greater for this feature") - - self.model_config = self.config.get("model_config", None) self.max_tokens = max_tokens self.max_new_tokens = max_new_tokens self.failed_prompts = [] + self.async_inference_queue = None def __call__(self, prompt, *args, **kwargs): prompt = self.transform_prompt(prompt) @@ -60,6 +44,7 @@ def generate( max_tokens=self.max_tokens, max_new_tokens=self.max_new_tokens, ) + assert self.async_inference_queue is not None return self.async_inference_queue.submit(req_data, self.token_optimizer) def make_llm_req_map( @@ -77,8 +62,6 @@ def make_llm_req_map( req_data["max_tokens"] = max_tokens if max_new_tokens is not None: req_data["max_new_tokens"] = max_new_tokens - if self.model_config: - req_data["model_config"] = self.model_config.as_dict() req_data["type"] = "completion" return req_data diff --git a/lamini/generation/generation_pipeline.py b/lamini/generation/generation_pipeline.py index 2fbf90b..2fbbdfd 100644 --- a/lamini/generation/generation_pipeline.py +++ b/lamini/generation/generation_pipeline.py @@ -4,9 +4,7 @@ from typing import AsyncIterator, Iterator, Optional import lamini -from lamini.api.utils.reservations import create_reservation_api from lamini.generation.base_node_object import BaseGenerationNode -from lamini.generation.token_optimizer import TokenOptimizer logger = logging.getLogger(__name__) @@ -16,25 +14,9 @@ def __init__( self, api_key: Optional[str] = None, api_url: Optional[str] = None, - config: dict = {}, ): self.api_key = api_key self.api_url = api_url - self.config = config - if sys.version_info >= (3, 10): - logger.info("Using 3.10 InferenceQueue Interface") - from lamini.generation.generation_queue_3_10 import ( - get_global_inference_queue as get_global_inference_queue_3_10, - ) - - self.async_inference_queue = get_global_inference_queue_3_10( - api_key, api_url, config=config - ) - else: - raise Exception("Must use Python 3.10 or greater for this feature") - self.reservation_api = create_reservation_api( - self.api_key, self.api_url, self.config - ) def forward(self, prompt: AsyncIterator) -> AsyncIterator: """NOTE: You must implement this function. @@ -54,28 +36,40 @@ async def __call( self, prompt: AsyncIterator, ) -> AsyncIterator: - assert isinstance(prompt, Iterator) or isinstance(prompt, AsyncIterator) - iterator = self.forward(prompt) - assert isinstance(iterator, AsyncIterator) + if sys.version_info >= (3, 10): + logger.info("Using 3.10 InferenceQueue Interface") + from lamini.generation.generation_queue_3_10 import ( + get_global_inference_queue as get_global_inference_queue_3_10, + ) + self.async_inference_queue = get_global_inference_queue_3_10( + self.api_key, + self.api_url, + ) + self.reservation_api = self.async_inference_queue.reservation_api + else: + raise Exception("Must use Python 3.10 or greater for this feature") model_names = [] max_tokens = [] for _, val in vars(self).items(): if isinstance(val, BaseGenerationNode): + val.async_inference_queue = self.async_inference_queue try: model_names.append(val.model_name) max_tokens.append(val.max_tokens) except: continue assert len(model_names) > 0 + assert isinstance(prompt, Iterator) or isinstance(prompt, AsyncIterator) + iterator = self.forward(prompt) + assert isinstance(iterator, AsyncIterator) - capacity = lamini.max_workers * lamini.batch_size if not any(max_tokens): max_tokens = None else: max_tokens = max(max_tokens) self.reservation_api.initialize_reservation( - capacity, + capacity=lamini.batch_size * lamini.max_workers, model_name=model_names[0], batch_size=lamini.batch_size, max_tokens=max_tokens, diff --git a/lamini/generation/generation_queue_3_10.py b/lamini/generation/generation_queue_3_10.py index a8c91f8..0962c96 100644 --- a/lamini/generation/generation_queue_3_10.py +++ b/lamini/generation/generation_queue_3_10.py @@ -1,9 +1,7 @@ import asyncio import functools import logging -from typing import AsyncIterator, Iterator, Optional, TypeVar, Union, Tuple, Any - -import lamini +from typing import Any, AsyncIterator, Iterator, Optional, Tuple, TypeVar, Union from lamini.generation.base_generation_queue import BaseGenerationQueue from lamini.generation.process_generation_batch import process_generation_batch @@ -16,10 +14,10 @@ global_inference_queue = None -def get_global_inference_queue(api_key, api_url, config): +def get_global_inference_queue(api_key, api_url): global global_inference_queue if global_inference_queue is None or global_inference_queue.client.closed: - global_inference_queue = GenerationQueue(api_key, api_url, config=config) + global_inference_queue = GenerationQueue(api_key, api_url) return global_inference_queue @@ -49,6 +47,9 @@ def append(self, item): class GenerationQueue(BaseGenerationQueue): + def __init__(self, *args, **kwargs): + super().__init__(variable_capacity=True, *args, **kwargs) + async def submit( self, request: dict, @@ -67,11 +68,8 @@ async def submit( async for result in async_iterator: if isinstance(result[1], Exception): - if ( - result[0]["batch"]["prompt"][0].error is not None - and len(result[0]["batch"]["prompt"][0].error) - < self.get_retry_limit() - ): + logger.debug(f"exception: {result[1]}") + if len(result[0]["batch"]["prompt"][0].error) < self.get_retry_limit(): logger.debug( f"Retrying up to {self.get_retry_limit()}, prompt: {result[0]}" ) @@ -100,7 +98,7 @@ async def form_batches( api_prefix, token_optimizer: Optional[TokenOptimizer], ): - batch_size_func = self.reservation_api.get_dynamic_max_batch_size + batch_size_func = self.get_dynamic_max_batch_size async for prompt in next_n_w_step_func(request["prompt"], batch_size_func): batch = request.copy() batch["prompt"] = prompt @@ -116,6 +114,7 @@ async def form_batches( "key": key, "batch": batch, "client": client, + "reservation_api": self.reservation_api, } @@ -151,6 +150,7 @@ async def limit_concurrency(aws, limit): aws_ended = False pending = set() + # TODO: there is a bug here, see https://github.com/lamini-ai/lamini-platform/issues/2899 while pending or not aws_ended: while len(pending) < limit and not aws_ended: try: @@ -230,7 +230,7 @@ async def async_chunks( while not finished: results: list[T] = [] size = size_fn() - + assert size != 0 for _ in range(size): try: result = None diff --git a/lamini/generation/process_generation_batch.py b/lamini/generation/process_generation_batch.py index 060be52..af783fe 100644 --- a/lamini/generation/process_generation_batch.py +++ b/lamini/generation/process_generation_batch.py @@ -1,9 +1,6 @@ -import json import logging -import traceback from lamini.api.rest_requests import make_async_web_request -from lamini.api.utils.reservations import get_reservation_api logger = logging.getLogger(__name__) @@ -12,9 +9,10 @@ async def process_generation_batch(args: dict): client = args["client"] key = args["key"] batch = args["batch"] + reservation_api = args["reservation_api"] + url = get_url_from_args(args) # this will block until there is space in capacity - reservation_api = get_reservation_api() await reservation_api.async_pause_for_reservation_start() def can_submit_query(): @@ -25,28 +23,26 @@ def can_submit_query(): # Now we can consume credits and send batch reservation_api.update_capacity_use(len(batch["prompt"])) logger.debug( - f"yes reservation_api.capacity_remaining {reservation_api.capacity_remaining}" + f"reservation_api.capacity_remaining {reservation_api.capacity_remaining}" ) return True if not can_submit_query(): async with reservation_api.condition: await reservation_api.condition.wait_for(can_submit_query) - # Separate thread updates existing reservations try: reservation_id = None if reservation_api.current_reservation is not None: reservation_id = reservation_api.current_reservation["reservation_id"] json = get_body_from_args(batch, reservation_id) + logger.debug(f"Sending batch with {len(batch['prompt'])}") result = await query_api(client, key, url, json, batch) except Exception as e: logger.debug( f"Error in process_generation_batch, type: {type(e)}, message: {e}" ) for prompt_obj in batch["prompt"]: - if prompt_obj.error is None: - prompt_obj.error = [] prompt_obj.error.append(e) raise e finally: @@ -97,6 +93,5 @@ def get_body_from_args(batch: dict, reservation_id: str) -> dict: "output_type": batch["output_type"], "max_tokens": batch["max_tokens"], "max_new_tokens": batch.get("max_new_tokens", None), - "model_config": batch.get("model_config", None), } return json diff --git a/lamini/index/lamini_index.py b/lamini/index/lamini_index.py index 0066c60..3548ff5 100644 --- a/lamini/index/lamini_index.py +++ b/lamini/index/lamini_index.py @@ -12,9 +12,8 @@ class LaminiIndex: - def __init__(self, config={}): - self.config = config - self.embedding_api = Embedding(self.config) + def __init__(self): + self.embedding_api = Embedding() self.index = None self.splits = [] diff --git a/lamini/runners/base_runner.py b/lamini/runners/base_runner.py index c64f0c8..1feed2e 100644 --- a/lamini/runners/base_runner.py +++ b/lamini/runners/base_runner.py @@ -22,7 +22,6 @@ def __init__( model_name=model_name, api_key=api_key, api_url=api_url, - config=self.config, local_cache_file=local_cache_file, ) self.prompt_template = prompt_template diff --git a/lamini/runners/llama_v2_runner.py b/lamini/runners/llama_v2_runner.py index e514c02..6b5cb8a 100644 --- a/lamini/runners/llama_v2_runner.py +++ b/lamini/runners/llama_v2_runner.py @@ -14,7 +14,7 @@ class LlamaV2Runner(BaseRunner): def __init__( self, - model_name: str = "meta-llama/Meta-Llama-3-8B-Instruct", + model_name: str = "meta-llama/Meta-Llama-3.1-8B-Instruct", system_prompt: str = None, prompt_template="[INST] <>\n{system}\n<>\n\n{user} [/INST]", api_key=None, diff --git a/lamini/runners/llama_v3_runner.py b/lamini/runners/llama_v3_runner.py index a7e40c4..93392f0 100644 --- a/lamini/runners/llama_v3_runner.py +++ b/lamini/runners/llama_v3_runner.py @@ -14,7 +14,7 @@ class LlamaV3Runner(BaseRunner): def __init__( self, - model_name: str = "meta-llama/Meta-Llama-3-8B-Instruct", + model_name: str = "meta-llama/Meta-Llama-3.1-8B-Instruct", system_prompt: str = None, prompt_template="<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n\n{system}<|eot_id|><|start_header_id|>user<|end_header_id|>\n\n{user}<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n", api_key=None, diff --git a/lamini/runners/mistral_runner.py b/lamini/runners/mistral_runner.py index b1aab9b..6eb0f46 100644 --- a/lamini/runners/mistral_runner.py +++ b/lamini/runners/mistral_runner.py @@ -9,7 +9,7 @@ class MistralRunner(BaseRunner): def __init__( self, - model_name="mistralai/Mistral-7B-Instruct-v0.2", + model_name="mistralai/Mistral-7B-Instruct-v0.3", system_prompt: str = None, prompt_template="[INST] {system} {user} [/INST]", api_key=None, diff --git a/pyproject.toml b/pyproject.toml index 027d449..7e7747f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "lamini" -version = "2.5.7" +version = "3.0.0" authors = [ { name="PowerML", email="info@powerml.co" }, ] @@ -24,11 +24,14 @@ dependencies = [ "jsonlines", "pandas", "azure-storage-blob", - "scikit-learn", + "scikit-learn", "aiohttp", - "faiss-cpu", ] +[project.optional-dependencies] +index = ["faiss-cpu"] +classifier = ["scikit-learn"] + [tool.setuptools] packages = [ "lamini",