Transform Operations Demo¤
Demonstrates data transformation: numeric filtering, string filtering, datetime filtering, and arithmetic calculations.
Run it: python examples/transform_operations_demo.py
Modules demonstrated: NumericFilter, StringFilter, DateTimeFilter, BooleanFilter, NumericCalc
Related guides: Signal Conditioning
#!/usr/bin/env python3
"""
Transform Operations Demo for ts-shape
========================================
Demonstrates all transform classes with practical examples:
Filters:
- IntegerFilter / DoubleFilter (numeric_filter)
- StringFilter (string_filter)
- BooleanFilter / IsDeltaFilter (boolean_filter)
- DateTimeFilter (datetime_filter)
Calculators:
- IntegerCalc (numeric_calc)
Functions:
- LambdaProcessor (lambda_func)
Time functions:
- TimezoneShift (timezone_shift)
- TimestampConverter (timestamp_converter)
Scenario: An industrial IoT gateway collects heterogeneous sensor data from a
packaging line. Each row carries one of several uuid-tagged signals.
"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from ts_shape.transform.filter.numeric_filter import IntegerFilter, DoubleFilter
from ts_shape.transform.filter.string_filter import StringFilter
from ts_shape.transform.filter.boolean_filter import BooleanFilter, IsDeltaFilter
from ts_shape.transform.filter.datetime_filter import DateTimeFilter
from ts_shape.transform.calculator.numeric_calc import IntegerCalc
from ts_shape.transform.functions.lambda_func import LambdaProcessor
from ts_shape.transform.time_functions.timezone_shift import TimezoneShift
from ts_shape.transform.time_functions.timestamp_converter import TimestampConverter
# ---------------------------------------------------------------------------
# Helper: build a mixed-signal packaging-line DataFrame
# ---------------------------------------------------------------------------
def create_packaging_line_data(n_rows: int = 200) -> pd.DataFrame:
"""
Simulate heterogeneous IoT data from a packaging line.
Signals:
- 'conveyor_speed' : integer RPM values (value_integer)
- 'fill_weight' : double weight in grams (value_double)
- 'product_label' : string label identifier (value_string)
- 'lid_sealed' : boolean seal status (value_bool)
"""
np.random.seed(12)
start = datetime(2025, 7, 1, 8, 0, 0)
rows = []
for i in range(n_rows):
t = start + timedelta(seconds=15 * i)
# Conveyor speed (integer RPM, mostly 60, occasional 0 for stops)
rpm = 60 if np.random.random() > 0.05 else 0
rows.append({
"systime": t,
"uuid": "conveyor_speed",
"value_bool": None,
"value_integer": rpm,
"value_double": None,
"value_string": None,
"is_delta": True,
})
# Fill weight (double, target 500 g)
weight = 500.0 + np.random.normal(0, 2.5)
if i in (50, 120, 180):
weight = np.nan # simulate missing readings
rows.append({
"systime": t + timedelta(seconds=1),
"uuid": "fill_weight",
"value_bool": None,
"value_integer": None,
"value_double": round(weight, 3),
"value_string": None,
"is_delta": True,
})
# Product label string
labels = ["SKU-A100", "SKU-B200", "SKU-C300", "SKU-A100", "SKU-A100"]
label = np.random.choice(labels)
rows.append({
"systime": t + timedelta(seconds=2),
"uuid": "product_label",
"value_bool": None,
"value_integer": None,
"value_double": None,
"value_string": label,
"is_delta": True,
})
# Lid sealed boolean (True most of the time, occasional False)
sealed = True if np.random.random() > 0.08 else False
rows.append({
"systime": t + timedelta(seconds=3),
"uuid": "lid_sealed",
"value_bool": sealed,
"value_integer": None,
"value_double": None,
"value_string": None,
"is_delta": bool(i % 2 == 0), # alternate is_delta for demo
})
return pd.DataFrame(rows)
# ---------------------------------------------------------------------------
# Demo functions
# ---------------------------------------------------------------------------
def demo_numeric_filters(df: pd.DataFrame):
"""Filter numeric columns (integer and double)."""
print("=" * 72)
print("1. NUMERIC FILTERS")
print("=" * 72)
# --- Integer filters ---
speed_df = df[df["uuid"] == "conveyor_speed"].copy()
print(f"\nConveyor speed rows: {len(speed_df)}")
# Match a specific RPM value
stopped = IntegerFilter.filter_value_integer_match(speed_df, column_name="value_integer", integer_value=0)
print(f" Conveyor stopped (RPM=0): {len(stopped)} rows")
running = IntegerFilter.filter_value_integer_not_match(speed_df, column_name="value_integer", integer_value=0)
print(f" Conveyor running (RPM!=0): {len(running)} rows")
# Range filter
in_range = IntegerFilter.filter_value_integer_between(speed_df, column_name="value_integer", min_value=50, max_value=70)
print(f" RPM between 50 and 70: {len(in_range)} rows")
# --- Double filters ---
weight_df = df[df["uuid"] == "fill_weight"].copy()
print(f"\nFill weight rows: {len(weight_df)}")
# Remove NaN weights
valid_weights = DoubleFilter.filter_nan_value_double(weight_df, column_name="value_double")
print(f" Valid (non-NaN) weights: {len(valid_weights)} rows")
# Filter weights within an acceptable range
in_spec = DoubleFilter.filter_value_double_between(valid_weights, column_name="value_double", min_value=495.0, max_value=505.0)
print(f" Weights in spec (495-505 g): {len(in_spec)} rows")
print()
def demo_string_filters(df: pd.DataFrame):
"""Filter and inspect string columns."""
print("=" * 72)
print("2. STRING FILTERS")
print("=" * 72)
label_df = df[df["uuid"] == "product_label"].copy()
print(f"\nProduct label rows: {len(label_df)}")
# Remove NAs
clean = StringFilter.filter_na_value_string(label_df, column_name="value_string")
print(f" Non-null labels: {len(clean)}")
# Exact match
sku_a = StringFilter.filter_value_string_match(clean, string_value="SKU-A100", column_name="value_string")
print(f" SKU-A100 rows: {len(sku_a)}")
# Not match
not_a = StringFilter.filter_value_string_not_match(clean, string_value="SKU-A100", column_name="value_string")
print(f" Not SKU-A100: {len(not_a)}")
# Substring contains
b_products = StringFilter.filter_string_contains(clean, substring="B200", column_name="value_string")
print(f" Labels containing 'B200': {len(b_products)}")
# Detect label changes (e.g., product changeover)
changes = StringFilter.detect_changes_in_string(clean, column_name="value_string")
print(f" Label change events: {len(changes)}")
# Regex clean: strip the alphabetic prefix to keep only the numeric part
cleaned = StringFilter.regex_clean_value_string(
clean.copy(),
column_name="value_string",
regex_pattern=r"SKU-[A-Z]",
replacement="ITEM-",
)
print(f" After regex replace (first 5): {cleaned['value_string'].head(5).tolist()}")
print()
def demo_boolean_filters(df: pd.DataFrame):
"""Filter boolean and is_delta columns."""
print("=" * 72)
print("3. BOOLEAN / IS_DELTA FILTERS")
print("=" * 72)
seal_df = df[df["uuid"] == "lid_sealed"].copy().reset_index(drop=True)
print(f"\nLid sealed rows: {len(seal_df)}")
# is_delta filters
delta_true = IsDeltaFilter.filter_is_delta_true(seal_df)
delta_false = IsDeltaFilter.filter_is_delta_false(seal_df)
print(f" is_delta=True: {len(delta_true)} rows")
print(f" is_delta=False: {len(delta_false)} rows")
# Boolean edge detection: rising (False -> True) and falling (True -> False)
falling = BooleanFilter.filter_falling_value_bool(seal_df.copy(), column_name="value_bool")
print(f" Falling edges (seal lost): {len(falling)} rows")
raising = BooleanFilter.filter_raising_value_bool(seal_df.copy(), column_name="value_bool")
print(f" Rising edges (seal restored): {len(raising)} rows")
print()
def demo_datetime_filters(df: pd.DataFrame):
"""Filter rows by datetime ranges."""
print("=" * 72)
print("4. DATETIME FILTERS")
print("=" * 72)
# Ensure systime is datetime
df = df.copy()
df["systime"] = pd.to_datetime(df["systime"])
print(f"\nTotal rows: {len(df)}")
print(f"Time range: {df['systime'].min()} to {df['systime'].max()}")
# Filter rows after a specific date/time
after = DateTimeFilter.filter_after_datetime(df, column_name="systime", datetime="2025-07-01 08:30:00")
print(f" After 08:30:00: {len(after)} rows")
# Filter rows before a specific date/time
before = DateTimeFilter.filter_before_datetime(df, column_name="systime", datetime="2025-07-01 08:15:00")
print(f" Before 08:15:00: {len(before)} rows")
# Filter between two datetimes
between = DateTimeFilter.filter_between_datetimes(
df,
column_name="systime",
start_datetime="2025-07-01 08:10:00",
end_datetime="2025-07-01 08:20:00",
)
print(f" Between 08:10 and 08:20: {len(between)} rows")
# Date-level filters
after_date = DateTimeFilter.filter_after_date(df, column_name="systime", date="2025-06-30")
print(f" After 2025-06-30: {len(after_date)} rows (should be all)")
before_date = DateTimeFilter.filter_before_date(df, column_name="systime", date="2025-07-02")
print(f" Before 2025-07-02: {len(before_date)} rows (should be all)")
between_dates = DateTimeFilter.filter_between_dates(
df, column_name="systime", start_date="2025-06-30", end_date="2025-07-02"
)
print(f" Between 2025-06-30 and 2025-07-02: {len(between_dates)} rows")
print()
def demo_numeric_calc(df: pd.DataFrame):
"""Demonstrate numeric calculations on integer columns."""
print("=" * 72)
print("5. NUMERIC CALCULATIONS (IntegerCalc)")
print("=" * 72)
speed_df = df[df["uuid"] == "conveyor_speed"][["systime", "uuid", "value_integer"]].copy()
original_sum = speed_df["value_integer"].sum()
print(f"\nOriginal RPM sum: {original_sum}")
# Scale: convert RPM to radians/sec (multiply by 2*pi/60)
scaled = IntegerCalc.scale_column(speed_df.copy(), column_name="value_integer", factor=2 * np.pi / 60)
print(f" After scale (RPM -> rad/s), first 3 values: {[round(v, 4) for v in scaled['value_integer'].head(3).tolist()]}")
# Offset: add a calibration offset of +2 RPM
offset = IntegerCalc.offset_column(
df[df["uuid"] == "conveyor_speed"][["systime", "uuid", "value_integer"]].copy(),
column_name="value_integer",
offset_value=2,
)
print(f" After offset +2, first 3 values: {offset['value_integer'].head(3).tolist()}")
# Divide: convert RPM to RPS
divided = IntegerCalc.divide_column(
df[df["uuid"] == "conveyor_speed"][["systime", "uuid", "value_integer"]].copy(),
column_name="value_integer",
divisor=60,
)
print(f" After divide by 60 (RPM -> RPS), first 3 values: {divided['value_integer'].head(3).tolist()}")
# Combined multiply + add (linear transform: y = mx + b)
combined = IntegerCalc.calculate_with_fixed_factors(
df[df["uuid"] == "conveyor_speed"][["systime", "uuid", "value_integer"]].copy(),
column_name="value_integer",
multiply_factor=1.05,
add_factor=-3,
)
print(f" After y=1.05x-3, first 3 values: {combined['value_integer'].head(3).tolist()}")
# Power: square the values
powered = IntegerCalc.power_column(
df[df["uuid"] == "conveyor_speed"][["systime", "uuid", "value_integer"]].copy(),
column_name="value_integer",
power_value=2,
)
print(f" After power(2), first 3 values: {powered['value_integer'].head(3).tolist()}")
# Modulus
modded = IntegerCalc.mod_column(
df[df["uuid"] == "conveyor_speed"][["systime", "uuid", "value_integer"]].copy(),
column_name="value_integer",
mod_value=7,
)
print(f" After mod 7, first 3 values: {modded['value_integer'].head(3).tolist()}")
print()
def demo_lambda_processor(df: pd.DataFrame):
"""Demonstrate custom lambda/function application."""
print("=" * 72)
print("6. LAMBDA PROCESSOR")
print("=" * 72)
weight_df = df[df["uuid"] == "fill_weight"][["systime", "uuid", "value_double"]].copy()
weight_df = weight_df.dropna(subset=["value_double"])
print(f"\nFill weight rows (non-null): {len(weight_df)}")
# Apply a lambda to convert grams to kilograms
kg_df = LambdaProcessor.apply_function(
weight_df.copy(),
column_name="value_double",
func=lambda x: x / 1000.0,
)
print(f" After g -> kg, first 3: {[round(v, 5) for v in kg_df['value_double'].head(3).tolist()]}")
# Apply numpy log transform
log_df = LambdaProcessor.apply_function(
weight_df.copy(),
column_name="value_double",
func=lambda x: np.log(x),
)
print(f" After log transform, first 3: {[round(v, 4) for v in log_df['value_double'].head(3).tolist()]}")
# Apply a clipping function
clipped = LambdaProcessor.apply_function(
weight_df.copy(),
column_name="value_double",
func=lambda x: np.clip(x, 497.0, 503.0),
)
print(f" After clip [497, 503], min={clipped['value_double'].min():.1f}, max={clipped['value_double'].max():.1f}")
print()
def demo_timezone_shift(df: pd.DataFrame):
"""Demonstrate timezone conversion and related utilities."""
print("=" * 72)
print("7. TIMEZONE SHIFT")
print("=" * 72)
# Work with a small slice
small = df.head(12).copy()
small["systime"] = pd.to_datetime(small["systime"])
print(f"\nOriginal timezone-aware? {TimezoneShift.detect_timezone_awareness(small, 'systime')}")
# Shift from UTC to US Eastern
shifted = TimezoneShift.shift_timezone(
small.copy(),
time_column="systime",
input_timezone="UTC",
target_timezone="America/New_York",
)
print(f" After shift to US/Eastern:")
print(f" First timestamp: {shifted['systime'].iloc[0]}")
print(f" Timezone-aware? {TimezoneShift.detect_timezone_awareness(shifted, 'systime')}")
# Add a second timezone column without modifying the original
dual = TimezoneShift.add_timezone_column(
small.copy(),
time_column="systime",
input_timezone="UTC",
target_timezone="Europe/Berlin",
)
tz_cols = [c for c in dual.columns if "Europe" in c]
if tz_cols:
print(f" Added column: {tz_cols[0]}")
print(f" Original: {dual['systime'].iloc[0]}")
print(f" Berlin: {dual[tz_cols[0]].iloc[0]}")
# Revert to UTC
reverted = TimezoneShift.revert_to_original_timezone(shifted.copy(), "systime", "UTC")
print(f" Reverted to UTC: {reverted['systime'].iloc[0]}")
# Calculate time difference between two columns
dual2 = dual.copy()
if tz_cols:
diff_seconds = TimezoneShift.calculate_time_difference(dual2, "systime", tz_cols[0])
print(f" Time diff (UTC vs Berlin), first 3 (seconds): {diff_seconds.head(3).tolist()}")
print()
def demo_timestamp_converter():
"""Demonstrate high-precision timestamp conversion."""
print("=" * 72)
print("8. TIMESTAMP CONVERTER")
print("=" * 72)
# Create a small DataFrame with epoch nanosecond timestamps
epoch_ns = [
1751356800000000000, # 2025-07-01 00:00:00 UTC
1751356860000000000, # +60 seconds
1751356920000000000, # +120 seconds
]
ts_df = pd.DataFrame({
"systime": epoch_ns,
"uuid": "test_sensor",
"value_double": [1.0, 2.0, 3.0],
})
print(f"\nRaw epoch nanoseconds (first): {ts_df['systime'].iloc[0]}")
# Convert from nanoseconds to datetime in UTC
converted_utc = TimestampConverter.convert_to_datetime(ts_df.copy(), columns=["systime"], unit="ns", timezone="UTC")
print(f" After ns -> UTC datetime: {converted_utc['systime'].iloc[0]}")
# Convert from nanoseconds directly to US/Pacific
converted_pac = TimestampConverter.convert_to_datetime(ts_df.copy(), columns=["systime"], unit="ns", timezone="US/Pacific")
print(f" After ns -> US/Pacific: {converted_pac['systime'].iloc[0]}")
# Demonstrate seconds-based conversion
epoch_s = [ts / 1e9 for ts in epoch_ns]
ts_df_s = pd.DataFrame({
"systime": epoch_s,
"uuid": "test_sensor",
"value_double": [1.0, 2.0, 3.0],
})
converted_s = TimestampConverter.convert_to_datetime(ts_df_s.copy(), columns=["systime"], unit="s", timezone="Europe/London")
print(f" After s -> Europe/London: {converted_s['systime'].iloc[0]}")
print()
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
if __name__ == "__main__":
df = create_packaging_line_data()
print(f"Generated {len(df)} rows of packaging-line IoT data\n")
demo_numeric_filters(df)
demo_string_filters(df)
demo_boolean_filters(df)
demo_datetime_filters(df)
demo_numeric_calc(df)
demo_lambda_processor(df)
demo_timezone_shift(df)
demo_timestamp_converter()
print("=" * 72)
print("Transform operations demo complete.")
print("=" * 72)