Files
mt/tools/plot_scripts/results_inference_timeline.py
Jan Kowalczyk e7624d2786 wip inference
2025-09-15 11:21:30 +02:00

305 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import pickle
import shutil
from datetime import datetime
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
# =========================
# User-configurable params
# =========================
# Single experiment to plot (stem of the .bag file, e.g. "3_smoke_human_walking_2023-01-23")
EXPERIMENT_NAME = "3_smoke_human_walking_2023-01-23"
# Directory that contains {EXPERIMENT_NAME}_{method}_scores.npy for methods in {"deepsad","ocsvm","isoforest"}
# Adjust this to where you save your per-method scores.
methods_scores_path = Path(
"/home/fedex/mt/projects/thesis-kowalczyk-jan/Deep-SAD-PyTorch/infer/DeepSAD/test/inference"
)
# Root data path containing .bag files used to build the cached stats
all_data_path = Path("/home/fedex/mt/data/subter")
# Output base directory (timestamped subfolder will be created here, then archived and copied to "latest/")
output_path = Path("/home/fedex/mt/plots/results_inference_timeline")
# Cache (stats + labels) directory — same as your original script
cache_path = output_path
# Assumed LiDAR frame resolution to convert counts -> percent (unchanged from original)
data_resolution = 32 * 2048
# Frames per second for x-axis time
FPS = 10.0
# Whether to try to align score sign so that higher = more degraded.
# If manual labels exist for this experiment, alignment uses anomaly window mean vs. outside.
ALIGN_SCORE_DIRECTION = True
# =========================
# Setup output folders
# =========================
datetime_folder_name = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
latest_folder_path = output_path / "latest"
archive_folder_path = output_path / "archive"
output_datetime_path = output_path / datetime_folder_name
output_path.mkdir(exist_ok=True, parents=True)
output_datetime_path.mkdir(exist_ok=True, parents=True)
latest_folder_path.mkdir(exist_ok=True, parents=True)
archive_folder_path.mkdir(exist_ok=True, parents=True)
# =========================
# Discover experiments to reconstruct indices consistent with caches
# =========================
normal_experiment_paths, anomaly_experiment_paths = [], []
if not all_data_path.exists():
raise FileNotFoundError(f"all_data_path does not exist: {all_data_path}")
for bag_file_path in all_data_path.iterdir():
if bag_file_path.suffix != ".bag":
continue
if "smoke" in bag_file_path.name:
anomaly_experiment_paths.append(bag_file_path)
else:
normal_experiment_paths.append(bag_file_path)
# Sort by filesize to match original ordering used when caches were generated
normal_experiment_paths = sorted(
normal_experiment_paths, key=lambda p: p.stat().st_size
)
anomaly_experiment_paths = sorted(
anomaly_experiment_paths, key=lambda p: p.stat().st_size
)
# Find the path for the requested experiment
exp_path = None
exp_is_anomaly = None
for p in anomaly_experiment_paths:
if p.stem == EXPERIMENT_NAME:
exp_path = p
exp_is_anomaly = True
break
if exp_path is None:
for p in normal_experiment_paths:
if p.stem == EXPERIMENT_NAME:
exp_path = p
exp_is_anomaly = False
break
if exp_path is None:
raise FileNotFoundError(
f"Experiment '{EXPERIMENT_NAME}' not found as a .bag in {all_data_path}"
)
# Get the index within the appropriate list
if exp_is_anomaly:
exp_index = anomaly_experiment_paths.index(exp_path)
else:
exp_index = normal_experiment_paths.index(exp_path)
# =========================
# Load cached statistical data
# =========================
missing_points_cache = Path(cache_path / "missing_points.pkl")
near_sensor_cache = Path(cache_path / "particles_near_sensor_counts_500.pkl")
if not missing_points_cache.exists():
raise FileNotFoundError(f"Missing points cache not found: {missing_points_cache}")
if not near_sensor_cache.exists():
raise FileNotFoundError(f"Near-sensor cache not found: {near_sensor_cache}")
with open(missing_points_cache, "rb") as f:
missing_points_normal, missing_points_anomaly = pickle.load(f)
with open(near_sensor_cache, "rb") as f:
near_sensor_normal, near_sensor_anomaly = pickle.load(f)
if exp_is_anomaly:
missing_points_series = np.asarray(missing_points_anomaly[exp_index], dtype=float)
near_sensor_series = np.asarray(near_sensor_anomaly[exp_index], dtype=float)
else:
missing_points_series = np.asarray(missing_points_normal[exp_index], dtype=float)
near_sensor_series = np.asarray(near_sensor_normal[exp_index], dtype=float)
# Convert counts to percentages of total points
missing_points_pct = (missing_points_series / data_resolution) * 100.0
near_sensor_pct = (near_sensor_series / data_resolution) * 100.0
# =========================
# Load manual anomaly frame borders (optional; used for sign alignment + vertical markers)
# =========================
manually_labeled_anomaly_frames = {}
labels_json_path = cache_path / "manually_labeled_anomaly_frames.json"
if labels_json_path.exists():
with open(labels_json_path, "r") as frame_borders_file:
manually_labeled_anomaly_frames_json = json.load(frame_borders_file)
for file in manually_labeled_anomaly_frames_json.get("files", []):
manually_labeled_anomaly_frames[file["filename"]] = (
file.get("semi_target_begin_frame", None),
file.get("semi_target_end_frame", None),
)
# The JSON uses .npy filenames (as in original script). Create this experiments key.
exp_npy_filename = exp_path.with_suffix(".npy").name
anomaly_window = manually_labeled_anomaly_frames.get(exp_npy_filename, (None, None))
# =========================
# Load method scores and z-score normalize per method
# =========================
def zscore_1d(x: np.ndarray, eps=1e-12):
x = np.asarray(x, dtype=float)
mu = np.mean(x)
sigma = np.std(x, ddof=0)
if sigma < eps:
return np.zeros_like(x)
return (x - mu) / sigma
def maybe_align_direction(z: np.ndarray, window):
"""Flip sign so that the anomaly window mean is higher than the outside mean, if labels exist."""
start, end = window
if start is None or end is None:
return z # no labels → leave as-is
start = int(max(0, start))
end = int(min(len(z), end))
if end <= start or end > len(z):
return z
inside_mean = float(np.mean(z[start:end]))
# outside: everything except [start:end]; handle edge cases
if start == 0 and end == len(z):
return z
outside_parts = []
if start > 0:
outside_parts.append(z[:start])
if end < len(z):
outside_parts.append(z[end:])
if not outside_parts:
return z
outside_mean = float(np.mean(np.concatenate(outside_parts)))
return z if inside_mean >= outside_mean else -z
methods = ["deepsad", "ocsvm", "isoforest"]
method_scores = {}
method_zscores = {}
if not methods_scores_path.exists():
raise FileNotFoundError(
f"Methods scores path does not exist: {methods_scores_path}"
)
for m in methods:
file_path = methods_scores_path / f"{EXPERIMENT_NAME}_{m}_scores.npy"
if not file_path.exists():
raise FileNotFoundError(f"Missing scores file for method '{m}': {file_path}")
s = np.load(file_path)
s = np.asarray(s, dtype=float).reshape(-1)
# If needed, truncate or pad to match stats length (should match if generated consistently)
n = min(len(s), len(missing_points_pct))
if len(s) != len(missing_points_pct):
# Align by truncation to the shortest length
s = s[:n]
# Also truncate stats to match
missing_points_pct = missing_points_pct[:n]
near_sensor_pct = near_sensor_pct[:n]
z = zscore_1d(s)
if ALIGN_SCORE_DIRECTION:
z = maybe_align_direction(z, anomaly_window)
method_scores[m] = s
method_zscores[m] = z
# Common time axis in seconds
num_frames = len(missing_points_pct)
t = np.arange(num_frames) / FPS
# =========================
# Plot 1: Missing points (%) vs. method z-scores
# =========================
fig1, axz1 = plt.subplots(figsize=(14, 6), constrained_layout=True)
axy1 = axz1.twinx()
# plot z-scores
for m in methods:
axz1.plot(t, method_zscores[m], label=f"{m} (z)", alpha=0.9)
# plot missing points (%)
axy1.plot(t, missing_points_pct, linestyle="--", alpha=0.7, label="Missing points (%)")
# vertical markers for anomaly window if available
start, end = anomaly_window
if start is not None and end is not None and 0 <= start < end <= num_frames:
axz1.axvline(x=start / FPS, linestyle=":", alpha=0.6)
axz1.axvline(x=end / FPS, linestyle=":", alpha=0.6)
axz1.set_xlabel("Time (s)")
axz1.set_ylabel("Anomaly score (z-score, ↑ = more degraded)")
axy1.set_ylabel("Missing points (%)")
axz1.set_title(f"{EXPERIMENT_NAME}\nDegradation vs. Missing Points")
# Build a combined legend
lines1, labels1 = axz1.get_legend_handles_labels()
lines2, labels2 = axy1.get_legend_handles_labels()
axz1.legend(lines1 + lines2, labels1 + labels2, loc="upper right")
axz1.grid(True, alpha=0.3)
fig1.savefig(
output_datetime_path / f"{EXPERIMENT_NAME}_zscores_vs_missing_points.png", dpi=150
)
plt.close(fig1)
# =========================
# Plot 2: Near-sensor (%) vs. method z-scores
# =========================
fig2, axz2 = plt.subplots(figsize=(14, 6), constrained_layout=True)
axy2 = axz2.twinx()
for m in methods:
axz2.plot(t, method_zscores[m], label=f"{m} (z)", alpha=0.9)
axy2.plot(t, near_sensor_pct, linestyle="--", alpha=0.7, label="Near-sensor <0.5m (%)")
start, end = anomaly_window
if start is not None and end is not None and 0 <= start < end <= num_frames:
axz2.axvline(x=start / FPS, linestyle=":", alpha=0.6)
axz2.axvline(x=end / FPS, linestyle=":", alpha=0.6)
axz2.set_xlabel("Time (s)")
axz2.set_ylabel("Anomaly score (z-score, ↑ = more degraded)")
axy2.set_ylabel("Near-sensor points (%)")
axz2.set_title(f"{EXPERIMENT_NAME}\nDegradation vs. Near-Sensor Points (<0.5 m)")
lines1, labels1 = axz2.get_legend_handles_labels()
lines2, labels2 = axy2.get_legend_handles_labels()
axz2.legend(lines1 + lines2, labels1 + labels2, loc="upper right")
axz2.grid(True, alpha=0.3)
fig2.savefig(
output_datetime_path / f"{EXPERIMENT_NAME}_zscores_vs_near_sensor.png", dpi=150
)
plt.close(fig2)
# =========================
# Preserve latest/, archive/, copy script
# =========================
# delete current latest folder
shutil.rmtree(latest_folder_path, ignore_errors=True)
# create new latest folder
latest_folder_path.mkdir(exist_ok=True, parents=True)
# copy contents of output folder to the latest folder
for file in output_datetime_path.iterdir():
shutil.copy2(file, latest_folder_path)
# copy this python script to preserve the code used
shutil.copy2(__file__, output_datetime_path)
shutil.copy2(__file__, latest_folder_path)
# move output date folder to archive
shutil.move(output_datetime_path, archive_folder_path)
print("Done. Plots saved and archived.")