Skip to content

ts_shape.events.production.line_throughput ¤

Classes:

LineThroughputEvents ¤

LineThroughputEvents(dataframe: DataFrame, *, event_uuid: str = 'prod:throughput', time_column: str = 'systime')

Bases: Base

Production: Line Throughput

Methods: - count_parts: Part counts per fixed window from a monotonically increasing counter. - takt_adherence: Cycle time violations against a takt time from step/boolean triggers.

Methods:

Source code in src/ts_shape/events/production/line_throughput.py
15
16
17
18
19
20
21
22
23
24
def __init__(
    self,
    dataframe: pd.DataFrame,
    *,
    event_uuid: str = "prod:throughput",
    time_column: str = "systime",
) -> None:
    super().__init__(dataframe, column_name=time_column)
    self.event_uuid = event_uuid
    self.time_column = time_column

count_parts ¤

count_parts(counter_uuid: str, *, value_column: str = 'value_integer', window: str = '1m') -> DataFrame

Compute parts per window for a counter uuid.

Returns columns: window_start, uuid, source_uuid, is_delta, count

Source code in src/ts_shape/events/production/line_throughput.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def count_parts(
    self,
    counter_uuid: str,
    *,
    value_column: str = "value_integer",
    window: str = "1m",
) -> pd.DataFrame:
    """Compute parts per window for a counter uuid.

    Returns columns: window_start, uuid, source_uuid, is_delta, count
    """
    c = (
        self.dataframe[self.dataframe["uuid"] == counter_uuid]
        .copy()
        .sort_values(self.time_column)
    )
    if c.empty:
        return pd.DataFrame(
            columns=["window_start", "uuid", "source_uuid", "is_delta", "count"]
        )
    c[self.time_column] = pd.to_datetime(c[self.time_column])
    c = c.set_index(self.time_column)
    # take diff of last values within each window
    grp = c[value_column].resample(window)
    counts = grp.max().fillna(method="ffill").diff().fillna(0).clip(lower=0)
    out = counts.to_frame("count").reset_index().rename(columns={self.time_column: "window_start"})
    out["uuid"] = self.event_uuid
    out["source_uuid"] = counter_uuid
    out["is_delta"] = True
    return out

get_dataframe ¤

get_dataframe() -> DataFrame

Returns the processed DataFrame.

Source code in src/ts_shape/utils/base.py
34
35
36
def get_dataframe(self) -> pd.DataFrame:
    """Returns the processed DataFrame."""
    return self.dataframe

takt_adherence ¤

takt_adherence(cycle_uuid: str, *, value_column: str = 'value_bool', takt_time: str = '60s', min_violation: str = '0s') -> DataFrame

Flag cycles whose durations exceed the takt_time.

For boolean triggers: detect True rising edges as cycle boundaries. For integer steps: detect increments as cycle boundaries.

Returns: systime (at boundary), uuid, source_uuid, is_delta, cycle_time_seconds, violation

Source code in src/ts_shape/events/production/line_throughput.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def takt_adherence(
    self,
    cycle_uuid: str,
    *,
    value_column: str = "value_bool",
    takt_time: str = "60s",
    min_violation: str = "0s",
) -> pd.DataFrame:
    """Flag cycles whose durations exceed the takt_time.

    For boolean triggers: detect True rising edges as cycle boundaries.
    For integer steps: detect increments as cycle boundaries.

    Returns: systime (at boundary), uuid, source_uuid, is_delta, cycle_time_seconds, violation
    """
    s = (
        self.dataframe[self.dataframe["uuid"] == cycle_uuid]
        .copy()
        .sort_values(self.time_column)
    )
    if s.empty:
        return pd.DataFrame(
            columns=[
                "systime",
                "uuid",
                "source_uuid",
                "is_delta",
                "cycle_time_seconds",
                "violation",
            ]
        )
    s[self.time_column] = pd.to_datetime(s[self.time_column])
    if value_column == "value_bool":
        s["prev"] = s[value_column].shift(fill_value=False)
        edges = s[(~s["prev"]) & (s[value_column].fillna(False))]
        times = edges[self.time_column].reset_index(drop=True)
    else:
        s["prev"] = s[value_column].shift(1)
        edges = s[s[value_column].fillna(0) != s["prev"].fillna(0)]
        times = edges[self.time_column].reset_index(drop=True)
    if len(times) < 2:
        return pd.DataFrame(
            columns=[
                "systime",
                "uuid",
                "source_uuid",
                "is_delta",
                "cycle_time_seconds",
                "violation",
            ]
        )
    cycle_times = (times.diff().dt.total_seconds()).iloc[1:].reset_index(drop=True)
    min_td = pd.to_timedelta(min_violation).total_seconds()
    target = pd.to_timedelta(takt_time).total_seconds()
    viol = (cycle_times - target) >= min_td
    out = pd.DataFrame(
        {
            "systime": times.iloc[1:].reset_index(drop=True),
            "uuid": self.event_uuid,
            "source_uuid": cycle_uuid,
            "is_delta": True,
            "cycle_time_seconds": cycle_times,
            "violation": viol,
        }
    )
    return out