Production Monitoring¤
Track machine states, line throughput, changeovers, downtime, cycle times, and shift performance on the shop floor. These classes form the operational backbone of plant analytics.
Production Class Map¤
flowchart TB
subgraph DETECT["<b>Event Detection</b><br/><i>raw signals → events</i>"]
MS["MachineStateEvents<br/><code>run / idle / setup</code>"]
LT["LineThroughputEvents<br/><code>parts per window</code>"]
CO["ChangeoverEvents<br/><code>product changes</code>"]
FC["FlowConstraintEvents<br/><code>blocked / starved</code>"]
end
subgraph TRACK["<b>Daily Tracking</b><br/><i>events → shift-level KPIs</i>"]
PT["PartProductionTracking"]
CT["CycleTimeTracking"]
DT["DowntimeTracking"]
QT["QualityTracking"]
SR["ShiftReporting"]
end
MS -->|"run/idle intervals"| DT
LT -->|"part counts"| PT
CO -->|"product changes"| CT
FC -->|"constraint events"| DT
PT -->|"date, shift"| SR
CT -->|"date, shift"| SR
DT -->|"date, shift"| SR
QT -->|"date, shift"| SR
style DETECT fill:#0f2a3d,stroke:#38bdf8,color:#e0f2fe
style TRACK fill:#1a3a4a,stroke:#f59e0b,color:#fef3c7
Machine State Events¤
Detect run/idle intervals and state transitions from boolean or string state signals.
from ts_shape.events.production.machine_state import MachineStateEvents
state = MachineStateEvents(df, run_state_uuid='machine_running')
# Run/idle intervals with minimum duration filter
intervals = state.detect_run_idle(min_duration='30s')
# State transitions (every change from one state to another)
transitions = state.transition_events()
# Detect rapid state changes (may indicate sensor noise)
rapid = state.detect_rapid_transitions(threshold='5s', min_count=3)
# Quality metrics for the state signal itself
metrics = state.state_quality_metrics()
print(f"Run/Idle ratio: {metrics['run_idle_ratio']:.2f}")
Line Throughput Events¤
Count parts per time window, check takt adherence, and detect throughput trends.
from ts_shape.events.production.line_throughput import LineThroughputEvents
throughput = LineThroughputEvents(df)
# Parts per minute
counts = throughput.count_parts(counter_uuid='part_counter', window='1m')
# Takt adherence — flag cycles exceeding takt time
violations = throughput.takt_adherence(
cycle_uuid='cycle_complete', takt_time='60s'
)
# Throughput trends with degradation detection
trends = throughput.throughput_trends(
counter_uuid='part_counter', window='1h', trend_window=24
)
Changeover Detection¤
Detect product/recipe changeovers and compute changeover windows.
from ts_shape.events.production.changeover import ChangeoverEvents
changeover = ChangeoverEvents(df)
# Detect changeovers (product signal changes)
changes = changeover.detect_changeover(product_uuid='product_signal', min_hold='5m')
# Fixed-duration changeover window
windows = changeover.changeover_window(
product_uuid='product_signal',
until='fixed_window',
config={'duration': '10m'}
)
# Stability-based changeover window (waits for process to settle)
windows = changeover.changeover_window(
product_uuid='product_signal',
until='stable_band',
config={
'metrics': [
{'uuid': 'temperature', 'band': 2.0, 'hold': '2m'},
{'uuid': 'pressure', 'band': 5.0, 'hold': '2m'},
],
'reference_method': 'ewma'
}
)
Flow Constraint Events¤
Detect blockages (downstream full) and starvation (upstream empty) between stations.
from ts_shape.events.production.flow_constraint import FlowConstraintEvents
flow = FlowConstraintEvents(df)
# Define station roles
roles = {
'upstream_run': 'station_1_running',
'downstream_run': 'station_2_running',
}
# Detect blocked events (station 1 running but station 2 stopped)
blocked = flow.blocked_events(roles=roles, tolerance='200ms', min_duration='5s')
# Detect starved events (station 2 running but station 1 stopped)
starved = flow.starved_events(roles=roles, tolerance='200ms', min_duration='5s')
# Full analytics with severity classification
analytics = flow.flow_constraint_analytics(
roles=roles, minor_threshold='5s', moderate_threshold='30s'
)
Part Production Tracking¤
Track production quantities by part number with time-based aggregation.
from ts_shape.events.production.part_tracking import PartProductionTracking
tracker = PartProductionTracking(df)
# Hourly production by part
hourly = tracker.production_by_part(
part_id_uuid='part_number_signal',
counter_uuid='counter_signal',
window='1h'
)
# Daily production summary
daily = tracker.daily_production_summary(
part_id_uuid='part_number_signal',
counter_uuid='counter_signal'
)
# Total production for a date range
totals = tracker.production_totals(
part_id_uuid='part_number_signal',
counter_uuid='counter_signal',
start_date='2024-01-01',
end_date='2024-01-31'
)
Cycle Time Tracking¤
Analyze cycle times with trend detection and slow cycle identification.
from ts_shape.events.production.cycle_time_tracking import CycleTimeTracking
tracker = CycleTimeTracking(df)
# Statistics by part (min, avg, max, std, median)
stats = tracker.cycle_time_statistics(
part_id_uuid='part_number_signal',
cycle_trigger_uuid='cycle_complete_signal'
)
# Detect slow cycles (>1.5x median)
slow = tracker.detect_slow_cycles(
part_id_uuid='part_number_signal',
cycle_trigger_uuid='cycle_complete_signal',
threshold_factor=1.5
)
# Trend analysis with rolling window
trend = tracker.cycle_time_trend(
part_id_uuid='part_number_signal',
cycle_trigger_uuid='cycle_complete_signal',
part_number='PART_A',
window_size=20
)
Downtime Tracking¤
Track machine downtime by shift, reason, and availability trends.
from ts_shape.events.production.downtime_tracking import DowntimeTracking
tracker = DowntimeTracking(df)
# Downtime per shift with availability
shift_downtime = tracker.downtime_by_shift(
state_uuid='machine_state', running_value='Running'
)
# date shift downtime_minutes uptime_minutes availability_pct
# 0 2024-01-01 shift_1 45.2 434.8 90.6
# Pareto of downtime reasons
reasons = tracker.downtime_by_reason(
state_uuid='machine_state',
reason_uuid='downtime_reason',
stopped_value='Stopped'
)
# Top 5 downtime reasons
top = tracker.top_downtime_reasons(
state_uuid='machine_state', reason_uuid='downtime_reason', top_n=5
)
# Availability trend
trend = tracker.availability_trend(
state_uuid='machine_state', running_value='Running', window='1D'
)
Quality Tracking (NOK/Scrap)¤
Track defective parts, first-pass yield, and defect reasons.
from ts_shape.events.production.quality_tracking import QualityTracking
tracker = QualityTracking(df, shift_definitions={
"day": ("06:00", "14:00"),
"afternoon": ("14:00", "22:00"),
"night": ("22:00", "06:00"),
})
# NOK parts per shift with first-pass yield
shift_quality = tracker.nok_by_shift(
ok_counter_uuid='good_parts', nok_counter_uuid='bad_parts'
)
# Quality by part number
part_quality = tracker.quality_by_part(
ok_counter_uuid='good_parts',
nok_counter_uuid='bad_parts',
part_id_uuid='part_number'
)
# Pareto of defect reasons
reasons = tracker.nok_by_reason(
nok_counter_uuid='bad_parts', defect_reason_uuid='defect_code'
)
Shift Reporting¤
Compare shift performance and track against targets.
from ts_shape.events.production.shift_reporting import ShiftReporting
reporter = ShiftReporting(df)
# Production per shift
shift_prod = reporter.shift_production(
counter_uuid='counter_signal', part_id_uuid='part_number_signal'
)
# Compare shifts (last 7 days)
comparison = reporter.shift_comparison(counter_uuid='counter_signal', days=7)
# Track against targets
targets = reporter.shift_targets(
counter_uuid='counter_signal',
targets={'shift_1': 450, 'shift_2': 450, 'shift_3': 400}
)
# Best and worst shifts
results = reporter.best_and_worst_shifts(counter_uuid='counter_signal')
How They Connect¤
| From | To | Merge Key | Purpose |
|---|---|---|---|
MachineStateEvents |
DowntimeTracking |
run/idle intervals | Calculate downtime from state changes |
LineThroughputEvents |
PartProductionTracking |
part counts per window | Aggregate to daily/shift totals |
ChangeoverEvents |
CycleTimeTracking |
product change timestamps | Exclude changeover time from cycle analysis |
DowntimeTracking |
ShiftReporting |
date, shift |
Feed downtime into shift report |
PartProductionTracking |
ShiftReporting |
date, shift |
Feed production counts into shift report |
QualityTracking |
ShiftReporting |
date, shift |
Feed quality metrics into shift report |
Next Steps¤
- OEE & Plant Analytics — Combine availability, performance, quality into OEE
- Shift Reports & KPIs — Performance loss, scrap, targets, handover reports
- API Reference — Full production API documentation