Skip to content

ts_shape.events.production.flow_constraints ¤

Classes:

FlowConstraintEvents ¤

FlowConstraintEvents(dataframe: DataFrame, *, time_column: str = 'systime', event_uuid: str = 'prod:flow')

Bases: Base

Production: Flow Constraints

  • blocked_events: upstream running while downstream not consuming.
  • starved_events: downstream idle due to lack of upstream supply.

Methods:

Source code in src/ts_shape/events/production/flow_constraints.py
14
15
16
17
18
19
20
21
22
23
def __init__(
    self,
    dataframe: pd.DataFrame,
    *,
    time_column: str = "systime",
    event_uuid: str = "prod:flow",
) -> None:
    super().__init__(dataframe, column_name=time_column)
    self.time_column = time_column
    self.event_uuid = event_uuid

blocked_events ¤

blocked_events(*, roles: Dict[str, str], tolerance: str = '200ms', min_duration: str = '0s') -> DataFrame

Blocked: upstream_run=True while downstream_run=False.

roles = {'upstream_run': uuid, 'downstream_run': uuid}

Source code in src/ts_shape/events/production/flow_constraints.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def blocked_events(
    self,
    *,
    roles: Dict[str, str],
    tolerance: str = "200ms",
    min_duration: str = "0s",
) -> pd.DataFrame:
    """Blocked: upstream_run=True while downstream_run=False.

    roles = {'upstream_run': uuid, 'downstream_run': uuid}
    """
    up = self._align_bool(roles["upstream_run"])  # time, state
    dn = self._align_bool(roles["downstream_run"])  # time, state
    if up.empty or dn.empty:
        return pd.DataFrame(
            columns=["start", "end", "uuid", "source_uuid", "is_delta", "type"]
        )
    tol = pd.to_timedelta(tolerance)
    merged = pd.merge_asof(up, dn, on=self.time_column, suffixes=("_up", "_dn"), tolerance=tol, direction="nearest")
    cond = merged["state_up"] & (~merged["state_dn"].fillna(False))
    gid = (cond.ne(cond.shift())).cumsum()
    min_td = pd.to_timedelta(min_duration)
    rows: List[Dict[str, Any]] = []
    for _, seg in merged.groupby(gid):
        m = cond.loc[seg.index]
        if not m.any():
            continue
        start = seg[self.time_column].iloc[0]
        end = seg[self.time_column].iloc[-1]
        if (end - start) < min_td:
            continue
        rows.append(
            {
                "start": start,
                "end": end,
                "uuid": self.event_uuid,
                "source_uuid": roles["upstream_run"],
                "is_delta": True,
                "type": "blocked",
            }
        )
    return pd.DataFrame(rows)

get_dataframe ¤

get_dataframe() -> DataFrame

Returns the processed DataFrame.

Source code in src/ts_shape/utils/base.py
34
35
36
def get_dataframe(self) -> pd.DataFrame:
    """Returns the processed DataFrame."""
    return self.dataframe

starved_events ¤

starved_events(*, roles: Dict[str, str], tolerance: str = '200ms', min_duration: str = '0s') -> DataFrame

Starved: downstream_run=True while upstream_run=False.

roles = {'upstream_run': uuid, 'downstream_run': uuid}

Source code in src/ts_shape/events/production/flow_constraints.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def starved_events(
    self,
    *,
    roles: Dict[str, str],
    tolerance: str = "200ms",
    min_duration: str = "0s",
) -> pd.DataFrame:
    """Starved: downstream_run=True while upstream_run=False.

    roles = {'upstream_run': uuid, 'downstream_run': uuid}
    """
    up = self._align_bool(roles["upstream_run"])  # time, state
    dn = self._align_bool(roles["downstream_run"])  # time, state
    if up.empty or dn.empty:
        return pd.DataFrame(
            columns=["start", "end", "uuid", "source_uuid", "is_delta", "type"]
        )
    tol = pd.to_timedelta(tolerance)
    merged = pd.merge_asof(dn, up, on=self.time_column, suffixes=("_dn", "_up"), tolerance=tol, direction="nearest")
    cond = merged["state_dn"] & (~merged["state_up"].fillna(False))
    gid = (cond.ne(cond.shift())).cumsum()
    min_td = pd.to_timedelta(min_duration)
    rows: List[Dict[str, Any]] = []
    for _, seg in merged.groupby(gid):
        m = cond.loc[seg.index]
        if not m.any():
            continue
        start = seg[self.time_column].iloc[0]
        end = seg[self.time_column].iloc[-1]
        if (end - start) < min_td:
            continue
        rows.append(
            {
                "start": start,
                "end": end,
                "uuid": self.event_uuid,
                "source_uuid": roles["downstream_run"],
                "is_delta": True,
                "type": "starved",
            }
        )
    return pd.DataFrame(rows)