From 05d9ab2e5eae72d2530e1def8614bf115c4e14fd Mon Sep 17 00:00:00 2001 From: Mohcine Chraibi Date: Mon, 30 Sep 2024 12:45:28 +0900 Subject: [PATCH] format ruff files --- src/tabs/LargeView_analysis.py | 147 ++++++++++++++++++++++++++------- src/tabs/survey_tab.py | 35 +++++--- 2 files changed, 138 insertions(+), 44 deletions(-) diff --git a/src/tabs/LargeView_analysis.py b/src/tabs/LargeView_analysis.py index f0bf946..a373596 100644 --- a/src/tabs/LargeView_analysis.py +++ b/src/tabs/LargeView_analysis.py @@ -112,7 +112,9 @@ def gaussian_kernel_scalar(r: float, r_c: float, xi: float) -> float: @njit -def get_r(i_line: int, j_column: int, r_cg: float, x_min: float, y_min: float) -> Tuple[float, float]: +def get_r( + i_line: int, j_column: int, r_cg: float, x_min: float, y_min: float +) -> Tuple[float, float]: """ Calculate the coordinates (x, y) of a point in a grid based on its indices and grid parameters. @@ -152,7 +154,9 @@ def get_cell(r: tuple, x_min: float, y_min, r_cg: float) -> Tuple[int, int]: # ensuring that the output signal has no phase shift relative to the input. # The padlen parameter is set to int(1 / delta_t) + 1 to determine the number of samples for edge padding, # which helps in reducing boundary effects in the filtering process. -def butter_lowpass_filter(pd_series: pd.Series, delta_t: float, order: int, cutoff: float) -> np.ndarray: +def butter_lowpass_filter( + pd_series: pd.Series, delta_t: float, order: int, cutoff: float +) -> np.ndarray: """ Apply a Butterworth lowpass filter to a pandas Series. @@ -168,7 +172,9 @@ def butter_lowpass_filter(pd_series: pd.Series, delta_t: float, order: int, cuto nyquist_freq = 0.5 / delta_t # Nyquist Frequency normal_cutoff = cutoff / nyquist_freq # Normalized cutoff frequency # Get the filter coefficients - b, a = butter(order, normal_cutoff, btype="low", analog=False) # Generate the filter coefficients + b, a = butter( + order, normal_cutoff, btype="low", analog=False + ) # Generate the filter coefficients return filtfilt(b, a, pd_series, padlen=int(1 / delta_t) + 1) # Apply the filter. @@ -198,12 +204,17 @@ def create_save_folder(pa: Parameters) -> Path: Path: The path of the created save folder. """ - save_folder = pa.FOLDER_SAVE / f"{pa.SELECTED_NAME}_start={pa.START_TIME:.1f}_dur={pa.DURATION:.1f}_dt={pa.DELTA_T:.1f}_xi={pa.XI:.1f}_rcg={pa.R_CG:.1f}" + save_folder = ( + pa.FOLDER_SAVE + / f"{pa.SELECTED_NAME}_start={pa.START_TIME:.1f}_dur={pa.DURATION:.1f}_dt={pa.DELTA_T:.1f}_xi={pa.XI:.1f}_rcg={pa.R_CG:.1f}" + ) save_folder.mkdir(parents=True, exist_ok=True) return save_folder -def process_trajectories(all_datas: pd.DataFrame, pa: Parameters) -> Dict[str, pd.DataFrame]: +def process_trajectories( + all_datas: pd.DataFrame, pa: Parameters +) -> Dict[str, pd.DataFrame]: """ Process trajectories and return a dictionary of processed dataframes. @@ -249,8 +260,12 @@ def process_trajectories(all_datas: pd.DataFrame, pa: Parameters) -> Dict[str, p try: # Apply Butterworth low-pass filter to x and y trajectory data - X_bw = butter_lowpass_filter(traj_data["x_m"].values, pa.DELTA_T, 2, pa.CUTOFF) - Y_bw = butter_lowpass_filter(traj_data["y_m"].values, pa.DELTA_T, 2, pa.CUTOFF) + X_bw = butter_lowpass_filter( + traj_data["x_m"].values, pa.DELTA_T, 2, pa.CUTOFF + ) + Y_bw = butter_lowpass_filter( + traj_data["y_m"].values, pa.DELTA_T, 2, pa.CUTOFF + ) except ValueError as e: # Log the exception with the trajectory number logging.error("Filtering failed for trajectory %s: %s", traj_nb, e) @@ -269,7 +284,9 @@ def process_trajectories(all_datas: pd.DataFrame, pa: Parameters) -> Dict[str, p # Compute the alpha values for interpolation alpha = np.maximum( - np.exp(np.clip(-4.0 * pa.CUTOFF * (t_s_values - starting_time_traj), -700, 700)), + np.exp( + np.clip(-4.0 * pa.CUTOFF * (t_s_values - starting_time_traj), -700, 700) + ), np.exp(np.clip(-4.0 * pa.CUTOFF * (end_time - t_s_values), -700, 700)), ) @@ -278,7 +295,10 @@ def process_trajectories(all_datas: pd.DataFrame, pa: Parameters) -> Dict[str, p traj_data.loc[:, "y_m"] = (1.0 - alpha) * Y_bw + alpha * traj_data["y_m"].values # Filter the data to the desired time interval - traj_data = traj_data[(traj_data["t_s"] >= pa.START_TIME) & (traj_data["t_s"] < pa.START_TIME + pa.DURATION + 2.0 * pa.DT)] + traj_data = traj_data[ + (traj_data["t_s"] >= pa.START_TIME) + & (traj_data["t_s"] < pa.START_TIME + pa.DURATION + 2.0 * pa.DT) + ] # If the trajectory is empty, skip it if traj_data.empty: @@ -286,7 +306,9 @@ def process_trajectories(all_datas: pd.DataFrame, pa: Parameters) -> Dict[str, p continue # Compute the cumulative time - pa.CUM_TIME += traj_data["t_s"].max(skipna=True) - traj_data["t_s"].min(skipna=True) + pa.CUM_TIME += traj_data["t_s"].max(skipna=True) - traj_data["t_s"].min( + skipna=True + ) # Update min and max coordinates pa.X_MIN = min(pa.X_MIN, traj_data["x_m"].min(skipna=True)) @@ -403,7 +425,10 @@ def compute_fields( nb_traj = len(all_trajs) # Iterate over all trajectories for i_traj, traj in tqdm(enumerate(all_trajs.values()), desc="Processing grid"): - traj = traj.loc[(traj["t_s"] >= pa.START_TIME) & (traj["t_s"] < pa.START_TIME + pa.DURATION + 2.0 * pa.DT)] + traj = traj.loc[ + (traj["t_s"] >= pa.START_TIME) + & (traj["t_s"] < pa.START_TIME + pa.DURATION + 2.0 * pa.DT) + ] if traj.shape[0] == 0: continue # Iterate over all rows in the trajectory (ie all time steps) @@ -413,14 +438,18 @@ def compute_fields( # Iterate over the cells in the grid for i in range(i_rel - pa.DELTA, i_rel + pa.DELTA + 1): for j in range(j_rel - pa.DELTA, j_rel + pa.DELTA + 1): - if i < 0 or i >= pa.NB_CG_X or j < 0 or j >= pa.NB_CG_Y: # Check if pedestrian is outside the grid + if ( + i < 0 or i >= pa.NB_CG_X or j < 0 or j >= pa.NB_CG_Y + ): # Check if pedestrian is outside the grid continue # Compute the position of the grid cell pos_grid_cell = get_r(i, j, pa.R_CG, pa.X_MIN, pa.Y_MIN) # Compute the Gaussian kernel - phi_r = gaussian_kernel_scalar(calculate_distance(pos_grid_cell, pos_ped), pa.R_C, pa.XI) + phi_r = gaussian_kernel_scalar( + calculate_distance(pos_grid_cell, pos_ped), pa.R_C, pa.XI + ) # Update the fields df_observables["X"][i, j] = pos_grid_cell[0] @@ -473,7 +502,12 @@ def normalize_density(density: np.ndarray, pa: Parameters) -> np.ndarray: return density -def update_figure(df_observables: Dict[str, np.ndarray], pa: Parameters, plot_density: bool, zsmooth=False) -> go.Figure: +def update_figure( + df_observables: Dict[str, np.ndarray], + pa: Parameters, + plot_density: bool, + zsmooth=False, +) -> go.Figure: """ Update the figure with the given observables and parameters. @@ -487,14 +521,25 @@ def update_figure(df_observables: Dict[str, np.ndarray], pa: Parameters, plot_de """ # Create the X and Y axes - X_axis = np.linspace(pa.X_MIN, pa.X_MIN + len(df_observables["rho"][0]) * pa.R_CG, len(df_observables["rho"][0])) - Y_axis = np.linspace(pa.Y_MIN, pa.Y_MIN + len(df_observables["rho"]) * pa.R_CG, len(df_observables["rho"])) + X_axis = np.linspace( + pa.X_MIN, + pa.X_MIN + len(df_observables["rho"][0]) * pa.R_CG, + len(df_observables["rho"][0]), + ) + Y_axis = np.linspace( + pa.Y_MIN, + pa.Y_MIN + len(df_observables["rho"]) * pa.R_CG, + len(df_observables["rho"]), + ) # Create the heatmap if plot_density: hovertext = np.array( [ - [f"x: {X_axis[i]:.2f} m
y: {Y_axis[j]:.2f} m
Density: {df_observables['rho'][j,i]:.2f} ped/m²" for i in range(df_observables["rho"].shape[1])] + [ + f"x: {X_axis[i]:.2f} m
y: {Y_axis[j]:.2f} m
Density: {df_observables['rho'][j,i]:.2f} ped/m²" + for i in range(df_observables["rho"].shape[1]) + ] for j in range(df_observables["rho"].shape[0]) ] ) @@ -516,7 +561,10 @@ def update_figure(df_observables: Dict[str, np.ndarray], pa: Parameters, plot_de else: hovertext = np.array( [ - [f"x: {X_axis[i]:.2f} m
y: {Y_axis[j]:.2f} m
Var_v: {(df_observables['var_vs'][j,i]):.2f} m²/s²" for i in range(df_observables["var_vs"].shape[1])] + [ + f"x: {X_axis[i]:.2f} m
y: {Y_axis[j]:.2f} m
Var_v: {(df_observables['var_vs'][j,i]):.2f} m²/s²" + for i in range(df_observables["var_vs"].shape[1]) + ] for j in range(df_observables["var_vs"].shape[0]) ] ) @@ -554,7 +602,9 @@ def update_figure(df_observables: Dict[str, np.ndarray], pa: Parameters, plot_de arrow_velocity = 1 # Velocity represented by the annotation arrow (1 m/s) arrow_start_x = 8.0 arrow_start_y = 3.0 - arrow_end_x = arrow_start_x + (arrow_velocity * pa.QUIVER_SCALE) # Adjusted to represent 1 m/s + arrow_end_x = arrow_start_x + ( + arrow_velocity * pa.QUIVER_SCALE + ) # Adjusted to represent 1 m/s arrow_end_y = arrow_start_y # Add arrow annotation @@ -576,12 +626,25 @@ def update_figure(df_observables: Dict[str, np.ndarray], pa: Parameters, plot_de ) # Add text annotation for the arrow - fig.add_annotation(x=9.5, y=3.8, text="1 m/s", showarrow=False, font={"color": "#009999", "size": 25}) + fig.add_annotation( + x=9.5, + y=3.8, + text="1 m/s", + showarrow=False, + font={"color": "#009999", "size": 25}, + ) # Correct layout update with proper domain fig.update_layout( font={"size": 20}, - title={"text": ("Velocity field for density heatmap" if plot_density else "Velocity field for variance velocity heatmap"), "font_size": 20}, + title={ + "text": ( + "Velocity field for density heatmap" + if plot_density + else "Velocity field for variance velocity heatmap" + ), + "font_size": 20, + }, xaxis={ "title": {"text": "x [m]", "font_size": 20}, "scaleanchor": "y", @@ -605,7 +668,7 @@ def update_figure(df_observables: Dict[str, np.ndarray], pa: Parameters, plot_de def main(selected_file: str): """ - Main function for the CCTV_analysis module. + Run main function for the CCTV_analysis module. Args: selected_file (str): The path to the selected file. @@ -628,7 +691,9 @@ def main(selected_file: str): path = Path(__file__) pa = Parameters( - FOLDER_TRAJ=Path(path.parent.parent.parent.absolute() / "data" / "trajectories"), + FOLDER_TRAJ=Path( + path.parent.parent.parent.absolute() / "data" / "trajectories" + ), FOLDER_SAVE=Path(path.parent.parent.parent.absolute() / "data" / "pickle"), SELECTED_NAME=Path(selected_file).stem, START_TIME=0.0, @@ -648,10 +713,16 @@ def main(selected_file: str): QUIVER_SCALE=3.0, ) pa.R_CG = slider_value_R_CG - pa.DELTA = int(ceil(pa.R_C / pa.R_CG)) + 1 # Number of cells to consider around the cell containing the point + pa.DELTA = ( + int(ceil(pa.R_C / pa.R_CG)) + 1 + ) # Number of cells to consider around the cell containing the point folder_save = create_save_folder(pa) - if not Path(folder_save / "traj_data.pkl").exists() or not Path(folder_save / "parameters.pkl").exists() or not Path(folder_save / "dictionnary_observables.pkl").exists(): + if ( + not Path(folder_save / "traj_data.pkl").exists() + or not Path(folder_save / "parameters.pkl").exists() + or not Path(folder_save / "dictionnary_observables.pkl").exists() + ): # PROGRESS BAR title_text = st.text("Progress Bar") my_progress_bar = st.progress(0) @@ -680,7 +751,9 @@ def main(selected_file: str): st.session_state["zsmooth"] = False # Button to toggle interpolation - toggle_interpolation = st.sidebar.checkbox("Interpolation", value=st.session_state["zsmooth"]) + toggle_interpolation = st.sidebar.checkbox( + "Interpolation", value=st.session_state["zsmooth"] + ) # Update session state based on checkbox st.session_state["zsmooth"] = "best" if toggle_interpolation else False @@ -690,16 +763,28 @@ def main(selected_file: str): with col1: fig = update_figure(dict_observables, params, True, st.session_state["zsmooth"]) st.plotly_chart(fig) - figname = pa.SELECTED_NAME + f"_velocity_field_for_density_heatmap_zsmooth{st.session_state['zsmooth']}.pdf" + figname = ( + pa.SELECTED_NAME + + f"_velocity_field_for_density_heatmap_zsmooth{st.session_state['zsmooth']}.pdf" + ) img_bytes = fig.to_image(format="pdf") - st.sidebar.download_button(label="Download Density Heatmap", data=img_bytes, file_name=figname) + st.sidebar.download_button( + label="Download Density Heatmap", data=img_bytes, file_name=figname + ) plt.close() with col2: - fig = update_figure(dict_observables, params, False, st.session_state["zsmooth"]) + fig = update_figure( + dict_observables, params, False, st.session_state["zsmooth"] + ) st.plotly_chart(fig) - figname = pa.SELECTED_NAME + f"_velocity_field_for_density_heatmap_zsmooth{st.session_state['zsmooth']}.pdf" + figname = ( + pa.SELECTED_NAME + + f"_velocity_field_for_density_heatmap_zsmooth{st.session_state['zsmooth']}.pdf" + ) img_bytes = fig.to_image(format="pdf") - st.sidebar.download_button(label="Download Variance Heatmap", data=img_bytes, file_name=figname) + st.sidebar.download_button( + label="Download Variance Heatmap", data=img_bytes, file_name=figname + ) plt.close() diff --git a/src/tabs/survey_tab.py b/src/tabs/survey_tab.py index 5a84c8f..9b575f0 100644 --- a/src/tabs/survey_tab.py +++ b/src/tabs/survey_tab.py @@ -38,10 +38,17 @@ def histogram_survey(df_survey: pd.DataFrame, remove_outlier: bool) -> Figure: # Add histogram traces for each category fig.add_trace(go.Histogram(x=values_children, name="Children", marker_color="pink")) - fig.add_trace(go.Histogram(x=values_both, name="Adults and children", marker_color="blue")) + fig.add_trace( + go.Histogram(x=values_both, name="Adults and children", marker_color="blue") + ) # Create a DataFrame for custom data in hover templates - data = pd.DataFrame({"Categories": ["Adults and children"] * len(values_both) + ["Children"] * len(values_children)}) + data = pd.DataFrame( + { + "Categories": ["Adults and children"] * len(values_both) + + ["Children"] * len(values_children) + } + ) # Update hover template with custom data fig.update_traces( @@ -61,16 +68,16 @@ def histogram_survey(df_survey: pd.DataFrame, remove_outlier: bool) -> Figure: title={"text": "Distribution of Group Sizes", "font_size": 28}, width=1000, height=700, - xaxis=dict( - title={"text": "Group Size", "font_size": 30, "font_color": "black"}, - tickfont_size=30, - tickfont_color="black", - ), - yaxis=dict( - title={"text": "Counts", "font_size": 30, "font_color": "black"}, - tickfont_size=30, - tickfont_color="black", - ), + xaxis={ + "title": {"text": "Group Size", "font_size": 30, "font_color": "black"}, + "tickfont_size": 30, + "tickfont_color": "black", + }, + yaxis={ + "title": {"text": "Counts", "font_size": 30, "font_color": "black"}, + "tickfont_size": 30, + "tickfont_color": "black", + }, legend={"font_size": 30, "font_color": "black"}, ) @@ -92,7 +99,9 @@ def main() -> None: 5. Provides a sidebar button to download the histogram as a PDF file. """ path = Path(__file__).resolve() - survey_path = path.parent.parent.parent.absolute() / "data" / "surveys" / "survey_results.csv" + survey_path = ( + path.parent.parent.parent.absolute() / "data" / "surveys" / "survey_results.csv" + ) path_pickle = path.parent.parent.parent.absolute() / "data" / "pickle" # Check if survey.pkl exists, if not create it, else load it