Statistics & Features Demo¤
Demonstrates feature extraction: numeric statistics, time-grouped aggregations, and multi-column feature tables.
Run it: python examples/features_statistics_demo.py
Modules demonstrated: NumericStatistics, TimeGroupedStatistics, FeatureTable
Related guides: Signal Analytics
#!/usr/bin/env python3
"""
Features and Statistics Demo for ts-shape
==========================================
Demonstrates all feature-extraction and statistics classes:
Stats:
- NumericStatistics (numeric_stats)
- BooleanStatistics (boolean_stats)
- StringStatistics (string_stats)
- TimestampStatistics (timestamp_stats)
- DescriptiveFeatures (feature_table)
Time-grouped stats:
- TimeGroupedStatistics (time_stats_numeric)
Cycle analysis:
- CycleExtractor (cycles_extractor)
- CycleDataProcessor (cycle_processor)
Scenario: A semiconductor wafer-fab chamber collects temperature, pressure,
gas-flow, recipe labels, and chamber-active status over several production
cycles.
"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from ts_shape.features.stats.numeric_stats import NumericStatistics
from ts_shape.features.stats.boolean_stats import BooleanStatistics
from ts_shape.features.stats.string_stats import StringStatistics
from ts_shape.features.stats.timestamp_stats import TimestampStatistics
from ts_shape.features.stats.feature_table import DescriptiveFeatures
from ts_shape.features.time_stats.time_stats_numeric import TimeGroupedStatistics
from ts_shape.features.cycles.cycles_extractor import CycleExtractor
from ts_shape.features.cycles.cycle_processor import CycleDataProcessor
# ---------------------------------------------------------------------------
# Helper: build realistic wafer-fab chamber data
# ---------------------------------------------------------------------------
def create_chamber_data(n_cycles: int = 5, points_per_cycle: int = 120) -> pd.DataFrame:
"""
Simulate semiconductor chamber sensor data across multiple process cycles.
Each cycle lasts ~10 minutes (one reading per 5 seconds). Between cycles
there is a 2-minute idle gap.
Signals:
- 'chamber_temp' : double, target 350 C with noise
- 'chamber_pressure': double, target 2.5 Torr
- 'gas_flow_rate' : integer, sccm (standard cubic centimetres per min)
- 'recipe_name' : string, the recipe loaded for the cycle
- 'chamber_active' : boolean, True while processing, False while idle
"""
np.random.seed(2025)
rows = []
recipes = ["OXIDE_DEP_A", "NITRIDE_DEP_B", "ETCH_C", "OXIDE_DEP_A", "NITRIDE_DEP_B"]
t = datetime(2025, 8, 15, 7, 0, 0)
for cycle_idx in range(n_cycles):
recipe = recipes[cycle_idx % len(recipes)]
# --- Active phase ---
for i in range(points_per_cycle):
temp = 350.0 + np.random.normal(0, 1.2) + (cycle_idx * 0.3) # slight drift per cycle
pressure = 2.5 + np.random.normal(0, 0.05)
gas_flow = int(200 + np.random.normal(0, 5))
# Temperature row
rows.append({
"systime": t,
"uuid": "chamber_temp",
"value_bool": None,
"value_integer": None,
"value_double": round(temp, 3),
"value_string": None,
"is_delta": True,
})
# Pressure row
rows.append({
"systime": t + timedelta(seconds=1),
"uuid": "chamber_pressure",
"value_bool": None,
"value_integer": None,
"value_double": round(pressure, 4),
"value_string": None,
"is_delta": True,
})
# Gas flow row
rows.append({
"systime": t + timedelta(seconds=2),
"uuid": "gas_flow_rate",
"value_bool": None,
"value_integer": gas_flow,
"value_double": None,
"value_string": None,
"is_delta": True,
})
# Recipe label (only on first sample of each cycle)
if i == 0:
rows.append({
"systime": t + timedelta(seconds=3),
"uuid": "recipe_name",
"value_bool": None,
"value_integer": None,
"value_double": None,
"value_string": recipe,
"is_delta": True,
})
# Chamber active status
rows.append({
"systime": t + timedelta(seconds=3),
"uuid": "chamber_active",
"value_bool": True,
"value_integer": None,
"value_double": None,
"value_string": None,
"is_delta": True,
})
t += timedelta(seconds=5)
# --- Idle gap (chamber off) ---
for i in range(24): # 2 minutes at 5-second intervals
rows.append({
"systime": t,
"uuid": "chamber_active",
"value_bool": False,
"value_integer": None,
"value_double": None,
"value_string": None,
"is_delta": True,
})
t += timedelta(seconds=5)
return pd.DataFrame(rows)
# ---------------------------------------------------------------------------
# Demo functions
# ---------------------------------------------------------------------------
def demo_numeric_statistics(df: pd.DataFrame):
"""Demonstrate NumericStatistics on chamber temperature data."""
print("=" * 72)
print("1. NUMERIC STATISTICS")
print("=" * 72)
temp_df = df[df["uuid"] == "chamber_temp"].copy()
col = "value_double"
print(f"\nChamber temperature: {len(temp_df)} readings")
print(f" Mean: {NumericStatistics.column_mean(temp_df, col):.3f} C")
print(f" Median: {NumericStatistics.column_median(temp_df, col):.3f} C")
print(f" Std dev: {NumericStatistics.column_std(temp_df, col):.4f} C")
print(f" Variance: {NumericStatistics.column_variance(temp_df, col):.4f}")
print(f" Min: {NumericStatistics.column_min(temp_df, col):.3f} C")
print(f" Max: {NumericStatistics.column_max(temp_df, col):.3f} C")
print(f" Range: {NumericStatistics.column_range(temp_df, col):.3f} C")
print(f" IQR: {NumericStatistics.column_iqr(temp_df, col):.4f}")
print(f" Skewness: {NumericStatistics.column_skewness(temp_df, col):.4f}")
print(f" Kurtosis: {NumericStatistics.column_kurtosis(temp_df, col):.4f}")
print(f" Q1 (25%): {NumericStatistics.column_quantile(temp_df, col, 0.25):.3f}")
print(f" Q3 (75%): {NumericStatistics.column_quantile(temp_df, col, 0.75):.3f}")
print(f" SEM: {NumericStatistics.standard_error_mean(temp_df, col):.5f}")
print(f" CV: {NumericStatistics.coefficient_of_variation(temp_df, col):.6f}")
# Full summary as DataFrame
print("\n Full describe():")
desc = NumericStatistics.describe(temp_df[[col]])
print(desc.to_string())
# Summary as dictionary
print("\n summary_as_dataframe() (first few columns):")
summary_df = NumericStatistics.summary_as_dataframe(temp_df, col)
print(summary_df.to_string(index=False))
print()
def demo_boolean_statistics(df: pd.DataFrame):
"""Demonstrate BooleanStatistics on chamber active status."""
print("=" * 72)
print("2. BOOLEAN STATISTICS")
print("=" * 72)
active_df = df[df["uuid"] == "chamber_active"].copy()
col = "value_bool"
print(f"\nChamber active status: {len(active_df)} readings")
print(f" True count: {BooleanStatistics.count_true(active_df, col)}")
print(f" False count: {BooleanStatistics.count_false(active_df, col)}")
print(f" Null count: {BooleanStatistics.count_null(active_df, col)}")
print(f" Not-null count: {BooleanStatistics.count_not_null(active_df, col)}")
print(f" True %: {BooleanStatistics.true_percentage(active_df, col):.1f}%")
print(f" False %: {BooleanStatistics.false_percentage(active_df, col):.1f}%")
print(f" Mode: {BooleanStatistics.mode(active_df, col)}")
print(f" Is balanced? {BooleanStatistics.is_balanced(active_df, col)}")
# Summary as DataFrame
print("\n summary_as_dataframe():")
summary_df = BooleanStatistics.summary_as_dataframe(active_df, col)
print(summary_df.to_string(index=False))
print()
def demo_string_statistics(df: pd.DataFrame):
"""Demonstrate StringStatistics on recipe names."""
print("=" * 72)
print("3. STRING STATISTICS")
print("=" * 72)
# Collect recipe rows plus some chamber_active rows to have more string variety
recipe_df = df[df["uuid"] == "recipe_name"].copy()
col = "value_string"
print(f"\nRecipe name entries: {len(recipe_df)}")
print(f" Unique values: {StringStatistics.count_unique(recipe_df, col)}")
print(f" Most frequent: {StringStatistics.most_frequent(recipe_df, col)}")
print(f" Count most freq: {StringStatistics.count_most_frequent(recipe_df, col)}")
print(f" Null count: {StringStatistics.count_null(recipe_df, col)}")
print(f" Avg string length: {StringStatistics.average_string_length(recipe_df, col):.1f}")
print(f" Longest string: {StringStatistics.longest_string(recipe_df, col)}")
print(f" Shortest string: {StringStatistics.shortest_string(recipe_df, col)}")
print(f" Uppercase %: {StringStatistics.uppercase_percentage(recipe_df, col):.1f}%")
print(f" Contains digit: {StringStatistics.contains_digit_count(recipe_df, col)}")
print(f" Starts with 'OXIDE': {StringStatistics.starts_with_count(recipe_df, 'OXIDE', col)}")
print(f" Ends with 'B': {StringStatistics.ends_with_count(recipe_df, 'B', col)}")
print(f" Contains 'DEP': {StringStatistics.contains_substring_count(recipe_df, 'DEP', col)}")
# Top N most common
print(f"\n Top 3 most common recipes:")
top3 = StringStatistics.most_common_n_strings(recipe_df, n=3, column_name=col)
for name, count in top3.items():
print(f" {name}: {count}")
# String length summary
print("\n String length summary:")
len_summary = StringStatistics.string_length_summary(recipe_df, col)
print(len_summary.to_string(index=False))
# Full summary
print("\n summary_as_dataframe():")
summary_df = StringStatistics.summary_as_dataframe(recipe_df, col)
print(summary_df.to_string(index=False))
print()
def demo_timestamp_statistics(df: pd.DataFrame):
"""Demonstrate TimestampStatistics on the systime column."""
print("=" * 72)
print("4. TIMESTAMP STATISTICS")
print("=" * 72)
# Use the full DataFrame for timestamp analysis
ts_df = df.copy()
ts_df["systime"] = pd.to_datetime(ts_df["systime"])
col = "systime"
print(f"\nTotal rows: {len(ts_df)}")
print(f" Earliest: {TimestampStatistics.earliest_timestamp(ts_df, col)}")
print(f" Latest: {TimestampStatistics.latest_timestamp(ts_df, col)}")
print(f" Range: {TimestampStatistics.timestamp_range(ts_df, col)}")
print(f" Median: {TimestampStatistics.median_timestamp(ts_df, col)}")
print(f" Null count: {TimestampStatistics.count_null(ts_df, col)}")
print(f" Not-null count: {TimestampStatistics.count_not_null(ts_df, col)}")
print(f" Avg time gap: {TimestampStatistics.average_time_gap(ts_df, col)}")
print(f" Std dev of gaps: {TimestampStatistics.standard_deviation_timestamps(ts_df, col)}")
print(f" Most frequent day: {TimestampStatistics.most_frequent_day(ts_df, col)} (0=Mon)")
print(f" Most frequent hr: {TimestampStatistics.most_frequent_hour(ts_df, col)}")
# Distributions
print("\n Hour distribution (top 5):")
hour_dist = TimestampStatistics.hour_distribution(ts_df, col)
for hour, count in hour_dist.head(5).items():
print(f" Hour {hour:02d}: {count} rows")
# Quartiles
print("\n Timestamp quartiles:")
quartiles = TimestampStatistics.timestamp_quartiles(ts_df, col)
for q, val in quartiles.items():
print(f" Q{q}: {val}")
# Days with most activity
print("\n Top 3 days with most activity:")
top_days = TimestampStatistics.days_with_most_activity(ts_df, col, n=3)
for day, count in top_days.items():
print(f" {day}: {count} rows")
print()
def demo_descriptive_features(df: pd.DataFrame):
"""Demonstrate DescriptiveFeatures (feature table) grouped by UUID."""
print("=" * 72)
print("5. DESCRIPTIVE FEATURES (Feature Table)")
print("=" * 72)
# Use a subset to keep output manageable
subset = df[df["uuid"].isin(["chamber_temp", "chamber_active"])].copy()
subset["systime"] = pd.to_datetime(subset["systime"])
print(f"\nUsing {len(subset)} rows (chamber_temp + chamber_active)")
feat = DescriptiveFeatures(subset)
# Compute as dictionary
print("\n Compute as dict (showing keys per UUID):")
result_dict = feat.compute(output_format="dict")
for uuid_key, stats in result_dict.items():
print(f"\n UUID: {uuid_key}")
if isinstance(stats, dict):
for section, content in stats.items():
print(f" Section: {section}")
if isinstance(content, dict):
for k, v in list(content.items())[:3]:
print(f" {k}: {v}")
if len(content) > 3:
print(f" ... ({len(content)} total keys)")
else:
print(f" {content}")
# Compute as DataFrame
print("\n Compute as DataFrame (first 5 columns):")
result_df = feat.compute(output_format="dataframe")
cols = list(result_df.columns)[:5]
print(result_df[cols].to_string(index=False))
print(f" ... total columns: {len(result_df.columns)}")
print()
def demo_time_grouped_statistics(df: pd.DataFrame):
"""Demonstrate TimeGroupedStatistics with time-binned aggregations."""
print("=" * 72)
print("6. TIME-GROUPED STATISTICS")
print("=" * 72)
temp_df = df[df["uuid"] == "chamber_temp"][["systime", "value_double"]].copy()
temp_df["systime"] = pd.to_datetime(temp_df["systime"])
print(f"\nChamber temperature: {len(temp_df)} readings")
# Single statistic: 10-minute mean
print("\n--- 10-minute mean temperature ---")
mean_10m = TimeGroupedStatistics.calculate_statistic(
temp_df, time_column="systime", value_column="value_double", freq="10min", stat_method="mean"
)
print(mean_10m.head(8).to_string())
# Multiple statistics at once
print("\n--- 10-minute mean, min, max, range ---")
multi = TimeGroupedStatistics.calculate_statistics(
temp_df,
time_column="systime",
value_column="value_double",
freq="10min",
stat_methods=["mean", "min", "max", "range"],
)
print(multi.head(8).to_string())
# Hourly difference (last - first)
print("\n--- Hourly difference (last - first) ---")
diff_h = TimeGroupedStatistics.calculate_statistic(
temp_df, time_column="systime", value_column="value_double", freq="h", stat_method="diff"
)
print(diff_h.to_string())
# Custom aggregation function (coefficient of variation per 10-min bin)
print("\n--- 10-minute coefficient of variation (custom func) ---")
cv_func = lambda x: x.std() / x.mean() if x.mean() != 0 else 0
cv_10m = TimeGroupedStatistics.calculate_custom_func(
temp_df, time_column="systime", value_column="value_double", freq="10min", func=cv_func
)
print(cv_10m.head(8).to_string())
print()
def demo_cycle_extraction(df: pd.DataFrame):
"""Demonstrate CycleExtractor with persistent-cycle and step-sequence methods."""
print("=" * 72)
print("7. CYCLE EXTRACTION")
print("=" * 72)
# --- Persistent cycle: chamber_active True/False transitions ---
active_df = df[df["uuid"] == "chamber_active"].copy()
active_df["systime"] = pd.to_datetime(active_df["systime"])
print(f"\nChamber active rows: {len(active_df)}")
extractor = CycleExtractor(
dataframe=active_df,
start_uuid="chamber_active",
)
# Method suggestion
print("\n--- Method suggestion ---")
suggestion = extractor.suggest_method()
for method, reason in zip(suggestion["recommended_methods"], suggestion["reasoning"]):
print(f" {method}: {reason}")
# Extract persistent cycles (True -> False transitions)
print("\n--- Persistent cycles (active=True -> active=False) ---")
cycles = extractor.process_persistent_cycle()
print(f" Extracted {len(cycles)} cycles")
if not cycles.empty:
print(cycles[["cycle_start", "cycle_end", "is_complete"]].to_string(index=False))
# Extraction statistics
stats = extractor.get_extraction_stats()
print(f"\n Extraction stats:")
print(f" Total: {stats['total_cycles']}")
print(f" Complete: {stats['complete_cycles']}")
print(f" Incomplete: {stats['incomplete_cycles']}")
print(f" Success rate: {stats['success_rate']:.1%}")
# Validate cycles (min 30s, max 15min)
print("\n--- Validate cycles ---")
validated = extractor.validate_cycles(cycles, min_duration="30s", max_duration="15m")
valid_count = validated["is_valid"].sum()
invalid_count = (~validated["is_valid"]).sum()
print(f" Valid: {valid_count}, Invalid: {invalid_count}")
if "validation_issue" in validated.columns:
issues = validated[~validated["is_valid"]]
if not issues.empty:
print(f" Issues found:")
for _, row in issues.iterrows():
print(f" {row['cycle_start']} -> {row.get('validation_issue', 'unknown')}")
# Detect overlapping cycles
print("\n--- Overlap detection ---")
overlap_result = extractor.detect_overlapping_cycles(cycles, resolve="flag")
overlaps = overlap_result["has_overlap"].sum()
print(f" Cycles with overlap: {overlaps}")
print()
def demo_cycle_processor(df: pd.DataFrame):
"""Demonstrate CycleDataProcessor: split, merge, statistics, compare, golden cycles."""
print("=" * 72)
print("8. CYCLE DATA PROCESSOR")
print("=" * 72)
# First extract cycles
active_df = df[df["uuid"] == "chamber_active"].copy()
active_df["systime"] = pd.to_datetime(active_df["systime"])
extractor = CycleExtractor(dataframe=active_df, start_uuid="chamber_active")
cycles = extractor.process_persistent_cycle()
# Filter to complete cycles only
complete_cycles = cycles[cycles["is_complete"]].copy()
print(f"\nComplete cycles: {len(complete_cycles)}")
if complete_cycles.empty:
print(" No complete cycles found -- skipping processor demo")
return
# Prepare values DataFrame (temperature readings)
temp_df = df[df["uuid"] == "chamber_temp"].copy()
temp_df["systime"] = pd.to_datetime(temp_df["systime"])
print(f"Temperature readings: {len(temp_df)}")
processor = CycleDataProcessor(
cycles_df=complete_cycles,
values_df=temp_df,
cycle_uuid_col="cycle_uuid",
systime_col="systime",
)
# --- Merge values with cycles ---
print("\n--- Merge temperature data with cycles ---")
merged = processor.merge_dataframes_by_cycle()
print(f" Merged rows: {len(merged)}")
print(f" Cycles represented: {merged['cycle_uuid'].nunique()}")
# --- Split by cycle ---
print("\n--- Split data by cycle ---")
split_data = processor.split_by_cycle()
for cycle_uuid, cycle_df in list(split_data.items())[:3]:
print(f" Cycle {cycle_uuid[:12]}...: {len(cycle_df)} readings, "
f"mean temp = {cycle_df['value_double'].mean():.2f} C")
# --- Cycle statistics ---
print("\n--- Cycle statistics ---")
cycle_stats = processor.compute_cycle_statistics()
if not cycle_stats.empty:
display_cols = [c for c in ["cycle_uuid", "duration_seconds", "value_count", "mean_value_double", "std_value_double"]
if c in cycle_stats.columns]
# Truncate cycle_uuid for display
stats_display = cycle_stats[display_cols].copy()
stats_display["cycle_uuid"] = stats_display["cycle_uuid"].str[:12] + "..."
print(stats_display.to_string(index=False))
# --- Compare cycles against a reference ---
print("\n--- Compare cycles against first cycle ---")
ref_uuid = complete_cycles["cycle_uuid"].iloc[0]
comparison = processor.compare_cycles(reference_cycle_uuid=ref_uuid, metric="value_double")
if not comparison.empty:
comp_display = comparison[["cycle_uuid", "is_reference", "mean_value", "deviation_from_ref", "deviation_pct"]].copy()
comp_display["cycle_uuid"] = comp_display["cycle_uuid"].str[:12] + "..."
print(comp_display.to_string(index=False))
# --- Identify golden cycles ---
print("\n--- Identify golden cycles (low variability) ---")
golden = processor.identify_golden_cycles(metric="value_double", method="low_variability", top_n=2)
for i, gc in enumerate(golden):
print(f" #{i+1}: {gc[:12]}...")
# --- Group by cycle UUID ---
print("\n--- Group by cycle UUID ---")
grouped = processor.group_by_cycle_uuid(merged)
print(f" Number of groups: {len(grouped)}")
if grouped:
print(f" Rows in first group: {len(grouped[0])}")
# --- Split groups further by uuid (signal type) ---
print("\n--- Split groups by signal uuid ---")
sub_groups = processor.split_dataframes_by_group(grouped[:2], column="uuid")
print(f" Sub-groups from first 2 cycles: {len(sub_groups)}")
print()
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
if __name__ == "__main__":
df = create_chamber_data(n_cycles=5, points_per_cycle=120)
print(f"Generated {len(df)} rows of wafer-fab chamber data")
print(f"UUIDs: {df['uuid'].unique().tolist()}\n")
demo_numeric_statistics(df)
demo_boolean_statistics(df)
demo_string_statistics(df)
demo_timestamp_statistics(df)
demo_descriptive_features(df)
demo_time_grouped_statistics(df)
demo_cycle_extraction(df)
demo_cycle_processor(df)
print("=" * 72)
print("Features and statistics demo complete.")
print("=" * 72)