feature_pipeline
feature_pipeline ¤
FeaturePipeline ¤
FeaturePipeline(
dataframe: DataFrame,
time_column: str = "systime",
uuid_column: str = "uuid",
value_column: str = "value_double",
)
Flexible pipeline builder for chaining any ts-shape class.
ts-shape has two class patterns. Choosing the wrong add_* method is
the most common mistake — the pipeline validates your choice at
registration time and tells you which method to use instead.
Pattern 1 — Stateless classmethods (add_step)
The vast majority of ts-shape classes. The class is a namespace; every
method is a @classmethod whose first argument is a DataFrame::
# No object creation — call directly on the class:
result = DoubleFilter.filter_nan_value_double(df)
Classes that follow this pattern:
- Filters:
DoubleFilter,IntegerFilter,StringFilter,BooleanFilter,IsDeltaFilter,DateTimeFilter,CustomFilter - Calculators:
IntegerCalc - Functions:
LambdaProcessor - Time:
TimestampConverter,TimezoneShift - Segment analysis:
SegmentExtractor,SegmentProcessor,TimeWindowedFeatureTable,ProfileComparison - Pattern recognition:
PatternRecognition - Statistics:
NumericStatistics,BooleanStatistics,StringStatistics,TimestampStatistics,TimeGroupedStatistics - Context:
ValueMapper
Pattern 2 — Stateful instance classes (add_instance_step)
Classes that must be instantiated with a DataFrame first. The constructor stores configuration (column names, UUIDs, thresholds) and methods operate on internal state::
# Must create an object first:
harmonizer = DataHarmonizer(df, time_column='systime')
result = harmonizer.resample_to_uniform(freq='1s')
Classes that follow this pattern:
DataHarmonizer— harmonize, resample, fill gapsCrossSignalAnalytics— lead-lag, Granger causality, synchronizationCycleExtractor— extract production cyclesCycleDataProcessor— split/merge data by cycleDescriptiveFeatures— per-group feature tablesOEECalculator— OEE availability/performance/quality- Events (all 60+ classes):
ThresholdMonitoringEvents,MachineStateEvents,SteadyStateDetectionEvents,OutlierDetectionEvents,StatisticalProcessControlRuleBased,DegradationDetectionEvents,EnergyConsumptionEvents, etc.
Pattern 3 — Custom functions (add_lambda_step)
For one-off transformations that don't map to a ts-shape class::
pipe.add_lambda_step(
lambda df: df[df['uuid'].isin(['temperature', 'pressure'])],
name='select_signals',
)
Special references (sentinels)¤
Any keyword argument value can use these string sentinels:
'$prev'— the output of the previous step (available from step 2+).'$input'— the original DataFrame passed to the constructor.
These let you wire steps that need more than one DataFrame.
Full example::
from ts_shape.transform.filter.numeric_filter import DoubleFilter
from ts_shape.transform.filter.datetime_filter import DateTimeFilter
from ts_shape.transform.harmonization import DataHarmonizer
from ts_shape.features.segment_analysis.segment_extractor import SegmentExtractor
from ts_shape.features.segment_analysis.segment_processor import SegmentProcessor
from ts_shape.features.segment_analysis.time_windowed_features import TimeWindowedFeatureTable
result = (
FeaturePipeline(df)
.add_step(DateTimeFilter.filter_between_dates,
start_date='2024-01-01', end_date='2024-01-31')
.add_step(DoubleFilter.filter_nan_value_double)
.add_instance_step(DataHarmonizer,
call='resample_to_uniform', freq='1s')
.add_step(SegmentExtractor.extract_time_ranges,
segment_uuid='order_number')
.add_step(SegmentProcessor.apply_ranges,
dataframe='$input', time_ranges='$prev')
.add_step(TimeWindowedFeatureTable.compute, freq='1min')
.run()
)
Debugging::
# Preview the pipeline before running:
print(pipe.describe())
# Get intermediate DataFrames for each step:
intermediates = pipe.run_steps()
intermediates['input'] # original
intermediates['DoubleFilter.filter_nan_value_double'] # after filtering
Initialize the pipeline with input data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dataframe
|
DataFrame
|
Input DataFrame to process. |
required |
time_column
|
str
|
Name of the timestamp column. Automatically passed to instance-step constructors that accept it. |
'systime'
|
uuid_column
|
str
|
Name of the UUID/signal identifier column. |
'uuid'
|
value_column
|
str
|
Name of the numeric value column. |
'value_double'
|
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
add_step ¤
add_step(
method: Callable[..., DataFrame], **kwargs: Any
) -> FeaturePipeline
Add a stateless classmethod step (Pattern 1).
The pipeline passes the current DataFrame as the first positional
argument automatically. If you explicitly provide the first
parameter by name (e.g. dataframe='$input'), the pipeline
uses your value instead.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
method
|
Callable[..., DataFrame]
|
A classmethod reference,
e.g. |
required |
**kwargs
|
Any
|
Keyword arguments forwarded to the method.
Use |
{}
|
Returns:
| Type | Description |
|---|---|
FeaturePipeline
|
self, for method chaining. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
TypeError
|
If |
ValueError
|
If kwargs contain an invalid sentinel (e.g.
|
Example::
pipe.add_step(DoubleFilter.filter_nan_value_double)
pipe.add_step(IntegerCalc.scale_column,
column_name='value_double', factor=2)
# Wire two DataFrames into one step:
pipe.add_step(SegmentProcessor.apply_ranges,
dataframe='$input', time_ranges='$prev')
add_instance_step ¤
add_instance_step(
cls: Type,
call: str,
init_kwargs: Optional[Dict[str, Any]] = None,
**method_kwargs: Any
) -> FeaturePipeline
Add a stateful instance-class step (Pattern 2).
The pipeline automatically:
- Instantiates
clswith the current DataFrame. - Passes
time_column,uuid_column,value_columnto the constructor if it accepts those parameters. - Calls the method named by
call. - If the method returns a DataFrame, it becomes the new pipeline state. Otherwise a warning is logged and the pipeline continues with the previous DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cls
|
Type
|
The class to instantiate, e.g. |
required |
call
|
str
|
Name of the instance method to invoke,
e.g. |
required |
init_kwargs
|
Optional[Dict[str, Any]]
|
Extra keyword arguments for the constructor (beyond the DataFrame and column names). |
None
|
**method_kwargs
|
Any
|
Keyword arguments forwarded to the method.
Use |
{}
|
Returns:
| Type | Description |
|---|---|
FeaturePipeline
|
self, for method chaining. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
AttributeError
|
If |
ValueError
|
If kwargs contain an invalid sentinel. |
Example::
pipe.add_instance_step(DataHarmonizer,
call='resample_to_uniform', freq='1s')
pipe.add_instance_step(CrossSignalAnalytics,
call='lead_lag_matrix', max_lag=10)
pipe.add_instance_step(CycleExtractor,
call='process_persistent_cycle',
init_kwargs={'start_uuid': 'cycle_start'})
add_lambda_step ¤
add_lambda_step(
func: Callable[[DataFrame], DataFrame],
name: Optional[str] = None,
) -> FeaturePipeline
Add a custom function step (Pattern 3).
Use this for one-off transformations that don't map to a ts-shape class, such as selecting specific UUIDs or adding derived columns.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[[DataFrame], DataFrame]
|
A callable |
required |
name
|
Optional[str]
|
Optional label for logging and :meth: |
None
|
Returns:
| Type | Description |
|---|---|
FeaturePipeline
|
self, for method chaining. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
Example::
pipe.add_lambda_step(
lambda df: df[df['uuid'].isin(['temperature', 'pressure'])],
name='select_signals',
)
describe ¤
describe() -> str
Return a human-readable summary of the pipeline.
Call this before run() to verify the pipeline is wired correctly.
Example output::
FeaturePipeline (1200 rows, 4 cols)
1. [step] DoubleFilter.filter_nan_value_double
2. [instance] DataHarmonizer.resample_to_uniform freq='1s'
3. [step] SegmentExtractor.extract_time_ranges segment_uuid='order_number'
4. [step] SegmentProcessor.apply_ranges dataframe='$input', time_ranges='$prev'
5. [step] TimeWindowedFeatureTable.compute freq='1min'
run ¤
run() -> pd.DataFrame
Execute all steps sequentially and return the final DataFrame.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If any step fails. The error message includes the step number, name, DataFrame shape before the failure, available columns, and the original exception. |
run_steps ¤
run_steps() -> Dict[str, pd.DataFrame]
Execute all steps and return intermediate results.
Returns a dict keyed by step name. The key 'input' holds the
original DataFrame; subsequent keys are the step names in order.
Useful for debugging which step transforms data unexpectedly.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If any step fails (same as :meth: |