Skip to content

Commit

Permalink
format ruff files
Browse files Browse the repository at this point in the history
  • Loading branch information
chraibi committed Sep 30, 2024
1 parent 05d4ae2 commit 05d9ab2
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 44 deletions.
147 changes: 116 additions & 31 deletions src/tabs/LargeView_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ def gaussian_kernel_scalar(r: float, r_c: float, xi: float) -> float:


@njit
def get_r(i_line: int, j_column: int, r_cg: float, x_min: float, y_min: float) -> Tuple[float, float]:
def get_r(
i_line: int, j_column: int, r_cg: float, x_min: float, y_min: float
) -> Tuple[float, float]:
"""
Calculate the coordinates (x, y) of a point in a grid based on its indices and grid parameters.
Expand Down Expand Up @@ -152,7 +154,9 @@ def get_cell(r: tuple, x_min: float, y_min, r_cg: float) -> Tuple[int, int]:
# ensuring that the output signal has no phase shift relative to the input.
# The padlen parameter is set to int(1 / delta_t) + 1 to determine the number of samples for edge padding,
# which helps in reducing boundary effects in the filtering process.
def butter_lowpass_filter(pd_series: pd.Series, delta_t: float, order: int, cutoff: float) -> np.ndarray:
def butter_lowpass_filter(
pd_series: pd.Series, delta_t: float, order: int, cutoff: float
) -> np.ndarray:
"""
Apply a Butterworth lowpass filter to a pandas Series.
Expand All @@ -168,7 +172,9 @@ def butter_lowpass_filter(pd_series: pd.Series, delta_t: float, order: int, cuto
nyquist_freq = 0.5 / delta_t # Nyquist Frequency
normal_cutoff = cutoff / nyquist_freq # Normalized cutoff frequency
# Get the filter coefficients
b, a = butter(order, normal_cutoff, btype="low", analog=False) # Generate the filter coefficients
b, a = butter(
order, normal_cutoff, btype="low", analog=False
) # Generate the filter coefficients
return filtfilt(b, a, pd_series, padlen=int(1 / delta_t) + 1) # Apply the filter.


Expand Down Expand Up @@ -198,12 +204,17 @@ def create_save_folder(pa: Parameters) -> Path:
Path: The path of the created save folder.
"""
save_folder = pa.FOLDER_SAVE / f"{pa.SELECTED_NAME}_start={pa.START_TIME:.1f}_dur={pa.DURATION:.1f}_dt={pa.DELTA_T:.1f}_xi={pa.XI:.1f}_rcg={pa.R_CG:.1f}"
save_folder = (
pa.FOLDER_SAVE
/ f"{pa.SELECTED_NAME}_start={pa.START_TIME:.1f}_dur={pa.DURATION:.1f}_dt={pa.DELTA_T:.1f}_xi={pa.XI:.1f}_rcg={pa.R_CG:.1f}"
)
save_folder.mkdir(parents=True, exist_ok=True)
return save_folder


def process_trajectories(all_datas: pd.DataFrame, pa: Parameters) -> Dict[str, pd.DataFrame]:
def process_trajectories(
all_datas: pd.DataFrame, pa: Parameters
) -> Dict[str, pd.DataFrame]:
"""
Process trajectories and return a dictionary of processed dataframes.
Expand Down Expand Up @@ -249,8 +260,12 @@ def process_trajectories(all_datas: pd.DataFrame, pa: Parameters) -> Dict[str, p

try:
# Apply Butterworth low-pass filter to x and y trajectory data
X_bw = butter_lowpass_filter(traj_data["x_m"].values, pa.DELTA_T, 2, pa.CUTOFF)
Y_bw = butter_lowpass_filter(traj_data["y_m"].values, pa.DELTA_T, 2, pa.CUTOFF)
X_bw = butter_lowpass_filter(
traj_data["x_m"].values, pa.DELTA_T, 2, pa.CUTOFF
)
Y_bw = butter_lowpass_filter(
traj_data["y_m"].values, pa.DELTA_T, 2, pa.CUTOFF
)
except ValueError as e:
# Log the exception with the trajectory number
logging.error("Filtering failed for trajectory %s: %s", traj_nb, e)
Expand All @@ -269,7 +284,9 @@ def process_trajectories(all_datas: pd.DataFrame, pa: Parameters) -> Dict[str, p

# Compute the alpha values for interpolation
alpha = np.maximum(
np.exp(np.clip(-4.0 * pa.CUTOFF * (t_s_values - starting_time_traj), -700, 700)),
np.exp(
np.clip(-4.0 * pa.CUTOFF * (t_s_values - starting_time_traj), -700, 700)
),
np.exp(np.clip(-4.0 * pa.CUTOFF * (end_time - t_s_values), -700, 700)),
)

Expand All @@ -278,15 +295,20 @@ def process_trajectories(all_datas: pd.DataFrame, pa: Parameters) -> Dict[str, p
traj_data.loc[:, "y_m"] = (1.0 - alpha) * Y_bw + alpha * traj_data["y_m"].values

# Filter the data to the desired time interval
traj_data = traj_data[(traj_data["t_s"] >= pa.START_TIME) & (traj_data["t_s"] < pa.START_TIME + pa.DURATION + 2.0 * pa.DT)]
traj_data = traj_data[
(traj_data["t_s"] >= pa.START_TIME)
& (traj_data["t_s"] < pa.START_TIME + pa.DURATION + 2.0 * pa.DT)
]

# If the trajectory is empty, skip it
if traj_data.empty:
list_files_skipped.append(traj_nb)
continue

# Compute the cumulative time
pa.CUM_TIME += traj_data["t_s"].max(skipna=True) - traj_data["t_s"].min(skipna=True)
pa.CUM_TIME += traj_data["t_s"].max(skipna=True) - traj_data["t_s"].min(
skipna=True
)

# Update min and max coordinates
pa.X_MIN = min(pa.X_MIN, traj_data["x_m"].min(skipna=True))
Expand Down Expand Up @@ -403,7 +425,10 @@ def compute_fields(
nb_traj = len(all_trajs)
# Iterate over all trajectories
for i_traj, traj in tqdm(enumerate(all_trajs.values()), desc="Processing grid"):
traj = traj.loc[(traj["t_s"] >= pa.START_TIME) & (traj["t_s"] < pa.START_TIME + pa.DURATION + 2.0 * pa.DT)]
traj = traj.loc[
(traj["t_s"] >= pa.START_TIME)
& (traj["t_s"] < pa.START_TIME + pa.DURATION + 2.0 * pa.DT)
]
if traj.shape[0] == 0:
continue
# Iterate over all rows in the trajectory (ie all time steps)
Expand All @@ -413,14 +438,18 @@ def compute_fields(
# Iterate over the cells in the grid
for i in range(i_rel - pa.DELTA, i_rel + pa.DELTA + 1):
for j in range(j_rel - pa.DELTA, j_rel + pa.DELTA + 1):
if i < 0 or i >= pa.NB_CG_X or j < 0 or j >= pa.NB_CG_Y: # Check if pedestrian is outside the grid
if (
i < 0 or i >= pa.NB_CG_X or j < 0 or j >= pa.NB_CG_Y
): # Check if pedestrian is outside the grid
continue

# Compute the position of the grid cell
pos_grid_cell = get_r(i, j, pa.R_CG, pa.X_MIN, pa.Y_MIN)

# Compute the Gaussian kernel
phi_r = gaussian_kernel_scalar(calculate_distance(pos_grid_cell, pos_ped), pa.R_C, pa.XI)
phi_r = gaussian_kernel_scalar(
calculate_distance(pos_grid_cell, pos_ped), pa.R_C, pa.XI
)

# Update the fields
df_observables["X"][i, j] = pos_grid_cell[0]
Expand Down Expand Up @@ -473,7 +502,12 @@ def normalize_density(density: np.ndarray, pa: Parameters) -> np.ndarray:
return density


def update_figure(df_observables: Dict[str, np.ndarray], pa: Parameters, plot_density: bool, zsmooth=False) -> go.Figure:
def update_figure(
df_observables: Dict[str, np.ndarray],
pa: Parameters,
plot_density: bool,
zsmooth=False,
) -> go.Figure:
"""
Update the figure with the given observables and parameters.
Expand All @@ -487,14 +521,25 @@ def update_figure(df_observables: Dict[str, np.ndarray], pa: Parameters, plot_de
"""
# Create the X and Y axes
X_axis = np.linspace(pa.X_MIN, pa.X_MIN + len(df_observables["rho"][0]) * pa.R_CG, len(df_observables["rho"][0]))
Y_axis = np.linspace(pa.Y_MIN, pa.Y_MIN + len(df_observables["rho"]) * pa.R_CG, len(df_observables["rho"]))
X_axis = np.linspace(
pa.X_MIN,
pa.X_MIN + len(df_observables["rho"][0]) * pa.R_CG,
len(df_observables["rho"][0]),
)
Y_axis = np.linspace(
pa.Y_MIN,
pa.Y_MIN + len(df_observables["rho"]) * pa.R_CG,
len(df_observables["rho"]),
)

# Create the heatmap
if plot_density:
hovertext = np.array(
[
[f"x: {X_axis[i]:.2f} m<br>y: {Y_axis[j]:.2f} m<br>Density: {df_observables['rho'][j,i]:.2f} ped/m²" for i in range(df_observables["rho"].shape[1])]
[
f"x: {X_axis[i]:.2f} m<br>y: {Y_axis[j]:.2f} m<br>Density: {df_observables['rho'][j,i]:.2f} ped/m²"
for i in range(df_observables["rho"].shape[1])
]
for j in range(df_observables["rho"].shape[0])
]
)
Expand All @@ -516,7 +561,10 @@ def update_figure(df_observables: Dict[str, np.ndarray], pa: Parameters, plot_de
else:
hovertext = np.array(
[
[f"x: {X_axis[i]:.2f} m<br>y: {Y_axis[j]:.2f} m<br>Var_v: {(df_observables['var_vs'][j,i]):.2f} m²/s²" for i in range(df_observables["var_vs"].shape[1])]
[
f"x: {X_axis[i]:.2f} m<br>y: {Y_axis[j]:.2f} m<br>Var_v: {(df_observables['var_vs'][j,i]):.2f} m²/s²"
for i in range(df_observables["var_vs"].shape[1])
]
for j in range(df_observables["var_vs"].shape[0])
]
)
Expand Down Expand Up @@ -554,7 +602,9 @@ def update_figure(df_observables: Dict[str, np.ndarray], pa: Parameters, plot_de
arrow_velocity = 1 # Velocity represented by the annotation arrow (1 m/s)
arrow_start_x = 8.0
arrow_start_y = 3.0
arrow_end_x = arrow_start_x + (arrow_velocity * pa.QUIVER_SCALE) # Adjusted to represent 1 m/s
arrow_end_x = arrow_start_x + (
arrow_velocity * pa.QUIVER_SCALE
) # Adjusted to represent 1 m/s
arrow_end_y = arrow_start_y

# Add arrow annotation
Expand All @@ -576,12 +626,25 @@ def update_figure(df_observables: Dict[str, np.ndarray], pa: Parameters, plot_de
)

# Add text annotation for the arrow
fig.add_annotation(x=9.5, y=3.8, text="<b>1 m/s</b>", showarrow=False, font={"color": "#009999", "size": 25})
fig.add_annotation(
x=9.5,
y=3.8,
text="<b>1 m/s</b>",
showarrow=False,
font={"color": "#009999", "size": 25},
)

# Correct layout update with proper domain
fig.update_layout(
font={"size": 20},
title={"text": ("Velocity field for density heatmap" if plot_density else "Velocity field for variance velocity heatmap"), "font_size": 20},
title={
"text": (
"Velocity field for density heatmap"
if plot_density
else "Velocity field for variance velocity heatmap"
),
"font_size": 20,
},
xaxis={
"title": {"text": "x [m]", "font_size": 20},
"scaleanchor": "y",
Expand All @@ -605,7 +668,7 @@ def update_figure(df_observables: Dict[str, np.ndarray], pa: Parameters, plot_de

def main(selected_file: str):
"""
Main function for the CCTV_analysis module.
Run main function for the CCTV_analysis module.
Args:
selected_file (str): The path to the selected file.
Expand All @@ -628,7 +691,9 @@ def main(selected_file: str):
path = Path(__file__)

pa = Parameters(
FOLDER_TRAJ=Path(path.parent.parent.parent.absolute() / "data" / "trajectories"),
FOLDER_TRAJ=Path(
path.parent.parent.parent.absolute() / "data" / "trajectories"
),
FOLDER_SAVE=Path(path.parent.parent.parent.absolute() / "data" / "pickle"),
SELECTED_NAME=Path(selected_file).stem,
START_TIME=0.0,
Expand All @@ -648,10 +713,16 @@ def main(selected_file: str):
QUIVER_SCALE=3.0,
)
pa.R_CG = slider_value_R_CG
pa.DELTA = int(ceil(pa.R_C / pa.R_CG)) + 1 # Number of cells to consider around the cell containing the point
pa.DELTA = (
int(ceil(pa.R_C / pa.R_CG)) + 1
) # Number of cells to consider around the cell containing the point
folder_save = create_save_folder(pa)

if not Path(folder_save / "traj_data.pkl").exists() or not Path(folder_save / "parameters.pkl").exists() or not Path(folder_save / "dictionnary_observables.pkl").exists():
if (
not Path(folder_save / "traj_data.pkl").exists()
or not Path(folder_save / "parameters.pkl").exists()
or not Path(folder_save / "dictionnary_observables.pkl").exists()
):
# PROGRESS BAR
title_text = st.text("Progress Bar")
my_progress_bar = st.progress(0)
Expand Down Expand Up @@ -680,7 +751,9 @@ def main(selected_file: str):
st.session_state["zsmooth"] = False

# Button to toggle interpolation
toggle_interpolation = st.sidebar.checkbox("Interpolation", value=st.session_state["zsmooth"])
toggle_interpolation = st.sidebar.checkbox(
"Interpolation", value=st.session_state["zsmooth"]
)

# Update session state based on checkbox
st.session_state["zsmooth"] = "best" if toggle_interpolation else False
Expand All @@ -690,16 +763,28 @@ def main(selected_file: str):
with col1:
fig = update_figure(dict_observables, params, True, st.session_state["zsmooth"])
st.plotly_chart(fig)
figname = pa.SELECTED_NAME + f"_velocity_field_for_density_heatmap_zsmooth{st.session_state['zsmooth']}.pdf"
figname = (
pa.SELECTED_NAME
+ f"_velocity_field_for_density_heatmap_zsmooth{st.session_state['zsmooth']}.pdf"
)
img_bytes = fig.to_image(format="pdf")
st.sidebar.download_button(label="Download Density Heatmap", data=img_bytes, file_name=figname)
st.sidebar.download_button(
label="Download Density Heatmap", data=img_bytes, file_name=figname
)
plt.close()
with col2:
fig = update_figure(dict_observables, params, False, st.session_state["zsmooth"])
fig = update_figure(
dict_observables, params, False, st.session_state["zsmooth"]
)
st.plotly_chart(fig)
figname = pa.SELECTED_NAME + f"_velocity_field_for_density_heatmap_zsmooth{st.session_state['zsmooth']}.pdf"
figname = (
pa.SELECTED_NAME
+ f"_velocity_field_for_density_heatmap_zsmooth{st.session_state['zsmooth']}.pdf"
)
img_bytes = fig.to_image(format="pdf")
st.sidebar.download_button(label="Download Variance Heatmap", data=img_bytes, file_name=figname)
st.sidebar.download_button(
label="Download Variance Heatmap", data=img_bytes, file_name=figname
)
plt.close()


Expand Down
35 changes: 22 additions & 13 deletions src/tabs/survey_tab.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,17 @@ def histogram_survey(df_survey: pd.DataFrame, remove_outlier: bool) -> Figure:

# Add histogram traces for each category
fig.add_trace(go.Histogram(x=values_children, name="Children", marker_color="pink"))
fig.add_trace(go.Histogram(x=values_both, name="Adults and children", marker_color="blue"))
fig.add_trace(
go.Histogram(x=values_both, name="Adults and children", marker_color="blue")
)

# Create a DataFrame for custom data in hover templates
data = pd.DataFrame({"Categories": ["Adults and children"] * len(values_both) + ["Children"] * len(values_children)})
data = pd.DataFrame(
{
"Categories": ["Adults and children"] * len(values_both)
+ ["Children"] * len(values_children)
}
)

# Update hover template with custom data
fig.update_traces(
Expand All @@ -61,16 +68,16 @@ def histogram_survey(df_survey: pd.DataFrame, remove_outlier: bool) -> Figure:
title={"text": "Distribution of Group Sizes", "font_size": 28},
width=1000,
height=700,
xaxis=dict(
title={"text": "Group Size", "font_size": 30, "font_color": "black"},
tickfont_size=30,
tickfont_color="black",
),
yaxis=dict(
title={"text": "Counts", "font_size": 30, "font_color": "black"},
tickfont_size=30,
tickfont_color="black",
),
xaxis={
"title": {"text": "Group Size", "font_size": 30, "font_color": "black"},
"tickfont_size": 30,
"tickfont_color": "black",
},
yaxis={
"title": {"text": "Counts", "font_size": 30, "font_color": "black"},
"tickfont_size": 30,
"tickfont_color": "black",
},
legend={"font_size": 30, "font_color": "black"},
)

Expand All @@ -92,7 +99,9 @@ def main() -> None:
5. Provides a sidebar button to download the histogram as a PDF file.
"""
path = Path(__file__).resolve()
survey_path = path.parent.parent.parent.absolute() / "data" / "surveys" / "survey_results.csv"
survey_path = (
path.parent.parent.parent.absolute() / "data" / "surveys" / "survey_results.csv"
)
path_pickle = path.parent.parent.parent.absolute() / "data" / "pickle"

# Check if survey.pkl exists, if not create it, else load it
Expand Down

0 comments on commit 05d9ab2

Please sign in to comment.