import json import pickle import shutil from datetime import datetime from pathlib import Path import matplotlib.pyplot as plt import numpy as np # define data paths all_data_path = Path("/home/fedex/mt/data/subter") output_path = Path("/home/fedex/mt/plots/data_anomalies_timeline") datetime_folder_name = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") latest_folder_path = output_path / "latest" archive_folder_path = output_path / "archive" output_datetime_path = output_path / datetime_folder_name cache_path = output_path # if output does not exist, create it output_path.mkdir(exist_ok=True, parents=True) output_datetime_path.mkdir(exist_ok=True, parents=True) latest_folder_path.mkdir(exist_ok=True, parents=True) archive_folder_path.mkdir(exist_ok=True, parents=True) data_resolution = 32 * 2048 # find all bag files and sort them correctly by name normal_experiment_paths, anomaly_experiment_paths = [], [] for bag_file_path in all_data_path.iterdir(): if bag_file_path.suffix != ".bag": continue if "smoke" in bag_file_path.name: anomaly_experiment_paths.append(bag_file_path) else: normal_experiment_paths.append(bag_file_path) # Load manually labeled frames with open( cache_path / "manually_labeled_anomaly_frames.json", "r" ) as frame_borders_file: manually_labeled_anomaly_frames_json = json.load(frame_borders_file) if not manually_labeled_anomaly_frames_json: print("No manually labeled anomaly frames found. Exiting...") exit(1) manually_labeled_anomaly_frames = {} try: for file in manually_labeled_anomaly_frames_json["files"]: if file["filename"] not in ( p.with_suffix(".npy").name for p in anomaly_experiment_paths ): print( f"File {file['filename']} from manually labeled frames not found in anomaly experiments. Exiting..." ) exit(1) manually_labeled_anomaly_frames[file["filename"]] = ( file["semi_target_begin_frame"], file["semi_target_end_frame"], ) except KeyError as e: print(f"Missing key in manually labeled frames JSON: {e}") exit(1) def plot_combined_timeline( normal_experiment_paths, anomaly_experiment_paths, title, num_bins=50 ): """Plot both missing points and near-sensor measurements over normalized timeline""" # Sort experiments by filesize first (to match original processing order) normal_experiment_paths = sorted( normal_experiment_paths, key=lambda path: path.stat().st_size ) anomaly_experiment_paths = sorted( anomaly_experiment_paths, key=lambda path: path.stat().st_size ) # Get largest normal experiment and moving anomaly experiments baseline_path = normal_experiment_paths[-3] # largest normal experiment moving_exp_indices = [ i for i, path in enumerate(anomaly_experiment_paths) if "stationary" not in path.name ] moving_anomaly_paths = [anomaly_experiment_paths[i] for i in moving_exp_indices] # Load missing points data missing_points_cache = Path(cache_path / "missing_points.pkl") if not missing_points_cache.exists(): print("Missing points cache not found!") return # Load near-sensor data (using 500mm threshold) near_sensor_cache = Path(cache_path / "particles_near_sensor_counts_500.pkl") if not near_sensor_cache.exists(): print("Near-sensor measurements cache not found!") return # Load both cached datasets with open(missing_points_cache, "rb") as file: missing_points_normal, missing_points_anomaly = pickle.load(file) with open(near_sensor_cache, "rb") as file: near_sensor_normal, near_sensor_anomaly = pickle.load(file) # Get data for baseline and moving experiments missing_data = [missing_points_normal[-3]] + [ missing_points_anomaly[i] for i in moving_exp_indices ] near_sensor_data = [near_sensor_normal[-3]] + [ near_sensor_anomaly[i] for i in moving_exp_indices ] all_paths = [baseline_path] + moving_anomaly_paths # Create figure with two y-axes and dynamic layout fig, ax1 = plt.subplots(figsize=(12, 6), constrained_layout=True) ax2 = ax1.twinx() # Color schemes - gray for baseline, colors for anomaly experiments experiment_colors = ["#808080"] + ["#1f77b4", "#ff7f0e", "#2ca02c"] # gray + colors # First create custom legend handles for the metrics from matplotlib.lines import Line2D metric_legend = [ Line2D([0], [0], color="gray", linestyle="-", label="Missing Points"), Line2D([0], [0], color="gray", linestyle="--", label="Near-Sensor (<0.5m)"), Line2D([0], [0], color="gray", linestyle=":", label="Manually Labeled Borders"), ] # Plot each experiment's data for i, (missing_exp, near_sensor_exp) in enumerate( zip(missing_data, near_sensor_data) ): # Get experiment name without the full path exp_name = all_paths[i].stem # Shorten experiment name if needed exp_name = exp_name.replace("experiment_smoke_", "exp_") # Convert both to percentages missing_pct = np.array(missing_exp) / data_resolution * 100 near_sensor_pct = np.array(near_sensor_exp) / data_resolution * 100 # Create normalized timeline bins for both exp_len = len(missing_pct) bins = np.linspace(0, exp_len - 1, num_bins) missing_binned = np.zeros(num_bins) near_sensor_binned = np.zeros(num_bins) # Bin both datasets for bin_idx in range(num_bins): if bin_idx == num_bins - 1: start_idx = int(bins[bin_idx]) end_idx = exp_len else: start_idx = int(bins[bin_idx]) end_idx = int(bins[bin_idx + 1]) missing_binned[bin_idx] = np.mean(missing_pct[start_idx:end_idx]) near_sensor_binned[bin_idx] = np.mean(near_sensor_pct[start_idx:end_idx]) # Plot both metrics with same color but different line styles color = experiment_colors[i] ax1.plot( range(num_bins), missing_binned, color=color, linestyle="-", alpha=0.6, label=exp_name, ) ax2.plot( range(num_bins), near_sensor_binned, color=color, linestyle="--", alpha=0.6 ) # # Add vertical lines for manually labeled frames if available # if all_paths[i].with_suffix(".npy").name in manually_labeled_anomaly_frames: # begin_frame, end_frame = manually_labeled_anomaly_frames[ # all_paths[i].with_suffix(".npy").name # ] # # Convert frame numbers to normalized timeline positions # begin_pos = (begin_frame / exp_len) * (num_bins - 1) # end_pos = (end_frame / exp_len) * (num_bins - 1) # # Add vertical lines with matching color and loose dotting # ax1.axvline( # x=begin_pos, # color=color, # linestyle=":", # alpha=0.6, # ) # ax1.axvline( # x=end_pos, # color=color, # linestyle=":", # alpha=0.6, # ) # Customize axes ax1.set_xlabel("Normalized Timeline") ax1.set_xticks(np.linspace(0, num_bins - 1, 5)) ax1.set_xticklabels([f"{x:.0f}%" for x in np.linspace(0, 100, 5)]) ax1.set_ylabel("Missing Points (%)") ax2.set_ylabel("Points with <0.5m Range (%)") # plt.title(title) # Create legends without fixed positions # First get all lines and labels for experiments lines1, labels1 = ax1.get_legend_handles_labels() # Combine both legends into one all_handles = ( lines1 + [Line2D([0], [0], color="gray", linestyle="-", label="", alpha=0)] + metric_legend ) all_labels = ( labels1 + [""] + ["Missing Points", "Points Near Sensor (<0.5m)", "Manually Labeled Borders"] ) # Create single legend in top right corner with consistent margins # fig.legend(all_handles, all_labels, loc="upper right", borderaxespad=2.8) fig.legend(all_handles, all_labels, bbox_to_anchor=(0.95, 0.99)) plt.grid(True, alpha=0.3) # Save figure letting matplotlib handle the layout plt.savefig(output_datetime_path / "combined_anomalies_timeline.png", dpi=150) plt.close() # Generate the combined timeline plot plot_combined_timeline( normal_experiment_paths, anomaly_experiment_paths, "Lidar Degradation Indicators Throughout Experiments\n(Baseline and Moving Anomaly Experiments)", ) # delete current latest folder shutil.rmtree(latest_folder_path, ignore_errors=True) # create new latest folder latest_folder_path.mkdir(exist_ok=True, parents=True) # copy contents of output folder to the latest folder for file in output_datetime_path.iterdir(): shutil.copy2(file, latest_folder_path) # copy this python script to preserve the code used shutil.copy2(__file__, output_datetime_path) shutil.copy2(__file__, latest_folder_path) # move output date folder to archive shutil.move(output_datetime_path, archive_folder_path)