Skip to content

Commit

Permalink
Merge pull request #33 from databricks-industry-solutions/mmf_sa
Browse files Browse the repository at this point in the history
some updates on the neuralforecast models
  • Loading branch information
ryuta-yoshimatsu authored May 22, 2024
2 parents 82c55f3 + 5fb71d6 commit 5216ba6
Show file tree
Hide file tree
Showing 14 changed files with 362 additions and 84 deletions.
39 changes: 31 additions & 8 deletions mmf_sa/Forecaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,12 @@ def ensemble(self):
.saveAsTable(self.conf["ensemble_scoring_output"])
)

def prepare_data_for_global_model(self, model_conf: DictConfig, mode: str):
def prepare_data_for_global_model(self, mode: str):
src_df = self.resolve_source("train_data")
src_df, removed = DataQualityChecks(src_df, self.conf, self.spark).run()
if mode == "scoring":
if (mode == "scoring") \
and (self.conf["scoring_data"]) \
and (self.conf["scoring_data"] != self.conf["train_data"]):
score_df = self.resolve_source("scoring_data")
score_df = score_df.where(~col(self.conf["group_id"]).isin(removed))
src_df = src_df.unionByName(score_df, allowMissingColumns=True)
Expand Down Expand Up @@ -219,7 +221,7 @@ def train_models(self):
try:
model = self.model_registry.get_model(model_name)
# Get training and scoring data
hist_df, removed = self.prepare_data_for_global_model(model_conf, "training")
hist_df, removed = self.prepare_data_for_global_model("training")
train_df, val_train_df = self.split_df_train_val(hist_df)
# Train and evaluate new models - results are saved to MLFlow
print(f"Training model: {model}")
Expand Down Expand Up @@ -289,7 +291,8 @@ def backtest_global_model(
model.backtest(
pd.concat([train_df, val_df]),
start=train_df[self.conf["date_col"]].max(),
retrain=self.conf["backtest_retrain"]))
retrain=self.conf["backtest_retrain"],
))

group_id_dtype = IntegerType() \
if train_df[self.conf["group_id"]].dtype == 'int' else StringType()
Expand Down Expand Up @@ -347,6 +350,8 @@ def evaluate_models(self):
self.evaluate_global_model(model_conf)
elif model_conf["model_type"] == "local":
self.evaluate_local_model(model_conf)
elif model_conf["model_type"] == "foundation":
self.evaluate_foundation_model(model_conf)
except Exception as err:
_logger.error(
f"Error has occurred while training model {model_name}: {repr(err)}",
Expand Down Expand Up @@ -446,7 +451,7 @@ def evaluate_one_model(

def evaluate_global_model(self, model_conf):
mlflow_client = mlflow.tracking.MlflowClient()
hist_df, removed = self.prepare_data_for_global_model(model_conf, "evaluating")
hist_df, removed = self.prepare_data_for_global_model("evaluating")
train_df, val_df = self.split_df_train_val(hist_df)
model_name = model_conf["name"]
mlflow_model_name = f"{self.conf['model_output']}.{model_name}_{self.conf['use_case_name']}"
Expand Down Expand Up @@ -501,6 +506,23 @@ def evaluate_global_model(self, model_conf):
model_details.version)
print(f"Champion alias assigned to the new model")

def evaluate_foundation_model(self, model_conf):
model_name = model_conf["name"]
model = self.model_registry.get_model(model_name)
hist_df, removed = self.prepare_data_for_global_model("evaluating")
train_df, val_df = self.split_df_train_val(hist_df)
metrics = self.backtest_global_model(
model=model,
train_df=train_df,
val_df=val_df,
model_uri="",
write=True,
)
mlflow.set_tag("action", "train")
mlflow.set_tag("candidate", "true")
mlflow.set_tag("model_name", model.params["name"])
print(f"Finished training {model_conf.get('name')}")

def score_models(self):
print("Starting run_scoring")
for model_name in self.model_registry.get_active_model_keys():
Expand All @@ -518,8 +540,9 @@ def score_local_model(self, model_conf):
src_df, removed = DataQualityChecks(src_df, self.conf, self.spark).run()
# Check if external regressors are provided and framework is statsforecast
# External regressors are supported only with statsforecast and neuralforecast models
if (self.conf["train_data"] != self.conf["scoring_data"]) & \
(model_conf["framework"] == "StatsForecast"):
if (self.conf["scoring_data"])\
and (self.conf["train_data"] != self.conf["scoring_data"])\
and (model_conf["framework"] == "StatsForecast"):
score_df = self.resolve_source("scoring_data")
score_df = score_df.where(~col(self.conf["group_id"]).isin(removed))
src_df = src_df.unionByName(score_df, allowMissingColumns=True)
Expand Down Expand Up @@ -578,7 +601,7 @@ def score_one_model(
def score_global_model(self, model_conf):
print(f"Running scoring for {model_conf['name']}...")
champion_model, champion_model_uri = self.get_model_for_scoring(model_conf)
score_df, removed = self.prepare_data_for_global_model(model_conf, "scoring")
score_df, removed = self.prepare_data_for_global_model("scoring")
prediction_df, model_fitted = champion_model.forecast(score_df)
if prediction_df[self.conf["date_col"]].dtype.type != np.datetime64:
prediction_df[self.conf["date_col"]] = prediction_df[
Expand Down
7 changes: 0 additions & 7 deletions mmf_sa/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
__version__ = "0.0.1"

import pathlib
import sys
from typing import Union, Any, Dict, List

import importlib.resources as pkg_resources

import pandas as pd
import yaml
from omegaconf import OmegaConf
from omegaconf.basecontainer import BaseContainer
from pyspark.sql import SparkSession, DataFrame

from mmf_sa.Forecaster import Forecaster


Expand Down Expand Up @@ -40,7 +36,6 @@ def run_forecast(
static_features: List[str] = None,
dynamic_future: List[str] = None,
dynamic_historical: List[str] = None,
dynamic_reals: List[str] = None,
active_models: List[str] = None,
accelerator: str = None,
scoring_model_stage: str = None,
Expand Down Expand Up @@ -129,8 +124,6 @@ def run_forecast(
_conf["dynamic_future"] = dynamic_future
if dynamic_historical is not None:
_conf["dynamic_historical"] = dynamic_historical
if dynamic_reals is not None:
_conf["dynamic_reals"] = dynamic_reals

f = Forecaster(conf=_conf, data_conf=_data_conf, spark=spark)
run_id = f.train_eval_score(export_metrics=False, scoring=run_scoring)
Expand Down
12 changes: 1 addition & 11 deletions mmf_sa/base_forecasting_conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,13 @@ accelerator: cpu
temp_path: /Volumes/solacc_uc/mmf/partitions

metric: smape
scoring_model_stage: Production
selection_metric: smape_eval
backtest_retrain: false

resample: false
train_predict_ratio: 4
freq: D

tuning_enabled: false
tuning_max_trials: 16
tuning_parallelism: 4
tuning_retrain: false
tuning_distributed: true
tuning_max_epochs: 10
tuning_max_context_len: 40

dynamic_reals:

static_features:
#- State

Expand Down Expand Up @@ -63,6 +52,7 @@ active_models:
- NeuralForecastAutoNHITS
- NeuralForecastAutoTiDE
- NeuralForecastAutoPatchTST
- ChronosT5Large

#Here we can override hyperparameters for built-in models
models:
Expand Down
6 changes: 0 additions & 6 deletions mmf_sa/models/abstract_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ def forecast(self, x):
# TODO Shouldn't X be optional if we have a trainable model and provide a prediction length
pass

def supports_tuning(self) -> bool:
return False

def search_space(self):
pass

def backtest(
self,
df: pd.DataFrame,
Expand Down
104 changes: 104 additions & 0 deletions mmf_sa/models/chronosforecast/ChronosPipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import pandas as pd
import numpy as np
import torch
from chronos import ChronosPipeline
from sktime.performance_metrics.forecasting import
from typing import Iterator
from pyspark.sql.functions import collect_list, pandas_udf
from mmf_sa.models.abstract_model import ForecastingRegressor


class ChronosForecaster(ForecastingRegressor):
def __init__(self, params):
super().__init__(params)
self.params = params
self.device = None
self.model = None

def prepare_data(self, df: pd.DataFrame, future: bool = False) -> pd.DataFrame:
context = (
df.rename(
columns={
self.params.group_id: "unique_id",
self.params.date_col: "ds",
self.params.target: "y",
}
)
)

torch.tensor(_df[self.params.target])

return context

def predict(self, hist_df: pd.DataFrame, val_df: pd.DataFrame = None):
# context must be either a 1D tensor, a list of 1D tensors,
# or a left-padded 2D tensor with batch as the first dimension
# forecast shape: [num_series, num_samples, prediction_length]
hist_df = self.spark
context = self.prepare_data(hist_df)
forecast_df = self.model.predict(
context=context,
prediction_length=self.params["prediction_length"],
num_samples=self.params["num_samples"],
)

forecast_df = forecast_df.reset_index(drop=False).rename(
columns={
"unique_id": self.params.group_id,
"ds": self.params.date_col,
target: self.params.target,
}
)
forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01)

return forecast_df, self.model

def forecast(self, df: pd.DataFrame):
return self.predict(df)

def calculate_metrics(
self, hist_df: pd.DataFrame, val_df: pd.DataFrame, curr_date
) -> list:

print(f"hist_df: {hist_df}")
print(f"val_df: {val_df}")
pred_df, model_fitted = self.predict(hist_df, val_df)

keys = pred_df[self.params["group_id"]].unique()
metrics = []
if self.params["metric"] == "smape":
metric_name = "smape"
else:
raise Exception(f"Metric {self.params['metric']} not supported!")
for key in keys:
actual = val_df[val_df[self.params["group_id"]] == key][self.params["target"]]
forecast = pred_df[pred_df[self.params["group_id"]] == key][self.params["target"]].\
iloc[-self.params["prediction_length"]:]
try:
if metric_name == "smape":
metric_value = mean_absolute_percentage_error(actual, forecast, symmetric=True)
metrics.extend(
[(
key,
curr_date,
metric_name,
metric_value,
actual.to_numpy(),
forecast.to_numpy(),
b'',
)])
except:
pass
return metrics


class ChronosT5Large(ChronosForecaster):
def __init__(self, params):
super().__init__(params)
self.params = params
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.model = ChronosPipeline.from_pretrained(
"amazon/chronos-t5-large",
device_map=self.device, # use "cuda" for GPU and "cpu" for CPU inference
torch_dtype=torch.bfloat16,
)
Empty file.
21 changes: 8 additions & 13 deletions mmf_sa/models/models_conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@ promoted_props:
- freq
- temp_path
- accelerator
- tuning_max_epochs
- tuning_max_context_len
- backtest_months
- stride
- dynamic_reals
- static_features
- dynamic_future
- dynamic_historical
Expand Down Expand Up @@ -217,7 +214,6 @@ models:
framework: NeuralForecast
model_type: global
trainable: true
tuning: false
max_steps: 200
input_size_factor: 2
loss: smape
Expand All @@ -237,7 +233,6 @@ models:
framework: NeuralForecast
model_type: global
trainable: true
tuning: false
max_steps: 200
input_size_factor: 2
loss: smape
Expand All @@ -256,7 +251,6 @@ models:
framework: NeuralForecast
model_type: global
trainable: true
tuning: false
max_steps: 100
input_size_factor: 2
loss: smape
Expand All @@ -272,7 +266,6 @@ models:
framework: NeuralForecast
model_type: global
trainable: true
tuning: false
max_steps: 200
input_size_factor: 2
loss: smape
Expand All @@ -292,7 +285,6 @@ models:
framework: NeuralForecast
model_type: global
trainable: true
tuning: false #-> eventually true
max_steps: 200
num_samples: 20
loss: smape
Expand All @@ -307,7 +299,6 @@ models:
framework: NeuralForecast
model_type: global
trainable: true
tuning: false #-> eventually true
max_steps: 200
num_samples: 20
loss: smape
Expand All @@ -322,7 +313,6 @@ models:
framework: NeuralForecast
model_type: global
trainable: true
tuning: false #-> eventually true
max_steps: 200
num_samples: 20
loss: smape
Expand All @@ -335,7 +325,6 @@ models:
framework: NeuralForecast
model_type: global
trainable: true
tuning: false #-> eventually true
max_steps: 200
num_samples: 20
loss: smape
Expand All @@ -350,7 +339,6 @@ models:
framework: NeuralForecast
model_type: global
trainable: true
tuning: false #-> eventually true
max_steps: 200
num_samples: 20
loss: smape
Expand All @@ -370,7 +358,6 @@ models:
framework: NeuralForecast
model_type: global
trainable: true
tuning: false #-> eventually true
max_steps: 200
num_samples: 20
loss: smape
Expand All @@ -380,3 +367,11 @@ models:
patch_len: [ 16, 24 ]
scaler_type: [ "robust", "standard" ]
revin: [ False, True ]

ChronosT5Large:
module: mmf_sa.models.chronosforecast.ChronosPipeline
model_class: ChronosT5Large
framework: Chronos
model_type: foundation
trainable: false
num_samples: 20
Loading

0 comments on commit 5216ba6

Please sign in to comment.