Production Tracking Demoยค
Demonstrates daily production tracking modules: part tracking, cycle times, downtime analysis, quality tracking, and shift reporting.
Run it: python examples/production_tracking_demo.py
Modules demonstrated: PartProductionTracking, CycleTimeTracking, DowntimeTracking, QualityTracking, ShiftReporting
Related guides: Production Monitoring | Shift Reports & KPIs
#!/usr/bin/env python3
"""
Demonstration of production tracking in ts-shape.
This script shows how to use:
1. ShiftReporting (shift production, comparison, targets)
2. DowntimeTracking (downtime by shift and reason)
3. CycleTimeTracking (cycle time analysis by part number)
"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import sys
import os
# Add parent directory to path to import ts_shape
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
from ts_shape.events.production.shift_reporting import ShiftReporting
from ts_shape.events.production.downtime_tracking import DowntimeTracking
from ts_shape.events.production.cycle_time_tracking import CycleTimeTracking
def create_shift_production_data():
"""Create synthetic production counter data spanning multiple shifts and days."""
rows = []
np.random.seed(42)
for day_offset in range(5):
base_date = datetime(2024, 1, 1 + day_offset)
counter = 10000 + day_offset * 1500
# Generate data every 10 minutes across all 3 shifts
for hour in range(24):
for minute in range(0, 60, 10):
t = base_date + timedelta(hours=hour, minutes=minute)
# Different production rates per shift
if 6 <= hour < 14:
rate = np.random.randint(6, 10) # Shift 1: higher rate
elif 14 <= hour < 22:
rate = np.random.randint(5, 9) # Shift 2: medium rate
else:
rate = np.random.randint(4, 7) # Shift 3: lower rate
counter += rate
rows.append({
'systime': t,
'uuid': 'prod_counter',
'value_integer': counter,
'value_string': None,
'is_delta': True,
})
# Part number signal (changes once per day)
parts = ['PART_X100', 'PART_Y200', 'PART_X100', 'PART_Z300', 'PART_Y200']
rows.append({
'systime': base_date + timedelta(hours=6),
'uuid': 'part_number',
'value_integer': None,
'value_string': parts[day_offset],
'is_delta': True,
})
return pd.DataFrame(rows)
def create_downtime_data():
"""Create synthetic machine state and downtime reason data."""
rows = []
np.random.seed(77)
base_date = datetime(2024, 1, 1, 6, 0, 0)
states = ['Running', 'Stopped', 'Idle']
reasons = ['Material_Shortage', 'Tool_Change', 'Quality_Issue',
'Operator_Break', 'Mechanical_Failure']
t = base_date
for i in range(500):
# Mostly running, with occasional stops
if np.random.random() > 0.85:
state = np.random.choice(['Stopped', 'Idle'], p=[0.7, 0.3])
reason = np.random.choice(reasons)
else:
state = 'Running'
reason = reasons[0] # placeholder, not used when running
rows.append({
'systime': t,
'uuid': 'machine_state',
'value_string': state,
'is_delta': True,
})
rows.append({
'systime': t,
'uuid': 'downtime_reason',
'value_string': reason,
'is_delta': True,
})
t += timedelta(minutes=np.random.randint(2, 8))
return pd.DataFrame(rows)
def create_cycle_time_data():
"""Create synthetic cycle time data for multiple part numbers."""
rows = []
np.random.seed(33)
base_time = datetime(2024, 1, 1, 8, 0, 0)
parts = {
'PART_A': {'cycle_time': 45.0, 'std': 3.0, 'count': 80},
'PART_B': {'cycle_time': 62.0, 'std': 5.0, 'count': 60},
}
t = base_time
for part_name, config in parts.items():
# Set part number
rows.append({
'systime': t,
'uuid': 'part_id_signal',
'value_string': part_name,
'value_bool': None,
'is_delta': True,
})
for i in range(config['count']):
cycle_time = max(20, np.random.normal(config['cycle_time'], config['std']))
# Inject a few slow cycles
if i in [15, 40, 65]:
cycle_time = config['cycle_time'] * 1.8
t += timedelta(seconds=cycle_time)
# Cycle trigger rising edge (False -> True)
rows.append({
'systime': t - timedelta(seconds=1),
'uuid': 'cycle_complete',
'value_string': None,
'value_bool': False,
'is_delta': True,
})
rows.append({
'systime': t,
'uuid': 'cycle_complete',
'value_string': None,
'value_bool': True,
'is_delta': True,
})
return pd.DataFrame(rows)
def demo_shift_reporting():
"""Demo 1: Shift-based production reporting."""
print("\n" + "=" * 70)
print("DEMO 1: Shift Reporting")
print("=" * 70)
df = create_shift_production_data()
print(f"\nDataset: {len(df)} records over 5 days")
reporter = ShiftReporting(
df,
shift_definitions={
'shift_1': ('06:00', '14:00'),
'shift_2': ('14:00', '22:00'),
'shift_3': ('22:00', '06:00'),
},
)
# Production per shift
print("\n--- Shift Production ---")
shift_prod = reporter.shift_production(
counter_uuid='prod_counter',
value_column_counter='value_integer',
)
if not shift_prod.empty:
print(shift_prod.to_string(index=False))
# Shift comparison
print("\n--- Shift Comparison (last 5 days) ---")
comparison = reporter.shift_comparison(
counter_uuid='prod_counter',
value_column_counter='value_integer',
days=5,
)
if not comparison.empty:
print(comparison.to_string(index=False))
# Shift targets
print("\n--- Shift Target Achievement ---")
targets = {'shift_1': 400, 'shift_2': 380, 'shift_3': 300}
target_results = reporter.shift_targets(
counter_uuid='prod_counter',
targets=targets,
value_column_counter='value_integer',
)
if not target_results.empty:
print(target_results.to_string(index=False))
# Best and worst shifts
print("\n--- Best & Worst Shifts ---")
results = reporter.best_and_worst_shifts(
counter_uuid='prod_counter',
value_column_counter='value_integer',
days=5,
)
print(" Best shifts:")
if not results['best'].empty:
print(results['best'].to_string(index=False))
print(" Worst shifts:")
if not results['worst'].empty:
print(results['worst'].to_string(index=False))
def demo_downtime_tracking():
"""Demo 2: Downtime tracking by shift and reason."""
print("\n" + "=" * 70)
print("DEMO 2: Downtime Tracking")
print("=" * 70)
df = create_downtime_data()
print(f"\nDataset: {len(df)} records")
tracker = DowntimeTracking(
df,
shift_definitions={
'shift_1': ('06:00', '14:00'),
'shift_2': ('14:00', '22:00'),
'shift_3': ('22:00', '06:00'),
},
)
# Downtime by shift
print("\n--- Downtime by Shift ---")
shift_downtime = tracker.downtime_by_shift(
state_uuid='machine_state',
running_value='Running',
)
if not shift_downtime.empty:
print(shift_downtime.to_string(index=False))
# Downtime by reason
print("\n--- Downtime by Reason ---")
reason_analysis = tracker.downtime_by_reason(
state_uuid='machine_state',
reason_uuid='downtime_reason',
stopped_value='Stopped',
)
if not reason_analysis.empty:
print(reason_analysis.to_string(index=False))
# Top downtime reasons (Pareto)
print("\n--- Top 3 Downtime Reasons (Pareto) ---")
top_reasons = tracker.top_downtime_reasons(
state_uuid='machine_state',
reason_uuid='downtime_reason',
top_n=3,
stopped_value='Stopped',
)
if not top_reasons.empty:
print(top_reasons.to_string(index=False))
# Availability trend
print("\n--- Availability Trend (daily) ---")
trend = tracker.availability_trend(
state_uuid='machine_state',
running_value='Running',
window='1D',
)
if not trend.empty:
print(trend.to_string(index=False))
def demo_cycle_time_tracking():
"""Demo 3: Cycle time analysis by part number."""
print("\n" + "=" * 70)
print("DEMO 3: Cycle Time Tracking")
print("=" * 70)
df = create_cycle_time_data()
print(f"\nDataset: {len(df)} records (2 part types)")
tracker = CycleTimeTracking(df)
# Cycle times by part
print("\n--- Cycle Times by Part ---")
cycles = tracker.cycle_time_by_part(
part_id_uuid='part_id_signal',
cycle_trigger_uuid='cycle_complete',
)
if not cycles.empty:
print(f" Total cycles: {len(cycles)}")
print(cycles.head(10).to_string(index=False))
# Statistics
print("\n--- Cycle Time Statistics ---")
stats = tracker.cycle_time_statistics(
part_id_uuid='part_id_signal',
cycle_trigger_uuid='cycle_complete',
)
if not stats.empty:
print(stats.to_string(index=False))
# Detect slow cycles
print("\n--- Slow Cycle Detection (threshold_factor=1.5) ---")
slow = tracker.detect_slow_cycles(
part_id_uuid='part_id_signal',
cycle_trigger_uuid='cycle_complete',
threshold_factor=1.5,
)
print(f" Slow cycles found: {len(slow)}")
if not slow.empty:
print(slow[['systime', 'part_number', 'cycle_time_seconds',
'median_seconds', 'deviation_factor']].to_string(index=False))
# Cycle time trend for PART_A
print("\n--- Cycle Time Trend (PART_A, window=10) ---")
trend = tracker.cycle_time_trend(
part_id_uuid='part_id_signal',
cycle_trigger_uuid='cycle_complete',
part_number='PART_A',
window_size=10,
)
if not trend.empty:
print(f" Trend data points: {len(trend)}")
print(trend[['systime', 'cycle_time_seconds', 'moving_avg', 'trend']].tail(10).to_string(index=False))
# Hourly summary
print("\n--- Hourly Cycle Time Summary ---")
hourly = tracker.hourly_cycle_time_summary(
part_id_uuid='part_id_signal',
cycle_trigger_uuid='cycle_complete',
)
if not hourly.empty:
print(hourly.to_string(index=False))
def main():
"""Run all production tracking demonstrations."""
print("\n" + "=" * 70)
print("Production Tracking Demonstration")
print("=" * 70)
try:
demo_shift_reporting()
demo_downtime_tracking()
demo_cycle_time_tracking()
print("\n" + "=" * 70)
print("All demonstrations completed successfully!")
print("=" * 70)
except Exception as e:
print(f"\nError during demonstration: {e}")
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == "__main__":
exit(main())