Skip to content

feature_pipeline

feature_pipeline ¤

FeaturePipeline ¤

FeaturePipeline(
    dataframe: DataFrame,
    time_column: str = "systime",
    uuid_column: str = "uuid",
    value_column: str = "value_double",
)

Flexible pipeline builder for chaining any ts-shape class.

ts-shape has two class patterns. Choosing the wrong add_* method is the most common mistake — the pipeline validates your choice at registration time and tells you which method to use instead.

Pattern 1 — Stateless classmethods (add_step)

The vast majority of ts-shape classes. The class is a namespace; every method is a @classmethod whose first argument is a DataFrame::

# No object creation — call directly on the class:
result = DoubleFilter.filter_nan_value_double(df)

Classes that follow this pattern:

  • Filters: DoubleFilter, IntegerFilter, StringFilter, BooleanFilter, IsDeltaFilter, DateTimeFilter, CustomFilter
  • Calculators: IntegerCalc
  • Functions: LambdaProcessor
  • Time: TimestampConverter, TimezoneShift
  • Segment analysis: SegmentExtractor, SegmentProcessor, TimeWindowedFeatureTable, ProfileComparison
  • Pattern recognition: PatternRecognition
  • Statistics: NumericStatistics, BooleanStatistics, StringStatistics, TimestampStatistics, TimeGroupedStatistics
  • Context: ValueMapper

Pattern 2 — Stateful instance classes (add_instance_step)

Classes that must be instantiated with a DataFrame first. The constructor stores configuration (column names, UUIDs, thresholds) and methods operate on internal state::

# Must create an object first:
harmonizer = DataHarmonizer(df, time_column='systime')
result = harmonizer.resample_to_uniform(freq='1s')

Classes that follow this pattern:

  • DataHarmonizer — harmonize, resample, fill gaps
  • CrossSignalAnalytics — lead-lag, Granger causality, synchronization
  • CycleExtractor — extract production cycles
  • CycleDataProcessor — split/merge data by cycle
  • DescriptiveFeatures — per-group feature tables
  • OEECalculator — OEE availability/performance/quality
  • Events (all 60+ classes): ThresholdMonitoringEvents, MachineStateEvents, SteadyStateDetectionEvents, OutlierDetectionEvents, StatisticalProcessControlRuleBased, DegradationDetectionEvents, EnergyConsumptionEvents, etc.

Pattern 3 — Custom functions (add_lambda_step)

For one-off transformations that don't map to a ts-shape class::

pipe.add_lambda_step(
    lambda df: df[df['uuid'].isin(['temperature', 'pressure'])],
    name='select_signals',
)
Special references (sentinels)¤

Any keyword argument value can use these string sentinels:

  • '$prev' — the output of the previous step (available from step 2+).
  • '$input' — the original DataFrame passed to the constructor.

These let you wire steps that need more than one DataFrame.

Full example::

from ts_shape.transform.filter.numeric_filter import DoubleFilter
from ts_shape.transform.filter.datetime_filter import DateTimeFilter
from ts_shape.transform.harmonization import DataHarmonizer
from ts_shape.features.segment_analysis.segment_extractor import SegmentExtractor
from ts_shape.features.segment_analysis.segment_processor import SegmentProcessor
from ts_shape.features.segment_analysis.time_windowed_features import TimeWindowedFeatureTable

result = (
    FeaturePipeline(df)
    .add_step(DateTimeFilter.filter_between_dates,
              start_date='2024-01-01', end_date='2024-01-31')
    .add_step(DoubleFilter.filter_nan_value_double)
    .add_instance_step(DataHarmonizer,
                       call='resample_to_uniform', freq='1s')
    .add_step(SegmentExtractor.extract_time_ranges,
              segment_uuid='order_number')
    .add_step(SegmentProcessor.apply_ranges,
              dataframe='$input', time_ranges='$prev')
    .add_step(TimeWindowedFeatureTable.compute, freq='1min')
    .run()
)

Debugging::

# Preview the pipeline before running:
print(pipe.describe())

# Get intermediate DataFrames for each step:
intermediates = pipe.run_steps()
intermediates['input']          # original
intermediates['DoubleFilter.filter_nan_value_double']  # after filtering

Initialize the pipeline with input data.

Parameters:

Name Type Description Default
dataframe DataFrame

Input DataFrame to process.

required
time_column str

Name of the timestamp column. Automatically passed to instance-step constructors that accept it.

'systime'
uuid_column str

Name of the UUID/signal identifier column.

'uuid'
value_column str

Name of the numeric value column.

'value_double'

Raises:

Type Description
TypeError

If dataframe is not a pandas DataFrame.

steps property ¤

steps: List[str]

Return the ordered list of registered step names.

add_step ¤

add_step(
    method: Callable[..., DataFrame], **kwargs: Any
) -> FeaturePipeline

Add a stateless classmethod step (Pattern 1).

The pipeline passes the current DataFrame as the first positional argument automatically. If you explicitly provide the first parameter by name (e.g. dataframe='$input'), the pipeline uses your value instead.

Parameters:

Name Type Description Default
method Callable[..., DataFrame]

A classmethod reference, e.g. DoubleFilter.filter_nan_value_double.

required
**kwargs Any

Keyword arguments forwarded to the method. Use '$prev' or '$input' as values to reference other DataFrames.

{}

Returns:

Type Description
FeaturePipeline

self, for method chaining.

Raises:

Type Description
TypeError

If method is an instance method (should use :meth:add_instance_step instead).

TypeError

If method is not callable.

ValueError

If kwargs contain an invalid sentinel (e.g. '$PREV' instead of '$prev').

Example::

pipe.add_step(DoubleFilter.filter_nan_value_double)
pipe.add_step(IntegerCalc.scale_column,
              column_name='value_double', factor=2)

# Wire two DataFrames into one step:
pipe.add_step(SegmentProcessor.apply_ranges,
              dataframe='$input', time_ranges='$prev')

add_instance_step ¤

add_instance_step(
    cls: Type,
    call: str,
    init_kwargs: Optional[Dict[str, Any]] = None,
    **method_kwargs: Any
) -> FeaturePipeline

Add a stateful instance-class step (Pattern 2).

The pipeline automatically:

  1. Instantiates cls with the current DataFrame.
  2. Passes time_column, uuid_column, value_column to the constructor if it accepts those parameters.
  3. Calls the method named by call.
  4. If the method returns a DataFrame, it becomes the new pipeline state. Otherwise a warning is logged and the pipeline continues with the previous DataFrame.

Parameters:

Name Type Description Default
cls Type

The class to instantiate, e.g. DataHarmonizer.

required
call str

Name of the instance method to invoke, e.g. 'resample_to_uniform'.

required
init_kwargs Optional[Dict[str, Any]]

Extra keyword arguments for the constructor (beyond the DataFrame and column names).

None
**method_kwargs Any

Keyword arguments forwarded to the method. Use '$prev' or '$input' as sentinel values.

{}

Returns:

Type Description
FeaturePipeline

self, for method chaining.

Raises:

Type Description
TypeError

If cls is not a class.

AttributeError

If cls does not have a method named call.

ValueError

If kwargs contain an invalid sentinel.

Example::

pipe.add_instance_step(DataHarmonizer,
                       call='resample_to_uniform', freq='1s')
pipe.add_instance_step(CrossSignalAnalytics,
                       call='lead_lag_matrix', max_lag=10)
pipe.add_instance_step(CycleExtractor,
                       call='process_persistent_cycle',
                       init_kwargs={'start_uuid': 'cycle_start'})

add_lambda_step ¤

add_lambda_step(
    func: Callable[[DataFrame], DataFrame],
    name: Optional[str] = None,
) -> FeaturePipeline

Add a custom function step (Pattern 3).

Use this for one-off transformations that don't map to a ts-shape class, such as selecting specific UUIDs or adding derived columns.

Parameters:

Name Type Description Default
func Callable[[DataFrame], DataFrame]

A callable (DataFrame) -> DataFrame.

required
name Optional[str]

Optional label for logging and :meth:describe. Defaults to the function's __name__.

None

Returns:

Type Description
FeaturePipeline

self, for method chaining.

Raises:

Type Description
TypeError

If func is not callable.

Example::

pipe.add_lambda_step(
    lambda df: df[df['uuid'].isin(['temperature', 'pressure'])],
    name='select_signals',
)

describe ¤

describe() -> str

Return a human-readable summary of the pipeline.

Call this before run() to verify the pipeline is wired correctly.

Example output::

FeaturePipeline (1200 rows, 4 cols)
  1. [step]     DoubleFilter.filter_nan_value_double
  2. [instance] DataHarmonizer.resample_to_uniform  freq='1s'
  3. [step]     SegmentExtractor.extract_time_ranges  segment_uuid='order_number'
  4. [step]     SegmentProcessor.apply_ranges  dataframe='$input', time_ranges='$prev'
  5. [step]     TimeWindowedFeatureTable.compute  freq='1min'

run ¤

run() -> pd.DataFrame

Execute all steps sequentially and return the final DataFrame.

Raises:

Type Description
RuntimeError

If any step fails. The error message includes the step number, name, DataFrame shape before the failure, available columns, and the original exception.

run_steps ¤

run_steps() -> Dict[str, pd.DataFrame]

Execute all steps and return intermediate results.

Returns a dict keyed by step name. The key 'input' holds the original DataFrame; subsequent keys are the step names in order. Useful for debugging which step transforms data unexpectedly.

Raises:

Type Description
RuntimeError

If any step fails (same as :meth:run).