Skip to content

Usage Examples¤

Practical examples for common timeseries data tasks.

Loading Data¤

From Parquet Files¤

from ts_shape.loader.timeseries.parquet_loader import ParquetLoader

# Load all parquet files from a directory
df = ParquetLoader.load_all_files("data/sensors/")

# Preview the data
print(df.head())
#          uuid                   systime  value_double
# 0  temperature  2024-01-01 00:00:00+00:00         23.5
# 1  temperature  2024-01-01 00:01:00+00:00         23.7
# 2     pressure  2024-01-01 00:00:00+00:00       1013.2

From Azure Blob Storage¤

from ts_shape.loader.timeseries.azure_blob_loader import AzureBlobLoader

loader = AzureBlobLoader(
    connection_string="DefaultEndpointsProtocol=https;...",
    container_name="timeseries",
    base_path="sensors/"
)

df = loader.fetch_data_as_dataframe(
    start_date="2024-01-01",
    end_date="2024-01-31"
)

From S3-Compatible Storage¤

from ts_shape.loader.timeseries.s3proxy_parquet_loader import S3ProxyParquetLoader

loader = S3ProxyParquetLoader(
    endpoint_url="https://s3.example.com",
    bucket="data-lake",
    prefix="timeseries/"
)

df = loader.fetch_data_as_dataframe()

Loading Metadata¤

from ts_shape.loader.metadata.metadata_json_loader import MetadataLoader

# Load signal metadata from JSON
meta = MetadataLoader("config/signals.json").to_df()

print(meta)
#          uuid         label    unit
# 0  temperature  Temperature  Celsius
# 1     pressure     Pressure     hPa

Combining Data¤

Merge Timeseries with Metadata¤

from ts_shape.loader.combine.integrator import DataIntegratorHybrid

# Combine timeseries with signal metadata
combined = DataIntegratorHybrid.combine_data(
    timeseries_sources=[ts_df],
    metadata_sources=[meta_df],
    join_key="uuid",
    merge_how="left"
)

print(combined.head())
#          uuid                   systime  value_double        label     unit
# 0  temperature  2024-01-01 00:00:00+00:00         23.5  Temperature  Celsius

Filter by Specific Signals¤

# Only load specific UUIDs
combined = DataIntegratorHybrid.combine_data(
    timeseries_sources=[ts_df],
    metadata_sources=[meta_df],
    uuids=["temperature", "humidity"],
    join_key="uuid"
)

Filtering Data¤

By Numeric Range¤

from ts_shape.transform.filter.numeric_filter import NumericFilter

# Keep values between 0 and 100
df = NumericFilter.filter_value_in_range(df, "value_double", min_value=0, max_value=100)

# Remove null values
df = NumericFilter.filter_not_null(df, "value_double")

# Keep values above threshold
df = NumericFilter.filter_greater_than(df, "value_double", threshold=50)

By Time Range¤

from ts_shape.transform.filter.datetime_filter import DateTimeFilter

# Filter to specific date range
df = DateTimeFilter.filter_between(
    df,
    column="systime",
    start_date="2024-01-01",
    end_date="2024-01-31"
)

# Keep only data after a date
df = DateTimeFilter.filter_after(df, "systime", "2024-06-01")

# Filter by hour of day (e.g., business hours)
df = DateTimeFilter.filter_by_hour_range(df, "systime", start_hour=9, end_hour=17)

By String Pattern¤

from ts_shape.transform.filter.string_filter import StringFilter

# Filter by exact match
df = StringFilter.filter_equals(df, "uuid", "temperature")

# Filter by pattern
df = StringFilter.filter_contains(df, "uuid", "sensor_")

# Filter by list of values
df = StringFilter.filter_in_list(df, "uuid", ["temp_1", "temp_2", "temp_3"])

By Boolean Flag¤

from ts_shape.transform.filter.boolean_filter import IsDeltaFilter

# Keep only delta values
df = IsDeltaFilter.filter_is_delta_true(df)

# Keep only absolute values
df = IsDeltaFilter.filter_is_delta_false(df)

Transforming Data¤

Timezone Conversion¤

from ts_shape.transform.time_functions.timezone_shift import TimezoneShift
from ts_shape.transform.time_functions.timestamp_converter import TimestampConverter

# Convert Unix timestamps to datetime
df = TimestampConverter.convert_to_datetime(
    df,
    columns=["systime"],
    unit="ns",
    timezone="UTC"
)

# Shift to local timezone
df = TimezoneShift.shift_timezone(
    df,
    time_column="systime",
    input_timezone="UTC",
    target_timezone="Europe/Berlin"
)

Numeric Calculations¤

from ts_shape.transform.calculator.numeric_calc import NumericCalc

# Add rolling average
df = NumericCalc.add_rolling_mean(df, "value_double", window=10)

# Add difference from previous value
df = NumericCalc.add_diff(df, "value_double")

# Normalize values (0-1 scale)
df = NumericCalc.normalize(df, "value_double")

Computing Statistics¤

Numeric Statistics¤

from ts_shape.features.stats.numeric_stats import NumericStatistics

stats = NumericStatistics(df, "value_double")

print(f"Count: {stats.count()}")
print(f"Mean: {stats.mean():.2f}")
print(f"Std: {stats.std():.2f}")
print(f"Min: {stats.min():.2f}")
print(f"Max: {stats.max():.2f}")
print(f"Median: {stats.median():.2f}")

# Get percentiles
print(f"P95: {stats.percentile(95):.2f}")
print(f"P99: {stats.percentile(99):.2f}")

Time Coverage Statistics¤

from ts_shape.features.stats.timestamp_stats import TimestampStatistics

time_stats = TimestampStatistics(df, "systime")

print(f"First: {time_stats.first()}")
print(f"Last: {time_stats.last()}")
print(f"Duration: {time_stats.duration()}")
print(f"Count: {time_stats.count()}")

String Value Counts¤

from ts_shape.features.stats.string_stats import StringStatistics

str_stats = StringStatistics(df, "uuid")

# Get value frequency
print(str_stats.value_counts())
#          uuid  count
# 0  temperature   1440
# 1     pressure   1440
# 2     humidity   1440

Detecting Events¤

Outlier Detection¤

from ts_shape.events.quality.outlier_detection import OutlierDetection

# Z-score based outliers (values > 3 std from mean)
outliers = OutlierDetection.detect_zscore_outliers(
    df,
    column="value_double",
    threshold=3.0
)

print(f"Found {len(outliers)} outliers")

# IQR-based outliers
outliers = OutlierDetection.detect_iqr_outliers(
    df,
    column="value_double",
    multiplier=1.5
)

Statistical Process Control¤

from ts_shape.events.quality.statistical_process_control import StatisticalProcessControl

spc = StatisticalProcessControl(df, value_column="value_double")

# Detect control limit violations
violations = spc.detect_control_violations(
    ucl=100,  # Upper control limit
    lcl=0     # Lower control limit
)

# Detect Western Electric rules violations
we_violations = spc.detect_western_electric_rules()

Tolerance Deviations¤

from ts_shape.events.quality.tolerance_deviation import ToleranceDeviation

# Find values outside specification limits
deviations = ToleranceDeviation.detect_out_of_tolerance(
    df,
    column="value_double",
    upper_limit=100,
    lower_limit=0
)

Complete Pipeline Example¤

import pandas as pd
from ts_shape.loader.timeseries.parquet_loader import ParquetLoader
from ts_shape.loader.metadata.metadata_json_loader import MetadataLoader
from ts_shape.loader.combine.integrator import DataIntegratorHybrid
from ts_shape.transform.filter.datetime_filter import DateTimeFilter
from ts_shape.transform.filter.numeric_filter import NumericFilter
from ts_shape.features.stats.numeric_stats import NumericStatistics
from ts_shape.events.quality.outlier_detection import OutlierDetection

# 1. Load data
print("Loading data...")
ts_df = ParquetLoader.load_all_files("data/sensors/")
meta_df = MetadataLoader("config/signals.json").to_df()

# 2. Combine with metadata
print("Combining with metadata...")
df = DataIntegratorHybrid.combine_data(
    timeseries_sources=[ts_df],
    metadata_sources=[meta_df],
    join_key="uuid"
)
print(f"  Total records: {len(df)}")

# 3. Filter to analysis period
print("Filtering...")
df = DateTimeFilter.filter_between(df, "systime", "2024-01-01", "2024-03-31")
df = NumericFilter.filter_not_null(df, "value_double")
print(f"  After filtering: {len(df)}")

# 4. Detect and remove outliers
print("Detecting outliers...")
outliers = OutlierDetection.detect_zscore_outliers(df, "value_double", threshold=3.0)
clean_df = df[~df.index.isin(outliers.index)]
print(f"  Outliers removed: {len(outliers)}")

# 5. Compute statistics per signal
print("\nStatistics by signal:")
for uuid in clean_df["uuid"].unique():
    signal_df = clean_df[clean_df["uuid"] == uuid]
    stats = NumericStatistics(signal_df, "value_double")
    print(f"  {uuid}:")
    print(f"    Count: {stats.count()}")
    print(f"    Mean: {stats.mean():.2f}")
    print(f"    Std: {stats.std():.2f}")
    print(f"    Range: [{stats.min():.2f}, {stats.max():.2f}]")

# 6. Export results
clean_df.to_parquet("output/clean_data.parquet")
print("\nExported to output/clean_data.parquet")

Output:

Loading data...
Combining with metadata...
  Total records: 125000
Filtering...
  After filtering: 98500
Detecting outliers...
  Outliers removed: 127

Statistics by signal:
  temperature:
    Count: 32850
    Mean: 23.45
    Std: 2.31
    Range: [18.20, 28.70]
  pressure:
    Count: 32800
    Mean: 1013.25
    Std: 5.67
    Range: [995.00, 1030.00]
  humidity:
    Count: 32723
    Mean: 65.30
    Std: 12.45
    Range: [35.00, 95.00]

Exported to output/clean_data.parquet


Production Traceability¤

Part Production Tracking¤

Track production quantities by part number with time-based aggregation.

from ts_shape.events.production.part_tracking import PartProductionTracking

tracker = PartProductionTracking(df)

# Production by part with hourly windows
hourly = tracker.production_by_part(
    part_id_uuid='part_number_signal',
    counter_uuid='counter_signal',
    window='1h'
)
#     window_start         part_number  quantity  first_count  last_count
# 0   2024-01-01 08:00:00  PART_A       150       1000        1150

# Daily production summary
daily = tracker.daily_production_summary(
    part_id_uuid='part_number_signal',
    counter_uuid='counter_signal'
)

# Total production for date range
totals = tracker.production_totals(
    part_id_uuid='part_number_signal',
    counter_uuid='counter_signal',
    start_date='2024-01-01',
    end_date='2024-01-31'
)

Quality Tracking (NOK/Scrap)¤

Track defective parts, first-pass yield, and defect reasons.

from ts_shape.events.production.quality_tracking import QualityTracking

tracker = QualityTracking(df, shift_definitions={
    "day": ("06:00", "14:00"),
    "afternoon": ("14:00", "22:00"),
    "night": ("22:00", "06:00"),
})

# NOK parts per shift with first-pass yield
shift_quality = tracker.nok_by_shift(
    ok_counter_uuid='good_parts',
    nok_counter_uuid='bad_parts'
)
#     date        shift    ok_parts  nok_parts  nok_rate_pct  first_pass_yield_pct
# 0   2024-01-01  day      450       12         2.6           97.4

# Quality by part number
part_quality = tracker.quality_by_part(
    ok_counter_uuid='good_parts',
    nok_counter_uuid='bad_parts',
    part_id_uuid='part_number'
)

# Pareto analysis of defect reasons
reasons = tracker.nok_by_reason(
    nok_counter_uuid='bad_parts',
    defect_reason_uuid='defect_code'
)

Cycle Time Tracking¤

Analyze cycle times with trend detection and slow cycle identification.

from ts_shape.events.production.cycle_time_tracking import CycleTimeTracking

tracker = CycleTimeTracking(df)

# Cycle times per part
cycles = tracker.cycle_time_by_part(
    part_id_uuid='part_number_signal',
    cycle_trigger_uuid='cycle_complete_signal'
)

# Statistics by part (min, avg, max, std, median)
stats = tracker.cycle_time_statistics(
    part_id_uuid='part_number_signal',
    cycle_trigger_uuid='cycle_complete_signal'
)

# Detect slow cycles (>1.5x median)
slow = tracker.detect_slow_cycles(
    part_id_uuid='part_number_signal',
    cycle_trigger_uuid='cycle_complete_signal',
    threshold_factor=1.5
)

# Trend analysis for specific part
trend = tracker.cycle_time_trend(
    part_id_uuid='part_number_signal',
    cycle_trigger_uuid='cycle_complete_signal',
    part_number='PART_A',
    window_size=20
)

Downtime Tracking¤

Track machine downtime by shift, reason, and availability trends.

from ts_shape.events.production.downtime_tracking import DowntimeTracking

tracker = DowntimeTracking(df)

# Downtime per shift with availability
shift_downtime = tracker.downtime_by_shift(
    state_uuid='machine_state',
    running_value='Running'
)
#     date        shift    downtime_minutes  uptime_minutes  availability_pct
# 0   2024-01-01  shift_1  45.2             434.8           90.6

# Downtime by reason (Pareto analysis)
reasons = tracker.downtime_by_reason(
    state_uuid='machine_state',
    reason_uuid='downtime_reason',
    stopped_value='Stopped'
)

# Top 5 downtime reasons
top_reasons = tracker.top_downtime_reasons(
    state_uuid='machine_state',
    reason_uuid='downtime_reason',
    top_n=5
)

# Availability trend over time
trend = tracker.availability_trend(
    state_uuid='machine_state',
    running_value='Running',
    window='1D'
)

Shift Reporting¤

Compare shift performance and track against targets.

from ts_shape.events.production.shift_reporting import ShiftReporting

reporter = ShiftReporting(df)

# Production per shift
shift_prod = reporter.shift_production(
    counter_uuid='counter_signal',
    part_id_uuid='part_number_signal'
)

# Compare shifts (last 7 days)
comparison = reporter.shift_comparison(counter_uuid='counter_signal', days=7)

# Track against targets
targets = reporter.shift_targets(
    counter_uuid='counter_signal',
    targets={'shift_1': 450, 'shift_2': 450, 'shift_3': 400}
)

# Best and worst shifts
results = reporter.best_and_worst_shifts(counter_uuid='counter_signal')

Machine State Events¤

Detect run/idle intervals and state transitions.

from ts_shape.events.production.machine_state import MachineStateEvents

state = MachineStateEvents(df, run_state_uuid='machine_running')

# Run/idle intervals with minimum duration
intervals = state.detect_run_idle(min_duration='30s')

# State transitions
transitions = state.transition_events()

# Detect rapid state changes (suspicious)
rapid = state.detect_rapid_transitions(threshold='5s', min_count=3)

# Quality metrics
metrics = state.state_quality_metrics()
print(f"Run/Idle ratio: {metrics['run_idle_ratio']:.2f}")

Changeover Detection¤

Detect product/recipe changes and compute changeover windows.

from ts_shape.events.production.changeover import ChangeoverEvents

changeover = ChangeoverEvents(df)

# Detect changeovers
changes = changeover.detect_changeover(
    product_uuid='product_signal',
    min_hold='5m'
)

# Compute changeover windows (fixed duration)
windows = changeover.changeover_window(
    product_uuid='product_signal',
    until='fixed_window',
    config={'duration': '10m'}
)

# Compute changeover windows (stable band - waits for process stability)
windows = changeover.changeover_window(
    product_uuid='product_signal',
    until='stable_band',
    config={
        'metrics': [
            {'uuid': 'temperature', 'band': 2.0, 'hold': '2m'},
            {'uuid': 'pressure', 'band': 5.0, 'hold': '2m'},
        ],
        'reference_method': 'ewma'
    }
)

Engineering Events¤

Setpoint Change Analysis¤

Comprehensive setpoint change detection with control quality KPIs.

from ts_shape.events.engineering.setpoint_events import SetpointChangeEvents

setpoint = SetpointChangeEvents(df, setpoint_uuid='temperature_setpoint')

# Detect step changes
steps = setpoint.detect_setpoint_steps(min_delta=5.0, min_hold='30s')

# Detect ramp changes
ramps = setpoint.detect_setpoint_ramps(min_rate=0.1, min_duration='10s')

# Time to settle
settling = setpoint.time_to_settle(
    actual_uuid='temperature_actual',
    tol=1.0,
    hold='10s',
    lookahead='5m'
)

# Overshoot/undershoot metrics
overshoot = setpoint.overshoot_metrics(actual_uuid='temperature_actual')

# Rise time (10% to 90%)
rise = setpoint.rise_time(actual_uuid='temperature_actual')

# Comprehensive control quality metrics (all-in-one)
quality = setpoint.control_quality_metrics(
    actual_uuid='temperature_actual',
    tol=1.0,
    hold='10s'
)
# Returns: t_settle, rise_time, overshoot, undershoot, oscillations, decay_rate

Advanced Features¤

Cycle Extraction¤

ts-shape includes powerful cycle detection capabilities for industrial processes.

from ts_shape.features.cycles.cycles_extractor import CycleExtractor

# Initialize extractor with start/end signals
extractor = CycleExtractor(
    dataframe=df,
    start_uuid="cycle_start_signal",
    end_uuid="cycle_end_signal",
    value_change_threshold=0.1
)

# Get recommendations for best extraction method
suggestions = extractor.suggest_method()
print(f"Recommended: {suggestions['recommended_methods']}")
print(f"Reason: {suggestions['reasoning']}")

# Extract cycles using the recommended method
if 'process_persistent_cycle' in suggestions['recommended_methods']:
    cycles = extractor.process_persistent_cycle()
elif 'process_step_sequence' in suggestions['recommended_methods']:
    cycles = extractor.process_step_sequence(start_step=1, end_step=10)
else:
    cycles = extractor.process_value_change_cycle()

# Validate cycles
validated = extractor.validate_cycles(
    cycles,
    min_duration='1s',
    max_duration='1h'
)

# Detect and resolve overlapping cycles
clean_cycles = extractor.detect_overlapping_cycles(
    validated,
    resolve='keep_longest'
)

# Get extraction statistics
stats = extractor.get_extraction_stats()
print(f"Total: {stats['total_cycles']}, Complete: {stats['complete_cycles']}")

Advanced Outlier Detection¤

Multiple robust outlier detection methods beyond basic z-score.

from ts_shape.events.quality.outlier_detection import OutlierDetectionEvents

detector = OutlierDetectionEvents(
    dataframe=df,
    value_column="value_double",
    event_uuid="outlier_event",
    time_threshold="5min"
)

# Z-score method (standard)
outliers_zscore = detector.detect_outliers_zscore(threshold=3.0)

# IQR method (resistant to extreme values)
outliers_iqr = detector.detect_outliers_iqr(threshold=(1.5, 1.5))

# MAD method (most robust to outliers)
outliers_mad = detector.detect_outliers_mad(threshold=3.5)

# IsolationForest (machine learning based)
outliers_ml = detector.detect_outliers_isolation_forest(
    contamination=0.1,
    random_state=42
)

# All methods return severity scores
print(outliers_mad[['systime', 'value_double', 'severity_score']])

Statistical Process Control (SPC)¤

Full Western Electric Rules and CUSUM shift detection.

from ts_shape.events.quality.statistical_process_control import StatisticalProcessControlRuleBased

spc = StatisticalProcessControlRuleBased(
    dataframe=df,
    value_column="value_double",
    tolerance_uuid="control_limits",
    actual_uuid="measurements",
    event_uuid="spc_violation"
)

# Calculate control limits
limits = spc.calculate_control_limits()
print(f"Mean: {limits['mean'][0]:.2f}")
print(f"UCL (3σ): {limits['3sigma_upper'][0]:.2f}")
print(f"LCL (3σ): {limits['3sigma_lower'][0]:.2f}")

# Dynamic control limits (adapts over time)
dynamic_limits = spc.calculate_dynamic_control_limits(
    method='ewma',  # or 'moving_range'
    window=20
)

# Apply Western Electric Rules (all 8 rules)
violations = spc.apply_rules_vectorized()

# Or select specific rules
violations = spc.apply_rules_vectorized(
    selected_rules=['rule_1', 'rule_2', 'rule_3']
)

# Get human-readable interpretations
interpreted = spc.interpret_violations(violations)
print(interpreted[['systime', 'rule', 'interpretation', 'recommendation']])

# CUSUM shift detection (sensitive to small shifts)
shifts = spc.detect_cusum_shifts(
    k=0.5,    # Slack parameter
    h=5.0     # Decision threshold
)
print(shifts[['systime', 'shift_direction', 'severity']])

Process Capability Indices¤

Calculate Cp, Cpk, Pp, Ppk for quality assessment.

from ts_shape.events.quality.tolerance_deviation import ToleranceDeviationEvents

# With separate upper/lower tolerances
tolerance_checker = ToleranceDeviationEvents(
    dataframe=df,
    tolerance_column="value_double",
    actual_column="value_double",
    upper_tolerance_uuid="upper_spec_limit",
    lower_tolerance_uuid="lower_spec_limit",
    actual_uuid="measurements",
    event_uuid="deviation_event",
    warning_threshold=0.8  # 80% of tolerance = warning zone
)

# Calculate capability indices
capability = tolerance_checker.compute_capability_indices()
print(f"Cp:  {capability['Cp']:.3f}")   # Potential capability
print(f"Cpk: {capability['Cpk']:.3f}")  # Actual capability
print(f"Mean: {capability['mean']:.2f}")
print(f"Std:  {capability['std']:.2f}")

# Interpret results
if capability['Cpk'] >= 1.33:
    print("Process is capable")
elif capability['Cpk'] >= 1.0:
    print("Process needs improvement")
else:
    print("Process is not capable")

Custom Filtering with Query Syntax¤

Use pandas query syntax for flexible filtering.

from ts_shape.transform.filter.custom_filter import CustomFilter

# Complex multi-condition filtering
df = CustomFilter.filter_custom_conditions(
    df,
    "value_double > 50 and value_double < 100 and uuid == 'temperature'"
)

# With computed expressions
df = CustomFilter.filter_custom_conditions(
    df,
    "value_double > value_double.mean() * 1.5"
)

# Multiple OR conditions
df = CustomFilter.filter_custom_conditions(
    df,
    "uuid == 'temp_1' or uuid == 'temp_2' or uuid == 'temp_3'"
)

Lambda Processing¤

Apply custom transformations to columns.

from ts_shape.transform.functions.lambda_func import LambdaProcessor
import numpy as np

# Apply custom transformations
df = LambdaProcessor.apply_function(
    df,
    "value_double",
    lambda x: np.log1p(x)  # Log transform
)

# Scale values
df = LambdaProcessor.apply_function(
    df,
    "value_double",
    lambda x: (x - x.mean()) / x.std()  # Z-score normalization
)

# Clip extreme values
df = LambdaProcessor.apply_function(
    df,
    "value_double",
    lambda x: np.clip(x, 0, 100)
)

Next Steps¤