diff --git a/.github/workflows/integration-test-gcp-pr.yml b/.github/workflows/integration-test-gcp-pr.yml deleted file mode 100644 index 99ef6b7..0000000 --- a/.github/workflows/integration-test-gcp-pr.yml +++ /dev/null @@ -1,45 +0,0 @@ -name: GCP integration test PR - -on: - pull_request: - -jobs: - run-databricks-notebook: - runs-on: ubuntu-latest - steps: - - name: Checkout repo - uses: actions/checkout@v2 - - name: Run a databricks notebook - uses: databricks/run-notebook@v0 - with: - local-notebook-path: RUNME.py - git-commit: ${{ github.event.pull_request.head.sha }} - databricks-host: https://416411475796958.8.gcp.databricks.com - databricks-token: ${{ secrets.DEPLOYMENT_TARGET_TOKEN_GCP }} - new-cluster-json: > - { - "num_workers": 0, - "spark_version": "14.3.x-cpu-ml-scala2.12", - "node_type_id": "n1-highmem-4", - "gcp_attributes": { - "availability": "ON_DEMAND_GCP" - }, - "spark_conf": { - "spark.master": "local[*, 4]", - "spark.databricks.cluster.profile": "singleNode" - }, - "custom_tags": { - "ResourceClass": "SingleNode" - } - } - notebook-params-json: > - { - "run_job": "True" - } - access-control-list-json: > - [ - { - "group_name": "users", - "permission_level": "CAN_VIEW" - } - ] \ No newline at end of file diff --git a/.github/workflows/integration-test-gcp-push.yml b/.github/workflows/integration-test-gcp-push.yml deleted file mode 100644 index a379991..0000000 --- a/.github/workflows/integration-test-gcp-push.yml +++ /dev/null @@ -1,49 +0,0 @@ -name: GCP integration test push - -on: - workflow_dispatch: - push: - branches: - - main - - web-sync - -jobs: - run-databricks-notebook: - runs-on: ubuntu-latest - steps: - - name: Checkout repo - uses: actions/checkout@v2 - - name: Run a databricks notebook - uses: databricks/run-notebook@v0 - with: - local-notebook-path: RUNME.py - git-commit: ${{ github.sha }} - databricks-host: https://416411475796958.8.gcp.databricks.com - databricks-token: ${{ secrets.DEPLOYMENT_TARGET_TOKEN_GCP }} - new-cluster-json: > - { - "num_workers": 0, - "spark_version": "14.3.x-cpu-ml-scala2.12", - "node_type_id": "n1-highmem-4", - "gcp_attributes": { - "availability": "ON_DEMAND_GCP" - }, - "spark_conf": { - "spark.master": "local[*, 4]", - "spark.databricks.cluster.profile": "singleNode" - }, - "custom_tags": { - "ResourceClass": "SingleNode" - } - } - notebook-params-json: > - { - "run_job": "True" - } - access-control-list-json: > - [ - { - "group_name": "users", - "permission_level": "CAN_VIEW" - } - ] \ No newline at end of file diff --git a/mmf_sa/Forecaster.py b/mmf_sa/Forecaster.py index b3804d6..26de1f3 100644 --- a/mmf_sa/Forecaster.py +++ b/mmf_sa/Forecaster.py @@ -88,7 +88,7 @@ def resolve_source(self, key: str) -> DataFrame: else: return self.spark.read.table(self.conf[key]) - def prepare_data_for_global_model(self, mode: str): + def prepare_data_for_global_model(self, mode: str = None): src_df = self.resolve_source("train_data") src_df, removed = DataQualityChecks(src_df, self.conf, self.spark).run() if (mode == "scoring") \ @@ -284,7 +284,7 @@ def evaluate_global_model(self, model_conf): model=model, train_df=train_df, val_df=val_df, - model_uri=model_info.model_uri, # This model_uri is from the final model + model_uri=model_info.model_uri, # This model_uri is from the final model write=True, ) print(f"Finished training {model_conf.get('name')}") @@ -360,17 +360,15 @@ def evaluate_foundation_model(self, model_conf): ) hist_df, removed = self.prepare_data_for_global_model("evaluating") # Reuse the same as global train_df, val_df = self.split_df_train_val(hist_df) + model_uri = f"runs:/{run.info.run_id}/model" metrics = self.backtest_global_model( # Reuse the same as global model=model, train_df=train_df, val_df=val_df, - model_uri="", + model_uri=model_uri, write=True, ) - mlflow.log_metric(self.conf["metric"], metrics) - mlflow.set_tag("action", "evaluate") - mlflow.set_tag("candidate", "true") mlflow.set_tag("model_name", model.params["name"]) mlflow.set_tag("run_id", self.run_id) mlflow.log_params(model.get_params()) @@ -485,8 +483,9 @@ def score_global_model(self, model_conf): def score_foundation_model(self, model_conf): print(f"Running scoring for {model_conf['name']}...") model_name = model_conf["name"] + _, model_uri = self.get_model_for_scoring(model_conf) model = self.model_registry.get_model(model_name) - hist_df, removed = self.prepare_data_for_global_model("evaluating") + hist_df, removed = self.prepare_data_for_global_model() prediction_df, model_pretrained = model.forecast(hist_df, spark=self.spark) sdf = self.spark.createDataFrame(prediction_df).drop('index') ( @@ -496,20 +495,22 @@ def score_foundation_model(self, model_conf): .withColumn("run_date", lit(self.run_date)) .withColumn("use_case", lit(self.conf["use_case_name"])) .withColumn("model_pickle", lit(b"")) - .withColumn("model_uri", lit("")) + .withColumn("model_uri", lit(model_uri)) .write.mode("append") .saveAsTable(self.conf["scoring_output"]) ) def get_model_for_scoring(self, model_conf): client = MlflowClient() + registered_name = f"{self.conf['model_output']}.{model_conf['name']}_{self.conf['use_case_name']}" + model_info = self.get_latest_model_info(client, registered_name) + model_version = model_info.version + model_uri = f"runs:/{model_info.run_id}/model" if model_conf.get("model_type", None) == "global": - registered_name = f"{self.conf['model_output']}.{model_conf['name']}_{self.conf['use_case_name']}" - model_info = Forecaster.get_latest_model_info(client, registered_name) - model_version = model_info.version - model_uri = f"runs:/{model_info.run_id}/model" model = mlflow.sklearn.load_model(f"models:/{registered_name}/{model_version}") return model, model_uri + elif model_conf.get("model_type", None) == "foundation": + return None, model_uri else: return self.model_registry.get_model(model_conf["name"]), None diff --git a/mmf_sa/models/abstract_model.py b/mmf_sa/models/abstract_model.py index d422662..af03175 100644 --- a/mmf_sa/models/abstract_model.py +++ b/mmf_sa/models/abstract_model.py @@ -36,7 +36,7 @@ def predict(self, x, y=None): pass @abstractmethod - def forecast(self, x): + def forecast(self, x, spark=None): # TODO Shouldn't X be optional if we have a trainable model and provide a prediction length pass diff --git a/mmf_sa/models/chronosforecast/ChronosPipeline.py b/mmf_sa/models/chronosforecast/ChronosPipeline.py index c9c33cb..4c55f72 100644 --- a/mmf_sa/models/chronosforecast/ChronosPipeline.py +++ b/mmf_sa/models/chronosforecast/ChronosPipeline.py @@ -45,6 +45,7 @@ def register(self, registered_model_name: str): pip_requirements=[ "git+https://github.com/amazon-science/chronos-forecasting.git", "git+https://github.com/databricks-industry-solutions/many-model-forecasting.git", + "pyspark==3.5.0", ], ) diff --git a/notebooks/demo_foundation_daily.py b/notebooks/demo_foundation_daily.py index 0471684..3ec24d5 100644 --- a/notebooks/demo_foundation_daily.py +++ b/notebooks/demo_foundation_daily.py @@ -149,12 +149,12 @@ def transform_group(df): # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.daily_evaluation_output +# MAGIC %sql delete from solacc_uc.mmf.daily_evaluation_output # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.daily_scoring_output +# MAGIC %sql delete from solacc_uc.mmf.daily_scoring_output # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.daily_ensemble_output +# MAGIC %sql delete from solacc_uc.mmf.daily_ensemble_output diff --git a/notebooks/demo_foundation_monthly.py b/notebooks/demo_foundation_monthly.py index 7a55203..852b5d8 100644 --- a/notebooks/demo_foundation_monthly.py +++ b/notebooks/demo_foundation_monthly.py @@ -159,12 +159,12 @@ def transform_group(df): # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.monthly_evaluation_output +# MAGIC %sql delete from solacc_uc.mmf.monthly_evaluation_output # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.monthly_scoring_output +# MAGIC %sql delete from solacc_uc.mmf.monthly_scoring_output # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.monthly_ensemble_output +# MAGIC %sql delete from solacc_uc.mmf.monthly_ensemble_output diff --git a/notebooks/demo_global_daily.py b/notebooks/demo_global_daily.py index 259aafc..e426203 100644 --- a/notebooks/demo_global_daily.py +++ b/notebooks/demo_global_daily.py @@ -150,12 +150,12 @@ def transform_group(df): # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.daily_evaluation_output +# MAGIC %sql delete from solacc_uc.mmf.daily_evaluation_output # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.daily_scoring_output +# MAGIC %sql delete from solacc_uc.mmf.daily_scoring_output # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.daily_ensemble_output +# MAGIC %sql delete from solacc_uc.mmf.daily_ensemble_output diff --git a/notebooks/demo_global_external_regressors_daily.py b/notebooks/demo_global_external_regressors_daily.py index ebb59fc..c5c9c28 100644 --- a/notebooks/demo_global_external_regressors_daily.py +++ b/notebooks/demo_global_external_regressors_daily.py @@ -109,12 +109,12 @@ # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.rossmann_daily_evaluation_output +# MAGIC %sql delete from solacc_uc.mmf.rossmann_daily_evaluation_output # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.rossmann_daily_scoring_output +# MAGIC %sql delete from solacc_uc.mmf.rossmann_daily_scoring_output # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.rossmann_daily_ensemble_output +# MAGIC %sql delete from solacc_uc.mmf.rossmann_daily_ensemble_output diff --git a/notebooks/demo_global_monthly.py b/notebooks/demo_global_monthly.py index 812f412..beabe0a 100644 --- a/notebooks/demo_global_monthly.py +++ b/notebooks/demo_global_monthly.py @@ -160,12 +160,12 @@ def transform_group(df): # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.monthly_evaluation_output +# MAGIC %sql delete from solacc_uc.mmf.monthly_evaluation_output # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.monthly_scoring_output +# MAGIC %sql delete from solacc_uc.mmf.monthly_scoring_output # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.monthly_ensemble_output +# MAGIC %sql delete from solacc_uc.mmf.monthly_ensemble_output diff --git a/notebooks/demo_local_univariate_daily.py b/notebooks/demo_local_univariate_daily.py index 16d2c8f..56b3139 100644 --- a/notebooks/demo_local_univariate_daily.py +++ b/notebooks/demo_local_univariate_daily.py @@ -180,12 +180,12 @@ def transform_group(df): # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.daily_evaluation_output +# MAGIC %sql delete from solacc_uc.mmf.daily_evaluation_output # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.daily_scoring_output +# MAGIC %sql delete from solacc_uc.mmf.daily_scoring_output # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.daily_ensemble_output +# MAGIC %sql delete from solacc_uc.mmf.daily_ensemble_output diff --git a/notebooks/demo_local_univariate_external_regressors_daily.py b/notebooks/demo_local_univariate_external_regressors_daily.py index 4a9536e..56f794d 100644 --- a/notebooks/demo_local_univariate_external_regressors_daily.py +++ b/notebooks/demo_local_univariate_external_regressors_daily.py @@ -131,12 +131,12 @@ # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.rossmann_daily_evaluation_output +# MAGIC %sql delete from solacc_uc.mmf.rossmann_daily_evaluation_output # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.rossmann_daily_scoring_output +# MAGIC %sql delete from solacc_uc.mmf.rossmann_daily_scoring_output # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.rossmann_daily_ensemble_output +# MAGIC %sql delete from solacc_uc.mmf.rossmann_daily_ensemble_output diff --git a/notebooks/demo_local_univariate_monthly.py b/notebooks/demo_local_univariate_monthly.py index 6322ef2..9a3932a 100644 --- a/notebooks/demo_local_univariate_monthly.py +++ b/notebooks/demo_local_univariate_monthly.py @@ -190,12 +190,12 @@ def transform_group(df): # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.monthly_evaluation_output +# MAGIC %sql delete from solacc_uc.mmf.monthly_evaluation_output # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.monthly_scoring_output +# MAGIC %sql delete from solacc_uc.mmf.monthly_scoring_output # COMMAND ---------- -# MAGIC #%sql delete from solacc_uc.mmf.monthly_ensemble_output +# MAGIC %sql delete from solacc_uc.mmf.monthly_ensemble_output diff --git a/pyproject.toml b/pyproject.toml index 7b95a3f..e589b79 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,6 +2,7 @@ name = "mmf_sa" version = "0.0.1" dependencies = [ + #"rpy2==3.5.16", # causes issue when deploying the model to Model Serving "kaleido==0.2.1", "Jinja2", "omegaconf==2.3.0",