Skip to content

ts_shape.events.production ¤

Production Events

Detectors for production events in long-form timeseries (uuid-per-signal).

  • MachineStateEvents: Run/idle intervals and transition points from a boolean state signal.
  • detect_run_idle: Intervalize run/idle with optional min duration.
  • transition_events: Point events on idle→run and run→idle changes.

  • LineThroughputEvents: Throughput metrics and takt adherence.

  • count_parts: Parts per fixed window from a counter uuid.
  • takt_adherence: Cycle time violations vs. a takt time.

  • ChangeoverEvents: Product/recipe changes and end-of-changeover derivation.

  • detect_changeover: Point events at product value changes.
  • changeover_window: End via fixed window or stable band metrics.

  • FlowConstraintEvents: Blocked/starved intervals between upstream/downstream run signals.

  • blocked_events: Upstream running while downstream not consuming.
  • starved_events: Downstream running while upstream not supplying.

Modules:

Classes:

ChangeoverEvents ¤

ChangeoverEvents(dataframe: DataFrame, *, event_uuid: str = 'prod:changeover', time_column: str = 'systime')

Bases: Base

Production: Changeover

Detect product/recipe changes and compute changeover windows without requiring a dedicated 'first good' signal.

Methods: - detect_changeover: point events when product/recipe changes. - changeover_window: derive an end time via fixed window or 'stable_band' metrics.

Methods:

Source code in src/ts_shape/events/production/changeover.py
18
19
20
21
22
23
24
25
26
27
def __init__(
    self,
    dataframe: pd.DataFrame,
    *,
    event_uuid: str = "prod:changeover",
    time_column: str = "systime",
) -> None:
    super().__init__(dataframe, column_name=time_column)
    self.event_uuid = event_uuid
    self.time_column = time_column

changeover_window ¤

changeover_window(product_uuid: str, *, value_column: str = 'value_string', start_time: Optional[Timestamp] = None, until: str = 'fixed_window', config: Optional[Dict[str, Any]] = None, fallback: Optional[Dict[str, Any]] = None) -> DataFrame

Compute changeover windows per product change.

until
  • fixed_window: end = start + config['duration'] (e.g., '10m')
  • stable_band: end when all metrics stabilize within band for hold: config = { 'metrics': [ {'uuid': 'm1', 'value_column': 'value_double', 'band': 0.2, 'hold': '2m'}, ... ] }

fallback: {'default_duration': '10m', 'completed': False}

Source code in src/ts_shape/events/production/changeover.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def changeover_window(
    self,
    product_uuid: str,
    *,
    value_column: str = "value_string",
    start_time: Optional[pd.Timestamp] = None,
    until: str = "fixed_window",
    config: Optional[Dict[str, Any]] = None,
    fallback: Optional[Dict[str, Any]] = None,
) -> pd.DataFrame:
    """Compute changeover windows per product change.

    until:
      - fixed_window: end = start + config['duration'] (e.g., '10m')
      - stable_band: end when all metrics stabilize within band for hold:
            config = {
              'metrics': [
                {'uuid': 'm1', 'value_column': 'value_double', 'band': 0.2, 'hold': '2m'},
                ...
              ]
            }
    fallback: {'default_duration': '10m', 'completed': False}
    """
    config = config or {}
    fallback = fallback or {"default_duration": "10m", "completed": False}

    changes = self.detect_changeover(product_uuid, value_column=value_column, min_hold=config.get("min_hold", "0s"))
    if start_time is not None:
        changes = changes[changes["systime"] >= pd.to_datetime(start_time)]
    if changes.empty:
        return pd.DataFrame(
            columns=["start", "end", "uuid", "source_uuid", "is_delta", "method", "completed"]
        )

    rows: List[Dict[str, Any]] = []
    for _, r in changes.iterrows():
        t0 = pd.to_datetime(r["systime"])
        if until == "fixed_window":
            duration = pd.to_timedelta(config.get("duration", "10m"))
            end = t0 + duration
            rows.append(
                {
                    "start": t0,
                    "end": end,
                    "uuid": self.event_uuid,
                    "source_uuid": product_uuid,
                    "is_delta": True,
                    "method": "fixed_window",
                    "completed": True,
                }
            )
            continue

        if until == "stable_band":
            metric_defs = config.get("metrics", [])
            metric_ends: List[pd.Timestamp] = []
            for mdef in metric_defs:
                uid = mdef["uuid"]
                vcol = mdef.get("value_column", "value_double")
                band = float(mdef.get("band", 0.0))
                hold_td = pd.to_timedelta(mdef.get("hold", "0s"))
                s = (
                    self.dataframe[self.dataframe["uuid"] == uid]
                    .copy()
                    .sort_values(self.time_column)
                )
                s[self.time_column] = pd.to_datetime(s[self.time_column])
                s = s[s[self.time_column] >= t0]
                if s.empty:
                    continue
                # Rolling median reference and band mask
                # Use expanding median to be robust soon after change
                ref = s[vcol].expanding(min_periods=3).median()
                inside = (s[vcol] - ref).abs() <= band
                if not inside.any():
                    continue
                gid = (inside.ne(inside.shift())).cumsum()
                end_found: Optional[pd.Timestamp] = None
                for _, seg in s.groupby(gid):
                    seg_inside = inside.loc[seg.index]
                    if not seg_inside.iloc[0]:
                        continue
                    start_seg = seg[self.time_column].iloc[0]
                    end_seg = seg[self.time_column].iloc[-1]
                    if (end_seg - start_seg) >= hold_td:
                        end_found = start_seg
                        break
                if end_found is not None:
                    metric_ends.append(end_found)
            if metric_defs and len(metric_ends) == len(metric_defs):
                end = max(metric_ends)
                rows.append(
                    {
                        "start": t0,
                        "end": end,
                        "uuid": self.event_uuid,
                        "source_uuid": product_uuid,
                        "is_delta": True,
                        "method": "stable_band",
                        "completed": True,
                    }
                )
                continue

        # fallback
        end = t0 + pd.to_timedelta(fallback.get("default_duration", "10m"))
        rows.append(
            {
                "start": t0,
                "end": end,
                "uuid": self.event_uuid,
                "source_uuid": product_uuid,
                "is_delta": True,
                "method": until,
                "completed": bool(fallback.get("completed", False)),
            }
        )

    return pd.DataFrame(rows)

detect_changeover ¤

detect_changeover(product_uuid: str, *, value_column: str = 'value_string', min_hold: str = '0s') -> DataFrame

Emit point events when the product/recipe changes value.

Uses a hold check: the new product must persist for at least min_hold until the next change.

Source code in src/ts_shape/events/production/changeover.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def detect_changeover(
    self,
    product_uuid: str,
    *,
    value_column: str = "value_string",
    min_hold: str = "0s",
) -> pd.DataFrame:
    """Emit point events when the product/recipe changes value.

    Uses a hold check: the new product must persist for at least min_hold
    until the next change.
    """
    p = (
        self.dataframe[self.dataframe["uuid"] == product_uuid]
        .copy()
        .sort_values(self.time_column)
    )
    if p.empty:
        return pd.DataFrame(
            columns=["systime", "uuid", "source_uuid", "is_delta", "new_value"]
        )
    p[self.time_column] = pd.to_datetime(p[self.time_column])
    series = p[value_column]
    changed = series.ne(series.shift())
    change_times = p.loc[changed, self.time_column]
    min_td = pd.to_timedelta(min_hold)
    next_change = change_times.shift(-1)
    ok = (next_change - change_times >= min_td) | next_change.isna()
    change_times = change_times[ok]
    out = p[p[self.time_column].isin(change_times)][
        [self.time_column, value_column]
    ].rename(columns={self.time_column: "systime", value_column: "new_value"})
    out["uuid"] = self.event_uuid
    out["source_uuid"] = product_uuid
    out["is_delta"] = True
    return out[["systime", "uuid", "source_uuid", "is_delta", "new_value"]]

get_dataframe ¤

get_dataframe() -> DataFrame

Returns the processed DataFrame.

Source code in src/ts_shape/utils/base.py
34
35
36
def get_dataframe(self) -> pd.DataFrame:
    """Returns the processed DataFrame."""
    return self.dataframe

FlowConstraintEvents ¤

FlowConstraintEvents(dataframe: DataFrame, *, time_column: str = 'systime', event_uuid: str = 'prod:flow')

Bases: Base

Production: Flow Constraints

  • blocked_events: upstream running while downstream not consuming.
  • starved_events: downstream idle due to lack of upstream supply.

Methods:

Source code in src/ts_shape/events/production/flow_constraints.py
14
15
16
17
18
19
20
21
22
23
def __init__(
    self,
    dataframe: pd.DataFrame,
    *,
    time_column: str = "systime",
    event_uuid: str = "prod:flow",
) -> None:
    super().__init__(dataframe, column_name=time_column)
    self.time_column = time_column
    self.event_uuid = event_uuid

blocked_events ¤

blocked_events(*, roles: Dict[str, str], tolerance: str = '200ms', min_duration: str = '0s') -> DataFrame

Blocked: upstream_run=True while downstream_run=False.

roles = {'upstream_run': uuid, 'downstream_run': uuid}

Source code in src/ts_shape/events/production/flow_constraints.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def blocked_events(
    self,
    *,
    roles: Dict[str, str],
    tolerance: str = "200ms",
    min_duration: str = "0s",
) -> pd.DataFrame:
    """Blocked: upstream_run=True while downstream_run=False.

    roles = {'upstream_run': uuid, 'downstream_run': uuid}
    """
    up = self._align_bool(roles["upstream_run"])  # time, state
    dn = self._align_bool(roles["downstream_run"])  # time, state
    if up.empty or dn.empty:
        return pd.DataFrame(
            columns=["start", "end", "uuid", "source_uuid", "is_delta", "type"]
        )
    tol = pd.to_timedelta(tolerance)
    merged = pd.merge_asof(up, dn, on=self.time_column, suffixes=("_up", "_dn"), tolerance=tol, direction="nearest")
    cond = merged["state_up"] & (~merged["state_dn"].fillna(False))
    gid = (cond.ne(cond.shift())).cumsum()
    min_td = pd.to_timedelta(min_duration)
    rows: List[Dict[str, Any]] = []
    for _, seg in merged.groupby(gid):
        m = cond.loc[seg.index]
        if not m.any():
            continue
        start = seg[self.time_column].iloc[0]
        end = seg[self.time_column].iloc[-1]
        if (end - start) < min_td:
            continue
        rows.append(
            {
                "start": start,
                "end": end,
                "uuid": self.event_uuid,
                "source_uuid": roles["upstream_run"],
                "is_delta": True,
                "type": "blocked",
            }
        )
    return pd.DataFrame(rows)

get_dataframe ¤

get_dataframe() -> DataFrame

Returns the processed DataFrame.

Source code in src/ts_shape/utils/base.py
34
35
36
def get_dataframe(self) -> pd.DataFrame:
    """Returns the processed DataFrame."""
    return self.dataframe

starved_events ¤

starved_events(*, roles: Dict[str, str], tolerance: str = '200ms', min_duration: str = '0s') -> DataFrame

Starved: downstream_run=True while upstream_run=False.

roles = {'upstream_run': uuid, 'downstream_run': uuid}

Source code in src/ts_shape/events/production/flow_constraints.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def starved_events(
    self,
    *,
    roles: Dict[str, str],
    tolerance: str = "200ms",
    min_duration: str = "0s",
) -> pd.DataFrame:
    """Starved: downstream_run=True while upstream_run=False.

    roles = {'upstream_run': uuid, 'downstream_run': uuid}
    """
    up = self._align_bool(roles["upstream_run"])  # time, state
    dn = self._align_bool(roles["downstream_run"])  # time, state
    if up.empty or dn.empty:
        return pd.DataFrame(
            columns=["start", "end", "uuid", "source_uuid", "is_delta", "type"]
        )
    tol = pd.to_timedelta(tolerance)
    merged = pd.merge_asof(dn, up, on=self.time_column, suffixes=("_dn", "_up"), tolerance=tol, direction="nearest")
    cond = merged["state_dn"] & (~merged["state_up"].fillna(False))
    gid = (cond.ne(cond.shift())).cumsum()
    min_td = pd.to_timedelta(min_duration)
    rows: List[Dict[str, Any]] = []
    for _, seg in merged.groupby(gid):
        m = cond.loc[seg.index]
        if not m.any():
            continue
        start = seg[self.time_column].iloc[0]
        end = seg[self.time_column].iloc[-1]
        if (end - start) < min_td:
            continue
        rows.append(
            {
                "start": start,
                "end": end,
                "uuid": self.event_uuid,
                "source_uuid": roles["downstream_run"],
                "is_delta": True,
                "type": "starved",
            }
        )
    return pd.DataFrame(rows)

LineThroughputEvents ¤

LineThroughputEvents(dataframe: DataFrame, *, event_uuid: str = 'prod:throughput', time_column: str = 'systime')

Bases: Base

Production: Line Throughput

Methods: - count_parts: Part counts per fixed window from a monotonically increasing counter. - takt_adherence: Cycle time violations against a takt time from step/boolean triggers.

Methods:

Source code in src/ts_shape/events/production/line_throughput.py
15
16
17
18
19
20
21
22
23
24
def __init__(
    self,
    dataframe: pd.DataFrame,
    *,
    event_uuid: str = "prod:throughput",
    time_column: str = "systime",
) -> None:
    super().__init__(dataframe, column_name=time_column)
    self.event_uuid = event_uuid
    self.time_column = time_column

count_parts ¤

count_parts(counter_uuid: str, *, value_column: str = 'value_integer', window: str = '1m') -> DataFrame

Compute parts per window for a counter uuid.

Returns columns: window_start, uuid, source_uuid, is_delta, count

Source code in src/ts_shape/events/production/line_throughput.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def count_parts(
    self,
    counter_uuid: str,
    *,
    value_column: str = "value_integer",
    window: str = "1m",
) -> pd.DataFrame:
    """Compute parts per window for a counter uuid.

    Returns columns: window_start, uuid, source_uuid, is_delta, count
    """
    c = (
        self.dataframe[self.dataframe["uuid"] == counter_uuid]
        .copy()
        .sort_values(self.time_column)
    )
    if c.empty:
        return pd.DataFrame(
            columns=["window_start", "uuid", "source_uuid", "is_delta", "count"]
        )
    c[self.time_column] = pd.to_datetime(c[self.time_column])
    c = c.set_index(self.time_column)
    # take diff of last values within each window
    grp = c[value_column].resample(window)
    counts = grp.max().fillna(method="ffill").diff().fillna(0).clip(lower=0)
    out = counts.to_frame("count").reset_index().rename(columns={self.time_column: "window_start"})
    out["uuid"] = self.event_uuid
    out["source_uuid"] = counter_uuid
    out["is_delta"] = True
    return out

get_dataframe ¤

get_dataframe() -> DataFrame

Returns the processed DataFrame.

Source code in src/ts_shape/utils/base.py
34
35
36
def get_dataframe(self) -> pd.DataFrame:
    """Returns the processed DataFrame."""
    return self.dataframe

takt_adherence ¤

takt_adherence(cycle_uuid: str, *, value_column: str = 'value_bool', takt_time: str = '60s', min_violation: str = '0s') -> DataFrame

Flag cycles whose durations exceed the takt_time.

For boolean triggers: detect True rising edges as cycle boundaries. For integer steps: detect increments as cycle boundaries.

Returns: systime (at boundary), uuid, source_uuid, is_delta, cycle_time_seconds, violation

Source code in src/ts_shape/events/production/line_throughput.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def takt_adherence(
    self,
    cycle_uuid: str,
    *,
    value_column: str = "value_bool",
    takt_time: str = "60s",
    min_violation: str = "0s",
) -> pd.DataFrame:
    """Flag cycles whose durations exceed the takt_time.

    For boolean triggers: detect True rising edges as cycle boundaries.
    For integer steps: detect increments as cycle boundaries.

    Returns: systime (at boundary), uuid, source_uuid, is_delta, cycle_time_seconds, violation
    """
    s = (
        self.dataframe[self.dataframe["uuid"] == cycle_uuid]
        .copy()
        .sort_values(self.time_column)
    )
    if s.empty:
        return pd.DataFrame(
            columns=[
                "systime",
                "uuid",
                "source_uuid",
                "is_delta",
                "cycle_time_seconds",
                "violation",
            ]
        )
    s[self.time_column] = pd.to_datetime(s[self.time_column])
    if value_column == "value_bool":
        s["prev"] = s[value_column].shift(fill_value=False)
        edges = s[(~s["prev"]) & (s[value_column].fillna(False))]
        times = edges[self.time_column].reset_index(drop=True)
    else:
        s["prev"] = s[value_column].shift(1)
        edges = s[s[value_column].fillna(0) != s["prev"].fillna(0)]
        times = edges[self.time_column].reset_index(drop=True)
    if len(times) < 2:
        return pd.DataFrame(
            columns=[
                "systime",
                "uuid",
                "source_uuid",
                "is_delta",
                "cycle_time_seconds",
                "violation",
            ]
        )
    cycle_times = (times.diff().dt.total_seconds()).iloc[1:].reset_index(drop=True)
    min_td = pd.to_timedelta(min_violation).total_seconds()
    target = pd.to_timedelta(takt_time).total_seconds()
    viol = (cycle_times - target) >= min_td
    out = pd.DataFrame(
        {
            "systime": times.iloc[1:].reset_index(drop=True),
            "uuid": self.event_uuid,
            "source_uuid": cycle_uuid,
            "is_delta": True,
            "cycle_time_seconds": cycle_times,
            "violation": viol,
        }
    )
    return out

MachineStateEvents ¤

MachineStateEvents(dataframe: DataFrame, run_state_uuid: str, *, event_uuid: str = 'prod:run_idle', value_column: str = 'value_bool', time_column: str = 'systime')

Bases: Base

Production: Machine State

Detect run/idle transitions and intervals from a boolean state signal.

  • MachineStateEvents: Run/idle state intervals and transitions.
  • detect_run_idle: Intervalize run/idle states with optional min duration filter.
  • transition_events: Point events on state changes (idle->run, run->idle).

Methods:

Source code in src/ts_shape/events/production/machine_state.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(
    self,
    dataframe: pd.DataFrame,
    run_state_uuid: str,
    *,
    event_uuid: str = "prod:run_idle",
    value_column: str = "value_bool",
    time_column: str = "systime",
) -> None:
    super().__init__(dataframe, column_name=time_column)
    self.run_state_uuid = run_state_uuid
    self.event_uuid = event_uuid
    self.value_column = value_column
    self.time_column = time_column
    self.series = (
        self.dataframe[self.dataframe["uuid"] == self.run_state_uuid]
        .copy()
        .sort_values(self.time_column)
    )
    self.series[self.time_column] = pd.to_datetime(self.series[self.time_column])

detect_run_idle ¤

detect_run_idle(min_duration: str = '0s') -> DataFrame

Return intervals labeled as 'run' or 'idle'.

  • min_duration: discard intervals shorter than this duration. Columns: start, end, uuid, source_uuid, is_delta, state
Source code in src/ts_shape/events/production/machine_state.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def detect_run_idle(self, min_duration: str = "0s") -> pd.DataFrame:
    """Return intervals labeled as 'run' or 'idle'.

    - min_duration: discard intervals shorter than this duration.
    Columns: start, end, uuid, source_uuid, is_delta, state
    """
    if self.series.empty:
        return pd.DataFrame(
            columns=["start", "end", "uuid", "source_uuid", "is_delta", "state"]
        )
    s = self.series[[self.time_column, self.value_column]].copy()
    s["state"] = s[self.value_column].fillna(False).astype(bool)
    state_change = (s["state"] != s["state"].shift()).cumsum()
    min_td = pd.to_timedelta(min_duration)
    rows: List[Dict[str, Any]] = []
    for _, seg in s.groupby(state_change):
        state = bool(seg["state"].iloc[0])
        start = seg[self.time_column].iloc[0]
        end = seg[self.time_column].iloc[-1]
        if (end - start) < min_td:
            continue
        rows.append(
            {
                "start": start,
                "end": end,
                "uuid": self.event_uuid,
                "source_uuid": self.run_state_uuid,
                "is_delta": True,
                "state": "run" if state else "idle",
            }
        )
    return pd.DataFrame(rows)

get_dataframe ¤

get_dataframe() -> DataFrame

Returns the processed DataFrame.

Source code in src/ts_shape/utils/base.py
34
35
36
def get_dataframe(self) -> pd.DataFrame:
    """Returns the processed DataFrame."""
    return self.dataframe

transition_events ¤

transition_events() -> DataFrame

Return point events at state transitions.

Columns: systime, uuid, source_uuid, is_delta, transition ('idle_to_run'|'run_to_idle')

Source code in src/ts_shape/events/production/machine_state.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def transition_events(self) -> pd.DataFrame:
    """Return point events at state transitions.

    Columns: systime, uuid, source_uuid, is_delta, transition ('idle_to_run'|'run_to_idle')
    """
    if self.series.empty:
        return pd.DataFrame(
            columns=["systime", "uuid", "source_uuid", "is_delta", "transition"]
        )
    s = self.series[[self.time_column, self.value_column]].copy()
    s["state"] = s[self.value_column].fillna(False).astype(bool)
    s["prev"] = s["state"].shift()
    changes = s[s["state"] != s["prev"]].dropna(subset=["prev"])  # ignore first row
    if changes.empty:
        return pd.DataFrame(
            columns=["systime", "uuid", "source_uuid", "is_delta", "transition"]
        )
    changes = changes.rename(columns={self.time_column: "systime"})
    changes["transition"] = changes.apply(
        lambda r: "idle_to_run" if (r["prev"] is False and r["state"] is True) else "run_to_idle",
        axis=1,
    )
    return pd.DataFrame(
        {
            "systime": changes["systime"],
            "uuid": self.event_uuid,
            "source_uuid": self.run_state_uuid,
            "is_delta": True,
            "transition": changes["transition"],
        }
    )