From 5fb71d61c35c1f686e8d7e22d4d81fbed396a838 Mon Sep 17 00:00:00 2001 From: Ryuta Yoshimatsu Date: Wed, 22 May 2024 20:28:52 +0200 Subject: [PATCH] some updates on the neuralforecast models --- mmf_sa/Forecaster.py | 39 ++++- mmf_sa/__init__.py | 7 - mmf_sa/base_forecasting_conf.yaml | 12 +- mmf_sa/models/abstract_model.py | 6 - .../models/chronosforecast/ChronosPipeline.py | 104 ++++++++++++ mmf_sa/models/chronosforecast/__init__.py | 0 mmf_sa/models/models_conf.yaml | 21 +-- .../neuralforecast/NeuralForecastPipeline.py | 38 +---- .../StatsFcForecastingPipeline.py | 10 +- notebooks/demo_foundation_daily.py | 152 ++++++++++++++++++ ...al_univariate_external_regressors_daily.py | 2 +- notebooks/run_foundation_daily.py | 50 ++++++ .../run_global_external_regressors_daily.py | 2 +- requirements.txt | 3 +- 14 files changed, 362 insertions(+), 84 deletions(-) create mode 100644 mmf_sa/models/chronosforecast/ChronosPipeline.py create mode 100644 mmf_sa/models/chronosforecast/__init__.py create mode 100644 notebooks/demo_foundation_daily.py create mode 100644 notebooks/run_foundation_daily.py diff --git a/mmf_sa/Forecaster.py b/mmf_sa/Forecaster.py index 0bf0440..aa08b47 100644 --- a/mmf_sa/Forecaster.py +++ b/mmf_sa/Forecaster.py @@ -176,10 +176,12 @@ def ensemble(self): .saveAsTable(self.conf["ensemble_scoring_output"]) ) - def prepare_data_for_global_model(self, model_conf: DictConfig, mode: str): + def prepare_data_for_global_model(self, mode: str): src_df = self.resolve_source("train_data") src_df, removed = DataQualityChecks(src_df, self.conf, self.spark).run() - if mode == "scoring": + if (mode == "scoring") \ + and (self.conf["scoring_data"]) \ + and (self.conf["scoring_data"] != self.conf["train_data"]): score_df = self.resolve_source("scoring_data") score_df = score_df.where(~col(self.conf["group_id"]).isin(removed)) src_df = src_df.unionByName(score_df, allowMissingColumns=True) @@ -219,7 +221,7 @@ def train_models(self): try: model = self.model_registry.get_model(model_name) # Get training and scoring data - hist_df, removed = self.prepare_data_for_global_model(model_conf, "training") + hist_df, removed = self.prepare_data_for_global_model("training") train_df, val_train_df = self.split_df_train_val(hist_df) # Train and evaluate new models - results are saved to MLFlow print(f"Training model: {model}") @@ -289,7 +291,8 @@ def backtest_global_model( model.backtest( pd.concat([train_df, val_df]), start=train_df[self.conf["date_col"]].max(), - retrain=self.conf["backtest_retrain"])) + retrain=self.conf["backtest_retrain"], + )) group_id_dtype = IntegerType() \ if train_df[self.conf["group_id"]].dtype == 'int' else StringType() @@ -347,6 +350,8 @@ def evaluate_models(self): self.evaluate_global_model(model_conf) elif model_conf["model_type"] == "local": self.evaluate_local_model(model_conf) + elif model_conf["model_type"] == "foundation": + self.evaluate_foundation_model(model_conf) except Exception as err: _logger.error( f"Error has occurred while training model {model_name}: {repr(err)}", @@ -446,7 +451,7 @@ def evaluate_one_model( def evaluate_global_model(self, model_conf): mlflow_client = mlflow.tracking.MlflowClient() - hist_df, removed = self.prepare_data_for_global_model(model_conf, "evaluating") + hist_df, removed = self.prepare_data_for_global_model("evaluating") train_df, val_df = self.split_df_train_val(hist_df) model_name = model_conf["name"] mlflow_model_name = f"{self.conf['model_output']}.{model_name}_{self.conf['use_case_name']}" @@ -501,6 +506,23 @@ def evaluate_global_model(self, model_conf): model_details.version) print(f"Champion alias assigned to the new model") + def evaluate_foundation_model(self, model_conf): + model_name = model_conf["name"] + model = self.model_registry.get_model(model_name) + hist_df, removed = self.prepare_data_for_global_model("evaluating") + train_df, val_df = self.split_df_train_val(hist_df) + metrics = self.backtest_global_model( + model=model, + train_df=train_df, + val_df=val_df, + model_uri="", + write=True, + ) + mlflow.set_tag("action", "train") + mlflow.set_tag("candidate", "true") + mlflow.set_tag("model_name", model.params["name"]) + print(f"Finished training {model_conf.get('name')}") + def score_models(self): print("Starting run_scoring") for model_name in self.model_registry.get_active_model_keys(): @@ -518,8 +540,9 @@ def score_local_model(self, model_conf): src_df, removed = DataQualityChecks(src_df, self.conf, self.spark).run() # Check if external regressors are provided and framework is statsforecast # External regressors are supported only with statsforecast and neuralforecast models - if (self.conf["train_data"] != self.conf["scoring_data"]) & \ - (model_conf["framework"] == "StatsForecast"): + if (self.conf["scoring_data"])\ + and (self.conf["train_data"] != self.conf["scoring_data"])\ + and (model_conf["framework"] == "StatsForecast"): score_df = self.resolve_source("scoring_data") score_df = score_df.where(~col(self.conf["group_id"]).isin(removed)) src_df = src_df.unionByName(score_df, allowMissingColumns=True) @@ -578,7 +601,7 @@ def score_one_model( def score_global_model(self, model_conf): print(f"Running scoring for {model_conf['name']}...") champion_model, champion_model_uri = self.get_model_for_scoring(model_conf) - score_df, removed = self.prepare_data_for_global_model(model_conf, "scoring") + score_df, removed = self.prepare_data_for_global_model("scoring") prediction_df, model_fitted = champion_model.forecast(score_df) if prediction_df[self.conf["date_col"]].dtype.type != np.datetime64: prediction_df[self.conf["date_col"]] = prediction_df[ diff --git a/mmf_sa/__init__.py b/mmf_sa/__init__.py index 2c85c7d..4a193b2 100644 --- a/mmf_sa/__init__.py +++ b/mmf_sa/__init__.py @@ -1,17 +1,13 @@ __version__ = "0.0.1" - import pathlib import sys from typing import Union, Any, Dict, List - import importlib.resources as pkg_resources - import pandas as pd import yaml from omegaconf import OmegaConf from omegaconf.basecontainer import BaseContainer from pyspark.sql import SparkSession, DataFrame - from mmf_sa.Forecaster import Forecaster @@ -40,7 +36,6 @@ def run_forecast( static_features: List[str] = None, dynamic_future: List[str] = None, dynamic_historical: List[str] = None, - dynamic_reals: List[str] = None, active_models: List[str] = None, accelerator: str = None, scoring_model_stage: str = None, @@ -129,8 +124,6 @@ def run_forecast( _conf["dynamic_future"] = dynamic_future if dynamic_historical is not None: _conf["dynamic_historical"] = dynamic_historical - if dynamic_reals is not None: - _conf["dynamic_reals"] = dynamic_reals f = Forecaster(conf=_conf, data_conf=_data_conf, spark=spark) run_id = f.train_eval_score(export_metrics=False, scoring=run_scoring) diff --git a/mmf_sa/base_forecasting_conf.yaml b/mmf_sa/base_forecasting_conf.yaml index 1035e59..0ba29dc 100644 --- a/mmf_sa/base_forecasting_conf.yaml +++ b/mmf_sa/base_forecasting_conf.yaml @@ -3,7 +3,6 @@ accelerator: cpu temp_path: /Volumes/solacc_uc/mmf/partitions metric: smape -scoring_model_stage: Production selection_metric: smape_eval backtest_retrain: false @@ -11,16 +10,6 @@ resample: false train_predict_ratio: 4 freq: D -tuning_enabled: false -tuning_max_trials: 16 -tuning_parallelism: 4 -tuning_retrain: false -tuning_distributed: true -tuning_max_epochs: 10 -tuning_max_context_len: 40 - -dynamic_reals: - static_features: #- State @@ -63,6 +52,7 @@ active_models: - NeuralForecastAutoNHITS - NeuralForecastAutoTiDE - NeuralForecastAutoPatchTST + - ChronosT5Large #Here we can override hyperparameters for built-in models models: diff --git a/mmf_sa/models/abstract_model.py b/mmf_sa/models/abstract_model.py index 756002e..4090e5a 100644 --- a/mmf_sa/models/abstract_model.py +++ b/mmf_sa/models/abstract_model.py @@ -38,12 +38,6 @@ def forecast(self, x): # TODO Shouldn't X be optional if we have a trainable model and provide a prediction length pass - def supports_tuning(self) -> bool: - return False - - def search_space(self): - pass - def backtest( self, df: pd.DataFrame, diff --git a/mmf_sa/models/chronosforecast/ChronosPipeline.py b/mmf_sa/models/chronosforecast/ChronosPipeline.py new file mode 100644 index 0000000..808c7c7 --- /dev/null +++ b/mmf_sa/models/chronosforecast/ChronosPipeline.py @@ -0,0 +1,104 @@ +import pandas as pd +import numpy as np +import torch +from chronos import ChronosPipeline +from sktime.performance_metrics.forecasting import +from typing import Iterator +from pyspark.sql.functions import collect_list, pandas_udf +from mmf_sa.models.abstract_model import ForecastingRegressor + + +class ChronosForecaster(ForecastingRegressor): + def __init__(self, params): + super().__init__(params) + self.params = params + self.device = None + self.model = None + + def prepare_data(self, df: pd.DataFrame, future: bool = False) -> pd.DataFrame: + context = ( + df.rename( + columns={ + self.params.group_id: "unique_id", + self.params.date_col: "ds", + self.params.target: "y", + } + ) + ) + + torch.tensor(_df[self.params.target]) + + return context + + def predict(self, hist_df: pd.DataFrame, val_df: pd.DataFrame = None): + # context must be either a 1D tensor, a list of 1D tensors, + # or a left-padded 2D tensor with batch as the first dimension + # forecast shape: [num_series, num_samples, prediction_length] + hist_df = self.spark + context = self.prepare_data(hist_df) + forecast_df = self.model.predict( + context=context, + prediction_length=self.params["prediction_length"], + num_samples=self.params["num_samples"], + ) + + forecast_df = forecast_df.reset_index(drop=False).rename( + columns={ + "unique_id": self.params.group_id, + "ds": self.params.date_col, + target: self.params.target, + } + ) + forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01) + + return forecast_df, self.model + + def forecast(self, df: pd.DataFrame): + return self.predict(df) + + def calculate_metrics( + self, hist_df: pd.DataFrame, val_df: pd.DataFrame, curr_date + ) -> list: + + print(f"hist_df: {hist_df}") + print(f"val_df: {val_df}") + pred_df, model_fitted = self.predict(hist_df, val_df) + + keys = pred_df[self.params["group_id"]].unique() + metrics = [] + if self.params["metric"] == "smape": + metric_name = "smape" + else: + raise Exception(f"Metric {self.params['metric']} not supported!") + for key in keys: + actual = val_df[val_df[self.params["group_id"]] == key][self.params["target"]] + forecast = pred_df[pred_df[self.params["group_id"]] == key][self.params["target"]].\ + iloc[-self.params["prediction_length"]:] + try: + if metric_name == "smape": + metric_value = mean_absolute_percentage_error(actual, forecast, symmetric=True) + metrics.extend( + [( + key, + curr_date, + metric_name, + metric_value, + actual.to_numpy(), + forecast.to_numpy(), + b'', + )]) + except: + pass + return metrics + + +class ChronosT5Large(ChronosForecaster): + def __init__(self, params): + super().__init__(params) + self.params = params + self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + self.model = ChronosPipeline.from_pretrained( + "amazon/chronos-t5-large", + device_map=self.device, # use "cuda" for GPU and "cpu" for CPU inference + torch_dtype=torch.bfloat16, + ) diff --git a/mmf_sa/models/chronosforecast/__init__.py b/mmf_sa/models/chronosforecast/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mmf_sa/models/models_conf.yaml b/mmf_sa/models/models_conf.yaml index 49db718..1806ad3 100644 --- a/mmf_sa/models/models_conf.yaml +++ b/mmf_sa/models/models_conf.yaml @@ -7,11 +7,8 @@ promoted_props: - freq - temp_path - accelerator - - tuning_max_epochs - - tuning_max_context_len - backtest_months - stride - - dynamic_reals - static_features - dynamic_future - dynamic_historical @@ -217,7 +214,6 @@ models: framework: NeuralForecast model_type: global trainable: true - tuning: false max_steps: 200 input_size_factor: 2 loss: smape @@ -237,7 +233,6 @@ models: framework: NeuralForecast model_type: global trainable: true - tuning: false max_steps: 200 input_size_factor: 2 loss: smape @@ -256,7 +251,6 @@ models: framework: NeuralForecast model_type: global trainable: true - tuning: false max_steps: 100 input_size_factor: 2 loss: smape @@ -272,7 +266,6 @@ models: framework: NeuralForecast model_type: global trainable: true - tuning: false max_steps: 200 input_size_factor: 2 loss: smape @@ -292,7 +285,6 @@ models: framework: NeuralForecast model_type: global trainable: true - tuning: false #-> eventually true max_steps: 200 num_samples: 20 loss: smape @@ -307,7 +299,6 @@ models: framework: NeuralForecast model_type: global trainable: true - tuning: false #-> eventually true max_steps: 200 num_samples: 20 loss: smape @@ -322,7 +313,6 @@ models: framework: NeuralForecast model_type: global trainable: true - tuning: false #-> eventually true max_steps: 200 num_samples: 20 loss: smape @@ -335,7 +325,6 @@ models: framework: NeuralForecast model_type: global trainable: true - tuning: false #-> eventually true max_steps: 200 num_samples: 20 loss: smape @@ -350,7 +339,6 @@ models: framework: NeuralForecast model_type: global trainable: true - tuning: false #-> eventually true max_steps: 200 num_samples: 20 loss: smape @@ -370,7 +358,6 @@ models: framework: NeuralForecast model_type: global trainable: true - tuning: false #-> eventually true max_steps: 200 num_samples: 20 loss: smape @@ -380,3 +367,11 @@ models: patch_len: [ 16, 24 ] scaler_type: [ "robust", "standard" ] revin: [ False, True ] + + ChronosT5Large: + module: mmf_sa.models.chronosforecast.ChronosPipeline + model_class: ChronosT5Large + framework: Chronos + model_type: foundation + trainable: false + num_samples: 20 diff --git a/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py b/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py index c582be3..ccdc889 100644 --- a/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py +++ b/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py @@ -243,9 +243,6 @@ def __init__(self, params): freq=self.params["freq"] ) - def supports_tuning(self) -> bool: - return False - class NeuralFcLSTM(NeuralFcForecaster): def __init__(self, params): @@ -279,9 +276,6 @@ def __init__(self, params): freq=self.params["freq"] ) - def supports_tuning(self) -> bool: - return False - class NeuralFcNBEATSx(NeuralFcForecaster): def __init__(self, params): @@ -312,9 +306,6 @@ def __init__(self, params): freq=self.params["freq"] ) - def supports_tuning(self) -> bool: - return False - class NeuralFcNHITS(NeuralFcForecaster): def __init__(self, params): @@ -349,9 +340,6 @@ def __init__(self, params): freq=self.params["freq"] ) - def supports_tuning(self) -> bool: - return False - class NeuralFcAutoRNN(NeuralFcForecaster): def __init__(self, params): @@ -405,9 +393,6 @@ def config(trial): freq=self.params["freq"] ) - def supports_tuning(self) -> bool: - return True - class NeuralFcAutoLSTM(NeuralFcForecaster): def __init__(self, params): @@ -460,9 +445,6 @@ def config(trial): freq=self.params["freq"] ) - def supports_tuning(self) -> bool: - return True - class NeuralFcAutoNBEATSx(NeuralFcForecaster): def __init__(self, params): @@ -510,9 +492,6 @@ def config(trial): freq=self.params["freq"] ) - def supports_tuning(self) -> bool: - return True - class NeuralFcAutoNHITS(NeuralFcForecaster): def __init__(self, params): @@ -563,9 +542,6 @@ def config(trial): freq=self.params["freq"] ) - def supports_tuning(self) -> bool: - return True - class NeuralFcAutoTiDE(NeuralFcForecaster): def __init__(self, params): @@ -609,7 +585,7 @@ def config(trial): 'dropout', list(self.params.dropout)), layernorm=trial.suggest_categorical( 'layernorm', list(self.params.layernorm)), - **self.exogs, + #**self.exogs, **self.distributed_kwargs, ) self.model = NeuralForecast( @@ -627,9 +603,6 @@ def config(trial): freq=self.params["freq"] ) - def supports_tuning(self) -> bool: - return True - class NeuralFcAutoPatchTST(NeuralFcForecaster): def __init__(self, params): @@ -644,6 +617,11 @@ def __init__(self, params): logger=False, enable_checkpointing=False, ) + self.exogs = { + 'stat_exog_list': list(self.params.get("static_features", [])), + 'futr_exog_list': list(self.params.get("dynamic_future", [])), + 'hist_exog_list': list(self.params.get("dynamic_historical", [])), + } def config(trial): return dict( @@ -661,6 +639,7 @@ def config(trial): 'scaler_type', list(self.params.scaler_type)), revin=trial.suggest_categorical( 'revin', list(self.params.revin)), + #**self.exogs, **self.distributed_kwargs, ) self.model = NeuralForecast( @@ -677,6 +656,3 @@ def config(trial): ], freq=self.params["freq"] ) - - def supports_tuning(self) -> bool: - return True \ No newline at end of file diff --git a/mmf_sa/models/statsforecast/StatsFcForecastingPipeline.py b/mmf_sa/models/statsforecast/StatsFcForecastingPipeline.py index eedd5c0..d9060f4 100644 --- a/mmf_sa/models/statsforecast/StatsFcForecastingPipeline.py +++ b/mmf_sa/models/statsforecast/StatsFcForecastingPipeline.py @@ -32,11 +32,11 @@ def prepare_data(self, df: pd.DataFrame, future: bool = False) -> pd.DataFrame: # Prepare historical dataframe with/out exogenous regressors for training # Fix here df[self.params.target] = df[self.params.target].clip(0.1) - if 'dynamic_reals' in self.params.keys(): + if 'dynamic_future' in self.params.keys(): try: df_statsfc = ( df[[self.params.group_id, self.params.date_col, self.params.target] - + self.params.dynamic_reals] + + self.params.dynamic_future] ) except Exception as e: raise Exception(f"Exogenous regressors missing: {e}") @@ -54,11 +54,11 @@ def prepare_data(self, df: pd.DataFrame, future: bool = False) -> pd.DataFrame: ) else: # Prepare future dataframe with/out exogenous regressors for forecasting - if 'dynamic_reals' in self.params.keys(): + if 'dynamic_future' in self.params.keys(): try: df_statsfc = ( df[[self.params.group_id, self.params.date_col] - + self.params.dynamic_reals] + + self.params.dynamic_future] ) except Exception as e: raise Exception(f"Exogenous regressors missing: {e}") @@ -104,7 +104,7 @@ def forecast(self, df: pd.DataFrame): _df = df[df[self.params.target].notnull()] _df = self.prepare_data(_df) self.fit(_df) - if 'dynamic_reals' in self.params.keys(): + if 'dynamic_future' in self.params.keys(): _last_date = _df["ds"].max() _future_df = df[ (df[self.params["date_col"]] > np.datetime64(_last_date)) diff --git a/notebooks/demo_foundation_daily.py b/notebooks/demo_foundation_daily.py new file mode 100644 index 0000000..566e278 --- /dev/null +++ b/notebooks/demo_foundation_daily.py @@ -0,0 +1,152 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Many Models Forecasting SA (MMFSA) Demo +# MAGIC This demo highlights how to configure MMF SA to use M4 competition data + +# COMMAND ---------- + +# MAGIC %pip install -r ../requirements.txt --quiet +dbutils.library.restartPython() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Data preparation steps +# MAGIC We are using `datasetsforecast` package to download M4 data. +# MAGIC +# MAGIC M4 dataset contains a set of time series which we use for testing of MMF SA. +# MAGIC +# MAGIC Below we have developed a number of functions to convert M4 time series to the expected format. + +# COMMAND ---------- + +import pathlib +import pandas as pd +import logging +logger = spark._jvm.org.apache.log4j +logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) +logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) +from datasetsforecast.m4 import M4 + +# COMMAND ---------- + +# Make sure that the catalog and the schema exist +catalog = "solacc_uc" # Name of the catalog we use to manage our assets +db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets) + +_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}") +_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}") + +# COMMAND ---------- + +# Number of time series +n = 100 + + +def create_m4_daily(): + y_df, _, _ = M4.load(directory=str(pathlib.Path.home()), group="Daily") + _ids = [f"D{i}" for i in range(1, n+1)] + y_df = ( + y_df.groupby("unique_id") + .filter(lambda x: x.unique_id.iloc[0] in _ids) + .groupby("unique_id") + .apply(transform_group) + .reset_index(drop=True) + ) + return y_df + + +def transform_group(df): + unique_id = df.unique_id.iloc[0] + _start = pd.Timestamp("2020-01-01") + _end = _start + pd.DateOffset(days=int(df.count()[0]) - 1) + date_idx = pd.date_range(start=_start, end=_end, freq="D", name="ds") + res_df = pd.DataFrame(data=[], index=date_idx).reset_index() + res_df["unique_id"] = unique_id + res_df["y"] = df.y.values + return res_df + + +( + spark.createDataFrame(create_m4_daily()) + .write.format("delta").mode("overwrite") + .saveAsTable(f"{catalog}.{db}.m4_daily_train") +) + +# COMMAND ---------- + +# MAGIC %md ### Now the dataset looks in the following way: + +# COMMAND ---------- + +# MAGIC %sql select * from solacc_uc.mmf.m4_daily_train where unique_id in ('D1', 'D2', 'D6', 'D7', 'D10') order by unique_id, ds + +# COMMAND ---------- + +# MAGIC %md ### Let's configure the list of models we are going to use for training: + +# COMMAND ---------- + +active_models = [ + "ChronosT5Large", +] + +# COMMAND ---------- + +# MAGIC %md ### Now we can run the forecasting process using `run_forecast` function. + +# COMMAND ---------- + +# MAGIC %md +# MAGIC We have to loop through the model in the following way else cuda will throw an error. + +# COMMAND ---------- + +for model in active_models: + dbutils.notebook.run( + "run_foundation_daily", + timeout_seconds=0, + arguments={"catalog": catalog, "db": db, "model": model}) + +# COMMAND ---------- + +# MAGIC %md ### Evaluation output +# MAGIC In the evaluation output table, the evaluation for all backtest windows and all models are stored. This info can be used to monitor model performance or decide which models should be taken into the final aggregated forecast. + +# COMMAND ---------- + +# MAGIC %sql select * from solacc_uc.mmf.daily_evaluation_output order by unique_id, model, backtest_window_start_date + +# COMMAND ---------- + +# MAGIC %md ### Forecast Output +# MAGIC In the Forecast output table, the final forecast for each model and each time series is stored. + +# COMMAND ---------- + +# MAGIC %sql select * from solacc_uc.mmf.daily_scoring_output order by unique_id, model, ds + +# COMMAND ---------- + +# MAGIC %md ### Ensemble Output +# MAGIC In the final ensemble output table, we store the averaged forecast. The models which meet the threshold defined using the ensembling parameters are taken into consideration + +# COMMAND ---------- + +# MAGIC %sql select * from solacc_uc.mmf.daily_ensemble_output order by unique_id, model, ds + +# COMMAND ---------- + +# MAGIC %md ### Delete Tables + +# COMMAND ---------- + +# MAGIC #%sql delete from solacc_uc.mmf.daily_evaluation_output + +# COMMAND ---------- + +# MAGIC #%sql delete from solacc_uc.mmf.daily_scoring_output + +# COMMAND ---------- + +# MAGIC #%sql delete from solacc_uc.mmf.daily_ensemble_output diff --git a/notebooks/demo_local_univariate_external_regressors_daily.py b/notebooks/demo_local_univariate_external_regressors_daily.py index 12cfa2d..4a9536e 100644 --- a/notebooks/demo_local_univariate_external_regressors_daily.py +++ b/notebooks/demo_local_univariate_external_regressors_daily.py @@ -95,7 +95,7 @@ date_col="Date", target="Sales", freq="D", - dynamic_reals=["DayOfWeek", "Open", "Promo", "SchoolHoliday"], + dynamic_future=["DayOfWeek", "Open", "Promo", "SchoolHoliday"], prediction_length=10, backtest_months=1, stride=10, diff --git a/notebooks/run_foundation_daily.py b/notebooks/run_foundation_daily.py new file mode 100644 index 0000000..98dd9bc --- /dev/null +++ b/notebooks/run_foundation_daily.py @@ -0,0 +1,50 @@ +# Databricks notebook source +# MAGIC %pip install -r ../requirements.txt --quiet +# MAGIC dbutils.library.restartPython() + +# COMMAND ---------- + +dbutils.widgets.text("catalog", "") +dbutils.widgets.text("db", "") +dbutils.widgets.text("model", "") + +catalog = dbutils.widgets.get("catalog") +db = dbutils.widgets.get("db") +model = dbutils.widgets.get("model") + +# COMMAND ---------- + +from mmf_sa import run_forecast +import logging +logger = spark._jvm.org.apache.log4j +logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) +logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) + + +run_forecast( + spark=spark, + train_data=f"{catalog}.{db}.m4_daily_train", + scoring_data=f"{catalog}.{db}.m4_daily_train", + scoring_output=f"{catalog}.{db}.daily_scoring_output", + evaluation_output=f"{catalog}.{db}.daily_evaluation_output", + model_output=f"{catalog}.{db}", + group_id="unique_id", + date_col="ds", + target="y", + freq="D", + prediction_length=10, + backtest_months=1, + stride=10, + train_predict_ratio=2, + data_quality_check=True, + resample=False, + ensemble=True, + ensemble_metric="smape", + ensemble_metric_avg=0.3, + ensemble_metric_max=0.5, + ensemble_scoring_output=f"{catalog}.{db}.daily_ensemble_output", + active_models=[model], + experiment_path=f"/Shared/mmf_experiment", + use_case_name="m4_daily", + accelerator="gpu", +) diff --git a/notebooks/run_global_external_regressors_daily.py b/notebooks/run_global_external_regressors_daily.py index 191917d..d3f52e1 100644 --- a/notebooks/run_global_external_regressors_daily.py +++ b/notebooks/run_global_external_regressors_daily.py @@ -32,7 +32,7 @@ date_col="Date", target="Sales", freq="D", - dynamic_reals=["DayOfWeek", "Open", "Promo", "SchoolHoliday"], + dynamic_future=["DayOfWeek", "Open", "Promo", "SchoolHoliday"], prediction_length=10, backtest_months=1, stride=10, diff --git a/requirements.txt b/requirements.txt index c3cbeb3..69ab22d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,4 +9,5 @@ tbats==1.1.3 sktime==0.29.0 lightgbm==4.3.0 datasetsforecast==0.0.8 -fugue==0.9.0 \ No newline at end of file +fugue==0.9.0 +git+https://github.com/amazon-science/chronos-forecasting.git \ No newline at end of file