From e55b341b754e5e5e8586f965a4a70124acb88707 Mon Sep 17 00:00:00 2001 From: Ryuta Yoshimatsu Date: Fri, 17 Jan 2025 11:38:28 +0100 Subject: [PATCH] update neural forecast pipeline for serving --- mmf_sa/Forecaster.py | 36 +++------------ .../models/chronosforecast/ChronosPipeline.py | 8 ++-- .../models/moiraiforecast/MoiraiPipeline.py | 13 +++--- .../neuralforecast/NeuralForecastPipeline.py | 44 +++++++++++++++++++ .../models/timesfmforecast/TimesFMPipeline.py | 28 +++++------- 5 files changed, 74 insertions(+), 55 deletions(-) diff --git a/mmf_sa/Forecaster.py b/mmf_sa/Forecaster.py index 62260cb..efc1671 100644 --- a/mmf_sa/Forecaster.py +++ b/mmf_sa/Forecaster.py @@ -11,8 +11,6 @@ import cloudpickle import mlflow from mlflow.tracking import MlflowClient -from mlflow.models import ModelSignature, infer_signature -from mlflow.types.schema import Schema, ColSpec from omegaconf import OmegaConf from omegaconf.basecontainer import BaseContainer from pyspark.sql import SparkSession, DataFrame @@ -313,37 +311,17 @@ def evaluate_global_model(self, model_conf): train_df, val_df = self.split_df_train_val(hist_df) # First, we train the model on the entire history (train_df, val_df). - # Then we register this model as our final model in Unity Catalog. + # Then, we register this model as our final model in Unity Catalog. final_model = self.model_registry.get_model(model_name) final_model.fit(pd.concat([train_df, val_df])) input_example = train_df[train_df[self.conf['group_id']] == train_df[self.conf['group_id']] \ - .unique()[0]].sort_values(by=[self.conf['date_col']]) - - # Prepare model signature for model registry - input_schema = infer_signature(model_input=input_example).inputs - output_schema = Schema( - [ - ColSpec("integer", "index"), - ColSpec("string", self.conf['group_id']), - ColSpec("datetime", self.conf['date_col']), - ColSpec("float", self.conf['target']), - ] - ) - signature = ModelSignature(inputs=input_schema, outputs=output_schema) - - # Register the model - model_info = mlflow.sklearn.log_model( - final_model, - "model", + .unique()[0]].sort_values(by=[self.conf['date_col']]) + model_info = final_model.register( + model=final_model, registered_model_name=f"{self.conf['model_output']}.{model_conf['name']}_{self.conf['use_case_name']}", input_example=input_example, - signature=signature, - pip_requirements=[], + run_id=self.run_id, ) - mlflow.log_params(final_model.get_params()) - mlflow.set_tag("run_id", self.run_id) - mlflow.set_tag("model_name", final_model.params["name"]) - print(f"Model registered: {self.conf['model_output']}.{model_conf['name']}_{self.conf['use_case_name']}") # Next, we train the model only with train_df and run detailed backtesting model = self.model_registry.get_model(model_name) @@ -565,7 +543,7 @@ def score_global_model(self, model_conf): print(f"Running scoring for {model_conf['name']}...") model, model_uri = self.get_model_for_scoring(model_conf) score_df, removed = self.prepare_data_for_global_model("scoring") - prediction_df, model_fitted = model.forecast(score_df) + prediction_df = model.predict(score_df) if prediction_df[self.conf["date_col"]].dtype.type != np.datetime64: prediction_df[self.conf["date_col"]] = prediction_df[ self.conf["date_col"] @@ -630,7 +608,7 @@ def get_model_for_scoring(self, model_conf): model_version = model_info.version model_uri = f"runs:/{model_info.run_id}/model" if model_conf.get("model_type", None) == "global": - model = mlflow.sklearn.load_model(f"models:/{registered_name}/{model_version}") + model = mlflow.pyfunc.load_model(f"models:/{registered_name}/{model_version}") return model, model_uri elif model_conf.get("model_type", None) == "foundation": return None, model_uri diff --git a/mmf_sa/models/chronosforecast/ChronosPipeline.py b/mmf_sa/models/chronosforecast/ChronosPipeline.py index 998ee65..f966280 100644 --- a/mmf_sa/models/chronosforecast/ChronosPipeline.py +++ b/mmf_sa/models/chronosforecast/ChronosPipeline.py @@ -248,15 +248,15 @@ def __init__(self, params): class ChronosModel(mlflow.pyfunc.PythonModel): - def __init__(self, repository, prediction_length): + def __init__(self, repo, prediction_length): import torch - self.repository = repository + self.repo = repo self.prediction_length = prediction_length self.device = "cuda" if torch.cuda.is_available() else "cpu" - # Initialize the ChronosPipeline with a pretrained model from the specified repository + # Initialize the BaseChronosPipeline with a pretrained model from the specified repository from chronos import BaseChronosPipeline self.pipeline = BaseChronosPipeline.from_pretrained( - self.repository, + self.repo, device_map='cuda', torch_dtype=torch.bfloat16, ) diff --git a/mmf_sa/models/moiraiforecast/MoiraiPipeline.py b/mmf_sa/models/moiraiforecast/MoiraiPipeline.py index 6896caf..3f0daf6 100644 --- a/mmf_sa/models/moiraiforecast/MoiraiPipeline.py +++ b/mmf_sa/models/moiraiforecast/MoiraiPipeline.py @@ -21,6 +21,7 @@ def __init__(self, params): self.params = params self.device = None self.model = None + self.repo = None def register(self, registered_model_name: str): pipeline = MoiraiModel( @@ -253,24 +254,24 @@ def __init__(self, params): self.repo = "Salesforce/moirai-moe-1.0-R-large" class MoiraiModel(mlflow.pyfunc.PythonModel): - def __init__(self, repository, prediction_length, patch_size, num_samples): + def __init__(self, repo, prediction_length, patch_size, num_samples): from uni2ts.model.moirai import MoiraiForecast, MoiraiModule from uni2ts.model.moirai_moe import MoiraiMoEForecast, MoiraiMoEModule - self.repository = repository + self.repo = repo self.prediction_length = prediction_length self.patch_size = patch_size self.num_samples = num_samples - if 'moe' in self.repository: - self.module = MoiraiMoEModule.from_pretrained(self.repository) + if 'moe' in self.repo: + self.module = MoiraiMoEModule.from_pretrained(self.repo) else: - self.module = MoiraiModule.from_pretrained(self.repository) + self.module = MoiraiModule.from_pretrained(self.repo) self.pipeline = None def predict(self, context, input_data, params=None): from einops import rearrange from uni2ts.model.moirai import MoiraiForecast, MoiraiModule from uni2ts.model.moirai_moe import MoiraiMoEForecast, MoiraiMoEModule - if 'moe' in self.repository: + if 'moe' in self.repo: self.pipeline = MoiraiMoEForecast( module=self.module, prediction_length=self.prediction_length, diff --git a/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py b/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py index de1f027..550e48d 100644 --- a/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py +++ b/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py @@ -1,5 +1,8 @@ import pandas as pd import numpy as np +import mlflow +from mlflow.models import ModelSignature, infer_signature +from mlflow.types.schema import Schema, ColSpec from sktime.performance_metrics.forecasting import ( MeanAbsoluteError, MeanSquaredError, @@ -30,6 +33,38 @@ def __init__(self, params): self.params = params self.model = None + def register(self, model, registered_model_name: str, input_example, run_id): + pipeline = NeuralForecastModel(model) + # Prepare model signature for model registry + input_schema = infer_signature(model_input=input_example).inputs + output_schema = Schema( + [ + ColSpec("integer", "index"), + ColSpec("string", self.params.group_id), + ColSpec("datetime", self.params.date_col), + ColSpec("float", self.params.target), + ] + ) + signature = ModelSignature(inputs=input_schema, outputs=output_schema) + model_info = mlflow.pyfunc.log_model( + "model", + python_model=pipeline, + registered_model_name=registered_model_name, + #input_example=input_example, + signature=signature, + pip_requirements=[ + "cloudpickle==2.2.1", + "neuralforecast==1.7.2", + "git+https://github.com/databricks-industry-solutions/many-model-forecasting.git", + "pyspark==3.5.0", + ], + ) + mlflow.log_params(model.get_params()) + mlflow.set_tag("run_id", run_id) + mlflow.set_tag("model_name", model.params["name"]) + print(f"Model registered: {registered_model_name}") + return model_info + def prepare_data(self, df: pd.DataFrame, future: bool = False) -> pd.DataFrame: if future: # Prepare future dataframe with exogenous regressors for forecasting @@ -680,3 +715,12 @@ def config(trial): ], freq=self.params["freq"] ) + + +class NeuralForecastModel(mlflow.pyfunc.PythonModel): + def __init__(self, pipeline): + self.pipeline = pipeline + + def predict(self, context, input_data, params=None): + forecast, model = self.pipeline.forecast(input_data) + return forecast diff --git a/mmf_sa/models/timesfmforecast/TimesFMPipeline.py b/mmf_sa/models/timesfmforecast/TimesFMPipeline.py index fe5dc9f..c48a692 100644 --- a/mmf_sa/models/timesfmforecast/TimesFMPipeline.py +++ b/mmf_sa/models/timesfmforecast/TimesFMPipeline.py @@ -18,9 +18,10 @@ def __init__(self, params): self.params = params self.device = None self.model = None + self.repo = None def register(self, registered_model_name: str): - pipeline = TimesFMModel(self.params) + pipeline = TimesFMModel(self.params, self.repo) input_schema = Schema([TensorSpec(np.dtype(np.double), (-1, -1))]) output_schema = Schema([TensorSpec(np.dtype(np.uint8), (-1, -1))]) signature = ModelSignature(inputs=input_schema, outputs=output_schema) @@ -142,11 +143,11 @@ def __init__(self, params): super().__init__(params) import timesfm self.params = params - self.backend = "gpu" if torch.cuda.is_available() else "cpu" + #self.backend = "gpu" if torch.cuda.is_available() else "cpu" self.repo = "google/timesfm-1.0-200m-pytorch" self.model = timesfm.TimesFm( hparams=timesfm.TimesFmHparams( - backend=self.backend, + backend="gpu", per_core_batch_size=32, horizon_len=self.params.prediction_length, ), @@ -160,11 +161,11 @@ def __init__(self, params): super().__init__(params) import timesfm self.params = params - self.backend = "gpu" if torch.cuda.is_available() else "cpu" + #self.backend = "gpu" if torch.cuda.is_available() else "cpu" self.repo = "google/timesfm-2.0-500m-pytorch" self.model = timesfm.TimesFm( hparams=timesfm.TimesFmHparams( - backend=self.backend, + backend="gpu", per_core_batch_size=32, horizon_len=self.params.prediction_length, num_layers=50, @@ -177,28 +178,23 @@ def __init__(self, params): ) class TimesFMModel(mlflow.pyfunc.PythonModel): - def __init__(self, params): - self.params = params - self.model = None # Initialize the model attribute to None - - def load_model(self): - # Initialize the TimesFm model with specified parameters + def __init__(self, params, repo): import timesfm + self.params = params + self.repo = repo + #self.backend = "gpu" if torch.cuda.is_available() else "cpu" self.model = timesfm.TimesFm( hparams=timesfm.TimesFmHparams( - backend=self.params.device, + backend="gpu", per_core_batch_size=32, horizon_len=self.params.prediction_length, ), checkpoint=timesfm.TimesFmCheckpoint( - huggingface_repo_id=self.params.repo + huggingface_repo_id=self.repo, ), ) def predict(self, context, input_df, params=None): - # Load the model if it hasn't been loaded yet - if self.model is None: - self.load_model() # Generate forecasts on the input DataFrame forecast_df = self.model.forecast_on_df( inputs=input_df, # Input DataFrame containing the time series data.