diff --git a/code_your_own_pandas_pipeline/aggregations.py b/code_your_own_pandas_pipeline/aggregations.py index 06d99dc..be2867d 100644 --- a/code_your_own_pandas_pipeline/aggregations.py +++ b/code_your_own_pandas_pipeline/aggregations.py @@ -1,65 +1,197 @@ """ -This modules provides function to pivot and summarize the practice level appointment data. -""" +This module provides functions to pivot and summarize practice-level appointment data. -import pandas as pd -from loguru import logger +Functions: + pivot_practice_level_data(practice_level_data: pd.DataFrame, index: Optional[list[str]] = None, columns="APPT_STATUS", values="COUNT_OF_APPOINTMENTS", rename_columns: Optional[dict[str, str]] = None) -> pd.DataFrame: -placeholder_df = pd.DataFrame() + summarize_monthly_appointment_status(practice_level_data: pd.DataFrame) -> pd.DataFrame: + + summarize_monthly_aggregate_appointments(practice_level_pivot: pd.DataFrame, agg_cols: Optional[list[str]] = None, add_rate_cols: bool = True) -> pd.DataFrame: + + batch_summarize_monthly_aggregate_appointments(practice_level_pivot: pd.DataFrame, agg_cols: Optional[list[str]] = None, add_rate_cols: bool = True) -> Dict[str, pd.DataFrame]: +""" +from typing import Dict, Optional -def pivot_practice_level_data(practice_data: pd.DataFrame) -> pd.DataFrame: +import pandas as pd +from loguru import logger +from tqdm import tqdm + +from code_your_own_pandas_pipeline.calculations import calculate_appointment_columns + +AGG_COLS = [ + "GP_NAME", + "SUPPLIER", + "PCN_NAME", + "SUB_ICB_LOCATION_NAME", + "ICB_NAME", + "REGION_NAME", + "HCP_TYPE", + "APPT_MODE", + "NATIONAL_CATEGORY", + "TIME_BETWEEN_BOOK_AND_APPT", +] + + +def pivot_practice_level_data( + practice_level_data: pd.DataFrame, + index: Optional[list[str]] = None, + columns="APPT_STATUS", + values="COUNT_OF_APPOINTMENTS", + rename_columns: Optional[dict[str, str]] = None, +) -> pd.DataFrame: """ Pivot the practice level data. Parameters ---------- - practice_data : pd.DataFrame - The practice data. + practice_level_data : pd.DataFrame + The DataFrame containing the practice level data. + index : list of str, optional + The columns to use as index for the pivot table. If None, defaults to [ + "APPOINTMENT_MONTH_START_DATE", + "GP_NAME", + "SUPPLIER", + "PCN_NAME", + "SUB_ICB_LOCATION_NAME", + "ICB_NAME", + "REGION_NAME", + "HCP_TYPE", + "APPT_MODE", + "NATIONAL_CATEGORY", + "TIME_BETWEEN_BOOK_AND_APPT" + ]. + columns : str, optional + The column to use for the pivot table columns, by default "APPT_STATUS". + values : str, optional + The column to use for the pivot table values, by default "COUNT_OF_APPOINTMENTS". + rename_columns : dict of str, str, optional + Dictionary to rename columns, by default None. Returns ------- pd.DataFrame - The pivoted data. + The pivoted DataFrame. """ - logger.info("Pivoting the practice level data.") + if not index: + index = ["APPOINTMENT_MONTH_START_DATE", *AGG_COLS] + if not rename_columns: + rename_columns = {"DNA": "DID_NOT_ATTEND", "Attended": "ATTENDED", "Unknown": "UNKNOWN"} + + logger.info("Pivoting practice level data") + practice_level_pivot = practice_level_data.pivot(index=index, columns=columns, values=values) + + practice_level_pivot = practice_level_pivot.reset_index() + + practice_level_pivot = practice_level_pivot.rename(columns=rename_columns) + + return practice_level_pivot - logger.warning("This function is not yet implemented.") +def summarize_monthly_appointment_status(practice_level_data: pd.DataFrame) -> pd.DataFrame: + """ + Summarize the monthly appointment status. + + Parameters + ---------- + df : pd.DataFrame + The DataFrame containing the appointment data. + date_column : str + The name of the column containing the date information. + status_column : str + The name of the column containing the appointment status. -def summarize_monthly_gp_appointments(pivot_practice_data: pd.DataFrame) -> pd.DataFrame: + Returns + ------- + pd.DataFrame + A DataFrame summarizing the count of each appointment status per month. + """ + logger.info("Summarizing monthly appointment status") + month_and_status_appointments = ( + practice_level_data.groupby(["APPOINTMENT_MONTH_START_DATE", "APPT_STATUS"]) + .agg({"COUNT_OF_APPOINTMENTS": "sum"}) + .reset_index() + .rename(columns={"APPT_STATUS": "Appointment Status"}) + ) + + return month_and_status_appointments + + +def summarize_monthly_aggregate_appointments( + practice_level_pivot: pd.DataFrame, + agg_cols: Optional[list[str]] = None, + add_rate_cols: bool = True, +) -> pd.DataFrame: """ - Summarize the monthly appointments by GP and Appointment Status. + Summarize the monthly aggregate appointments. Parameters ---------- - practice_data : pd.DataFrame - The practice data. + df : pd.DataFrame + The DataFrame containing the appointment data. + date_column : str + The name of the column containing the date information. + agg_column : str + The name of the column to aggregate. + aggfunc : str or function, default 'sum' + Aggregation function to apply to the agg_column. Returns ------- pd.DataFrame - The summarized data. + A DataFrame summarizing the aggregated values per month. """ - logger.info("Summarizing the monthly GP appointments.") + if not agg_cols: + agg_cols = [] + + monthly_aggregate_appointments = ( + practice_level_pivot.groupby(["APPOINTMENT_MONTH_START_DATE", *agg_cols]) + .agg({"ATTENDED": "sum", "DID_NOT_ATTEND": "sum", "UNKNOWN": "sum"}) + .reset_index() + ) - logger.warning("This function is not yet implemented.") + if add_rate_cols: + monthly_aggregate_appointments = calculate_appointment_columns( + monthly_aggregate_appointments + ) + return monthly_aggregate_appointments -def summarize_monthly_region_appointments(pivot_practice_data: pd.DataFrame) -> pd.DataFrame: + +def batch_summarize_monthly_aggregate_appointments( + practice_level_pivot: pd.DataFrame, + agg_cols: Optional[list[str]] = None, + add_rate_cols: bool = True, +) -> Dict[str, pd.DataFrame]: """ - Summarize the monthly appointments by Region and Appointment Status. + Batch summarize monthly aggregate appointments. Parameters ---------- - practice_data : pd.DataFrame - The practice data. + practice_level_pivot : pd.DataFrame + DataFrame containing practice level pivot data. + agg_cols : list of str, optional + List of columns to aggregate. If None, defaults to AGG_COLS. + add_rate_cols : bool, optional + Whether to add rate columns to the summary DataFrame, by default True. Returns ------- - pd.DataFrame - The summarized data. + Dict[str, pd.DataFrame] + Dictionary where keys are the aggregation columns and values are the + summarized DataFrames for each aggregation column. """ - logger.info("Summarizing the monthly region appointments.") - - logger.warning("This function is not yet implemented.") + if agg_cols is None: + agg_cols = AGG_COLS + + monthly_aggregate_appointments = {} + logger.info("Batch summarizing monthly aggregate appointments") + tqdm_agg_cols = tqdm(agg_cols) + for agg_col in tqdm_agg_cols: + tqdm_agg_cols.set_description_str(f"Creating monthly appointment summaries for {agg_col}") + summary_df = summarize_monthly_aggregate_appointments( + practice_level_pivot, [agg_col], add_rate_cols + ) + monthly_aggregate_appointments[agg_col] = summary_df + + return monthly_aggregate_appointments diff --git a/code_your_own_pandas_pipeline/calculations.py b/code_your_own_pandas_pipeline/calculations.py new file mode 100644 index 0000000..dd430eb --- /dev/null +++ b/code_your_own_pandas_pipeline/calculations.py @@ -0,0 +1,117 @@ +""" +This module provides functions to calculate appointment statistics for a given practice level DataFrame. + +Functions +--------- +calculate_total_appointments(practice_level_pivot: pd.DataFrame) -> pd.DataFrame + +calculate_did_not_attend_rate(practice_level_pivot: pd.DataFrame) -> pd.DataFrame + +calculate_attended_rate(practice_level_pivot: pd.DataFrame) -> pd.DataFrame + +calculate_appointment_columns(practice_level_pivot: pd.DataFrame) -> pd.DataFrame + +""" + +from typing import Optional + +import pandas as pd +from matplotlib import axis + +# from loguru import logger + + +def calculate_total_appointments( + practice_level_pivot: pd.DataFrame, appointment_cols: Optional[list[str]] = None +) -> pd.DataFrame: + """ + Calculate the total number of appointments by summing attended and did not attend appointments. + + Parameters + ---------- + practice_level_pivot : pd.DataFrame + A DataFrame containing columns "ATTENDED" and "DID_NOT_ATTEND" representing the number of attended and missed appointments respectively. + + Returns + ------- + pd.DataFrame + The input DataFrame with an additional column "TOTAL_APPOINTMENTS" which is the sum of "ATTENDED" and "DID_NOT_ATTEND". + """ + if not appointment_cols: + appointment_cols = ["ATTENDED", "DID_NOT_ATTEND", "UNKNOWN"] + + # logger.info("Calculating total appointments") + practice_level_pivot[appointment_cols] = practice_level_pivot[appointment_cols].fillna( + 0, inplace=False + ) + + practice_level_pivot["TOTAL_APPOINTMENTS"] = practice_level_pivot[appointment_cols].sum(axis=1) + + return practice_level_pivot + + +def calculate_did_not_attend_rate(practice_level_pivot) -> pd.DataFrame: + """ + Calculate the rate of missed appointments. + + Parameters + ---------- + practice_level_pivot : pd.DataFrame + A DataFrame containing columns "ATTENDED" and "DID_NOT_ATTEND" representing the number of attended and missed appointments respectively. + + Returns + ------- + pd.DataFrame + The input DataFrame with an additional column "DID_NOT_ATTEND_RATE" which is the rate of missed appointments. + """ + # logger.info("Calculating did not attend rate") + practice_level_pivot["DID_NOT_ATTEND_RATE"] = ( + practice_level_pivot["DID_NOT_ATTEND"] / practice_level_pivot["TOTAL_APPOINTMENTS"] + ) + + return practice_level_pivot + + +def calculate_attended_rate(practice_level_pivot) -> pd.DataFrame: + """ + Calculate the rate of attended appointments. + + Parameters + ---------- + practice_level_pivot : pd.DataFrame + A DataFrame containing columns "ATTENDED" and "DID_NOT_ATTEND" representing the number of attended and missed appointments respectively. + + Returns + ------- + pd.DataFrame + The input DataFrame with an additional column "ATTENDED_RATE" which is the rate of attended appointments. + """ + # logger.info("Calculating attended rate") + practice_level_pivot["ATTENDED_RATE"] = ( + practice_level_pivot["ATTENDED"] / practice_level_pivot["TOTAL_APPOINTMENTS"] + ) + + return practice_level_pivot + + +def calculate_appointment_columns(practice_level_pivot) -> pd.DataFrame: + """ + Calculate the total number of appointments, the rate of missed appointments, and the rate of attended appointments. + + Parameters + ---------- + practice_level_pivot : pd.DataFrame + A DataFrame containing columns "ATTENDED" and "DID_NOT_ATTEND" representing the number of attended and missed appointments respectively. + + Returns + ------- + pd.DataFrame + The input DataFrame with additional columns "TOTAL_APPOINTMENTS", "DID_NOT_ATTEND_RATE", and "ATTENDED_RATE". + """ + # logger.info("Calculating appointment columns") + practice_level_pivot = ( + practice_level_pivot.pipe(calculate_total_appointments) + .pipe(calculate_attended_rate) + .pipe(calculate_did_not_attend_rate) + ) + return practice_level_pivot diff --git a/code_your_own_pandas_pipeline/data_in.py b/code_your_own_pandas_pipeline/data_in.py index d3de9d6..7428b16 100644 --- a/code_your_own_pandas_pipeline/data_in.py +++ b/code_your_own_pandas_pipeline/data_in.py @@ -1,40 +1,226 @@ """ -This module contains the function to read the mapping and practice crosstab data from the data -folder. +This module provides functions to read mapping data and practice level crosstab files +from CSV files into pandas DataFrames. + +Functions +--------- +read_mapping_data(file_path: Optional[Path] = None) -> pd.DataFrame + +read_practice_level_crosstab_files(directory: Optional[Path] = None) -> list[pd.DataFrame] """ +import os +import zipfile +from pathlib import Path +from typing import Callable, Optional + import pandas as pd +import requests from loguru import logger +from tqdm import tqdm + +from code_your_own_pandas_pipeline.config import RAW_DATA_DIR + + +def download_and_extract_zip( + url: str, + overwrite: bool = False, + extract_path: Optional[Path] = None, + skip_if_exists: bool = False, +) -> None: + """ + Downloads a zip file from the given URL and extracts its contents into the RAW_DATA_DIR. + + Parameters + ---------- + url : str + The URL of the zip file to download. + overwrite : bool, default False + Whether to overwrite existing files or skip extraction if they already exist. + + Returns + ------- + None + """ + if not extract_path: + extract_path = RAW_DATA_DIR + + zip_path = extract_path / "_data.zip" + + skip_logger = logger.info if skip_if_exists else logger.warning + + download_zip(url=url, zip_path=zip_path, skip_logger=skip_logger) + extract_zip_contents( + overwrite=overwrite, extract_path=extract_path, skip_logger=skip_logger, zip_path=zip_path + ) + + logger.success(f"Download and extraction of {url} complete!") + + +def extract_zip_contents(overwrite, extract_path, skip_logger, zip_path): + """ + Extracts the contents of a zip file to a specified directory. + + Parameters + ---------- + overwrite : bool + If True, existing files will be overwritten. If False, existing files will be skipped. + extract_path : pathlib.Path + The directory where the contents of the zip file will be extracted. + skip_logger : function + A logging function to call when a file is skipped or overwritten. + zip_path : pathlib.Path + The path to the zip file to be extracted. + + Returns + ------- + None + """ + logger.info(f"Extracting contents of {zip_path} to {extract_path}") + with zipfile.ZipFile(zip_path, "r") as zip_ref: + for member in zip_ref.namelist(): + member_path = extract_path / member + if member_path.exists(): + if not overwrite: + skip_logger(f"File {member_path} already exists, skipping extraction.") + continue + skip_logger(f"File {member_path} already exists, overwriting.") + zip_ref.extract(member, extract_path) + + logger.info(f"Extraction complete. Removing zip file {zip_path}") + os.remove(zip_path) + +def download_zip( + url: str, + overwrite: bool = False, + zip_path: Optional[Path] = None, + skip_logger: Callable = logger.info, +) -> None: + """ + Downloads a zip file from a given URL to a specified path. + + Parameters + ---------- + url : str + The URL from which to download the zip file. + overwrite : bool, optional + If True, overwrite the existing file if it exists. Default is False. + zip_path : Optional[Path], optional + The path where the zip file should be saved. If not provided, defaults to RAW_DATA_DIR / "data.zip". + skip_logger : Callable, optional + A logging function to call if the file already exists and is not being overwritten. Default is logger.info. + + Returns + ------- + None + + Raises + ------ + requests.exceptions.RequestException + If there is an issue with the HTTP request. + """ + + if not zip_path: + zip_path = RAW_DATA_DIR / "data.zip" + + if zip_path.exists() and not overwrite: + skip_logger(f"File {zip_path} already exists, skipping download.") + return + + logger.info(f"Downloading zip file from {url} to {zip_path}") + response = requests.get(url, stream=True, timeout=10) + total_size = int(response.headers.get("content-length", 0)) + block_size = 1024 # 1 Kibibyte + t = tqdm(total=total_size, unit="iB", unit_scale=True) + + with open(zip_path, "wb") as file: + for data in response.iter_content(block_size): + t.update(len(data)) + file.write(data) + t.close() + + +def read_mapping_data(file_path: Optional[Path] = None) -> pd.DataFrame: + """ + Reads mapping data from a CSV file and returns it as a pandas DataFrame. + + Parameters + ---------- + file_path : Optional[Path], optional + The path to the CSV file containing the mapping data. If not provided, + the default path "RAW_DATA_DIR/Mapping.csv" will be used. -def read_mapping_data() -> pd.DataFrame: + Returns + ------- + pd.DataFrame + A DataFrame containing the mapping data from the CSV file. """ - Read the mapping data from the data folder. + if file_path is None: + file_path = RAW_DATA_DIR / "Mapping.csv" + + logger.info(f"Reading mapping data from {file_path}") + + return pd.read_csv(file_path) + +def concatenate_practice_level_data(practice_level_datasets: list[pd.DataFrame]) -> pd.DataFrame: + """ + Concatenates the practice data and returns the result. + + Parameters + ---------- + practice_level_datasets : list[pd.DataFrame] + The practice data to concatenate. Returns ------- pd.DataFrame - The mapping data. + The concatenated DataFrame. """ - logger.info(f"Reading mapping data from {""}") + logger.info("Concatenating practice data") - logger.warning("This function is not yet implemented.") + return pd.concat(practice_level_datasets, ignore_index=True) -def read_practice_crosstab_data() -> pd.DataFrame: +def read_practice_level_data( + directory: Optional[Path] = None, file_starts_with: Optional[str] = "Practice_Level_Crosstab" +) -> pd.DataFrame: """ - Read the practice crosstab data from the data folder. + Reads and concatenates practice level data from CSV files in the specified directory. + + Parameters + ---------- + directory : Optional[Path], default None + The directory to search for CSV files. If None, uses RAW_DATA_DIR. + file_starts_with : Optional[str], default "Practice_Level_Crosstab" + The prefix of the CSV files to read. Returns ------- pd.DataFrame - The practice crosstab data. + A DataFrame containing the concatenated practice level data from the CSV files. + + Raises + ------ + AssertionError + If no files are found in the specified directory with the given prefix. + + Notes + ----- + This function uses the `tqdm` library to display a progress bar while reading files. """ - logger.info(f"Reading practice crosstab data from {""}") + if directory is None: + directory = RAW_DATA_DIR - logger.warning("This function is not yet implemented.") + file_paths = list(directory.glob(f"{file_starts_with}*.csv")) + assert ( + file_paths + ), f"No files found in {directory} with name starting with '{file_starts_with}'" + dataframes = [] + for file_path in tqdm(file_paths, desc="Reading Practice Level Crosstab Files"): + logger.info(f"Reading file {file_path}") + dataframes.append(pd.read_csv(file_path)) -if __name__ == "__main__": - read_mapping_data().head() - read_practice_crosstab_data().head() + practice_level_data = concatenate_practice_level_data(dataframes) + return practice_level_data diff --git a/code_your_own_pandas_pipeline/pipeline.py b/code_your_own_pandas_pipeline/pipeline.py index 64b8d2e..deb758f 100644 --- a/code_your_own_pandas_pipeline/pipeline.py +++ b/code_your_own_pandas_pipeline/pipeline.py @@ -1,18 +1,31 @@ """ -Main pipeline for the code_your_own_pandas_pipeline package. +Main pipeline script to run the GP Appointment Data Pipeline. + +This script downloads and processes GP appointment data, summarises the data, and creates plots. + +Functions +--------- +main(_save_interim_output: bool = False) -> None + Main function to run the GP Appointment Data Pipeline. """ -import pandas as pd -from loguru import logger +from pathlib import Path -from code_your_own_pandas_pipeline import aggregations, data_in, plots, processing +from loguru import logger -placeholder_df = pd.DataFrame() +from code_your_own_pandas_pipeline import aggregations as agg +from code_your_own_pandas_pipeline import data_in, plots, processing, utils -def main() -> None: +@utils.timeit +def main(_save_interim_output: bool = False) -> None: """ - Main function to run the pipeline. + Main function to run the GP Appointment Data Pipeline. + + Parameters + ---------- + _save_interim_output : bool, optional + Flag to determine whether to save interim output files, by default True. Returns ------- @@ -21,20 +34,32 @@ def main() -> None: logger.level("START", no=15, color="") logger.log("START", "Starting the GP Appointment Data Pipeline") - data_in.read_mapping_data() - data_in.read_practice_crosstab_data() + data_in.download_and_extract_zip( + "https://files.digital.nhs.uk/A5/B4AB19/Practice_Level_Crosstab_Sep_24.zip" + ) + + mapping_data = data_in.read_mapping_data() + + practice_level_data = ( + data_in.read_practice_level_data() + .pipe(processing.tidy_practice_level_data, _save_interim_output) + .pipe( + processing.merge_mapping_data_with_practice_level_data, + mapping_data=mapping_data, + _save_interim_output=_save_interim_output, + ) + ) - processing.tidy_practice_level_data(placeholder_df) - processing.merge_mapping_and_practice_data(placeholder_df, placeholder_df) + practice_level_pivot = agg.pivot_practice_level_data(practice_level_data) + summaries = agg.batch_summarize_monthly_aggregate_appointments(practice_level_pivot) - aggregations.pivot_practice_level_data(placeholder_df) - aggregations.summarize_monthly_gp_appointments(placeholder_df) - aggregations.summarize_monthly_region_appointments(placeholder_df) + if _save_interim_output: + for key, value in summaries.items(): + value.to_csv(Path("data", f"{key}_summary.csv"), index=False) - plots.plot_monthly_gp_appointments(placeholder_df, "placeholder_str") - plots.plot_monthly_region_appointments(placeholder_df, "placeholder_str") + plots.batch_plot_monthly_attd_rate_by(summaries=summaries) - logger.success("GP Appointment Data Pipeline Completed") + logger.success("GP Appointment Data Pipeline completed") if __name__ == "__main__": diff --git a/code_your_own_pandas_pipeline/plots.py b/code_your_own_pandas_pipeline/plots.py index cf4d9cd..c83e3a8 100644 --- a/code_your_own_pandas_pipeline/plots.py +++ b/code_your_own_pandas_pipeline/plots.py @@ -1,72 +1,173 @@ """ -This module provides function for generating and saving plots. +This module provides functions for generating and saving plots of monthly attendance rates using seaborn and pandas. + +Functions +--------- +point_monthly_attd_rate_by(summary_df: pd.DataFrame, agg_by: str) -> FacetGrid + +violin_monthly_attd_rate_by(summary_df: pd.DataFrame, agg_by: str) -> FacetGrid + Generates a violin plot of monthly attendance rates, aggregated by a specified column. + +save_monthly_attendance_plot(plot: FacetGrid, col_name: str, figures_path: Optional[Path] = None) + Saves a monthly attendance plot to a specified directory. + +batch_plot_monthly_attd_rate_by(summaries: Dict[str, pd.DataFrame]) -> None + Generates and saves monthly attendance rate plots for each summary DataFrame in the given dictionary. """ +from pathlib import Path +from typing import Dict, Optional + import pandas as pd +import seaborn as sns from loguru import logger +from seaborn.axisgrid import FacetGrid +from tqdm import tqdm + +from code_your_own_pandas_pipeline.config import FIGURES_DIR -def save_plot(plot, output_folder: str, plot_name: str) -> None: +def point_monthly_attd_rate_by(summary_df: pd.DataFrame, agg_by: str) -> FacetGrid: """ - Save the plot to the output folder. + Generates a point plot of monthly attendance rates, aggregated by a specified column. Parameters ---------- - plot : matplotlib.pyplot - The plot to save. - output_folder : str - The output folder to save the plot. - plot_name : str - The plot name. + summary_df : pd.DataFrame + DataFrame containing the summary data with columns 'APPOINTMENT_MONTH_START_DATE' and 'ATTENDED_RATE'. + agg_by : str + The column name by which to aggregate the data for the plot. Returns ------- - None + FacetGrid + A seaborn FacetGrid object representing the point plot. + + Raises + ------ + AssertionError + If the number of unique categories in the aggregation column exceeds 20. """ - logger.info(f"Saving the plot {plot_name} to {output_folder}.") - logger.warning("This function is not yet implemented.") + assert summary_df[agg_by].nunique() <= 20, "Too many unique categories for display" + plt = sns.catplot( + x="APPOINTMENT_MONTH_START_DATE", + y="ATTENDED_RATE", + hue=agg_by, + data=summary_df, + aspect=2, + kind="point", + ) -def plot_monthly_gp_appointments( - monthly_gp_appointments: pd.DataFrame, output_folder: str -) -> None: + plt.set( + title=f"Monthly Attendance Rate by {agg_by}", + xlabel="Month Start Date", + ylabel="Attendance Rate", + # ylim=(0.85, 1.0), + ) + + return plt + + +def violin_monthly_attd_rate_by(summary_df: pd.DataFrame, agg_by: str) -> FacetGrid: """ - Plot the monthly GP appointments. + Generates a point plot of monthly attendance rates, aggregated by a specified column. Parameters ---------- - monthly_gp_appointments : pd.DataFrame - The monthly GP appointments data. - output_folder : str - The output folder to save the plots. + summary_df : pd.DataFrame + DataFrame containing the summary data with columns 'APPOINTMENT_MONTH_START_DATE' and 'ATTENDED_RATE'. + agg_by : str + The column name by which to aggregate the data for the plot. Returns ------- - None + FacetGrid + A seaborn FacetGrid object representing the point plot. + + Raises + ------ + AssertionError + If the number of unique categories in the aggregation column exceeds 20. """ - logger.info("Plotting the monthly GP appointments.") - logger.warning("This function is not yet implemented.") + assert summary_df[agg_by].nunique() > 20, "Too few unique categories for display" + + plt = sns.catplot( + x="APPOINTMENT_MONTH_START_DATE", + y="ATTENDED_RATE", + data=summary_df, + aspect=2, + kind="violin", + ) + + plt.set( + title=f"Monthly Attendance Rate by {agg_by}", + xlabel="Month Start Date", + ylabel="Attendance Rate", + # ylim=(0.85, 1.0), + ) + return plt -def plot_monthly_region_appointments( - monthly_region_appointments: pd.DataFrame, output_folder: str -) -> None: + +def save_monthly_attendance_plot( + plot: FacetGrid, col_name: str, figures_path: Optional[Path] = None +): """ - Plot the monthly region appointments. + Save a monthly attendance plot to a specified directory. Parameters ---------- - monthly_region_appointments : pd.DataFrame - The monthly region appointments data. - output_folder : str - The output folder to save the plots. + plot : FacetGrid + The plot object to be saved. + col_name : str + The name of the column used for the plot. + figures_path : Optional[Path], default=None + The directory path where the plot will be saved. If None, a default directory is used. Returns ------- None """ - logger.info("Plotting the monthly region appointments.") - logger.warning("This function is not yet implemented.") + if figures_path is None: + figures_path = FIGURES_DIR + + file_path = figures_path / f"monthly_attendance_rate_by_{col_name}.png" + plot.savefig(file_path) + + logger.info(f"Saved plot to {file_path.relative_to(Path.cwd())}") + + +def batch_plot_monthly_attd_rate_by(summaries: Dict[str, pd.DataFrame]) -> None: + """ + Generate and save monthly attendance rate plots for each summary DataFrame in the given dictionary. + + For each key-value pair in the `summaries` dictionary, this function generates a plot of the monthly attendance rate. + If the number of unique values in the aggregation column is less than 20, a violin plot is generated. + Otherwise, a point plot is generated. The resulting plot is then saved. + + Parameters + ---------- + summaries : Dict[str, pd.DataFrame] + A dictionary where the keys are column names to aggregate by, and the values are summary DataFrames containing + the data to be plotted. + + Returns + ------- + None + """ + logger.info("Batch plotting monthly attendance rate by aggregation column") + for agg_column, summary_df in tqdm(summaries.items(), desc="Plotting summaries"): + if summary_df[agg_column].nunique() > 20: + monthly_attendance_rate_by_agg_plt = violin_monthly_attd_rate_by( + agg_by=agg_column, summary_df=summary_df + ) + else: + monthly_attendance_rate_by_agg_plt = point_monthly_attd_rate_by( + agg_by=agg_column, summary_df=summary_df + ) + + save_monthly_attendance_plot(monthly_attendance_rate_by_agg_plt, agg_column) diff --git a/code_your_own_pandas_pipeline/processing.py b/code_your_own_pandas_pipeline/processing.py index bac11f3..af84183 100644 --- a/code_your_own_pandas_pipeline/processing.py +++ b/code_your_own_pandas_pipeline/processing.py @@ -1,50 +1,143 @@ """ -This module contains the functions to process the mapping and practice crosstab data and merge them. +This module provides functions to process and merge practice level data with mapping data. + +Functions: + tidy_practice_level_data(data: pd.DataFrame) -> pd.DataFrame: + Cleans and tidies the practice level data by performing necessary transformations and validations. + + check_merge(data1: pd.DataFrame, data2: pd.DataFrame) -> bool: + Checks if the merge between two datasets can be performed without any issues such as duplicate keys or missing values. + + merge_mapping_data_with_practice_level_data(mapping_data: pd.DataFrame, practice_data: pd.DataFrame) -> pd.DataFrame: + Merges the mapping data with the practice level data, ensuring that the resulting dataset is consistent and complete. """ import pandas as pd from loguru import logger -placeholder_df = pd.DataFrame() +from code_your_own_pandas_pipeline.config import INTERIM_DATA_DIR + + +def tidy_practice_level_data( + practice_level_data: pd.DataFrame, _save_interim_output: bool = False +) -> pd.DataFrame: + """ + Processes the practice data and returns the result. + + Parameters + ---------- + practice_level_data : pd.DataFrame + The practice data to process. + _save_interim_output : bool, optional + Whether to save the interim output, by default False + + Returns + ------- + pd.DataFrame + The processed DataFrame. + """ + logger.info("Processing practice data") + + logger.info("Selecting columns: 'APPOINTMENT_MONTH_START_DATE', 'GP_CODE' and 6 other columns") + practice_level_data = practice_level_data.loc[ + :, + [ + "APPOINTMENT_MONTH_START_DATE", + "GP_CODE", + "HCP_TYPE", + "APPT_MODE", + "NATIONAL_CATEGORY", + "TIME_BETWEEN_BOOK_AND_APPT", + "COUNT_OF_APPOINTMENTS", + "APPT_STATUS", + ], + ] + + # We are commenting out this code as we want to test out the pipeline without loosing this data + # logger.info('Change "Unknown" to missing data') + # practice_level_data.replace("Unknown", None, inplace=True) + + # logger.info("Dropping rows where APPT_STATUS is missing") + # practice_level_data.dropna(subset=["APPT_STATUS"], inplace=True) + logger.info("Convert APPOINTMENT_MONTH_START_DATE to datetime") + practice_level_data["APPOINTMENT_MONTH_START_DATE"] = pd.to_datetime( + practice_level_data["APPOINTMENT_MONTH_START_DATE"], format="%d%b%Y" + ) -def tidy_practice_level_data(practice_data: pd.DataFrame) -> pd.DataFrame: + if _save_interim_output: + save_path = INTERIM_DATA_DIR / "processed_practice_level_data.csv" + logger.info(f"Saving interim output to {save_path}") + practice_level_data.to_csv(save_path, index=False) + + return practice_level_data + + +def check_merge(merged_df: pd.DataFrame, merge_column: str = "_merge") -> pd.DataFrame: """ - Tidy the practice crosstab data. + Check the merge for any issues and return the merged DataFrame. Parameters ---------- - practice_crosstab : pd.DataFrame - The practice crosstab data. + merged_df : pd.DataFrame + The merged DataFrame to check. + merge_column : str, optional + The column to check for issues, by default "_merge" Returns ------- pd.DataFrame - The tidy practice crosstab data. + The merged DataFrame. """ - logger.info("Tidying the practice crosstab data.") + bad_merge = False + for bad_merge in ("left_only", "right_only"): + bad_merge_count = merged_df[merge_column].value_counts().get(bad_merge, 0) + if bad_merge_count: + logger.warning(f"There are {bad_merge_count} '{bad_merge}' rows in the merged data") + bad_merge = True - logger.warning("This function is not yet implemented.") + if not bad_merge: + logger.info("The merge was healthy.") + merged_df.drop(columns="_merge", inplace=True) -def merge_mapping_and_practice_data( - mapping_data: pd.DataFrame, practice_data: pd.DataFrame + return merged_df + + +def merge_mapping_data_with_practice_level_data( + practice_level_data: pd.DataFrame, + mapping_data: pd.DataFrame, + _save_interim_output: bool = False, ) -> pd.DataFrame: """ - Merge the mapping and practice data. + Merges the mapping data with the practice data and returns the result. Parameters ---------- mapping_data : pd.DataFrame - The mapping data. - practice_data : pd.DataFrame - The practice data. + The mapping data to merge. + practice_level_data : list[pd.DataFrame] + The practice data to merge. + _save_interim_output : bool, optional + Whether to save the interim output, by default False Returns ------- pd.DataFrame - The merged data. + The merged DataFrame. """ - logger.info("Merging the mapping and practice data.") + logger.info("Merging mapping data with practice data") + merged_data = pd.merge( + left=practice_level_data, + right=mapping_data, + on="GP_CODE", + # how="left", + indicator=True, + ).pipe(check_merge) + + if _save_interim_output: + save_path = INTERIM_DATA_DIR / "merged_data.csv" + logger.info(f"Saving interim output to {save_path}") + merged_data.to_csv(save_path, index=False) - logger.warning("This function is not yet implemented.") + return merged_data diff --git a/code_your_own_pandas_pipeline/utils.py b/code_your_own_pandas_pipeline/utils.py new file mode 100644 index 0000000..5d2fa78 --- /dev/null +++ b/code_your_own_pandas_pipeline/utils.py @@ -0,0 +1,41 @@ +""" +This module provides utility functions for performance measurement and logging. + +Functions +--------- +timeit(func) +""" + +import time + +from loguru import logger + + +def timeit(func): + """ + A decorator that measures the execution time of a function and logs the duration. + + Parameters + ---------- + func : callable + The function to be wrapped and timed. + + Returns + ------- + callable + The wrapped function with added timing functionality. + + Notes + ----- + The execution time is logged using the `logger.debug` method. + """ + + def wrapped(*args, **kwargs): + start = time.time() + result = func(*args, **kwargs) + end = time.time() + duration = end - start + logger.debug(f"Function '{func.__name__}' executed in {duration:f} s") + return result + + return wrapped diff --git a/pyproject.toml b/pyproject.toml index 809b3a6..7938e71 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,6 @@ exclude = ''' )/ ''' -[tool.ruff.lint.isort] -known_first_party = ["code_your_own_pandas_pipeline"] -force_sort_within_sections = true +# [tool.ruff.lint.isort] +# known_first_party = ["code_your_own_pandas_pipeline"] +# force_sort_within_sections = true diff --git a/tests/unittests/test_aggregations.py b/tests/unittests/test_aggregations.py new file mode 100644 index 0000000..c6329f4 --- /dev/null +++ b/tests/unittests/test_aggregations.py @@ -0,0 +1,13 @@ +""" +Tests for code_your_own_pandas_pipeline.aggregations +""" +import pytest + +import code_your_own_pandas_pipeline.aggregations + + +class TestExample: + """Example test class""" + def test_example(self): + """Example test case""" + assert True diff --git a/tests/unittests/test_calculations.py b/tests/unittests/test_calculations.py new file mode 100644 index 0000000..6d47564 --- /dev/null +++ b/tests/unittests/test_calculations.py @@ -0,0 +1,88 @@ +""" +Tests for code_your_own_pandas_pipeline.calculations +""" + +import pytest +import pandas as pd +from code_your_own_pandas_pipeline.calculations import ( + calculate_total_appointments, + calculate_did_not_attend_rate, + calculate_attended_rate, + calculate_appointment_columns, +) + + +def test_calculate_total_appointments(): + """Test calculate_total_appointments function""" + data = {"ATTENDED": [10, 20, 30], "DID_NOT_ATTEND": [1, 2, 3], "UNKNOWN": [0, 1, 0]} + df = pd.DataFrame(data) + result = calculate_total_appointments(df) + expected_total = [11, 23, 33] + assert result["TOTAL_APPOINTMENTS"].tolist() == expected_total + + +def test_calculate_did_not_attend_rate(): + """Test calculate_did_not_attend_rate function""" + data = { + "ATTENDED": [10, 20, 30], + "DID_NOT_ATTEND": [1, 2, 3], + "TOTAL_APPOINTMENTS": [11, 22, 33], + } + df = pd.DataFrame(data) + result = calculate_did_not_attend_rate(df) + expected_rate = [1 / 11, 2 / 22, 3 / 33] + assert result["DID_NOT_ATTEND_RATE"].tolist() == expected_rate + + +def test_calculate_attended_rate(): + """Test calculate_attended_rate function""" + data = { + "ATTENDED": [10, 20, 30], + "DID_NOT_ATTEND": [1, 2, 3], + "TOTAL_APPOINTMENTS": [11, 22, 33], + } + df = pd.DataFrame(data) + result = calculate_attended_rate(df) + expected_rate = [10 / 11, 20 / 22, 30 / 33] + assert result["ATTENDED_RATE"].tolist() == expected_rate + + +@pytest.mark.parametrize( + "data, expected_total, expected_dna_rate, expected_attended_rate", + [ + ( + {"ATTENDED": [10, 20, 30], "DID_NOT_ATTEND": [1, 2, 3]}, + [11, 22, 33], + [1 / 11, 2 / 22, 3 / 33], + [10 / 11, 20 / 22, 30 / 33], + ) + ], +) +def test_calculate_appointment_columns( + mocker, data, expected_total, expected_dna_rate, expected_attended_rate +): + """Test calculate_appointment_columns function with mocks""" + df = pd.DataFrame(data) + + mock_calculate_total_appointments = mocker.patch( + "code_your_own_pandas_pipeline.calculations.calculate_total_appointments", + return_value=df.assign(TOTAL_APPOINTMENTS=expected_total), + ) + mock_calculate_did_not_attend_rate = mocker.patch( + "code_your_own_pandas_pipeline.calculations.calculate_did_not_attend_rate", + return_value=df.assign(DID_NOT_ATTEND_RATE=expected_dna_rate), + ) + mock_calculate_attended_rate = mocker.patch( + "code_your_own_pandas_pipeline.calculations.calculate_attended_rate", + return_value=df.assign(ATTENDED_RATE=expected_attended_rate), + ) + + result = calculate_appointment_columns(df) + + mock_calculate_total_appointments.assert_called_once_with(df) + mock_calculate_did_not_attend_rate.assert_called_once_with(df) + mock_calculate_attended_rate.assert_called_once_with(df) + + assert result["TOTAL_APPOINTMENTS"].tolist() == expected_total + assert result["DID_NOT_ATTEND_RATE"].tolist() == expected_dna_rate + assert result["ATTENDED_RATE"].tolist() == expected_attended_rate diff --git a/tests/unittests/test_config.py b/tests/unittests/test_config.py new file mode 100644 index 0000000..9477600 --- /dev/null +++ b/tests/unittests/test_config.py @@ -0,0 +1,13 @@ +""" +Tests for code_your_own_pandas_pipeline.config +""" +import pytest + +import code_your_own_pandas_pipeline.config + + +class TestExample: + """Example test class""" + def test_example(self): + """Example test case""" + assert True diff --git a/tests/unittests/test_data_in.py b/tests/unittests/test_data_in.py new file mode 100644 index 0000000..791e3f4 --- /dev/null +++ b/tests/unittests/test_data_in.py @@ -0,0 +1,13 @@ +""" +Tests for code_your_own_pandas_pipeline.data_in +""" +import pytest + +import code_your_own_pandas_pipeline.data_in + + +class TestExample: + """Example test class""" + def test_example(self): + """Example test case""" + assert True diff --git a/tests/unittests/test_pipeline.py b/tests/unittests/test_pipeline.py new file mode 100644 index 0000000..59c441a --- /dev/null +++ b/tests/unittests/test_pipeline.py @@ -0,0 +1,13 @@ +""" +Tests for code_your_own_pandas_pipeline.pipeline +""" +import pytest + +import code_your_own_pandas_pipeline.pipeline + + +class TestExample: + """Example test class""" + def test_example(self): + """Example test case""" + assert True diff --git a/tests/unittests/test_plots.py b/tests/unittests/test_plots.py new file mode 100644 index 0000000..c4d1fed --- /dev/null +++ b/tests/unittests/test_plots.py @@ -0,0 +1,13 @@ +""" +Tests for code_your_own_pandas_pipeline.plots +""" +import pytest + +import code_your_own_pandas_pipeline.plots + + +class TestExample: + """Example test class""" + def test_example(self): + """Example test case""" + assert True diff --git a/tests/unittests/test_processing.py b/tests/unittests/test_processing.py new file mode 100644 index 0000000..243dbe9 --- /dev/null +++ b/tests/unittests/test_processing.py @@ -0,0 +1,13 @@ +""" +Tests for code_your_own_pandas_pipeline.processing +""" +import pytest + +import code_your_own_pandas_pipeline.processing + + +class TestExample: + """Example test class""" + def test_example(self): + """Example test case""" + assert True diff --git a/tests/unittests/test_utils.py b/tests/unittests/test_utils.py new file mode 100644 index 0000000..59d84fa --- /dev/null +++ b/tests/unittests/test_utils.py @@ -0,0 +1,13 @@ +""" +Tests for code_your_own_pandas_pipeline.utils +""" +import pytest + +import code_your_own_pandas_pipeline.utils + + +class TestExample: + """Example test class""" + def test_example(self): + """Example test case""" + assert True