Correlation Events Demoยค
Demonstrates cross-signal correlation analysis and anomaly correlation for root cause identification.
Run it: python examples/correlation_events_demo.py
Modules demonstrated: SignalCorrelationEvents, AnomalyCorrelationEvents
Related guides: Module Reference: Correlation
"""Correlation Events Demo
Demonstrates cross-signal correlation analysis and anomaly correlation
for detecting related process issues in manufacturing.
Run: python examples/correlation_events_demo.py
"""
import pandas as pd
import numpy as np
from ts_shape.events.correlation.signal_correlation import SignalCorrelationEvents
from ts_shape.events.correlation.anomaly_correlation import AnomalyCorrelationEvents
def create_correlated_process_data(n: int = 1000) -> pd.DataFrame:
"""Create realistic multi-signal process data with correlations.
Simulates a process with:
- Temperature and pressure (normally correlated)
- Vibration that increases when temperature spikes
- Flow rate that drops when pressure is abnormal
"""
np.random.seed(42)
base = pd.Timestamp("2024-01-01")
rows = []
# Base temperature signal with gradual drift
temp = np.cumsum(np.random.normal(0, 0.1, n)) + 85.0
# Pressure correlated with temperature (~0.8 correlation)
pressure = temp * 0.12 + 2.5 + np.random.normal(0, 0.3, n)
# Inject process upset at t=400-450: temperature spike
temp[400:450] += np.linspace(0, 15, 50)
# Pressure diverges during upset
pressure[410:460] -= np.linspace(0, 3, 50)
# Vibration: normally low, spikes after temperature issues
vibration = np.abs(np.random.normal(2.0, 0.5, n))
vibration[420:470] += np.linspace(0, 8, 50) # delayed reaction to temp
# Flow rate: drops when pressure drops
flow = 100 + pressure * 5 + np.random.normal(0, 1, n)
# Inject another anomaly at t=700
temp[700] += 25 # sudden spike
vibration[705] += 12 # delayed vibration response
pressure[710] -= 5 # delayed pressure response
for i in range(n):
t = base + pd.Timedelta(minutes=i)
for uuid, val in [
("proc:temperature", temp[i]),
("proc:pressure", pressure[i]),
("proc:vibration", vibration[i]),
("proc:flow_rate", flow[i]),
]:
rows.append({
"systime": t, "uuid": uuid,
"value_double": val, "is_delta": True,
})
return pd.DataFrame(rows)
if __name__ == "__main__":
print("=" * 70)
print("CORRELATION EVENTS DEMO")
print("=" * 70)
df = create_correlated_process_data()
print(f"\nCreated dataset: {len(df)} rows, {df['uuid'].nunique()} signals")
# -----------------------------------------------------------------------
# 1. Signal Correlation Analysis
# -----------------------------------------------------------------------
print("\n" + "-" * 70)
print("1. ROLLING CORRELATION: Temperature vs Pressure")
print("-" * 70)
sc = SignalCorrelationEvents(df)
corr = sc.rolling_correlation(
"proc:temperature", "proc:pressure", resample="1min", window=30
)
print(f"\nRolling correlation points: {len(corr)}")
print(f"Mean correlation: {corr['correlation'].mean():.3f}")
print(f"Min correlation: {corr['correlation'].min():.3f}")
print(f"Max correlation: {corr['correlation'].max():.3f}")
# -----------------------------------------------------------------------
# 2. Correlation Breakdown Detection
# -----------------------------------------------------------------------
print("\n" + "-" * 70)
print("2. CORRELATION BREAKDOWN: Temperature vs Pressure")
print("-" * 70)
breakdowns = sc.correlation_breakdown(
"proc:temperature", "proc:pressure",
resample="1min", window=30, threshold=0.3,
)
print(f"\nCorrelation breakdowns detected: {len(breakdowns)}")
if not breakdowns.empty:
for _, row in breakdowns.iterrows():
print(f" {row['start']} to {row['end']} "
f"(min_corr={row['min_correlation']:.3f}, "
f"duration={row['duration_seconds']:.0f}s)")
# -----------------------------------------------------------------------
# 3. Lag Correlation Analysis
# -----------------------------------------------------------------------
print("\n" + "-" * 70)
print("3. LAG CORRELATION: Temperature -> Vibration")
print("-" * 70)
lag = sc.lag_correlation(
"proc:temperature", "proc:vibration", resample="1min", max_lag=20
)
best = lag[lag["is_best_lag"]].iloc[0]
print(f"\nBest lag: {best['lag_periods']} periods "
f"(correlation = {best['correlation']:.3f})")
print(f"Top 5 lag correlations:")
top5 = lag.nlargest(5, "correlation")
for _, row in top5.iterrows():
marker = " <-- best" if row["is_best_lag"] else ""
print(f" Lag {row['lag_periods']:+3d}: r = {row['correlation']:.3f}{marker}")
# -----------------------------------------------------------------------
# 4. Anomaly Correlation
# -----------------------------------------------------------------------
print("\n" + "-" * 70)
print("4. COINCIDENT ANOMALIES across all signals")
print("-" * 70)
ac = AnomalyCorrelationEvents(df)
signals = ["proc:temperature", "proc:pressure", "proc:vibration", "proc:flow_rate"]
coincident = ac.coincident_anomalies(
signals, z_threshold=2.5, coincidence_window="15min", min_signals=2
)
print(f"\nCoincident anomaly windows: {len(coincident)}")
if not coincident.empty:
for _, row in coincident.iterrows():
print(f" {row['window_start']}: "
f"{row['anomaly_count']} anomalies across "
f"[{row['signal_uuids_involved']}]")
# -----------------------------------------------------------------------
# 5. Cascade Detection
# -----------------------------------------------------------------------
print("\n" + "-" * 70)
print("5. CASCADE DETECTION: Temperature -> Vibration")
print("-" * 70)
cascades = ac.cascade_detection(
"proc:temperature", "proc:vibration",
z_threshold=2.5, max_delay="15min",
)
print(f"\nCascade events detected: {len(cascades)}")
if not cascades.empty:
for _, row in cascades.iterrows():
print(f" Leader: {row['leader_time']} -> "
f"Follower: {row['follower_time']} "
f"(delay={row['delay_seconds']:.0f}s)")
# -----------------------------------------------------------------------
# 6. Root Cause Ranking
# -----------------------------------------------------------------------
print("\n" + "-" * 70)
print("6. ROOT CAUSE RANKING")
print("-" * 70)
ranking = ac.root_cause_ranking(signals, z_threshold=2.5, max_delay="15min")
print(f"\nSignal ranking by anomaly leadership:")
for _, row in ranking.iterrows():
print(f" #{int(row['rank'])} {row['signal_uuid']}: "
f"leader_ratio={row['leader_ratio']:.2f} "
f"(leads={int(row['leader_count'])}, follows={int(row['follower_count'])})")
print("\n" + "=" * 70)
print("Demo complete.")
print("=" * 70)