From c0b058142f7f009df40f6f7fad557aac09e45c2b Mon Sep 17 00:00:00 2001 From: Ryuta Yoshimatsu Date: Thu, 16 Jan 2025 11:31:28 +0100 Subject: [PATCH] fixed a bug on the chronos pipeline --- examples/foundation_daily.py | 1 - examples/foundation_monthly.py | 1 - examples/m5-examples/foundation_daily_m5.py | 1 - mmf_sa/Forecaster.py | 3 -- .../models/chronosforecast/ChronosPipeline.py | 41 ++++++------------- 5 files changed, 13 insertions(+), 34 deletions(-) diff --git a/examples/foundation_daily.py b/examples/foundation_daily.py index 321dd73..e6d12c3 100644 --- a/examples/foundation_daily.py +++ b/examples/foundation_daily.py @@ -129,7 +129,6 @@ def transform_group(df): "MoiraiLarge", "MoiraiMoESmall", "MoiraiMoEBase", - "MoiraiMoELarge", "TimesFM_1_0_200m", "TimesFM_2_0_500m", ] diff --git a/examples/foundation_monthly.py b/examples/foundation_monthly.py index 62509ce..b0f404b 100644 --- a/examples/foundation_monthly.py +++ b/examples/foundation_monthly.py @@ -135,7 +135,6 @@ def transform_group(df): "MoiraiLarge", "MoiraiMoESmall", "MoiraiMoEBase", - "MoiraiMoELarge", "TimesFM_1_0_200m", "TimesFM_2_0_500m", ] diff --git a/examples/m5-examples/foundation_daily_m5.py b/examples/m5-examples/foundation_daily_m5.py index 6dfe9cf..de5f9f4 100644 --- a/examples/m5-examples/foundation_daily_m5.py +++ b/examples/m5-examples/foundation_daily_m5.py @@ -35,7 +35,6 @@ "MoiraiLarge", "MoiraiMoESmall", "MoiraiMoEBase", - "MoiraiMoELarge", "TimesFM_1_0_200m", "TimesFM_2_0_500m", ] diff --git a/mmf_sa/Forecaster.py b/mmf_sa/Forecaster.py index f4909cd..62260cb 100644 --- a/mmf_sa/Forecaster.py +++ b/mmf_sa/Forecaster.py @@ -383,7 +383,6 @@ def backtest_global_model( spark=self.spark, # backtest_retrain=self.conf["backtest_retrain"], )) - group_id_dtype = IntegerType() \ if train_df[self.conf["group_id"]].dtype == 'int' else StringType() @@ -399,7 +398,6 @@ def backtest_global_model( ] ) res_sdf = self.spark.createDataFrame(res_pdf, schema) - # Write evaluation results to a delta table if write: if self.conf.get("evaluation_output", None): @@ -413,7 +411,6 @@ def backtest_global_model( .write.mode("append") .saveAsTable(self.conf.get("evaluation_output")) ) - # Compute aggregated metrics res_df = ( res_sdf.groupby(["metric_name"]) diff --git a/mmf_sa/models/chronosforecast/ChronosPipeline.py b/mmf_sa/models/chronosforecast/ChronosPipeline.py index 4c35eac..998ee65 100644 --- a/mmf_sa/models/chronosforecast/ChronosPipeline.py +++ b/mmf_sa/models/chronosforecast/ChronosPipeline.py @@ -90,7 +90,6 @@ def predict(self, horizon_timestamps_udf(hist_df.ds).alias("ds"), forecast_udf(hist_df.y).alias("y")) ).toPandas() - forecast_df = forecast_df.reset_index(drop=False).rename( columns={ "unique_id": self.params.group_id, @@ -98,7 +97,6 @@ def predict(self, "y": self.params.target, } ) - # Todo # forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01) return forecast_df, self.model @@ -165,19 +163,13 @@ def predict_udf(bulk_iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: import numpy as np import pandas as pd # Initialize the ChronosPipeline with a pretrained model from the specified repository - from chronos import BaseChronosPipeline, ChronosBoltPipeline - if "bolt" in self.repo: - pipeline = ChronosBoltPipeline.from_pretrained( - self.repo, - device_map=self.device, - torch_dtype=torch.bfloat16, - ) - else: - pipeline = BaseChronosPipeline.from_pretrained( - self.repo, - device_map=self.device, - torch_dtype=torch.bfloat16, - ) + from chronos import BaseChronosPipeline + pipeline = BaseChronosPipeline.from_pretrained( + self.repo, + device_map='cuda', + torch_dtype=torch.bfloat16, + ) + # inference for bulk in bulk_iterator: median = [] @@ -262,19 +254,12 @@ def __init__(self, repository, prediction_length): self.prediction_length = prediction_length self.device = "cuda" if torch.cuda.is_available() else "cpu" # Initialize the ChronosPipeline with a pretrained model from the specified repository - from chronos import BaseChronosPipeline, ChronosBoltPipeline - if "bolt" in self.repository: - self.pipeline = ChronosBoltPipeline.from_pretrained( - self.repository, - device_map=self.device, - torch_dtype=torch.bfloat16, - ) - else: - self.pipeline = BaseChronosPipeline.from_pretrained( - self.repository, - device_map=self.device, - torch_dtype=torch.bfloat16, - ) + from chronos import BaseChronosPipeline + self.pipeline = BaseChronosPipeline.from_pretrained( + self.repository, + device_map='cuda', + torch_dtype=torch.bfloat16, + ) def predict(self, context, input_data, params=None): history = [torch.tensor(list(series)) for series in input_data]