Skip to content

Commit

Permalink
update neural forecast pipeline for serving
Browse files Browse the repository at this point in the history
  • Loading branch information
ryuta-yoshimatsu committed Jan 17, 2025
1 parent 672cdc1 commit e55b341
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 55 deletions.
36 changes: 7 additions & 29 deletions mmf_sa/Forecaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import cloudpickle
import mlflow
from mlflow.tracking import MlflowClient
from mlflow.models import ModelSignature, infer_signature
from mlflow.types.schema import Schema, ColSpec
from omegaconf import OmegaConf
from omegaconf.basecontainer import BaseContainer
from pyspark.sql import SparkSession, DataFrame
Expand Down Expand Up @@ -313,37 +311,17 @@ def evaluate_global_model(self, model_conf):
train_df, val_df = self.split_df_train_val(hist_df)

# First, we train the model on the entire history (train_df, val_df).
# Then we register this model as our final model in Unity Catalog.
# Then, we register this model as our final model in Unity Catalog.
final_model = self.model_registry.get_model(model_name)
final_model.fit(pd.concat([train_df, val_df]))
input_example = train_df[train_df[self.conf['group_id']] == train_df[self.conf['group_id']] \
.unique()[0]].sort_values(by=[self.conf['date_col']])

# Prepare model signature for model registry
input_schema = infer_signature(model_input=input_example).inputs
output_schema = Schema(
[
ColSpec("integer", "index"),
ColSpec("string", self.conf['group_id']),
ColSpec("datetime", self.conf['date_col']),
ColSpec("float", self.conf['target']),
]
)
signature = ModelSignature(inputs=input_schema, outputs=output_schema)

# Register the model
model_info = mlflow.sklearn.log_model(
final_model,
"model",
.unique()[0]].sort_values(by=[self.conf['date_col']])
model_info = final_model.register(
model=final_model,
registered_model_name=f"{self.conf['model_output']}.{model_conf['name']}_{self.conf['use_case_name']}",
input_example=input_example,
signature=signature,
pip_requirements=[],
run_id=self.run_id,
)
mlflow.log_params(final_model.get_params())
mlflow.set_tag("run_id", self.run_id)
mlflow.set_tag("model_name", final_model.params["name"])
print(f"Model registered: {self.conf['model_output']}.{model_conf['name']}_{self.conf['use_case_name']}")

# Next, we train the model only with train_df and run detailed backtesting
model = self.model_registry.get_model(model_name)
Expand Down Expand Up @@ -565,7 +543,7 @@ def score_global_model(self, model_conf):
print(f"Running scoring for {model_conf['name']}...")
model, model_uri = self.get_model_for_scoring(model_conf)
score_df, removed = self.prepare_data_for_global_model("scoring")
prediction_df, model_fitted = model.forecast(score_df)
prediction_df = model.predict(score_df)
if prediction_df[self.conf["date_col"]].dtype.type != np.datetime64:
prediction_df[self.conf["date_col"]] = prediction_df[
self.conf["date_col"]
Expand Down Expand Up @@ -630,7 +608,7 @@ def get_model_for_scoring(self, model_conf):
model_version = model_info.version
model_uri = f"runs:/{model_info.run_id}/model"
if model_conf.get("model_type", None) == "global":
model = mlflow.sklearn.load_model(f"models:/{registered_name}/{model_version}")
model = mlflow.pyfunc.load_model(f"models:/{registered_name}/{model_version}")
return model, model_uri
elif model_conf.get("model_type", None) == "foundation":
return None, model_uri
Expand Down
8 changes: 4 additions & 4 deletions mmf_sa/models/chronosforecast/ChronosPipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,15 +248,15 @@ def __init__(self, params):


class ChronosModel(mlflow.pyfunc.PythonModel):
def __init__(self, repository, prediction_length):
def __init__(self, repo, prediction_length):
import torch
self.repository = repository
self.repo = repo
self.prediction_length = prediction_length
self.device = "cuda" if torch.cuda.is_available() else "cpu"
# Initialize the ChronosPipeline with a pretrained model from the specified repository
# Initialize the BaseChronosPipeline with a pretrained model from the specified repository
from chronos import BaseChronosPipeline
self.pipeline = BaseChronosPipeline.from_pretrained(
self.repository,
self.repo,
device_map='cuda',
torch_dtype=torch.bfloat16,
)
Expand Down
13 changes: 7 additions & 6 deletions mmf_sa/models/moiraiforecast/MoiraiPipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def __init__(self, params):
self.params = params
self.device = None
self.model = None
self.repo = None

def register(self, registered_model_name: str):
pipeline = MoiraiModel(
Expand Down Expand Up @@ -253,24 +254,24 @@ def __init__(self, params):
self.repo = "Salesforce/moirai-moe-1.0-R-large"

class MoiraiModel(mlflow.pyfunc.PythonModel):
def __init__(self, repository, prediction_length, patch_size, num_samples):
def __init__(self, repo, prediction_length, patch_size, num_samples):
from uni2ts.model.moirai import MoiraiForecast, MoiraiModule
from uni2ts.model.moirai_moe import MoiraiMoEForecast, MoiraiMoEModule
self.repository = repository
self.repo = repo
self.prediction_length = prediction_length
self.patch_size = patch_size
self.num_samples = num_samples
if 'moe' in self.repository:
self.module = MoiraiMoEModule.from_pretrained(self.repository)
if 'moe' in self.repo:
self.module = MoiraiMoEModule.from_pretrained(self.repo)
else:
self.module = MoiraiModule.from_pretrained(self.repository)
self.module = MoiraiModule.from_pretrained(self.repo)
self.pipeline = None

def predict(self, context, input_data, params=None):
from einops import rearrange
from uni2ts.model.moirai import MoiraiForecast, MoiraiModule
from uni2ts.model.moirai_moe import MoiraiMoEForecast, MoiraiMoEModule
if 'moe' in self.repository:
if 'moe' in self.repo:
self.pipeline = MoiraiMoEForecast(
module=self.module,
prediction_length=self.prediction_length,
Expand Down
44 changes: 44 additions & 0 deletions mmf_sa/models/neuralforecast/NeuralForecastPipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import pandas as pd
import numpy as np
import mlflow
from mlflow.models import ModelSignature, infer_signature
from mlflow.types.schema import Schema, ColSpec
from sktime.performance_metrics.forecasting import (
MeanAbsoluteError,
MeanSquaredError,
Expand Down Expand Up @@ -30,6 +33,38 @@ def __init__(self, params):
self.params = params
self.model = None

def register(self, model, registered_model_name: str, input_example, run_id):
pipeline = NeuralForecastModel(model)
# Prepare model signature for model registry
input_schema = infer_signature(model_input=input_example).inputs
output_schema = Schema(
[
ColSpec("integer", "index"),
ColSpec("string", self.params.group_id),
ColSpec("datetime", self.params.date_col),
ColSpec("float", self.params.target),
]
)
signature = ModelSignature(inputs=input_schema, outputs=output_schema)
model_info = mlflow.pyfunc.log_model(
"model",
python_model=pipeline,
registered_model_name=registered_model_name,
#input_example=input_example,
signature=signature,
pip_requirements=[
"cloudpickle==2.2.1",
"neuralforecast==1.7.2",
"git+https://github.com/databricks-industry-solutions/many-model-forecasting.git",
"pyspark==3.5.0",
],
)
mlflow.log_params(model.get_params())
mlflow.set_tag("run_id", run_id)
mlflow.set_tag("model_name", model.params["name"])
print(f"Model registered: {registered_model_name}")
return model_info

def prepare_data(self, df: pd.DataFrame, future: bool = False) -> pd.DataFrame:
if future:
# Prepare future dataframe with exogenous regressors for forecasting
Expand Down Expand Up @@ -680,3 +715,12 @@ def config(trial):
],
freq=self.params["freq"]
)


class NeuralForecastModel(mlflow.pyfunc.PythonModel):
def __init__(self, pipeline):
self.pipeline = pipeline

def predict(self, context, input_data, params=None):
forecast, model = self.pipeline.forecast(input_data)
return forecast
28 changes: 12 additions & 16 deletions mmf_sa/models/timesfmforecast/TimesFMPipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ def __init__(self, params):
self.params = params
self.device = None
self.model = None
self.repo = None

def register(self, registered_model_name: str):
pipeline = TimesFMModel(self.params)
pipeline = TimesFMModel(self.params, self.repo)
input_schema = Schema([TensorSpec(np.dtype(np.double), (-1, -1))])
output_schema = Schema([TensorSpec(np.dtype(np.uint8), (-1, -1))])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)
Expand Down Expand Up @@ -142,11 +143,11 @@ def __init__(self, params):
super().__init__(params)
import timesfm
self.params = params
self.backend = "gpu" if torch.cuda.is_available() else "cpu"
#self.backend = "gpu" if torch.cuda.is_available() else "cpu"
self.repo = "google/timesfm-1.0-200m-pytorch"
self.model = timesfm.TimesFm(
hparams=timesfm.TimesFmHparams(
backend=self.backend,
backend="gpu",
per_core_batch_size=32,
horizon_len=self.params.prediction_length,
),
Expand All @@ -160,11 +161,11 @@ def __init__(self, params):
super().__init__(params)
import timesfm
self.params = params
self.backend = "gpu" if torch.cuda.is_available() else "cpu"
#self.backend = "gpu" if torch.cuda.is_available() else "cpu"
self.repo = "google/timesfm-2.0-500m-pytorch"
self.model = timesfm.TimesFm(
hparams=timesfm.TimesFmHparams(
backend=self.backend,
backend="gpu",
per_core_batch_size=32,
horizon_len=self.params.prediction_length,
num_layers=50,
Expand All @@ -177,28 +178,23 @@ def __init__(self, params):
)

class TimesFMModel(mlflow.pyfunc.PythonModel):
def __init__(self, params):
self.params = params
self.model = None # Initialize the model attribute to None

def load_model(self):
# Initialize the TimesFm model with specified parameters
def __init__(self, params, repo):
import timesfm
self.params = params
self.repo = repo
#self.backend = "gpu" if torch.cuda.is_available() else "cpu"
self.model = timesfm.TimesFm(
hparams=timesfm.TimesFmHparams(
backend=self.params.device,
backend="gpu",
per_core_batch_size=32,
horizon_len=self.params.prediction_length,
),
checkpoint=timesfm.TimesFmCheckpoint(
huggingface_repo_id=self.params.repo
huggingface_repo_id=self.repo,
),
)

def predict(self, context, input_df, params=None):
# Load the model if it hasn't been loaded yet
if self.model is None:
self.load_model()
# Generate forecasts on the input DataFrame
forecast_df = self.model.forecast_on_df(
inputs=input_df, # Input DataFrame containing the time series data.
Expand Down

0 comments on commit e55b341

Please sign in to comment.