Skip to content

Quality Events Demo¤

Demonstrates outlier detection, SPC rules, and tolerance deviation analysis for manufacturing measurement data.

Run it: python examples/quality_events_demo.py

Modules demonstrated: OutlierDetectionEvents, StatisticalProcessControlRuleBased, ToleranceDeviationEvents

Related guides: Quality Control & SPC


#!/usr/bin/env python3
"""
Quality Events Demo for ts-shape
=================================

Demonstrates quality event detection classes for manufacturing measurement data:
1. OutlierDetectionEvents  - Detect outliers via Z-score, IQR, MAD, and Isolation Forest
2. StatisticalProcessControlRuleBased - Western Electric SPC rules and CUSUM analysis
3. ToleranceDeviationEvents - Tolerance deviation detection with severity classification

Scenario: A CNC milling machine produces parts with a target bore diameter of 25.000 mm.
Sensor readings are collected every 30 seconds over an 8-hour shift.
"""

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

from ts_shape.events.quality.outlier_detection import OutlierDetectionEvents
from ts_shape.events.quality.statistical_process_control import StatisticalProcessControlRuleBased
from ts_shape.events.quality.tolerance_deviation import ToleranceDeviationEvents


# ---------------------------------------------------------------------------
# Helper: build a realistic manufacturing timeseries DataFrame
# ---------------------------------------------------------------------------

def create_bore_diameter_data(n_points: int = 960) -> pd.DataFrame:
    """
    Simulate 8 hours of bore-diameter measurements (one every 30 s).

    The signal has:
      - a stable baseline around 25.000 mm with normal noise (sigma ~0.003)
      - a slow upward drift starting at sample 600 (simulating tool wear)
      - a handful of spike outliers injected at known positions
    """
    np.random.seed(42)
    start = datetime(2025, 6, 10, 6, 0, 0)
    times = [start + timedelta(seconds=30 * i) for i in range(n_points)]

    # Baseline + noise
    values = 25.000 + np.random.normal(0, 0.003, n_points)

    # Gradual tool-wear drift after sample 600
    drift = np.zeros(n_points)
    drift[600:] = np.linspace(0, 0.012, n_points - 600)
    values += drift

    # Inject obvious outlier spikes
    spike_indices = [120, 350, 351, 780]
    for idx in spike_indices:
        values[idx] += np.random.choice([-1, 1]) * np.random.uniform(0.025, 0.040)

    df = pd.DataFrame({
        "systime": pd.to_datetime(times),
        "uuid": "bore_diameter_sensor",
        "value_bool": None,
        "value_integer": None,
        "value_double": np.round(values, 6),
        "value_string": None,
        "is_delta": True,
    })
    return df


def create_spc_data() -> pd.DataFrame:
    """
    Build a DataFrame suitable for SPC analysis.

    Contains two uuid groups:
      - 'bore_tolerance': historical tolerance / reference measurements
      - 'bore_actual':    production measurements (some with rule violations)
    """
    np.random.seed(99)
    start = datetime(2025, 6, 10, 6, 0, 0)

    # Tolerance baseline (100 samples, tightly controlled)
    n_tol = 100
    tol_times = [start + timedelta(seconds=30 * i) for i in range(n_tol)]
    tol_values = 25.000 + np.random.normal(0, 0.002, n_tol)

    tol_df = pd.DataFrame({
        "systime": pd.to_datetime(tol_times),
        "uuid": "bore_tolerance",
        "value_bool": None,
        "value_integer": None,
        "value_double": np.round(tol_values, 6),
        "value_string": None,
        "is_delta": True,
    })

    # Actual production measurements (500 samples)
    n_act = 500
    act_times = [start + timedelta(seconds=30 * (n_tol + i)) for i in range(n_act)]
    act_values = 25.000 + np.random.normal(0, 0.003, n_act)

    # Inject a mean shift (rule 2) around samples 200-220
    act_values[200:220] += 0.007

    # Inject a steady upward trend (rule 3) around samples 350-360
    act_values[350:360] += np.linspace(0, 0.015, 10)

    # Inject a point beyond 3-sigma (rule 1)
    act_values[450] = 25.035

    act_df = pd.DataFrame({
        "systime": pd.to_datetime(act_times),
        "uuid": "bore_actual",
        "value_bool": None,
        "value_integer": None,
        "value_double": np.round(act_values, 6),
        "value_string": None,
        "is_delta": True,
    })

    return pd.concat([tol_df, act_df], ignore_index=True)


def create_tolerance_data() -> pd.DataFrame:
    """
    Build a DataFrame for tolerance deviation analysis.

    Contains three uuid groups:
      - 'upper_spec_limit': upper tolerance (25.010 mm)
      - 'lower_spec_limit': lower tolerance (24.990 mm)
      - 'bore_measurement': actual bore measurements with some out-of-spec values
    """
    np.random.seed(7)
    start = datetime(2025, 6, 10, 6, 0, 0)

    rows = []

    # Upper tolerance setting event
    rows.append({
        "systime": start,
        "uuid": "upper_spec_limit",
        "value_bool": None,
        "value_integer": None,
        "value_double": 25.010,
        "value_string": None,
        "is_delta": True,
    })

    # Lower tolerance setting event
    rows.append({
        "systime": start,
        "uuid": "lower_spec_limit",
        "value_bool": None,
        "value_integer": None,
        "value_double": 24.990,
        "value_string": None,
        "is_delta": True,
    })

    # Actual measurements (300 samples, some outside tolerance)
    n_meas = 300
    for i in range(n_meas):
        t = start + timedelta(seconds=30 * (i + 1))
        val = 25.000 + np.random.normal(0, 0.005)
        # Force some consecutive out-of-spec values around index 100-104
        if 100 <= i <= 104:
            val = 25.015 + np.random.uniform(0, 0.005)
        # Another group around 250-252
        if 250 <= i <= 252:
            val = 24.983 - np.random.uniform(0, 0.003)
        rows.append({
            "systime": t,
            "uuid": "bore_measurement",
            "value_bool": None,
            "value_integer": None,
            "value_double": round(val, 6),
            "value_string": None,
            "is_delta": True,
        })

    return pd.DataFrame(rows)


# ---------------------------------------------------------------------------
# Demo functions
# ---------------------------------------------------------------------------

def demo_outlier_detection():
    """Demonstrate all outlier detection methods."""
    print("=" * 72)
    print("1. OUTLIER DETECTION EVENTS")
    print("=" * 72)

    df = create_bore_diameter_data()
    print(f"\nInput data: {len(df)} bore-diameter readings over 8 hours")
    print(f"Value range: {df['value_double'].min():.4f} - {df['value_double'].max():.4f} mm\n")

    detector = OutlierDetectionEvents(
        dataframe=df,
        value_column="value_double",
        event_uuid="outlier_event",
        time_threshold="5min",
    )

    # --- Z-score method ---
    print("--- Z-score method (threshold=3.0) ---")
    zscore_events = detector.detect_outliers_zscore(threshold=3.0)
    print(f"  Detected {len(zscore_events)} outlier event rows")
    if not zscore_events.empty:
        print(zscore_events[["systime", "value_double", "severity_score"]].to_string(index=False))
    print()

    # --- IQR method ---
    print("--- IQR method (threshold=(1.5, 1.5)) ---")
    iqr_events = detector.detect_outliers_iqr(threshold=(1.5, 1.5))
    print(f"  Detected {len(iqr_events)} outlier event rows")
    if not iqr_events.empty:
        print(iqr_events[["systime", "value_double", "severity_score"]].head(10).to_string(index=False))
    print()

    # --- MAD method ---
    print("--- MAD method (threshold=3.5) ---")
    mad_events = detector.detect_outliers_mad(threshold=3.5)
    print(f"  Detected {len(mad_events)} outlier event rows")
    if not mad_events.empty:
        print(mad_events[["systime", "value_double", "severity_score"]].head(10).to_string(index=False))
    print()

    # --- Isolation Forest method ---
    print("--- Isolation Forest method (contamination=0.05) ---")
    try:
        iforest_events = detector.detect_outliers_isolation_forest(contamination=0.05)
        print(f"  Detected {len(iforest_events)} outlier event rows")
        if not iforest_events.empty:
            print(iforest_events[["systime", "value_double", "severity_score"]].head(10).to_string(index=False))
    except ImportError:
        print("  scikit-learn not installed -- skipping Isolation Forest demo")
    print()


def demo_spc():
    """Demonstrate Statistical Process Control (Western Electric Rules + CUSUM)."""
    print("=" * 72)
    print("2. STATISTICAL PROCESS CONTROL (SPC)")
    print("=" * 72)

    df = create_spc_data()
    print(f"\nInput data: {len(df)} rows ({df['uuid'].value_counts().to_dict()})")

    spc = StatisticalProcessControlRuleBased(
        dataframe=df,
        value_column="value_double",
        tolerance_uuid="bore_tolerance",
        actual_uuid="bore_actual",
        event_uuid="spc_violation_event",
    )

    # --- Control limits ---
    print("\n--- Static control limits (from tolerance data) ---")
    limits = spc.calculate_control_limits()
    for col in limits.columns:
        print(f"  {col}: {limits[col].values[0]:.6f}")

    # --- Dynamic control limits ---
    print("\n--- Dynamic control limits (EWMA, window=20) ---")
    dynamic_limits = spc.calculate_dynamic_control_limits(method="ewma", window=20)
    print(f"  Generated {len(dynamic_limits)} rows of dynamic limits")
    print(dynamic_limits.tail(5).to_string(index=False))

    # --- Apply all rules ---
    print("\n--- Apply all Western Electric rules ---")
    violations = spc.process(include_severity=True)
    if not violations.empty:
        print(f"  Found {len(violations)} violations")
        print(violations.head(10).to_string(index=False))
    else:
        print("  No violations detected (data is well-behaved)")

    # --- Vectorized rule application (selected rules) ---
    print("\n--- Vectorized rules: rule_1, rule_2, rule_3 ---")
    vec_violations = spc.apply_rules_vectorized(selected_rules=["rule_1", "rule_2", "rule_3"])
    if not vec_violations.empty:
        print(f"  Found {len(vec_violations)} violations")
        print(vec_violations.head(10).to_string(index=False))
    else:
        print("  No violations from selected rules")

    # --- Interpret violations ---
    if not vec_violations.empty:
        print("\n--- Interpret violations ---")
        interpreted = spc.interpret_violations(vec_violations)
        for _, row in interpreted.head(3).iterrows():
            print(f"  Rule: {row['rule']}")
            print(f"    Severity:       {row['severity']}")
            print(f"    Interpretation: {row['interpretation']}")
            print(f"    Recommendation: {row['recommendation']}")
            print()

    # --- CUSUM shift detection ---
    print("--- CUSUM shift detection ---")
    cusum_shifts = spc.detect_cusum_shifts(k=0.5, h=5.0)
    if not cusum_shifts.empty:
        print(f"  Detected {len(cusum_shifts)} CUSUM shift points")
        print(cusum_shifts.head(10).to_string(index=False))
    else:
        print("  No CUSUM shifts detected")
    print()


def demo_tolerance_deviation():
    """Demonstrate tolerance deviation detection with separate upper/lower limits."""
    print("=" * 72)
    print("3. TOLERANCE DEVIATION EVENTS")
    print("=" * 72)

    df = create_tolerance_data()
    print(f"\nInput data: {len(df)} rows")
    print(f"  Upper spec: 25.010 mm, Lower spec: 24.990 mm")

    tol = ToleranceDeviationEvents(
        dataframe=df,
        tolerance_column="value_double",
        actual_column="value_double",
        actual_uuid="bore_measurement",
        event_uuid="tolerance_deviation_event",
        upper_tolerance_uuid="upper_spec_limit",
        lower_tolerance_uuid="lower_spec_limit",
        warning_threshold=0.8,
        time_threshold="5min",
    )

    # --- Process and group tolerance events ---
    print("\n--- Process tolerance deviation events ---")
    events = tol.process_and_group_data_with_events()
    if not events.empty:
        print(f"  Generated {len(events)} event rows")
        display_cols = [c for c in ["systime", "uuid", "deviation_abs", "deviation_pct", "severity"] if c in events.columns]
        if display_cols:
            print(events[display_cols].head(10).to_string(index=False))
    else:
        print("  No grouped tolerance deviation events (individual deviations may exist)")

    # --- Process capability indices ---
    print("\n--- Process capability indices ---")
    try:
        capability = tol.compute_capability_indices()
        print(f"  Cp  = {capability['Cp']:.4f}  (potential capability)")
        print(f"  Cpk = {capability['Cpk']:.4f}  (actual capability)")
        print(f"  Pp  = {capability['Pp']:.4f}  (process performance)")
        print(f"  Ppk = {capability['Ppk']:.4f}  (process performance index)")
        print(f"  USL = {capability['usl']:.4f} mm")
        print(f"  LSL = {capability['lsl']:.4f} mm")
        print(f"  Process mean = {capability['mean']:.4f} mm")
        print(f"  Process std  = {capability['std']:.6f} mm")
        if capability['Cpk'] >= 1.33:
            print("  --> Process is CAPABLE (Cpk >= 1.33)")
        else:
            print("  --> Process is NOT capable (Cpk < 1.33) -- investigate variation sources")
    except Exception as e:
        print(f"  Could not compute capability indices: {e}")
    print()


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

if __name__ == "__main__":
    demo_outlier_detection()
    demo_spc()
    demo_tolerance_deviation()

    print("=" * 72)
    print("Quality events demo complete.")
    print("=" * 72)