diff --git a/mmf_sa/Forecaster.py b/mmf_sa/Forecaster.py index aa08b47..838eeb9 100644 --- a/mmf_sa/Forecaster.py +++ b/mmf_sa/Forecaster.py @@ -292,6 +292,7 @@ def backtest_global_model( pd.concat([train_df, val_df]), start=train_df[self.conf["date_col"]].max(), retrain=self.conf["backtest_retrain"], + spark=self.spark, )) group_id_dtype = IntegerType() \ @@ -507,21 +508,23 @@ def evaluate_global_model(self, model_conf): print(f"Champion alias assigned to the new model") def evaluate_foundation_model(self, model_conf): - model_name = model_conf["name"] - model = self.model_registry.get_model(model_name) - hist_df, removed = self.prepare_data_for_global_model("evaluating") - train_df, val_df = self.split_df_train_val(hist_df) - metrics = self.backtest_global_model( - model=model, - train_df=train_df, - val_df=val_df, - model_uri="", - write=True, - ) - mlflow.set_tag("action", "train") - mlflow.set_tag("candidate", "true") - mlflow.set_tag("model_name", model.params["name"]) - print(f"Finished training {model_conf.get('name')}") + with mlflow.start_run(experiment_id=self.experiment_id) as run: + model_name = model_conf["name"] + model = self.model_registry.get_model(model_name) + hist_df, removed = self.prepare_data_for_global_model("evaluating") # Reuse the same as global + train_df, val_df = self.split_df_train_val(hist_df) + metrics = self.backtest_global_model( # Reuse the same as global + model=model, + train_df=train_df, + val_df=val_df, + model_uri="", + write=True, + ) + mlflow.log_metric(self.conf["metric"], metrics) + mlflow.set_tag("action", "evaluate") + mlflow.set_tag("candidate", "true") + mlflow.set_tag("model_name", model.params["name"]) + print(f"Finished evaluating {model_conf.get('name')}") def score_models(self): print("Starting run_scoring") @@ -532,6 +535,8 @@ def score_models(self): self.score_global_model(model_conf) elif model_conf["model_type"] == "local": self.score_local_model(model_conf) + elif model_conf["model_type"] == "foundation": + self.score_foundation_model(model_conf) print(f"Finished scoring with {model_name}") print("Finished run_scoring") @@ -627,13 +632,24 @@ def score_global_model(self, model_conf): .saveAsTable(self.conf["scoring_output"]) ) - def get_latest_model_version(self, mlflow_client, registered_name): - latest_version = 1 - for mv in mlflow_client.search_model_versions(f"name='{registered_name}'"): - version_int = int(mv.version) - if version_int > latest_version: - latest_version = version_int - return latest_version + def score_foundation_model(self, model_conf): + print(f"Running scoring for {model_conf['name']}...") + model_name = model_conf["name"] + model = self.model_registry.get_model(model_name) + hist_df, removed = self.prepare_data_for_global_model("evaluating") + prediction_df, model_pretrained = model.forecast(hist_df, spark=self.spark) + sdf = self.spark.createDataFrame(prediction_df).drop('index') + ( + sdf.withColumn(self.conf["group_id"], col(self.conf["group_id"]).cast(StringType())) + .withColumn("model", lit(model_conf["name"])) + .withColumn("run_id", lit(self.run_id)) + .withColumn("run_date", lit(self.run_date)) + .withColumn("use_case", lit(self.conf["use_case_name"])) + .withColumn("model_pickle", lit(b"")) + .withColumn("model_uri", lit("")) + .write.mode("append") + .saveAsTable(self.conf["scoring_output"]) + ) def get_model_for_scoring(self, model_conf): mlflow_client = MlflowClient() @@ -649,6 +665,7 @@ def get_model_for_scoring(self, model_conf): else: return self.model_registry.get_model(model_conf["name"]), None + def flatten_nested_parameters(d): out = {} for key, val in d.items(): @@ -661,3 +678,12 @@ def flatten_nested_parameters(d): else: out[key] = val return out + + +def get_latest_model_version(self, mlflow_client, registered_name): + latest_version = 1 + for mv in mlflow_client.search_model_versions(f"name='{registered_name}'"): + version_int = int(mv.version) + if version_int > latest_version: + latest_version = version_int + return latest_version diff --git a/mmf_sa/base_forecasting_conf.yaml b/mmf_sa/base_forecasting_conf.yaml index 0ba29dc..6cf5077 100644 --- a/mmf_sa/base_forecasting_conf.yaml +++ b/mmf_sa/base_forecasting_conf.yaml @@ -52,6 +52,10 @@ active_models: - NeuralForecastAutoNHITS - NeuralForecastAutoTiDE - NeuralForecastAutoPatchTST + - ChronosT5Tiny + - ChronosT5Mini + - ChronosT5Small + - ChronosT5Base - ChronosT5Large #Here we can override hyperparameters for built-in models diff --git a/mmf_sa/models/__init__.py b/mmf_sa/models/__init__.py index 271d3aa..0844a37 100644 --- a/mmf_sa/models/__init__.py +++ b/mmf_sa/models/__init__.py @@ -46,7 +46,10 @@ def load_models_conf(): return conf def get_model( - self, model_name: str, override_conf: DictConfig = None + self, + model_name: str, + override_conf: DictConfig = None, + spark=None, ) -> ForecastingRegressor: model_conf = self.active_models.get(model_name) if override_conf is not None: diff --git a/mmf_sa/models/abstract_model.py b/mmf_sa/models/abstract_model.py index 4090e5a..c22f2e2 100644 --- a/mmf_sa/models/abstract_model.py +++ b/mmf_sa/models/abstract_model.py @@ -3,8 +3,11 @@ import pandas as pd import cloudpickle from typing import Dict, Union +from transformers import pipeline from sklearn.base import BaseEstimator, RegressorMixin from sktime.performance_metrics.forecasting import mean_absolute_percentage_error +import mlflow +mlflow.set_registry_uri("databricks-uc") class ForecastingRegressor(BaseEstimator, RegressorMixin): @@ -45,6 +48,7 @@ def backtest( group_id: Union[str, int] = None, stride: int = None, retrain: bool = True, + spark=None, ) -> pd.DataFrame: if stride is None: stride = int(self.params.get("stride", 7)) @@ -73,7 +77,7 @@ def backtest( if retrain: self.fit(_df) - metrics = self.calculate_metrics(_df, actuals_df, curr_date) + metrics = self.calculate_metrics(_df, actuals_df, curr_date, spark) if isinstance(metrics, dict): evaluation_results = [ @@ -103,10 +107,11 @@ def backtest( "actual", "model_pickle"], ) + return res_df def calculate_metrics( - self, hist_df: pd.DataFrame, val_df: pd.DataFrame, curr_date + self, hist_df: pd.DataFrame, val_df: pd.DataFrame, curr_date, spark=None ) -> Dict[str, Union[str, float, bytes]]: pred_df, model_fitted = self.predict(hist_df, val_df) smape = mean_absolute_percentage_error( diff --git a/mmf_sa/models/chronosforecast/ChronosPipeline.py b/mmf_sa/models/chronosforecast/ChronosPipeline.py index 808c7c7..8123c6e 100644 --- a/mmf_sa/models/chronosforecast/ChronosPipeline.py +++ b/mmf_sa/models/chronosforecast/ChronosPipeline.py @@ -1,11 +1,17 @@ +from abc import ABC + import pandas as pd import numpy as np import torch from chronos import ChronosPipeline -from sktime.performance_metrics.forecasting import +from transformers import pipeline +from sktime.performance_metrics.forecasting import mean_absolute_percentage_error from typing import Iterator from pyspark.sql.functions import collect_list, pandas_udf +from pyspark.sql import DataFrame +import mlflow from mmf_sa.models.abstract_model import ForecastingRegressor +mlflow.set_registry_uri("databricks-uc") class ChronosForecaster(ForecastingRegressor): @@ -15,55 +21,72 @@ def __init__(self, params): self.device = None self.model = None - def prepare_data(self, df: pd.DataFrame, future: bool = False) -> pd.DataFrame: - context = ( - df.rename( - columns={ - self.params.group_id: "unique_id", - self.params.date_col: "ds", - self.params.target: "y", - } - ) - ) - - torch.tensor(_df[self.params.target]) + def create_horizon_timestamps_udf(self): + @pandas_udf('array') + def horizon_timestamps_udf(batch_iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: + batch_horizon_timestamps = [] + for batch in batch_iterator: + for series in batch: + last = series.max() + horizon_timestamps = [] + for i in range(self.params["prediction_length"]): + last = last + self.one_ts_offset + horizon_timestamps.append(last) + batch_horizon_timestamps.append(np.array(horizon_timestamps)) + yield pd.Series(batch_horizon_timestamps) + return horizon_timestamps_udf - return context + def prepare_data(self, df: pd.DataFrame, future: bool = False, spark=None) -> DataFrame: + df = spark.createDataFrame(df) + df = ( + df.groupBy(self.params.group_id) + .agg( + collect_list(self.params.date_col).alias('ds'), + collect_list(self.params.target).alias('y'), + )) + return df - def predict(self, hist_df: pd.DataFrame, val_df: pd.DataFrame = None): - # context must be either a 1D tensor, a list of 1D tensors, - # or a left-padded 2D tensor with batch as the first dimension - # forecast shape: [num_series, num_samples, prediction_length] - hist_df = self.spark - context = self.prepare_data(hist_df) - forecast_df = self.model.predict( - context=context, + def predict(self, + hist_df: pd.DataFrame, + val_df: pd.DataFrame = None, + curr_date=None, + spark=None): + hist_df = self.prepare_data(hist_df, spark=spark) + forecast_udf = create_predict_udf( prediction_length=self.params["prediction_length"], - num_samples=self.params["num_samples"], + num_samples=self.params["num_samples"] ) + horizon_timestamps_udf = self.create_horizon_timestamps_udf() + + # Todo figure out the distribution + forecast_df = ( + hist_df.repartition(4) + .select( + hist_df.unique_id, + horizon_timestamps_udf(hist_df.ds).alias("ds"), + forecast_udf(hist_df.y).alias("y")) + ).toPandas() + forecast_df = forecast_df.reset_index(drop=False).rename( columns={ "unique_id": self.params.group_id, "ds": self.params.date_col, - target: self.params.target, + "y": self.params.target, } ) - forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01) + + #forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01) return forecast_df, self.model - def forecast(self, df: pd.DataFrame): - return self.predict(df) + def forecast(self, df: pd.DataFrame, spark=None): + return self.predict(df, spark=spark) def calculate_metrics( - self, hist_df: pd.DataFrame, val_df: pd.DataFrame, curr_date + self, hist_df: pd.DataFrame, val_df: pd.DataFrame, curr_date, spark=None ) -> list: - - print(f"hist_df: {hist_df}") - print(f"val_df: {val_df}") - pred_df, model_fitted = self.predict(hist_df, val_df) - + pred_df, model_pretrained = self.predict(hist_df, val_df, curr_date, spark) keys = pred_df[self.params["group_id"]].unique() metrics = [] if self.params["metric"] == "smape": @@ -71,9 +94,8 @@ def calculate_metrics( else: raise Exception(f"Metric {self.params['metric']} not supported!") for key in keys: - actual = val_df[val_df[self.params["group_id"]] == key][self.params["target"]] - forecast = pred_df[pred_df[self.params["group_id"]] == key][self.params["target"]].\ - iloc[-self.params["prediction_length"]:] + actual = val_df[val_df[self.params["group_id"]] == key][self.params["target"]].to_numpy() + forecast = pred_df[pred_df[self.params["group_id"]] == key][self.params["target"]].to_numpy()[0] try: if metric_name == "smape": metric_value = mean_absolute_percentage_error(actual, forecast, symmetric=True) @@ -83,8 +105,8 @@ def calculate_metrics( curr_date, metric_name, metric_value, - actual.to_numpy(), - forecast.to_numpy(), + actual, + forecast, b'', )]) except: @@ -92,6 +114,54 @@ def calculate_metrics( return metrics +class ChronosT5Tiny(ChronosForecaster): + def __init__(self, params): + super().__init__(params) + self.params = params + self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + self.model = ChronosPipeline.from_pretrained( + "amazon/chronos-t5-tiny", + device_map=self.device, # use "cuda" for GPU and "cpu" for CPU inference + torch_dtype=torch.bfloat16, + ) + + +class ChronosT5Mini(ChronosForecaster): + def __init__(self, params): + super().__init__(params) + self.params = params + self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + self.model = ChronosPipeline.from_pretrained( + "amazon/chronos-t5-mini", + device_map=self.device, # use "cuda" for GPU and "cpu" for CPU inference + torch_dtype=torch.bfloat16, + ) + + +class ChronosT5Small(ChronosForecaster): + def __init__(self, params): + super().__init__(params) + self.params = params + self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + self.model = ChronosPipeline.from_pretrained( + "amazon/chronos-t5-small", + device_map=self.device, # use "cuda" for GPU and "cpu" for CPU inference + torch_dtype=torch.bfloat16, + ) + + +class ChronosT5Base(ChronosForecaster): + def __init__(self, params): + super().__init__(params) + self.params = params + self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + self.model = ChronosPipeline.from_pretrained( + "amazon/chronos-t5-base", + device_map=self.device, # use "cuda" for GPU and "cpu" for CPU inference + torch_dtype=torch.bfloat16, + ) + + class ChronosT5Large(ChronosForecaster): def __init__(self, params): super().__init__(params) @@ -102,3 +172,33 @@ def __init__(self, params): device_map=self.device, # use "cuda" for GPU and "cpu" for CPU inference torch_dtype=torch.bfloat16, ) + + +def create_predict_udf(prediction_length: int, num_samples: int): + + @pandas_udf('array') + def predict_udf(batch_iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: + # initialization step + import torch + import numpy as np + import pandas as pd + from chronos import ChronosPipeline + pipeline = ChronosPipeline.from_pretrained( + "amazon/chronos-t5-large", + device_map="cuda", + torch_dtype=torch.bfloat16, + ) + # inference + for batch in batch_iterator: + median = [] + for series in batch: + context = torch.tensor(list(series)) + forecast = pipeline.predict( + context=context, + prediction_length=prediction_length, + num_samples=num_samples + ) + median.append(np.quantile(forecast[0].numpy(), [0.5], axis=0)[0]) + yield pd.Series(median) + + return predict_udf diff --git a/mmf_sa/models/models_conf.yaml b/mmf_sa/models/models_conf.yaml index 1806ad3..d705396 100644 --- a/mmf_sa/models/models_conf.yaml +++ b/mmf_sa/models/models_conf.yaml @@ -368,6 +368,38 @@ models: scaler_type: [ "robust", "standard" ] revin: [ False, True ] + ChronosT5Tiny: + module: mmf_sa.models.chronosforecast.ChronosPipeline + model_class: ChronosT5Tiny + framework: Chronos + model_type: foundation + trainable: false + num_samples: 20 + + ChronosT5Mini: + module: mmf_sa.models.chronosforecast.ChronosPipeline + model_class: ChronosT5Mini + framework: Chronos + model_type: foundation + trainable: false + num_samples: 20 + + ChronosT5Small: + module: mmf_sa.models.chronosforecast.ChronosPipeline + model_class: ChronosT5Small + framework: Chronos + model_type: foundation + trainable: false + num_samples: 20 + + ChronosT5Base: + module: mmf_sa.models.chronosforecast.ChronosPipeline + model_class: ChronosT5Base + framework: Chronos + model_type: foundation + trainable: false + num_samples: 20 + ChronosT5Large: module: mmf_sa.models.chronosforecast.ChronosPipeline model_class: ChronosT5Large diff --git a/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py b/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py index ccdc889..b81df36 100644 --- a/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py +++ b/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py @@ -160,7 +160,7 @@ def forecast(self, df: pd.DataFrame): return forecast_df, self.model def calculate_metrics( - self, hist_df: pd.DataFrame, val_df: pd.DataFrame, curr_date + self, hist_df: pd.DataFrame, val_df: pd.DataFrame, curr_date, spark=None ) -> list: pred_df, model_fitted = self.predict(hist_df, val_df) keys = pred_df[self.params["group_id"]].unique() diff --git a/notebooks/demo_foundation_daily.py b/notebooks/demo_foundation_daily.py index 566e278..906558f 100644 --- a/notebooks/demo_foundation_daily.py +++ b/notebooks/demo_foundation_daily.py @@ -88,6 +88,10 @@ def transform_group(df): # COMMAND ---------- active_models = [ + "ChronosT5Tiny", + "ChronosT5Mini", + "ChronosT5Small", + "ChronosT5Base", "ChronosT5Large", ] @@ -104,7 +108,7 @@ def transform_group(df): for model in active_models: dbutils.notebook.run( - "run_foundation_daily", + "run_daily", timeout_seconds=0, arguments={"catalog": catalog, "db": db, "model": model}) diff --git a/notebooks/demo_foundation_monthly.py b/notebooks/demo_foundation_monthly.py new file mode 100644 index 0000000..8fa1823 --- /dev/null +++ b/notebooks/demo_foundation_monthly.py @@ -0,0 +1,166 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Many Models Forecasting SA (MMFSA) Demo +# MAGIC This demo highlights how to configure MMF SA to use M4 competition data + +# COMMAND ---------- + +# MAGIC %pip install -r ../requirements.txt --quiet +dbutils.library.restartPython() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Data preparation steps +# MAGIC We are using `datasetsforecast` package to download M4 data. +# MAGIC +# MAGIC M4 dataset contains a set of time series which we use for testing of MMF SA. +# MAGIC +# MAGIC Below we have developed a number of functions to convert M4 time series to the expected format. + +# COMMAND ---------- + +import pathlib +import pandas as pd +from mmf_sa import run_forecast +import logging +logger = spark._jvm.org.apache.log4j +logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) +logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) +from datasetsforecast.m4 import M4 + +# COMMAND ---------- + +# Make sure that the catalog and the schema exist +catalog = "solacc_uc" # Name of the catalog we use to manage our assets +db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets) + +_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}") +_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}") + +# COMMAND ---------- + +# Number of time series +n = 100 + + +def create_m4_monthly(): + y_df, _, _ = M4.load(directory=str(pathlib.Path.home()), group="Monthly") + _ids = [f"M{i}" for i in range(1, n + 1)] + y_df = ( + y_df.groupby("unique_id") + .filter(lambda x: x.unique_id.iloc[0] in _ids) + .groupby("unique_id") + .apply(transform_group) + .reset_index(drop=True) + ) + return y_df + + +def transform_group(df): + unique_id = df.unique_id.iloc[0] + _cnt = 60 # df.count()[0] + _start = pd.Timestamp("2018-01-01") + _end = _start + pd.DateOffset(months=_cnt) + date_idx = pd.date_range(start=_start, end=_end, freq="M", name="date") + _df = ( + pd.DataFrame(data=[], index=date_idx) + .reset_index() + .rename(columns={"index": "date"}) + ) + _df["unique_id"] = unique_id + _df["y"] = df[:60].y.values + return _df + + +( + spark.createDataFrame(create_m4_monthly()) + .write.format("delta").mode("overwrite") + .saveAsTable(f"{catalog}.{db}.m4_monthly_train") +) + +# COMMAND ---------- + +# MAGIC %md ### Now the dataset looks in the following way: + +# COMMAND ---------- + +# MAGIC %sql select unique_id, count(date) as count from solacc_uc.mmf.m4_monthly_train group by unique_id order by unique_id + +# COMMAND ---------- + +# MAGIC %sql select count(distinct(unique_id)) from solacc_uc.mmf.m4_monthly_train + +# COMMAND ---------- + +# MAGIC %md ### Let's configure the list of models we are going to use for training: + +# COMMAND ---------- + +active_models = [ + "ChronosT5Tiny", + "ChronosT5Mini", + "ChronosT5Small", + "ChronosT5Base", + "ChronosT5Large", +] + +# COMMAND ---------- + +# MAGIC %md ### Now we can run the forecasting process using `run_forecast` function. + +# COMMAND ---------- + +# MAGIC %md +# MAGIC We have to loop through the model in the following way else cuda will throw an error. + +# COMMAND ---------- + +for model in active_models: + dbutils.notebook.run( + "run_monthly", + timeout_seconds=0, + arguments={"catalog": catalog, "db": db, "model": model}) + +# COMMAND ---------- + +# MAGIC %md ### Evaluation Output +# MAGIC In the evaluation output table, the evaluation for all backtest windows and all models are stored. This info can be used to monitor model performance or decide which models should be taken into the final aggregated forecast. + +# COMMAND ---------- + +# MAGIC %sql select * from solacc_uc.mmf.monthly_evaluation_output order by unique_id, model, backtest_window_start_date + +# COMMAND ---------- + +# MAGIC %md ### Forecast Output +# MAGIC In the Forecast output table, the final forecast for each model and each time series is stored. + +# COMMAND ---------- + +# MAGIC %sql select * from solacc_uc.mmf.monthly_scoring_output order by unique_id, model, date + +# COMMAND ---------- + +# MAGIC %md ### Ensemble Output +# MAGIC In the final ensemble output table, we store the averaged forecast. The models which meet the threshold defined using the ensembling parameters are taken into consideration + +# COMMAND ---------- + +# MAGIC %sql select * from solacc_uc.mmf.monthly_ensemble_output order by unique_id, model, date + +# COMMAND ---------- + +# MAGIC %md ### Delete Tables + +# COMMAND ---------- + +# MAGIC #%sql delete from solacc_uc.mmf.monthly_evaluation_output + +# COMMAND ---------- + +# MAGIC #%sql delete from solacc_uc.mmf.monthly_scoring_output + +# COMMAND ---------- + +# MAGIC #%sql delete from solacc_uc.mmf.monthly_ensemble_output diff --git a/notebooks/demo_global_daily.py b/notebooks/demo_global_daily.py index b704f76..259aafc 100644 --- a/notebooks/demo_global_daily.py +++ b/notebooks/demo_global_daily.py @@ -113,7 +113,7 @@ def transform_group(df): for model in active_models: dbutils.notebook.run( - "run_global_daily", + "run_daily", timeout_seconds=0, arguments={"catalog": catalog, "db": db, "model": model}) diff --git a/notebooks/demo_global_external_regressors_daily.py b/notebooks/demo_global_external_regressors_daily.py index 1861adc..ebb59fc 100644 --- a/notebooks/demo_global_external_regressors_daily.py +++ b/notebooks/demo_global_external_regressors_daily.py @@ -87,7 +87,7 @@ for model in active_models: dbutils.notebook.run( - "run_global_external_regressors_daily", + "run_external_regressors_daily", timeout_seconds=0, arguments={"catalog": catalog, "db": db, "model": model}) diff --git a/notebooks/demo_global_monthly.py b/notebooks/demo_global_monthly.py index ab6a647..812f412 100644 --- a/notebooks/demo_global_monthly.py +++ b/notebooks/demo_global_monthly.py @@ -123,7 +123,7 @@ def transform_group(df): for model in active_models: dbutils.notebook.run( - "run_global_monthly", + "run_monthly", timeout_seconds=0, arguments={"catalog": catalog, "db": db, "model": model}) diff --git a/notebooks/run_foundation_daily.py b/notebooks/run_daily.py similarity index 100% rename from notebooks/run_foundation_daily.py rename to notebooks/run_daily.py diff --git a/notebooks/run_global_external_regressors_daily.py b/notebooks/run_external_regressors_daily.py similarity index 100% rename from notebooks/run_global_external_regressors_daily.py rename to notebooks/run_external_regressors_daily.py diff --git a/notebooks/run_global_daily.py b/notebooks/run_global_daily.py deleted file mode 100644 index 98dd9bc..0000000 --- a/notebooks/run_global_daily.py +++ /dev/null @@ -1,50 +0,0 @@ -# Databricks notebook source -# MAGIC %pip install -r ../requirements.txt --quiet -# MAGIC dbutils.library.restartPython() - -# COMMAND ---------- - -dbutils.widgets.text("catalog", "") -dbutils.widgets.text("db", "") -dbutils.widgets.text("model", "") - -catalog = dbutils.widgets.get("catalog") -db = dbutils.widgets.get("db") -model = dbutils.widgets.get("model") - -# COMMAND ---------- - -from mmf_sa import run_forecast -import logging -logger = spark._jvm.org.apache.log4j -logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) -logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) - - -run_forecast( - spark=spark, - train_data=f"{catalog}.{db}.m4_daily_train", - scoring_data=f"{catalog}.{db}.m4_daily_train", - scoring_output=f"{catalog}.{db}.daily_scoring_output", - evaluation_output=f"{catalog}.{db}.daily_evaluation_output", - model_output=f"{catalog}.{db}", - group_id="unique_id", - date_col="ds", - target="y", - freq="D", - prediction_length=10, - backtest_months=1, - stride=10, - train_predict_ratio=2, - data_quality_check=True, - resample=False, - ensemble=True, - ensemble_metric="smape", - ensemble_metric_avg=0.3, - ensemble_metric_max=0.5, - ensemble_scoring_output=f"{catalog}.{db}.daily_ensemble_output", - active_models=[model], - experiment_path=f"/Shared/mmf_experiment", - use_case_name="m4_daily", - accelerator="gpu", -) diff --git a/notebooks/run_global_monthly.py b/notebooks/run_monthly.py similarity index 100% rename from notebooks/run_global_monthly.py rename to notebooks/run_monthly.py