diff --git a/mmf_sa/Forecaster.py b/mmf_sa/Forecaster.py index 5a937cc..7098274 100644 --- a/mmf_sa/Forecaster.py +++ b/mmf_sa/Forecaster.py @@ -10,11 +10,10 @@ import numpy as np import cloudpickle import mlflow -from mlflow.exceptions import MlflowException from mlflow.tracking import MlflowClient from mlflow.models import ModelSignature, infer_signature from mlflow.types.schema import Schema, ColSpec -from omegaconf import OmegaConf, DictConfig +from omegaconf import OmegaConf from omegaconf.basecontainer import BaseContainer from pyspark.sql import SparkSession, DataFrame from pyspark.sql.types import ( @@ -89,92 +88,6 @@ def resolve_source(self, key: str) -> DataFrame: else: return self.spark.read.table(self.conf[key]) - def train_eval_score(self, scoring=True) -> str: - print("Starting train_evaluate_models") - self.train_models() - self.evaluate_models() - if scoring: - self.score_models() - self.ensemble() - print("Finished train_evaluate_models") - return self.run_id - - def ensemble(self): - if self.conf.get("ensemble") and self.conf["ensemble_scoring_output"]: - metrics_df = ( - self.spark.table(self.conf["evaluation_output"]) - .where(col("run_id").eqNullSafe(lit(self.run_id))) - .where( - col("metric_name").eqNullSafe( - lit(self.conf.get("ensemble_metric", "smape")) - ) - ) - ) - models_df = ( - metrics_df.groupby(self.conf["group_id"], "model") - .agg( - avg("metric_value").alias("metric_avg"), - min("metric_value").alias("metric_min"), - max("metric_value").alias("metric_max"), - ) - .where( - col("metric_avg") < lit(self.conf.get("ensemble_metric_avg", 0.2)) - ) - .where( - col("metric_max") < lit(self.conf.get("ensemble_metric_max", 0.5)) - ) - .where(col("metric_min") > lit(0.01)) - ) - df = ( - self.spark.table(self.conf["scoring_output"]) - .where(col("run_id").eqNullSafe(lit(self.run_id))) - .join( - models_df.select(self.conf["group_id"], "model"), - on=[self.conf["group_id"], "model"], - ) - ) - - left = df.select( - self.conf["group_id"], "run_id", "run_date", "use_case", "model", - posexplode(self.conf["date_col"]) - ).withColumnRenamed('col', self.conf["date_col"]) - - right = df.select( - self.conf["group_id"], "run_id", "run_date", "use_case", "model", - posexplode(self.conf["target"]) - ).withColumnRenamed('col', self.conf["target"]) - - merged = left.join(right, [ - self.conf["group_id"], 'run_id', 'run_date', 'use_case', 'model', - 'pos'], 'inner').drop("pos") - - aggregated_df = merged.groupby( - self.conf["group_id"], self.conf["date_col"] - ).agg( - avg(self.conf["target"]).alias(self.conf["target"] + "_avg"), - min(self.conf["target"]).alias(self.conf["target"] + "_min"), - max(self.conf["target"]).alias(self.conf["target"] + "_max"), - ) - - aggregated_df = aggregated_df.orderBy(self.conf["group_id"], self.conf["date_col"])\ - .groupBy(self.conf["group_id"]).agg( - collect_list(self.conf["date_col"]).alias(self.conf["date_col"]), - collect_list(self.conf["target"] + "_avg").alias(self.conf["target"] + "_avg"), - collect_list(self.conf["target"] + "_min").alias(self.conf["target"] + "_min"), - collect_list(self.conf["target"] + "_max").alias(self.conf["target"] + "_max") - ) - - ( - aggregated_df.withColumn(self.conf["group_id"], col(self.conf["group_id"]).cast(StringType())) - .withColumn("run_id", lit(self.run_id)) - .withColumn("run_date", lit(self.run_date)) - .withColumn("use_case", lit(self.conf["use_case_name"])) - .withColumn("model", lit("ensemble")) - .write.format("delta") - .mode("append") - .saveAsTable(self.conf["ensemble_scoring_output"]) - ) - def prepare_data_for_global_model(self, mode: str): src_df = self.resolve_source("train_data") src_df, removed = DataQualityChecks(src_df, self.conf, self.spark).run() @@ -197,172 +110,44 @@ def split_df_train_val(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFra - pd.DateOffset(months=self.conf["backtest_months"]) ] # Validate with data after the backtest months cutoff... - val_true_df = df[ + val_df = df[ df[self.conf["date_col"]] > df[self.conf["date_col"]].max() - pd.DateOffset(months=self.conf["backtest_months"]) ] - return train_df, val_true_df - - def train_models(self): - """Trains and evaluates all models from the configuration file with the configuration's training data. - Then evaluates the current best model with the configuration's training data. - Saves the params, models and metrics, so the runs will all have evaluation data - """ - print("Starting train_models") - for model_name in self.model_registry.get_active_model_keys(): - model_conf = self.model_registry.get_model_conf(model_name) - if ( - model_conf.get("trainable", False) - and model_conf.get("model_type", None) == "global" - ): - with mlflow.start_run(experiment_id=self.experiment_id) as run: - try: - model = self.model_registry.get_model(model_name) - # Get training and scoring data - hist_df, removed = self.prepare_data_for_global_model("training") - train_df, val_train_df = self.split_df_train_val(hist_df) - # Train and evaluate new models - results are saved to MLFlow - print(f"Training model: {model}") - self.train_global_model(train_df, val_train_df, model_conf, model) - except Exception as err: - _logger.error( - f"Error has occurred while training model {model_name}: {repr(err)}", - exc_info=err, - ) - raise err - print("Finished train_models") - - def train_global_model( - self, - train_df: pd.DataFrame, - val_df: pd.DataFrame, - model_conf: DictConfig, - model: ForecastingRegressor, - ): - print(f"Started training {model_conf['name']}") - # Todo fix - model.fit(pd.concat([train_df, val_df])) - - input_example = train_df[train_df[self.conf['group_id']] == train_df[self.conf['group_id']]\ - .unique()[0]].sort_values(by=[self.conf['date_col']]) - input_schema = infer_signature(model_input=input_example).inputs - output_schema = Schema( - [ - ColSpec("integer", "index"), - ColSpec("string", self.conf['group_id']), - ColSpec("datetime", self.conf['date_col']), - ColSpec("float", self.conf['target']), - ] - ) - signature = ModelSignature(inputs=input_schema, outputs=output_schema) - model_info = mlflow.sklearn.log_model( - model, - "model", - registered_model_name=f"{self.conf['model_output']}.{model_conf['name']}_{self.conf['use_case_name']}", - input_example=input_example, - signature=signature, - pip_requirements=[], - ) - print(f"Model registered: {self.conf['model_output']}.{model_conf['name']}_{self.conf['use_case_name']}") - try: - mlflow.log_params(model.get_params()) - except MlflowException: - # MLflow log_params has a parameter length limit of 500 - # When using ensemble models parameters consist of - # nested parameter dictionaries which are flattened here - mlflow.log_params( - flatten_nested_parameters(OmegaConf.to_object(model.get_params())) - ) - metrics = self.backtest_global_model( - model=model, - train_df=train_df, - val_df=val_df, - model_uri=model_info.model_uri, - write=True, - ) - mlflow.set_tag("action", "train") - mlflow.set_tag("candidate", "true") - mlflow.set_tag("model_name", model.params["name"]) - print(f"Finished training {model_conf.get('name')}") - - def backtest_global_model( - self, - model: ForecastingRegressor, - train_df: pd.DataFrame, - val_df: pd.DataFrame, - model_uri: str, - write: bool = True, - ): - res_pdf = ( - model.backtest( - pd.concat([train_df, val_df]), - start=train_df[self.conf["date_col"]].max(), - retrain=self.conf["backtest_retrain"], - spark=self.spark, - )) - - group_id_dtype = IntegerType() \ - if train_df[self.conf["group_id"]].dtype == 'int' else StringType() - - schema = StructType( - [ - StructField(self.conf["group_id"], group_id_dtype), - StructField("backtest_window_start_date", DateType()), - StructField("metric_name", StringType()), - StructField("metric_value", DoubleType()), - StructField("forecast", ArrayType(DoubleType())), - StructField("actual", ArrayType(DoubleType())), - StructField("model_pickle", BinaryType()), - ] - ) - res_sdf = self.spark.createDataFrame(res_pdf, schema) - - if write: - if self.conf.get("evaluation_output", None): - ( - res_sdf.withColumn(self.conf["group_id"], col(self.conf["group_id"]).cast(StringType())) - .withColumn("run_id", lit(self.run_id)) - .withColumn("run_date", lit(self.run_date)) - .withColumn("model", lit(model.params.name)) - .withColumn("use_case", lit(self.conf["use_case_name"])) - .withColumn("model_uri", lit(model_uri)) - .write.mode("append") - .saveAsTable(self.conf.get("evaluation_output")) - ) - - res_df = ( - res_sdf.groupby(["metric_name"]) - .mean("metric_value") - .withColumnRenamed("avg(metric_value)", "metric_value") - .toPandas() - ) - - metric_name = None - metric_value = None - - for rec in res_df.values: - metric_name, metric_value = rec - if write: - mlflow.log_metric(metric_name, metric_value) - print(res_df) - return metric_value + return train_df, val_df + + def evaluate_score( + self, evaluating: bool = True, scoring: bool = True, ensemble: bool = True + ) -> str: + print("Starting evaluate_score") + if evaluating: + self.evaluate_models() + if scoring: + self.score_models() + if ensemble: + self.ensemble() + print("Finished evaluate_score") + return self.run_id def evaluate_models(self): + """ + Trains and evaluates all models from the active models list. + """ print("Starting evaluate_models") for model_name in self.model_registry.get_active_model_keys(): print(f"Started evaluating {model_name}") try: model_conf = self.model_registry.get_model_conf(model_name) - if model_conf["model_type"] == "global": - self.evaluate_global_model(model_conf) - elif model_conf["model_type"] == "local": + if model_conf["model_type"] == "local": self.evaluate_local_model(model_conf) + elif model_conf["model_type"] == "global": + self.evaluate_global_model(model_conf) elif model_conf["model_type"] == "foundation": self.evaluate_foundation_model(model_conf) except Exception as err: _logger.error( - f"Error has occurred while training model {model_name}: {repr(err)}", + f"Error has occurred while evaluating model {model_name}: {repr(err)}", exc_info=err, ) print(f"Finished evaluating {model_name}") @@ -387,12 +172,12 @@ def evaluate_local_model(self, model_conf): model = self.model_registry.get_model(model_conf["name"]) # Use Pandas UDF to forecast - evaluate_one_model_fn = functools.partial( - Forecaster.evaluate_one_model, model=model + evaluate_one_local_model_fn = functools.partial( + Forecaster.evaluate_one_local_model, model=model ) res_sdf = ( src_df.groupby(self.conf["group_id"]) - .applyInPandas(evaluate_one_model_fn, schema=output_schema) + .applyInPandas(evaluate_one_local_model_fn, schema=output_schema) ) if self.conf.get("evaluation_output", None) is not None: ( @@ -423,7 +208,7 @@ def evaluate_local_model(self, model_conf): mlflow.set_tag("run_id", self.run_id) @staticmethod - def evaluate_one_model( + def evaluate_one_local_model( pdf: pd.DataFrame, model: ForecastingRegressor ) -> pd.DataFrame: pdf[model.params["date_col"]] = pd.to_datetime(pdf[model.params["date_col"]]) @@ -436,7 +221,7 @@ def evaluate_one_model( pdf = pdf.fillna(0.1) # Fix here pdf[model.params["target"]] = pdf[model.params["target"]].clip(0.1) - metrics_df = model.backtest(pdf, start=split_date, group_id=group_id, retrain=False) + metrics_df = model.backtest(pdf, start=split_date, group_id=group_id) return metrics_df except Exception as err: _logger.error( @@ -458,61 +243,113 @@ def evaluate_one_model( ) def evaluate_global_model(self, model_conf): - mlflow_client = mlflow.tracking.MlflowClient() - hist_df, removed = self.prepare_data_for_global_model("evaluating") - train_df, val_df = self.split_df_train_val(hist_df) - model_name = model_conf["name"] - mlflow_model_name = f"{self.conf['model_output']}.{model_name}_{self.conf['use_case_name']}" - try: - champion = mlflow_client.get_model_version_by_alias(mlflow_model_name, "champion") - champion_version = champion.version - champion_run_id = f"runs:/{champion.run_id}/model" - champion_model = mlflow.sklearn.load_model( - f"models:/{mlflow_model_name}/{champion_version}" + with mlflow.start_run(experiment_id=self.experiment_id) as run: + model_name = model_conf["name"] + hist_df, removed = self.prepare_data_for_global_model("evaluating") + train_df, val_df = self.split_df_train_val(hist_df) + + # First, we train the model on the entire history (train_df, val_df) + # and then register this model as our final model in Unity Catalog + final_model = self.model_registry.get_model(model_name) + final_model.fit(pd.concat([train_df, val_df])) + input_example = train_df[train_df[self.conf['group_id']] == train_df[self.conf['group_id']] \ + .unique()[0]].sort_values(by=[self.conf['date_col']]) + input_schema = infer_signature(model_input=input_example).inputs + output_schema = Schema( + [ + ColSpec("integer", "index"), + ColSpec("string", self.conf['group_id']), + ColSpec("datetime", self.conf['date_col']), + ColSpec("float", self.conf['target']), + ] ) - champion_metrics = self.backtest_global_model( - model=champion_model, + signature = ModelSignature(inputs=input_schema, outputs=output_schema) + model_info = mlflow.sklearn.log_model( + final_model, + "model", + registered_model_name=f"{self.conf['model_output']}.{model_conf['name']}_{self.conf['use_case_name']}", + input_example=input_example, + signature=signature, + pip_requirements=[], + ) + mlflow.log_params(final_model.get_params()) + mlflow.set_tag("run_id", self.run_id) + mlflow.set_tag("model_name", final_model.params["name"]) + print(f"Model registered: {self.conf['model_output']}.{model_conf['name']}_{self.conf['use_case_name']}") + + # Next, we train the model only with train_df and run detailed backtesting + model = self.model_registry.get_model(model_name) + model.fit(pd.concat([train_df])) + metrics = self.backtest_global_model( + model=model, train_df=train_df, val_df=val_df, - model_uri=champion_run_id, - write=False, + model_uri=model_info.model_uri, # This model_uri is from the final model + write=True, ) - except: - print(f"No deployed model yet available for model: {mlflow_model_name}") - champion_model = None - - new_runs = mlflow_client.search_runs( - experiment_ids=[self.experiment_id], - filter_string=f"tags.candidate='true' and tags.model_name='{model_name}'", - order_by=["start_time DESC"], - max_results=10, + print(f"Finished training {model_conf.get('name')}") + + def backtest_global_model( + self, + model: ForecastingRegressor, + train_df: pd.DataFrame, + val_df: pd.DataFrame, + model_uri: str, + write: bool = True, + ): + res_pdf = ( + model.backtest( + pd.concat([train_df, val_df]), + start=train_df[self.conf["date_col"]].max(), + spark=self.spark, + #backtest_retrain=self.conf["backtest_retrain"], + )) + + group_id_dtype = IntegerType() \ + if train_df[self.conf["group_id"]].dtype == 'int' else StringType() + + schema = StructType( + [ + StructField(self.conf["group_id"], group_id_dtype), + StructField("backtest_window_start_date", DateType()), + StructField("metric_name", StringType()), + StructField("metric_value", DoubleType()), + StructField("forecast", ArrayType(DoubleType())), + StructField("actual", ArrayType(DoubleType())), + StructField("model_pickle", BinaryType()), + ] ) - if len(new_runs) == 0: - print( - f"No candidate models found for model {model_name}! Nothing to deploy! Exiting.." - ) - return - new_run = new_runs[0] - new_model_uri = f"runs:/{new_run.info.run_uuid}/model" - new_model = mlflow.sklearn.load_model(new_model_uri) - new_metrics = self.backtest_global_model( - model=new_model, - train_df=train_df, - val_df=val_df, - model_uri=new_model_uri, - write=False, + res_sdf = self.spark.createDataFrame(res_pdf, schema) + + if write: + if self.conf.get("evaluation_output", None): + ( + res_sdf.withColumn(self.conf["group_id"], col(self.conf["group_id"]).cast(StringType())) + .withColumn("run_id", lit(self.run_id)) + .withColumn("run_date", lit(self.run_date)) + .withColumn("model", lit(model.params.name)) + .withColumn("use_case", lit(self.conf["use_case_name"])) + .withColumn("model_uri", lit(model_uri)) + .write.mode("append") + .saveAsTable(self.conf.get("evaluation_output")) + ) + + res_df = ( + res_sdf.groupby(["metric_name"]) + .mean("metric_value") + .withColumnRenamed("avg(metric_value)", "metric_value") + .toPandas() ) - if (champion_model is None) or (new_metrics <= champion_metrics): - model_details = mlflow.register_model( - model_uri=new_model_uri, name=mlflow_model_name) - # wait_until_ready(model_details.name, model_details.version) - # TODO: Add description, version, metadata in general - mlflow_client.set_registered_model_alias( - mlflow_model_name, - "champion", - model_details.version) - print(f"Champion alias assigned to the new model") + metric_name = None + metric_value = None + + for rec in res_df.values: + metric_name, metric_value = rec + if write: + mlflow.log_metric(metric_name, metric_value) + print(res_df) + return metric_value def evaluate_foundation_model(self, model_conf): with mlflow.start_run(experiment_id=self.experiment_id) as run: @@ -571,10 +408,10 @@ def score_local_model(self, model_conf): ] ) model = self.model_registry.get_model(model_conf["name"]) - score_one_model_fn = functools.partial(Forecaster.score_one_model, model=model) + score_one_local_model_fn = functools.partial(Forecaster.score_one_local_model, model=model) res_sdf = ( src_df.groupby(self.conf["group_id"]) - .applyInPandas(score_one_model_fn, schema=output_schema) + .applyInPandas(score_one_local_model_fn, schema=output_schema) ) ( res_sdf.withColumn(self.conf["group_id"], col(self.conf["group_id"]).cast(StringType())) @@ -588,7 +425,7 @@ def score_local_model(self, model_conf): ) @staticmethod - def score_one_model( + def score_one_local_model( pdf: pd.DataFrame, model: ForecastingRegressor ) -> pd.DataFrame: pdf[model.params["date_col"]] = pd.to_datetime(pdf[model.params["date_col"]]) @@ -614,9 +451,9 @@ def score_one_model( def score_global_model(self, model_conf): print(f"Running scoring for {model_conf['name']}...") - champion_model, champion_model_uri = self.get_model_for_scoring(model_conf) + model, model_uri = self.get_model_for_scoring(model_conf) score_df, removed = self.prepare_data_for_global_model("scoring") - prediction_df, model_fitted = champion_model.forecast(score_df) + prediction_df, model_fitted = model.forecast(score_df) if prediction_df[self.conf["date_col"]].dtype.type != np.datetime64: prediction_df[self.conf["date_col"]] = prediction_df[ self.conf["date_col"] @@ -636,7 +473,7 @@ def score_global_model(self, model_conf): .withColumn("run_date", lit(self.run_date)) .withColumn("use_case", lit(self.conf["use_case_name"])) .withColumn("model_pickle", lit(b"")) - .withColumn("model_uri", lit(champion_model_uri)) + .withColumn("model_uri", lit(model_uri)) .write.mode("append") .saveAsTable(self.conf["scoring_output"]) ) @@ -661,29 +498,98 @@ def score_foundation_model(self, model_conf): ) def get_model_for_scoring(self, model_conf): - mlflow_client = MlflowClient() - if model_conf["trainable"]: - mlflow_model_name = f"{self.conf['model_output']}.{model_conf['name']}_{self.conf['use_case_name']}" - champion = mlflow_client.get_model_version_by_alias(mlflow_model_name, "champion") - champion_version = champion.version - champion_model_uri = f"runs:/{champion.run_id}/model" - champion_model = mlflow.sklearn.load_model( - f"models:/{mlflow_model_name}/{champion_version}" - ) - return champion_model, champion_model_uri + client = MlflowClient() + if model_conf.get("model_type", None) == "global": + registered_name = f"{self.conf['model_output']}.{model_conf['name']}_{self.conf['use_case_name']}" + model_info = Forecaster.get_latest_model_info(client, registered_name) + model_version = model_info.version + model_uri = f"runs:/{model_info.run_id}/model" + model = mlflow.sklearn.load_model(f"models:/{registered_name}/{model_version}") + return model, model_uri else: return self.model_registry.get_model(model_conf["name"]), None + @staticmethod + def get_latest_model_info(client, registered_name): + latest_version = 1 + for mv in client.search_model_versions(f"name='{registered_name}'"): + version_int = int(mv.version) + if version_int > latest_version: + latest_version = version_int + return client.get_model_version(registered_name, latest_version) -def flatten_nested_parameters(d): - out = {} - for key, val in d.items(): - if isinstance(val, dict): - val = [val] - if isinstance(val, list) and all(isinstance(item, dict) for item in val): - for subdict in val: - deeper = flatten_nested_parameters(subdict).items() - out.update({key + "_" + key2: val2 for key2, val2 in deeper}) - else: - out[key] = val - return out + def ensemble(self): + if self.conf.get("ensemble") and self.conf["ensemble_scoring_output"]: + metrics_df = ( + self.spark.table(self.conf["evaluation_output"]) + .where(col("run_id").eqNullSafe(lit(self.run_id))) + .where( + col("metric_name").eqNullSafe( + lit(self.conf.get("ensemble_metric", "smape")) + ) + ) + ) + models_df = ( + metrics_df.groupby(self.conf["group_id"], "model") + .agg( + avg("metric_value").alias("metric_avg"), + min("metric_value").alias("metric_min"), + max("metric_value").alias("metric_max"), + ) + .where( + col("metric_avg") < lit(self.conf.get("ensemble_metric_avg", 0.2)) + ) + .where( + col("metric_max") < lit(self.conf.get("ensemble_metric_max", 0.5)) + ) + .where(col("metric_min") > lit(0.01)) + ) + df = ( + self.spark.table(self.conf["scoring_output"]) + .where(col("run_id").eqNullSafe(lit(self.run_id))) + .join( + models_df.select(self.conf["group_id"], "model"), + on=[self.conf["group_id"], "model"], + ) + ) + + left = df.select( + self.conf["group_id"], "run_id", "run_date", "use_case", "model", + posexplode(self.conf["date_col"]) + ).withColumnRenamed('col', self.conf["date_col"]) + + right = df.select( + self.conf["group_id"], "run_id", "run_date", "use_case", "model", + posexplode(self.conf["target"]) + ).withColumnRenamed('col', self.conf["target"]) + + merged = left.join(right, [ + self.conf["group_id"], 'run_id', 'run_date', 'use_case', 'model', + 'pos'], 'inner').drop("pos") + + aggregated_df = merged.groupby( + self.conf["group_id"], self.conf["date_col"] + ).agg( + avg(self.conf["target"]).alias(self.conf["target"] + "_avg"), + min(self.conf["target"]).alias(self.conf["target"] + "_min"), + max(self.conf["target"]).alias(self.conf["target"] + "_max"), + ) + + aggregated_df = aggregated_df.orderBy(self.conf["group_id"], self.conf["date_col"])\ + .groupBy(self.conf["group_id"]).agg( + collect_list(self.conf["date_col"]).alias(self.conf["date_col"]), + collect_list(self.conf["target"] + "_avg").alias(self.conf["target"] + "_avg"), + collect_list(self.conf["target"] + "_min").alias(self.conf["target"] + "_min"), + collect_list(self.conf["target"] + "_max").alias(self.conf["target"] + "_max") + ) + + ( + aggregated_df.withColumn(self.conf["group_id"], col(self.conf["group_id"]).cast(StringType())) + .withColumn("run_id", lit(self.run_id)) + .withColumn("run_date", lit(self.run_date)) + .withColumn("use_case", lit(self.conf["use_case_name"])) + .withColumn("model", lit("ensemble")) + .write.format("delta") + .mode("append") + .saveAsTable(self.conf["ensemble_scoring_output"]) + ) diff --git a/mmf_sa/__init__.py b/mmf_sa/__init__.py index 29ba20f..50d1f57 100644 --- a/mmf_sa/__init__.py +++ b/mmf_sa/__init__.py @@ -77,6 +77,7 @@ def run_forecast( _conf["metric"] = metric _conf["resample"] = resample + run_evaluation = True run_scoring = False if scoring_data is not None and scoring_output is not None: run_scoring = True @@ -85,6 +86,7 @@ def run_forecast( _data_conf["scoring_data"] = scoring_data else: _conf["scoring_data"] = scoring_data + run_ensemble = True if use_case_name is not None: _conf["use_case_name"] = use_case_name @@ -126,7 +128,11 @@ def run_forecast( _conf["dynamic_historical"] = dynamic_historical f = Forecaster(conf=_conf, data_conf=_data_conf, spark=spark) - run_id = f.train_eval_score(scoring=run_scoring) + run_id = f.evaluate_score( + evaluating=run_evaluation, + scoring=run_scoring, + ensemble=run_ensemble, + ) return run_id diff --git a/mmf_sa/models/abstract_model.py b/mmf_sa/models/abstract_model.py index c22f2e2..603cfda 100644 --- a/mmf_sa/models/abstract_model.py +++ b/mmf_sa/models/abstract_model.py @@ -3,7 +3,6 @@ import pandas as pd import cloudpickle from typing import Dict, Union -from transformers import pipeline from sklearn.base import BaseEstimator, RegressorMixin from sktime.performance_metrics.forecasting import mean_absolute_percentage_error import mlflow @@ -47,7 +46,7 @@ def backtest( start: pd.Timestamp, group_id: Union[str, int] = None, stride: int = None, - retrain: bool = True, + backtest_retrain: bool = False, spark=None, ) -> pd.DataFrame: if stride is None: @@ -74,7 +73,8 @@ def backtest( < np.datetime64(curr_date + self.prediction_length_offset) )] - if retrain: + # backtest_retrain for global models is currently not supported + if backtest_retrain and self.params["model_type"] == "global": self.fit(_df) metrics = self.calculate_metrics(_df, actuals_df, curr_date, spark) diff --git a/mmf_sa/models/models_conf.yaml b/mmf_sa/models/models_conf.yaml index ddc961f..5a9e3f9 100644 --- a/mmf_sa/models/models_conf.yaml +++ b/mmf_sa/models/models_conf.yaml @@ -20,7 +20,6 @@ models: model_class: StatsFcBaselineWindowAverage framework: StatsForecast model_type: local - trainable: false model_spec: window_size: 7 @@ -29,7 +28,6 @@ models: model_class: StatsFcBaselineSeasonalWindowAverage framework: StatsForecast model_type: local - trainable: false model_spec: season_length: 7 window_size: 7 @@ -39,14 +37,12 @@ models: model_class: StatsFcBaselineNaive framework: StatsForecast model_type: local - trainable: false StatsForecastBaselineSeasonalNaive: module: mmf_sa.models.statsforecast.StatsFcForecastingPipeline model_class: StatsFcBaselineSeasonalNaive framework: StatsForecast model_type: local - trainable: false model_spec: season_length: 7 @@ -55,7 +51,6 @@ models: model_class: StatsFcAutoArima framework: StatsForecast model_type: local - trainable: false model_spec: season_length: approximation: @@ -66,7 +61,6 @@ models: model_class: StatsFcAutoETS framework: StatsForecast model_type: local - trainable: false model_spec: season_length: model_type: "ZZZ" @@ -76,7 +70,6 @@ models: model_class: StatsFcAutoCES framework: StatsForecast model_type: local - trainable: false model_spec: season_length: 1 model_type: "Z" @@ -86,7 +79,6 @@ models: model_class: StatsFcAutoTheta framework: StatsForecast model_type: local - trainable: false model_spec: season_length: 1 decomposition_type: "multiplicative" @@ -96,7 +88,6 @@ models: model_class: StatsFcTSB framework: StatsForecast model_type: local - trainable: false model_spec: alpha_d: 0.2 alpha_p: 0.2 @@ -106,7 +97,6 @@ models: model_class: StatsFcADIDA framework: StatsForecast model_type: local - trainable: false model_spec: StatsForecastIMAPA: @@ -114,7 +104,6 @@ models: model_class: StatsFcIMAPA framework: StatsForecast model_type: local - trainable: false model_spec: StatsForecastCrostonClassic: @@ -122,7 +111,6 @@ models: model_class: StatsFcCrostonClassic framework: StatsForecast model_type: local - trainable: false model_spec: StatsForecastCrostonOptimized: @@ -130,7 +118,6 @@ models: model_class: StatsFcCrostonOptimized framework: StatsForecast model_type: local - trainable: false model_spec: StatsForecastCrostonSBA: @@ -138,7 +125,6 @@ models: model_class: StatsFcCrostonSBA framework: StatsForecast model_type: local - trainable: false model_spec: RFableArima: @@ -146,7 +132,6 @@ models: model_class: RFableModel framework: RFable model_type: local - trainable: false model_spec: season_length: @@ -155,7 +140,6 @@ models: model_class: RFableModel framework: RFable model_type: local - trainable: false model_spec: season_length: @@ -164,7 +148,6 @@ models: model_class: RFableModel framework: RFable model_type: local - trainable: false model_spec: season_length: @@ -173,14 +156,12 @@ models: model_class: RFableModel framework: RFable model_type: local - trainable: false RDynamicHarmonicRegression: module: mmf_sa.models.r_fable.RFableForecastingPipeline model_class: RFableModel framework: RFable model_type: local - trainable: false model_spec: fourier_terms: @@ -189,7 +170,6 @@ models: model_class: SKTimeLgbmDsDt framework: SKTime model_type: local - trainable: false enable_gcv: false model_spec: deseasonalise_model: multiplicative @@ -201,7 +181,6 @@ models: model_class: SKTimeTBats framework: SKTime model_type: local - trainable: false enable_gcv: false model_spec: box_cox: True @@ -213,7 +192,6 @@ models: model_class: NeuralFcRNN framework: NeuralForecast model_type: global - trainable: true max_steps: 200 input_size_factor: 2 loss: smape @@ -232,7 +210,6 @@ models: model_class: NeuralFcLSTM framework: NeuralForecast model_type: global - trainable: true max_steps: 200 input_size_factor: 2 loss: smape @@ -250,7 +227,6 @@ models: model_class: NeuralFcNBEATSx framework: NeuralForecast model_type: global - trainable: true max_steps: 100 input_size_factor: 2 loss: smape @@ -265,7 +241,6 @@ models: model_class: NeuralFcNHITS framework: NeuralForecast model_type: global - trainable: true max_steps: 200 input_size_factor: 2 loss: smape @@ -284,7 +259,6 @@ models: model_class: NeuralFcAutoRNN framework: NeuralForecast model_type: global - trainable: true max_steps: 200 num_samples: 20 loss: smape @@ -298,7 +272,6 @@ models: model_class: NeuralFcAutoLSTM framework: NeuralForecast model_type: global - trainable: true max_steps: 200 num_samples: 20 loss: smape @@ -312,7 +285,6 @@ models: model_class: NeuralFcAutoNBEATSx framework: NeuralForecast model_type: global - trainable: true max_steps: 200 num_samples: 20 loss: smape @@ -324,7 +296,6 @@ models: model_class: NeuralFcAutoNHITS framework: NeuralForecast model_type: global - trainable: true max_steps: 200 num_samples: 20 loss: smape @@ -338,7 +309,6 @@ models: model_class: NeuralFcAutoTiDE framework: NeuralForecast model_type: global - trainable: true max_steps: 200 num_samples: 20 loss: smape @@ -357,7 +327,6 @@ models: model_class: NeuralFcAutoPatchTST framework: NeuralForecast model_type: global - trainable: true max_steps: 200 num_samples: 20 loss: smape @@ -373,7 +342,6 @@ models: model_class: ChronosT5Tiny framework: Chronos model_type: foundation - trainable: false num_samples: 20 ChronosT5Mini: @@ -381,7 +349,6 @@ models: model_class: ChronosT5Mini framework: Chronos model_type: foundation - trainable: false num_samples: 20 ChronosT5Small: @@ -389,7 +356,6 @@ models: model_class: ChronosT5Small framework: Chronos model_type: foundation - trainable: false num_samples: 20 ChronosT5Base: @@ -397,7 +363,6 @@ models: model_class: ChronosT5Base framework: Chronos model_type: foundation - trainable: false num_samples: 20 ChronosT5Large: @@ -405,7 +370,6 @@ models: model_class: ChronosT5Large framework: Chronos model_type: foundation - trainable: false num_samples: 20 Moment1Large: @@ -413,7 +377,6 @@ models: model_class: Moment1Large framework: Moment model_type: foundation - trainable: false num_samples: 20 MoiraiSmall: @@ -421,7 +384,6 @@ models: model_class: MoiraiSmall framework: Moirai model_type: foundation - trainable: false num_samples: 100 patch_size: 32 @@ -430,7 +392,6 @@ models: model_class: MoiraiBase framework: Moirai model_type: foundation - trainable: false num_samples: 100 patch_size: 32 @@ -439,6 +400,5 @@ models: model_class: MoiraiLarge framework: Moirai model_type: foundation - trainable: false num_samples: 100 patch_size: 32 \ No newline at end of file diff --git a/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py b/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py index 4933f58..19c1e9b 100644 --- a/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py +++ b/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py @@ -584,7 +584,7 @@ def config(trial): 'dropout', list(self.params.dropout)), layernorm=trial.suggest_categorical( 'layernorm', list(self.params.layernorm)), - #**self.exogs, + #**self.exogs, #exogenous regressors not yet supported **self.distributed_kwargs, ) self.model = NeuralForecast( @@ -638,7 +638,7 @@ def config(trial): 'scaler_type', list(self.params.scaler_type)), revin=trial.suggest_categorical( 'revin', list(self.params.revin)), - #**self.exogs, + #**self.exogs, #exogenous regressors not yet supported **self.distributed_kwargs, ) self.model = NeuralForecast(