Skip to content

Commit

Permalink
Update flows.py
Browse files Browse the repository at this point in the history
  • Loading branch information
BayoAdejare authored Nov 3, 2024
1 parent 1f94005 commit 100feaa
Showing 1 changed file with 73 additions and 96 deletions.
169 changes: 73 additions & 96 deletions src/flows.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from datetime import datetime, timedelta
from typing import List, Optional
from typing import List, Optional, Tuple, Dict, Any
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.utilities.annotations import quote
from pathlib import Path
import pandas as pd
from tasks.analytics import (
Expand All @@ -12,125 +11,103 @@
elbow_evaluator,
)

# Configuration
class Config:
DEFAULT_RETRY_DELAYS = [30, 60, 120] # Exponential backoff
class PipelineConfig:
# Base configuration remains the same...
BASE_DIR = Path(__file__).parent.resolve()
DATA_DIR = BASE_DIR / "data"
LOAD_DIR = DATA_DIR / "Load"
PROCESSED_DIR = DATA_DIR / "Processed"
ANALYTICS_DIR = DATA_DIR / "Analytics"

DEFAULT_RETRY_DELAYS = [30, 60, 120]
DEFAULT_CACHE_TTL = "1h"
DATA_PATH = Path("data")
DEFAULT_HOURS = list(map(lambda x: str(x).zfill(2), range(24))) # All 24 hours
DEFAULT_HOURS = [str(x).zfill(2) for x in range(24)]

@task(retries=3,
retry_delay_seconds=30,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1))
def validate_dates(start_date: str, end_date: str) -> tuple[datetime, datetime]:
"""Validate and parse input dates"""
logger = get_run_logger()
try:
start = datetime.strptime(start_date, "%d/%m/%Y")
end = datetime.strptime(end_date, "%d/%m/%Y")
if end < start:
raise ValueError("End date must be after start date")
logger.info(f"Date range validated: {start_date} to {end_date}")
return start, end
except ValueError as e:
logger.error(f"Date validation failed: {str(e)}")
raise

@task
def prepare_hours(hours: Optional[List[str]] = None) -> List[str]:
"""Prepare and validate hours list"""
if not hours:
return Config.DEFAULT_HOURS
return [str(h).zfill(2) for h in hours]

@flow(name="ETL Pipeline",
retries=3,
retry_delay_seconds=30)
def etl_ingest(
start_date: str,
end_date: str,
hours: Optional[List[str]] = None
) -> str:
"""Main ETL ingestion flow"""
logger = get_run_logger()
logger.info(f"Starting ETL ingestion process")

# Validate inputs
start, end = validate_dates(start_date, end_date)
validated_hours = prepare_hours(hours)

try:
# Ensure data directory exists
Config.DATA_PATH.mkdir(exist_ok=True)

# TODO: Replace this temporary return until ingestion task is implemented
logger.info("ETL ingestion completed successfully")
return "ETL Flow completed successfully!"

# Original code that caused the error:
# result = ingestion(
# start_date=start.strftime("%d/%m/%Y"),
# end_date=end.strftime("%d/%m/%Y"),
# hours=validated_hours
# )

except Exception as e:
logger.error(f"ETL ingestion failed: {str(e)}")
raise
@classmethod
def initialize_directories(cls) -> None:
"""Create all required directories if they don't exist"""
directories = [
cls.DATA_DIR,
cls.LOAD_DIR,
cls.PROCESSED_DIR,
cls.ANALYTICS_DIR
]
for directory in directories:
directory.mkdir(parents=True, exist_ok=True)

@flow(name="Clustering Analysis",
retries=3,
retry_delay_seconds=30)
def cluster_analysis() -> str:
"""Clustering analysis flow"""
def cluster_analysis() -> Dict[str, Any]:
"""Clustering analysis flow with proper error handling and data persistence"""
logger = get_run_logger()
logger.info("Starting clustering analysis")

try:
# Preprocess data
processed_data = preprocessor()
# Load data from processed directory
data_path = PipelineConfig.PROCESSED_DIR / "processed_data.parquet"
if not data_path.exists():
raise FileNotFoundError(f"Processed data not found at {data_path}")

raw_data = pd.read_parquet(data_path)

# Preprocess
processed_data = preprocessor(raw_data)
if processed_data.empty:
raise ValueError("No data available for clustering")

# Perform clustering
clustered_data = kmeans_cluster(processed_data)
cluster_results = kmeans_cluster(processed_data)

# Evaluate using both methods
# Evaluate results
results = {}
try:
silhouette_score = silhouette_evaluator(clustered_data)
logger.info(f"Silhouette analysis completed: {silhouette_score}")
results["silhouette_score"] = silhouette_evaluator(cluster_results)
logger.info(f"Silhouette analysis completed: {results['silhouette_score']}")
except Exception as e:
logger.warning(f"Silhouette analysis failed, falling back to elbow method: {str(e)}")
elbow_score = elbow_evaluator(clustered_data)
logger.info(f"Elbow analysis completed: {elbow_score}")
results["elbow_analysis"] = elbow_evaluator(cluster_results)
logger.info(f"Elbow analysis completed: {results['elbow_analysis']}")

# Save results
results_path = PipelineConfig.ANALYTICS_DIR / "clustering_results.parquet"
pd.DataFrame({
"cluster": cluster_results["clusters"],
**{f"feature_{i}": processed_data.iloc[:, i]
for i in range(processed_data.shape[1])}
}).to_parquet(results_path)

logger.info(f"Clustering results saved to {results_path}")
return results

return "Clustering Flow completed successfully!"
except Exception as e:
logger.error(f"Clustering analysis failed: {str(e)}")
logger.exception(f"Clustering analysis failed: {str(e)}")
raise

@flow(name="Dashboard Refresh",
retries=2,
retry_delay_seconds=5)
def dashboard_refresh() -> str:
"""Dashboard refresh flow"""
logger = get_run_logger()
logger.info("Starting dashboard refresh")
return "Dashboard Flow completed successfully!"

@flow(name="Main Pipeline")
def main():
"""Main entry point for the application"""
def main() -> None:
"""Main pipeline with proper analytics integration"""
logger = get_run_logger()

# Example usage with environment variables or command line args
start_date = "09/10/2024" # Example date
end_date = "09/10/2024" # Example date
hours = ["00", "01", "02"] # Example hours

try:
etl_ingest(start_date, end_date, hours)
cluster_analysis()
# Initialize environment
PipelineConfig.initialize_directories()

# Run ETL
etl_results = etl_ingest(
start_date="09/10/2024",
end_date="09/10/2024",
hours=["00", "01", "02"]
)

# Run clustering if ETL successful
if etl_results:
clustering_results = cluster_analysis()
logger.info(f"Clustering completed with results: {clustering_results}")

# Update dashboard
dashboard_refresh()

except Exception as e:
logger.error(f"Pipeline failed: {str(e)}")
raise
Expand Down

0 comments on commit 100feaa

Please sign in to comment.