Files
mt/tools/animate_score.py
2025-03-14 18:02:23 +01:00

152 lines
5.3 KiB
Python

from functools import partial
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import animation
# from util import calculate_average_frame_rate, load_dataset
from rich.progress import Progress, track
scores_path = Path(
"/home/fedex/mt/projects/thesis-kowalczyk-jan/Deep-SAD-PyTorch/infer/DeepSAD/subter_selective_test/inference/3_smoke_human_walking_2023-01-23.npy"
)
# scores_path = Path(
# "/home/fedex/mt/projects/thesis-kowalczyk-jan/Deep-SAD-PyTorch/infer/DeepSAD/subter_split_test/inference/3_smoke_human_walking_2023-01-23.npy"
# )
# scores_path = Path(
# "/home/fedex/mt/projects/thesis-kowalczyk-jan/Deep-SAD-PyTorch/infer/DeepSAD/subter_split_test/inference/1_loop_closure_illuminated_two_LiDARs_2023-01-23.npy"
# )
# scores_path = Path(
# "/home/fedex/mt/projects/thesis-kowalczyk-jan/Deep-SAD-PyTorch/infer/DeepSAD/esmera_split_test/inference/experiment_3.npy"
# )
# dataset_path = Path(
# "/home/fedex/mt/data/subter/1_loop_closure_illuminated_two_LiDARs_2023-01-23.bag"
# )
# dataset_path = Path("/home/fedex/mt/data/subter/3_smoke_human_walking_2023-01-23.bag")
# dataset = load_dataset(dataset_path)
fps = 10
all_scores = np.load(scores_path)
y_limit = 1.1 * np.max(all_scores[np.isfinite(all_scores)])
# y_limit = 10
# all_scores = np.where(all_scores > 10, 10, all_scores)
# all_scores = all_scores.reshape(-1, 16).T #SUBTER
# all_scores = all_scores.reshape(-1, 8).T # ESMERA
all_scores = all_scores.reshape(-1, 1).T # ESMERA
print(all_scores.shape, y_limit)
fig, axes = plt.subplots(
1,
1,
figsize=(7.68, 7.2),
# 1, 1, figsize=(7.68, 7.2), gridspec_kw={"wspace": 0.10, "hspace": 0.10}
)
axes = [axes]
# fig, axes = plt.subplots(
# 1, 8, figsize=(20.48, 7.2), gridspec_kw={"wspace": 0.05, "hspace": 0.05}
# )
# Flatten the axes for easier indexing
# axes = axes.flatten()
last_running_avg = [None] * len(all_scores)
# Function to calculate the running average with a dynamic window size
def running_average_dynamic(data, current_frame, max_window=20):
"""Calculate the running average with dynamic window size up to max_window."""
window = min(
current_frame, max_window
) # Use the minimum of current frame or the max window size
return np.convolve(data, np.ones(window) / window, mode="valid")
# Function to animate each subplot
def animate(i, progress_bar, progress_task):
for score_id, ax in enumerate(axes):
ax.cla() # Clear the axes for the current frame
ax.plot(
all_scores[score_id][:i], label="Anomaly Score"
) # Plot the scores up to frame `i`
if i > 0:
avg_data = running_average_dynamic(
all_scores[score_id][:i], current_frame=i, max_window=20
)
ax.plot(range(len(avg_data)), avg_data, color="orange", label="Running Avg")
ax.set_xlim([0, all_scores[score_id].shape[0]]) # Set x limits
ax.set_ylim([0, y_limit]) # Set y limits
# ax.set_title(f"Score {score_id + 1}") # Add a title for each subplot
ax.set_ylabel("Score", fontsize=10) # Add y-axis label
ax.set_xlabel("Frame", fontsize=10) # Add y-axis label
ax.legend(loc="upper right", fontsize=10) # Add a legend
# Only show y-axis tick labels for the leftmost subplots
# if score_id % 8 == 0: # First column of subplots
# ax.set_ylabel("Score", fontsize=8) # Add y-axis label
# else:
# ax.set_yticklabels([]) # Remove y-axis tick labels
# if score_id < 8:
# ax.set_xticklabels([]) # Remove x-axis tick labels
# ax.tick_params(labelsize=6)
# Update the running average text every 10 frames
if i % fps == 0 and i > 0:
# Calculate the current running average value (up to last 20 frames)
current_window = min(i, 20)
last_running_avg[score_id] = np.mean(
all_scores[score_id][i - current_window : i]
)
# Display the last updated running average value (if available)
if last_running_avg[score_id] is not None:
ax.text(
0.05,
0.95,
f"Current Avg: {last_running_avg[score_id]:>2.1f}",
transform=ax.transAxes,
fontsize=10,
verticalalignment="top",
horizontalalignment="left",
color="black",
fontfamily="monospace",
)
progress_bar.update(progress_task, completed=i)
# plt.subplots_adjust(
# left=0.02, right=0.98, top=0.95, bottom=0.05, wspace=0.05, hspace=0.05
# )
with Progress() as progress:
total = all_scores[0].shape[0] + 1
progress_task = progress.add_task("[cyan]Animating...", total=total)
progress.update(progress_task, completed=0)
animate_partial = partial(
animate, progress_bar=progress, progress_task=progress_task
)
# anim = animation.FuncAnimation(fig, animate_partial, frames=50, interval=1, blit=False)
anim = animation.FuncAnimation(
fig, animate_partial, frames=total, interval=1, blit=False
)
# Save the animation as a single video
animated_score_filename = f"{scores_path.stem}_selective.mp4"
anim.save(animated_score_filename, writer=animation.FFMpegWriter(fps=fps))
progress.update(progress_task, completed=all_scores[0].shape[0] + 1)
# Clear the figure after saving
fig.clear()