Skip to content

ts_shape.events.engineering ¤

Engineering Events

Detectors for engineering-related patterns over shaped timeseries.

  • SetpointChangeEvents: Detect setpoint changes and compute response KPIs.
  • detect_setpoint_steps: Point events where |Δsetpoint| ≥ min_delta and holds for min_hold.
  • detect_setpoint_ramps: Intervals where |dS/dt| ≥ min_rate for at least min_duration.
  • detect_setpoint_changes: Unified table of steps and ramps with standardized columns.
  • time_to_settle: Time until |actual − setpoint| ≤ tol for a hold duration within a window.
  • overshoot_metrics: Peak overshoot magnitude/percent and time-to-peak after a change.

  • StartupDetectionEvents: Detect startup intervals from thresholds or slope.

  • detect_startup_by_threshold: Rising threshold crossing with minimum dwell above threshold.
  • detect_startup_by_slope: Intervals with sustained positive slope ≥ min_slope for min_duration.

Modules:

Classes:

SetpointChangeEvents ¤

SetpointChangeEvents(dataframe: DataFrame, setpoint_uuid: str, *, event_uuid: str = 'setpoint_change_event', value_column: str = 'value_double', time_column: str = 'systime')

Bases: Base

Detect step/ramp changes on a setpoint signal and compute follow-up KPIs like time-to-settle and overshoot based on an actual (process) value.

Schema assumptions (columns): - uuid, sequence_number, systime, plctime, is_delta - value_integer, value_string, value_double, value_bool, value_bytes

Methods:

Source code in src/ts_shape/events/engineering/setpoint_events.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(
    self,
    dataframe: pd.DataFrame,
    setpoint_uuid: str,
    *,
    event_uuid: str = "setpoint_change_event",
    value_column: str = "value_double",
    time_column: str = "systime",
) -> None:
    super().__init__(dataframe, column_name=time_column)
    self.setpoint_uuid = setpoint_uuid
    self.event_uuid = event_uuid
    self.value_column = value_column
    self.time_column = time_column

    # isolate setpoint series and ensure proper dtypes/sort
    self.sp = (
        self.dataframe[self.dataframe["uuid"] == self.setpoint_uuid]
        .copy()
        .sort_values(self.time_column)
    )
    self.sp[self.time_column] = pd.to_datetime(self.sp[self.time_column])

detect_setpoint_changes ¤

detect_setpoint_changes(*, min_delta: float = 0.0, min_rate: Optional[float] = None, min_hold: str = '0s', min_duration: str = '0s') -> DataFrame

Unified setpoint change table (steps + ramps) with standardized columns.

Source code in src/ts_shape/events/engineering/setpoint_events.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def detect_setpoint_changes(
    self,
    *,
    min_delta: float = 0.0,
    min_rate: Optional[float] = None,
    min_hold: str = "0s",
    min_duration: str = "0s",
) -> pd.DataFrame:
    """
    Unified setpoint change table (steps + ramps) with standardized columns.
    """
    steps = self.detect_setpoint_steps(min_delta=min_delta, min_hold=min_hold)
    ramps = (
        self.detect_setpoint_ramps(min_rate=min_rate, min_duration=min_duration)
        if min_rate is not None
        else pd.DataFrame(columns=["start", "end", "uuid", "is_delta", "change_type", "avg_rate", "delta"])
    )
    # ensure uniform columns
    if not steps.empty:
        steps = steps.assign(avg_rate=None, delta=None)[
            [
                "start",
                "end",
                "uuid",
                "is_delta",
                "change_type",
                "magnitude",
                "prev_level",
                "new_level",
                "avg_rate",
                "delta",
            ]
        ]
    if not ramps.empty:
        ramps = ramps.assign(magnitude=None, prev_level=None, new_level=None)[
            [
                "start",
                "end",
                "uuid",
                "is_delta",
                "change_type",
                "magnitude",
                "prev_level",
                "new_level",
                "avg_rate",
                "delta",
            ]
        ]
    frames = [df for df in (steps, ramps) if not df.empty]
    combined = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame(
        columns=[
            "start",
            "end",
            "uuid",
            "is_delta",
            "change_type",
            "magnitude",
            "prev_level",
            "new_level",
            "avg_rate",
            "delta",
        ]
    )
    return combined.sort_values(["start", "end"]) if not combined.empty else combined

detect_setpoint_ramps ¤

detect_setpoint_ramps(min_rate: float, min_duration: str = '0s') -> DataFrame

Interval events where |dS/dt| >= min_rate for at least min_duration.

Returns:

  • DataFrame

    DataFrame with columns: start, end, uuid, is_delta, change_type='ramp',

  • DataFrame

    avg_rate, delta.

Source code in src/ts_shape/events/engineering/setpoint_events.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def detect_setpoint_ramps(self, min_rate: float, min_duration: str = "0s") -> pd.DataFrame:
    """
    Interval events where |dS/dt| >= min_rate for at least `min_duration`.

    Returns:
        DataFrame with columns: start, end, uuid, is_delta, change_type='ramp',
        avg_rate, delta.
    """
    if self.sp.empty:
        return pd.DataFrame(
            columns=["start", "end", "uuid", "is_delta", "change_type", "avg_rate", "delta"]
        )

    sp = self.sp[[self.time_column, self.value_column]].copy()
    sp["dt_s"] = sp[self.time_column].diff().dt.total_seconds()
    sp["dv"] = sp[self.value_column].diff()
    sp["rate"] = sp["dv"] / sp["dt_s"]
    rate_mask = sp["rate"].abs() >= float(min_rate)

    # group contiguous True segments
    group_id = (rate_mask != rate_mask.shift()).cumsum()
    events: List[Dict[str, Any]] = []
    min_d = pd.to_timedelta(min_duration)
    for gid, seg in sp.groupby(group_id):
        seg_mask_true = rate_mask.loc[seg.index]
        if not seg_mask_true.any():
            continue
        # boundaries
        start_time = seg.loc[seg_mask_true, self.time_column].iloc[0]
        end_time = seg.loc[seg_mask_true, self.time_column].iloc[-1]
        if (end_time - start_time) < min_d:
            continue
        avg_rate = seg.loc[seg_mask_true, "rate"].mean()
        delta = seg.loc[seg_mask_true, "dv"].sum()
        events.append(
            {
                "start": start_time,
                "end": end_time,
                "uuid": self.event_uuid,
                "is_delta": True,
                "change_type": "ramp",
                "avg_rate": float(avg_rate) if pd.notna(avg_rate) else None,
                "delta": float(delta) if pd.notna(delta) else None,
            }
        )

    return pd.DataFrame(events)

detect_setpoint_steps ¤

detect_setpoint_steps(min_delta: float, min_hold: str = '0s') -> DataFrame

Point events at times where the setpoint changes by >= min_delta and the new level holds for at least min_hold (no subsequent change within that time).

Returns:

  • DataFrame

    DataFrame with columns: start, end (== start), uuid, is_delta,

  • DataFrame

    change_type='step', magnitude, prev_level, new_level.

Source code in src/ts_shape/events/engineering/setpoint_events.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def detect_setpoint_steps(self, min_delta: float, min_hold: str = "0s") -> pd.DataFrame:
    """
    Point events at times where the setpoint changes by >= min_delta and the
    new level holds for at least `min_hold` (no subsequent change within that time).

    Returns:
        DataFrame with columns: start, end (== start), uuid, is_delta,
        change_type='step', magnitude, prev_level, new_level.
    """
    if self.sp.empty:
        return pd.DataFrame(
            columns=[
                "start",
                "end",
                "uuid",
                "is_delta",
                "change_type",
                "magnitude",
                "prev_level",
                "new_level",
            ]
        )

    sp = self.sp[[self.time_column, self.value_column]].copy()
    sp["prev"] = sp[self.value_column].shift(1)
    sp["delta"] = sp[self.value_column] - sp["prev"]
    change_mask = sp["delta"].abs() >= float(min_delta)

    # hold condition: next change must be after min_hold
    change_times = sp.loc[change_mask, self.time_column]
    min_hold_td = pd.to_timedelta(min_hold)
    next_change_times = change_times.shift(-1)
    hold_ok = (next_change_times - change_times >= min_hold_td) | next_change_times.isna()
    valid_change_times = change_times[hold_ok]

    rows: List[Dict[str, Any]] = []
    for t in valid_change_times:
        row = sp.loc[sp[self.time_column] == t].iloc[0]
        rows.append(
            {
                "start": t,
                "end": t,
                "uuid": self.event_uuid,
                "is_delta": True,
                "change_type": "step",
                "magnitude": float(row["delta"]),
                "prev_level": float(row["prev"]) if pd.notna(row["prev"]) else None,
                "new_level": float(row[self.value_column]),
            }
        )

    return pd.DataFrame(rows)

get_dataframe ¤

get_dataframe() -> DataFrame

Returns the processed DataFrame.

Source code in src/ts_shape/utils/base.py
34
35
36
def get_dataframe(self) -> pd.DataFrame:
    """Returns the processed DataFrame."""
    return self.dataframe

overshoot_metrics ¤

overshoot_metrics(actual_uuid: str, *, window: str = '10m') -> DataFrame

For each change, compute peak overshoot relative to the new setpoint within a lookahead window.

Returns:

  • DataFrame

    DataFrame with columns: start, uuid, is_delta, overshoot_abs,

  • DataFrame

    overshoot_pct, t_peak_seconds.

Source code in src/ts_shape/events/engineering/setpoint_events.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def overshoot_metrics(
    self,
    actual_uuid: str,
    *,
    window: str = "10m",
) -> pd.DataFrame:
    """
    For each change, compute peak overshoot relative to the new setpoint
    within a lookahead window.

    Returns:
        DataFrame with columns: start, uuid, is_delta, overshoot_abs,
        overshoot_pct, t_peak_seconds.
    """
    if self.sp.empty:
        return pd.DataFrame(columns=["start", "uuid", "is_delta", "overshoot_abs", "overshoot_pct", "t_peak_seconds"])

    actual = (
        self.dataframe[self.dataframe["uuid"] == actual_uuid]
        .copy()
        .sort_values(self.time_column)
    )
    actual[self.time_column] = pd.to_datetime(actual[self.time_column])
    look_td = pd.to_timedelta(window)

    sp = self.sp[[self.time_column, self.value_column]].copy()
    sp["prev"] = sp[self.value_column].shift(1)
    sp["delta"] = sp[self.value_column] - sp["prev"]
    changes = sp.loc[sp["delta"].abs() > 0, [self.time_column, self.value_column, "delta"]]

    out_rows: List[Dict[str, Any]] = []
    for _, r in changes.iterrows():
        t0 = r[self.time_column]
        s_new = float(r[self.value_column])
        delta = float(r["delta"]) if pd.notna(r["delta"]) else 0.0
        win = actual[(actual[self.time_column] >= t0) & (actual[self.time_column] <= t0 + look_td)]
        if win.empty:
            out_rows.append(
                {
                    "start": t0,
                    "uuid": self.event_uuid,
                    "is_delta": True,
                    "overshoot_abs": None,
                    "overshoot_pct": None,
                    "t_peak_seconds": None,
                }
            )
            continue
        err = win[self.value_column] - s_new
        if delta >= 0:
            peak = err.max()
            t_peak = win.loc[err.idxmax(), self.time_column]
        else:
            peak = -err.min()  # magnitude for downward step
            t_peak = win.loc[err.idxmin(), self.time_column]
        overshoot_abs = float(peak) if pd.notna(peak) else None
        overshoot_pct = (overshoot_abs / abs(delta)) if (delta != 0 and overshoot_abs is not None) else None
        out_rows.append(
            {
                "start": t0,
                "uuid": self.event_uuid,
                "is_delta": True,
                "overshoot_abs": overshoot_abs,
                "overshoot_pct": float(overshoot_pct) if overshoot_pct is not None else None,
                "t_peak_seconds": (t_peak - t0).total_seconds() if pd.notna(t_peak) else None,
            }
        )

    return pd.DataFrame(out_rows)

time_to_settle ¤

time_to_settle(actual_uuid: str, *, tol: float, hold: str = '0s', lookahead: str = '10m') -> DataFrame

For each setpoint change (any change), compute time until the actual signal is within ±tol of the new setpoint for a continuous duration of hold.

Returns:

  • DataFrame

    DataFrame with columns: start, uuid, is_delta, t_settle_seconds, settled.

Source code in src/ts_shape/events/engineering/setpoint_events.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def time_to_settle(
    self,
    actual_uuid: str,
    *,
    tol: float,
    hold: str = "0s",
    lookahead: str = "10m",
) -> pd.DataFrame:
    """
    For each setpoint change (any change), compute time until the actual signal
    is within ±`tol` of the new setpoint for a continuous duration of `hold`.

    Returns:
        DataFrame with columns: start, uuid, is_delta, t_settle_seconds, settled.
    """
    if self.sp.empty:
        return pd.DataFrame(columns=["start", "uuid", "is_delta", "t_settle_seconds", "settled"])

    actual = (
        self.dataframe[self.dataframe["uuid"] == actual_uuid]
        .copy()
        .sort_values(self.time_column)
    )
    actual[self.time_column] = pd.to_datetime(actual[self.time_column])
    hold_td = pd.to_timedelta(hold)
    look_td = pd.to_timedelta(lookahead)

    # change instants
    sp = self.sp[[self.time_column, self.value_column]].copy()
    sp["prev"] = sp[self.value_column].shift(1)
    sp["delta"] = sp[self.value_column] - sp["prev"]
    change_times = sp.loc[sp["delta"].abs() > 0, [self.time_column, self.value_column]].reset_index(drop=True)

    rows: List[Dict[str, Any]] = []
    for _, c in change_times.iterrows():
        t0 = c[self.time_column]
        s_new = float(c[self.value_column])
        window = actual[(actual[self.time_column] >= t0) & (actual[self.time_column] <= t0 + look_td)]
        if window.empty:
            rows.append({"start": t0, "uuid": self.event_uuid, "is_delta": True, "t_settle_seconds": None, "settled": False})
            continue
        err = (window[self.value_column] - s_new).abs()
        inside = err <= tol

        # time to first entry within tolerance (ignores hold)
        if inside.any():
            first_idx = inside[inside].index[0]
            t_enter = window.loc[first_idx, self.time_column]
        else:
            t_enter = None

        # determine if any contiguous inside segment satisfies hold duration
        settled = False
        if inside.any():
            gid = (inside.ne(inside.shift())).cumsum()
            for _, seg in window.groupby(gid):
                seg_inside = inside.loc[seg.index]
                if not seg_inside.iloc[0]:
                    continue
                start_seg = seg[self.time_column].iloc[0]
                end_seg = seg[self.time_column].iloc[-1]
                if (end_seg - start_seg) >= hold_td:
                    settled = True
                    break

        rows.append(
            {
                "start": t0,
                "uuid": self.event_uuid,
                "is_delta": True,
                "t_settle_seconds": (t_enter - t0).total_seconds() if t_enter is not None else None,
                "settled": bool(settled),
            }
        )

    return pd.DataFrame(rows)

StartupDetectionEvents ¤

StartupDetectionEvents(dataframe: DataFrame, target_uuid: str, *, event_uuid: str = 'startup_event', value_column: str = 'value_double', time_column: str = 'systime')

Bases: Base

Detect equipment startup intervals based on threshold crossings or sustained positive slope in a numeric metric (speed, temperature, etc.).

Schema assumptions (columns): - uuid, sequence_number, systime, plctime, is_delta - value_integer, value_string, value_double, value_bool, value_bytes

Methods:

Source code in src/ts_shape/events/engineering/startup_events.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(
    self,
    dataframe: pd.DataFrame,
    target_uuid: str,
    *,
    event_uuid: str = "startup_event",
    value_column: str = "value_double",
    time_column: str = "systime",
) -> None:
    super().__init__(dataframe, column_name=time_column)
    self.target_uuid = target_uuid
    self.event_uuid = event_uuid
    self.value_column = value_column
    self.time_column = time_column

    self.series = (
        self.dataframe[self.dataframe["uuid"] == self.target_uuid]
        .copy()
        .sort_values(self.time_column)
    )
    self.series[self.time_column] = pd.to_datetime(self.series[self.time_column])

detect_startup_by_slope ¤

detect_startup_by_slope(*, min_slope: float, slope_window: str = '0s', min_duration: str = '0s') -> DataFrame

Startup intervals where per-second slope >= min_slope for at least min_duration. slope_window is accepted for API completeness but the current implementation uses instantaneous slope between samples.

Returns:

  • DataFrame

    DataFrame with columns: start, end, uuid, is_delta, method, min_slope, avg_slope.

Source code in src/ts_shape/events/engineering/startup_events.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def detect_startup_by_slope(
    self,
    *,
    min_slope: float,
    slope_window: str = "0s",
    min_duration: str = "0s",
) -> pd.DataFrame:
    """
    Startup intervals where per-second slope >= `min_slope` for at least
    `min_duration`. `slope_window` is accepted for API completeness but the
    current implementation uses instantaneous slope between samples.

    Returns:
        DataFrame with columns: start, end, uuid, is_delta, method, min_slope, avg_slope.
    """
    if self.series.empty:
        return pd.DataFrame(columns=["start", "end", "uuid", "is_delta", "method", "min_slope", "avg_slope"])

    s = self.series[[self.time_column, self.value_column]].copy()
    s["dt_s"] = s[self.time_column].diff().dt.total_seconds()
    s["dv"] = s[self.value_column].diff()
    s["slope"] = s["dv"] / s["dt_s"]
    mask = s["slope"] >= float(min_slope)

    gid = (mask != mask.shift()).cumsum()
    min_d = pd.to_timedelta(min_duration)
    events: List[Dict[str, Any]] = []
    for _, seg in s.groupby(gid):
        seg_mask = mask.loc[seg.index]
        if not seg_mask.any():
            continue
        start_t = seg.loc[seg_mask, self.time_column].iloc[0]
        end_t = seg.loc[seg_mask, self.time_column].iloc[-1]
        if (end_t - start_t) < min_d:
            continue
        avg_slope = seg.loc[seg_mask, "slope"].mean()
        events.append(
            {
                "start": start_t,
                "end": end_t,
                "uuid": self.event_uuid,
                "is_delta": True,
                "method": "slope",
                "min_slope": float(min_slope),
                "avg_slope": float(avg_slope) if pd.notna(avg_slope) else None,
            }
        )

    return pd.DataFrame(events)

detect_startup_by_threshold ¤

detect_startup_by_threshold(*, threshold: float, hysteresis: tuple[float, float] | None = None, min_above: str = '0s') -> DataFrame

Startup begins at first crossing above threshold (or hysteresis enter) and is valid only if the metric stays above the (exit) threshold for at least min_above.

Returns:

  • DataFrame

    DataFrame with columns: start, end, uuid, is_delta, method, threshold.

Source code in src/ts_shape/events/engineering/startup_events.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def detect_startup_by_threshold(
    self,
    *,
    threshold: float,
    hysteresis: tuple[float, float] | None = None,
    min_above: str = "0s",
) -> pd.DataFrame:
    """
    Startup begins at first crossing above `threshold` (or hysteresis enter)
    and is valid only if the metric stays above the (exit) threshold for at
    least `min_above`.

    Returns:
        DataFrame with columns: start, end, uuid, is_delta, method, threshold.
    """
    if self.series.empty:
        return pd.DataFrame(columns=["start", "end", "uuid", "is_delta", "method", "threshold"])

    enter_thr = threshold if hysteresis is None else hysteresis[0]
    exit_thr = threshold if hysteresis is None else hysteresis[1]
    min_above_td = pd.to_timedelta(min_above)

    s = self.series[[self.time_column, self.value_column]].copy()
    above_enter = s[self.value_column] >= enter_thr
    rising = (~above_enter.shift(fill_value=False)) & above_enter
    rise_times = s.loc[rising, self.time_column]

    events: List[Dict[str, Any]] = []
    for t0 in rise_times:
        # ensure dwell above exit threshold for min_above
        win = s[(s[self.time_column] >= t0) & (s[self.time_column] <= t0 + min_above_td)]
        if win.empty:
            continue
        if (win[self.value_column] >= exit_thr).all():
            events.append(
                {
                    "start": t0,
                    "end": t0 + min_above_td,
                    "uuid": self.event_uuid,
                    "is_delta": True,
                    "method": "threshold",
                    "threshold": float(threshold),
                }
            )

    return pd.DataFrame(events)

get_dataframe ¤

get_dataframe() -> DataFrame

Returns the processed DataFrame.

Source code in src/ts_shape/utils/base.py
34
35
36
def get_dataframe(self) -> pd.DataFrame:
    """Returns the processed DataFrame."""
    return self.dataframe