Skip to content

Commit

Permalink
Added notebook markdowns
Browse files Browse the repository at this point in the history
  • Loading branch information
ryuta-yoshimatsu committed Jun 7, 2024
1 parent ddfe283 commit 8ec896c
Show file tree
Hide file tree
Showing 213 changed files with 8,445 additions and 264 deletions.
48 changes: 24 additions & 24 deletions README.md

Large diffs are not rendered by default.

15 changes: 8 additions & 7 deletions examples/foundation-model-examples/chronos-example.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

# MAGIC %md
# MAGIC ## Prepare Data
# MAGIC Make sure that the catalog and the schema already exist.

# COMMAND ----------

Expand All @@ -23,16 +24,10 @@
# COMMAND ----------

# This cell will create tables: {catalog}.{db}.m4_daily_train, {catalog}.{db}.m4_monthly_train, {catalog}.{db}.rossmann_daily_train, {catalog}.{db}.rossmann_daily_test

dbutils.notebook.run("data_preparation", timeout_seconds=0, arguments={"catalog": catalog, "db": db, "n": n})

# COMMAND ----------

# MAGIC %md
# MAGIC

# COMMAND ----------

from pyspark.sql.functions import collect_list

# Make sure that the data exists
Expand All @@ -44,6 +39,7 @@

# MAGIC %md
# MAGIC ## Distribute Inference
# MAGIC We use [Pandas UDF](https://docs.databricks.com/en/udf/pandas.html#iterator-of-series-to-iterator-of-series-udf) to distribute the inference.

# COMMAND ----------

Expand Down Expand Up @@ -184,6 +180,11 @@ def predict(self, context, input_data, params=None):

# COMMAND ----------

# MAGIC %md
# MAGIC ##Reload Model

# COMMAND ----------

from mlflow import MlflowClient
client = MlflowClient()

Expand All @@ -210,7 +211,7 @@ def get_latest_model_version(client, registered_model_name):
# COMMAND ----------

# MAGIC %md
# MAGIC ## Deploy Model for Online Forecast
# MAGIC ## Deploy Model on Databricks Model Serving

# COMMAND ----------

Expand Down
9 changes: 8 additions & 1 deletion examples/foundation-model-examples/moirai-example.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

# MAGIC %md
# MAGIC ## Prepare Data
# MAGIC Make sure that the catalog and the schema already exist.

# COMMAND ----------

Expand All @@ -39,6 +40,7 @@

# MAGIC %md
# MAGIC ## Distribute Inference
# MAGIC We use [Pandas UDF](https://docs.databricks.com/en/udf/pandas.html#iterator-of-series-to-iterator-of-series-udf) to distribute the inference.

# COMMAND ----------

Expand Down Expand Up @@ -210,6 +212,11 @@ def predict(self, context, input_data, params=None):

# COMMAND ----------

# MAGIC %md
# MAGIC ##Reload Model

# COMMAND ----------

from mlflow import MlflowClient
mlflow_client = MlflowClient()

Expand All @@ -235,7 +242,7 @@ def get_latest_model_version(mlflow_client, registered_model_name):
# COMMAND ----------

# MAGIC %md
# MAGIC ## Deploy Model for Online Forecast
# MAGIC ## Deploy Model on Databricks Model Serving

# COMMAND ----------

Expand Down
11 changes: 9 additions & 2 deletions examples/foundation-model-examples/moment-example.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

# MAGIC %md
# MAGIC ## Prepare Data
# MAGIC Make sure that the catalog and the schema already exist.

# COMMAND ----------

Expand All @@ -38,7 +39,8 @@
# COMMAND ----------

# MAGIC %md
# MAGIC ## Distributed Inference
# MAGIC ## Distribute Inference
# MAGIC We use [Pandas UDF](https://docs.databricks.com/en/udf/pandas.html#iterator-of-series-to-iterator-of-series-udf) to distribute the inference.

# COMMAND ----------

Expand Down Expand Up @@ -193,6 +195,11 @@ def predict(self, context, input_data, params=None):

# COMMAND ----------

# MAGIC %md
# MAGIC ##Reload Model

# COMMAND ----------

from mlflow import MlflowClient
mlflow_client = MlflowClient()

Expand All @@ -218,7 +225,7 @@ def get_latest_model_version(mlflow_client, registered_model_name):
# COMMAND ----------

# MAGIC %md
# MAGIC ## Deploy Model for Online Forecast
# MAGIC ## Deploy Model on Databricks Model Serving

# COMMAND ----------

Expand Down
12 changes: 9 additions & 3 deletions examples/foundation-model-examples/timesfm-example.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# MAGIC
# MAGIC The notebook loads the model, distributes the inference, registers the model, deploys the model and makes online forecasts.
# MAGIC
# MAGIC As of today (June 5, 2024), TimesFM supports python version below [3.10](https://github.com/google-research/timesfm/issues/60). So make sure your cluster is below DBR ML 14.3.
# MAGIC As of June 5, 2024, TimesFM supports python version below [3.10](https://github.com/google-research/timesfm/issues/60). So make sure your cluster is below DBR ML 14.3.

# COMMAND ----------

Expand All @@ -24,6 +24,7 @@

# MAGIC %md
# MAGIC ## Prepare Data
# MAGIC Make sure that the catalog and the schema already exist.

# COMMAND ----------

Expand Down Expand Up @@ -51,7 +52,7 @@
# COMMAND ----------

# MAGIC %md
# MAGIC See the [github repository](https://github.com/google-research/timesfm/tree/master?tab=readme-ov-file#initialize-the-model-and-load-a-checkpoint) of TimesFM for detailed description of the input parameters.
# MAGIC Distribution of the inference is managed by TimesFM so we don't need to use Pandas UDF. See the [github repository](https://github.com/google-research/timesfm/tree/master?tab=readme-ov-file#initialize-the-model-and-load-a-checkpoint) of TimesFM for detailed description of the input parameters.

# COMMAND ----------

Expand Down Expand Up @@ -166,6 +167,11 @@ def __setstate__(self, state):

# COMMAND ----------

# MAGIC %md
# MAGIC ##Reload Model

# COMMAND ----------

from mlflow import MlflowClient
client = MlflowClient()

Expand All @@ -189,7 +195,7 @@ def get_latest_model_version(client, registered_model_name):
# COMMAND ----------

# MAGIC %md
# MAGIC ## Deploy Model for Online Forecast
# MAGIC ## Deploy Model on Databricks Model Serving

# COMMAND ----------

Expand Down
113 changes: 77 additions & 36 deletions examples/foundation_daily.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,46 @@
# Databricks notebook source
# MAGIC %md
# MAGIC # Many Models Forecasting SA (MMFSA) Demo
# MAGIC This demo highlights how to configure MMF SA to use M4 competition data
# MAGIC # Many Models Forecasting Demo
# MAGIC This notebook showcases how to run MMF with foundation models on multiple time series of daily resolution. We will use [M4 competition](https://www.sciencedirect.com/science/article/pii/S0169207019301128#sec5) data.

# COMMAND ----------

# MAGIC %pip install -r ../requirements.txt --quiet
# MAGIC dbutils.library.restartPython()
# MAGIC %md
# MAGIC ### Cluster setup
# MAGIC
# MAGIC We recommend using a cluster with [Databricks Runtime 14.3 LTS for ML](https://docs.databricks.com/en/release-notes/runtime/14.3lts-ml.html) or above. The cluster should be single-node with one or more GPU instances: e.g. [g4dn.12xlarge [T4]](https://aws.amazon.com/ec2/instance-types/g4/) on AWS or [Standard_NC64as_T4_v3](https://learn.microsoft.com/en-us/azure/virtual-machines/nct4-v3-series) on Azure. MMF leverages [Pandas UDF](https://docs.databricks.com/en/udf/pandas.html) for distributing the inference tasks and utilizing all the available resource.

# COMMAND ----------

# MAGIC %md
# MAGIC ### Data preparation steps
# MAGIC We are using `datasetsforecast` package to download M4 data.
# MAGIC
# MAGIC M4 dataset contains a set of time series which we use for testing of MMF SA.
# MAGIC
# MAGIC Below we have developed a number of functions to convert M4 time series to the expected format.
# MAGIC ### Install and import packages
# MAGIC Check out [requirements.txt](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/requirements.txt) if you're interested in the libraries we use. For foundation models, additional dependencies are installed and imported as per demand. See how this is done in `install` function defined in each model pipeline script: e.g. [mmf_sa/models/chronosforecast/ChronosPipeline.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/chronosforecast/ChronosPipeline.py).

# COMMAND ----------

# MAGIC %pip install -r ../requirements.txt --quiet
# MAGIC dbutils.library.restartPython()

# COMMAND ----------

import pathlib
import pandas as pd
import logging
logger = spark._jvm.org.apache.log4j
logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR)
logging.getLogger("py4j.clientserver").setLevel(logging.ERROR)
from datasetsforecast.m4 import M4
import uuid

# COMMAND ----------

# Make sure that the catalog and the schema exist
catalog = "solacc_uc" # Name of the catalog we use to manage our assets
db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets)
import uuid
import pathlib
import pandas as pd
from datasetsforecast.m4 import M4
from mmf_sa import run_forecast

_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}")
# COMMAND ----------

# MAGIC %md
# MAGIC ### Prepare data
# MAGIC We are using [`datasetsforecast`](https://github.com/Nixtla/datasetsforecast/tree/main/) package to download M4 data. M4 dataset contains a set of time series which we use for testing MMF. Below we have written a number of custome functions to convert M4 time series to an expected format.

# COMMAND ----------

Expand Down Expand Up @@ -68,6 +72,20 @@ def transform_group(df):
return res_df


# COMMAND ----------

# MAGIC %md
# MAGIC We are going to save this data in a delta lake table. Provide catalog and database names where you want to store the data.

# COMMAND ----------

catalog = "solacc_uc" # Name of the catalog we use to manage our assets
db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets)

# Making sure that the catalog and the schema exist
_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}")

(
spark.createDataFrame(create_m4_daily())
.write.format("delta").mode("overwrite")
Expand All @@ -76,15 +94,20 @@ def transform_group(df):

# COMMAND ----------

# MAGIC %md ### Now the dataset looks in the following way:
# MAGIC %md Let's take a peak at the dataset:

# COMMAND ----------

# MAGIC %sql select * from solacc_uc.mmf.m4_daily_train where unique_id in ('D1', 'D2', 'D6', 'D7', 'D10') order by unique_id, ds
display(
spark.sql(f"select * from {catalog}.{db}.m4_daily_train where unique_id in ('D1', 'D2', 'D3', 'D4', 'D5') order by unique_id, ds")
)

# COMMAND ----------

# MAGIC %md ### Let's configure the list of models we are going to use for training:
# MAGIC %md ### Models
# MAGIC Let's configure a list of models we are going to apply to our time series for evaluation and forecasting. A comprehensive list of all supported models is available in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml). Look for the models where `model_type: foundation`; these are the foundation models we import from [chronos](https://github.com/amazon-science/chronos-forecasting/tree/main), [uni2ts](https://github.com/SalesforceAIResearch/uni2ts) and [moment](https://github.com/moment-timeseries-foundation-model/moment). Check their documentation for the detailed description of each model.
# MAGIC
# MAGIC Foundation time series models are pretrained on millions or billions of time series. These models can produce analysis (i.e. forecasting, anomaly detection, classfication) on an unforeseen time series without training or tuning. You can modify the hyperparameters in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml) or overwrite the default values in [mmf_sa/forecasting_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/forecasting_conf.yaml).

# COMMAND ----------

Expand All @@ -102,15 +125,19 @@ def transform_group(df):

# COMMAND ----------

# MAGIC %md ### Now we can run the forecasting process using `run_forecast` function.

# COMMAND ----------

# MAGIC %md
# MAGIC We have to loop through the model in the following way else cuda will throw an error.
# MAGIC %md ### Run MMF
# MAGIC
# MAGIC Now, we can run the evaluation and forecasting using `run_forecast` function defined in [mmf_sa/models/__init__.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/__init__.py). Refer to [README.md](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/README.md#parameters-description) for a comprehensive description of each parameter.
# MAGIC
# MAGIC While the following cell is running, you can check the status of your run on Experiments. Make sure you look for the experiments with the path you provided as `experiment_path` within `run_forecast`. On the Experiments page, you see one entry per one model (i.e. ChronosT5Large). The metric provided here is a simple average over all back testing trials and all time series. This is intended to give you an initial feeling of how good each model performs on your entire data mix. But we will look into how you can scrutinize the evaluation using the `evaluation_output` table in a bit.
# MAGIC
# MAGIC If you are interested in how MMF achieves distributed inference on these foundation models using Pandas UDF, have a look at the model pipeline scripts: e.g. [mmf_sa/models/chronosforecast/ChronosPipeline.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/chronosforecast/ChronosPipeline.py).
# MAGIC
# MAGIC One small difference here in running `run_forecast` from the local model case is that we have to iterate through the `active_models` and call the function written in a [separate notebook](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/run_daily.py). This is to avoid the CUDA out of memory issue by freeing up the GPU memory after each model. Make sure to provide `accelerator="gpu"` as an input parameter to `run_forecast` function.

# COMMAND ----------

# The same run_id will be assigned to all the models. This makes it easier to run the post evaluation analysis later.
run_id = str(uuid.uuid4())

for model in active_models:
Expand All @@ -121,30 +148,44 @@ def transform_group(df):

# COMMAND ----------

# MAGIC %md ### Evaluation output
# MAGIC In the evaluation output table, the evaluation for all backtest windows and all models are stored. This info can be used to monitor model performance or decide which models should be taken into the final aggregated forecast.
# MAGIC %md ### Evaluate
# MAGIC In `evaluation_output` table, the we store all evaluation results for all backtesting trials from all models. This information can be used to understand which models performed well on which time series on which periods of backtesting. This is very important for selecting the final model for forecasting or models for ensembling. Maybe, it's faster to take a look at the table:

# COMMAND ----------

display(spark.sql(f"select * from {catalog}.{db}.daily_evaluation_output order by unique_id, model, backtest_window_start_date"))

# COMMAND ----------

# MAGIC %sql select * from solacc_uc.mmf.daily_evaluation_output order by unique_id, model, backtest_window_start_date
# MAGIC %md
# MAGIC For foundation models, we use the same pre-trained model to produce the as-if forecasts for all back testing periods. See how MMF implements backtesting in `backtest` method in [mmf_sa/models/abstract_model.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/abstract_model.py).
# MAGIC
# MAGIC We store the as-if forecasts together with the actuals for each backtesting period, so you can construct any metric of your interest. We provide a few out-of-the-box metrics for you (e.g. smape), but the idea here is that you construct your own metrics reflecting your business requirements and evaluate models based on those. For example, maybe you care more about the accuracy of the near-horizon forecasts than the far-horizon ones. In such case, you can apply a decreasing wieght to compute weighted aggregated metrics.
# MAGIC
# MAGIC Note that if you run local and/or global models against the same time series with the same input parameters (except for those specifying global and foundation models), you will get the entries from those models in the same table and will be able to compare across all types models, which is the biggest benefit of having all models integrated in one solution.
# MAGIC
# MAGIC We also register the model in Unity Catalog and store each model's URI in this table (`model_uri`). You can use MLFlow to [load the models](https://mlflow.org/docs/latest/python_api/mlflow.pyfunc.html#mlflow.pyfunc.load_model) and access their specifications or produce forecasts.
# MAGIC
# MAGIC Once you have your foundation models registered in Unity Catalog, you can deploy them behind a real-time endpoint on [Model Serving](https://docs.databricks.com/en/machine-learning/model-serving/index.html). You can then generate a multi-step ahead forecast at any point in time as long as you provide the history with the right resolution. This could be a game changing feature for applications relying on real-time tracking and monitoring of time series data. See the notebooks in [examples/foundation-model-examples](https://github.com/databricks-industry-solutions/many-model-forecasting/tree/main/examples/foundation-model-examples) for examples of how to register and deploy a model, and make an online forecasting request on that model.

# COMMAND ----------

# MAGIC %md ### Forecast Output
# MAGIC In the Forecast output table, the final forecast for each model and each time series is stored.
# MAGIC %md ### Forecast
# MAGIC In `scoring_output` table, forecasts for each time series from each model are stored. Based on the evaluation exercised performed on `evaluation_output` table, you can select the forecasts from the best performing models or a mix of models. We again store each model's URI in this table (`model_uri`). You can use MLFlow to [load the models](https://mlflow.org/docs/latest/python_api/mlflow.pyfunc.html#mlflow.pyfunc.load_model) and access their specifications or produce forecasts.

# COMMAND ----------

# MAGIC %sql select * from solacc_uc.mmf.daily_scoring_output order by unique_id, model, ds
display(spark.sql(f"select * from {catalog}.{db}.daily_scoring_output order by unique_id, model, ds"))

# COMMAND ----------

# MAGIC %md ### Delete Tables
# MAGIC Let's clean up the tables.

# COMMAND ----------

# MAGIC %sql delete from solacc_uc.mmf.daily_evaluation_output
display(spark.sql(f"delete from {catalog}.{db}.daily_evaluation_output"))

# COMMAND ----------

# MAGIC %sql delete from solacc_uc.mmf.daily_scoring_output
display(spark.sql(f"delete from {catalog}.{db}.daily_scoring_output"))
Loading

0 comments on commit 8ec896c

Please sign in to comment.