Skip to content

Loader Usage Demoยค

Demonstrates data loading from parquet files, metadata enrichment, and data integration.

Run it: python examples/loader_usage_demo.py

Modules demonstrated: ParquetLoader, MetadataJsonLoader, DataIntegratorHybrid

Related guides: Data Acquisition


#!/usr/bin/env python3
"""
Demonstration of data loader utilities in ts-shape.

This script shows how to use:
1. ParquetLoader (constructor and usage patterns -- no actual files needed)
2. MetadataJsonLoader (load metadata from JSON strings)
3. DataIntegratorHybrid (combine timeseries + metadata DataFrames)
"""

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json
import sys
import os

# Add parent directory to path to import ts_shape
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))

from ts_shape.loader.timeseries.parquet_loader import ParquetLoader
from ts_shape.loader.metadata.metadata_json_loader import MetadataJsonLoader
from ts_shape.loader.combine.integrator import DataIntegratorHybrid


def create_timeseries_dataframe():
    """Create a synthetic timeseries DataFrame (simulating loaded parquet data)."""
    np.random.seed(42)
    start_time = datetime(2024, 1, 1, 8, 0, 0)
    rows = []

    uuids = ['sensor-temp-001', 'sensor-press-002', 'sensor-flow-003']

    for uid in uuids:
        for i in range(50):
            t = start_time + timedelta(minutes=i * 5)
            rows.append({
                'systime': t,
                'uuid': uid,
                'value_double': np.random.normal(100 if 'temp' in uid else 50, 5),
                'value_string': None,
                'value_bool': None,
                'is_delta': True,
            })

    return pd.DataFrame(rows)


def create_metadata_json_string():
    """Create a JSON string representing sensor metadata."""
    metadata = {
        "uuid": {
            "0": "sensor-temp-001",
            "1": "sensor-press-002",
            "2": "sensor-flow-003",
        },
        "label": {
            "0": "Temperature Sensor A",
            "1": "Pressure Sensor B",
            "2": "Flow Meter C",
        },
        "config": {
            "0": {"unit": "celsius", "location": "reactor_1", "min_range": 0, "max_range": 200},
            "1": {"unit": "bar", "location": "pipeline_3", "min_range": 0, "max_range": 10},
            "2": {"unit": "m3/h", "location": "inlet_valve", "min_range": 0, "max_range": 500},
        },
    }
    return json.dumps(metadata, indent=2)


def demo_parquet_loader():
    """Demo 1: ParquetLoader usage patterns."""
    print("\n" + "=" * 70)
    print("DEMO 1: ParquetLoader (Usage Patterns)")
    print("=" * 70)

    print("""
    ParquetLoader loads timeseries data from Parquet files organized in a
    directory structure: base_path/YYYY/MM/DD/HH/<uuid>.parquet

    NOTE: This demo shows the API patterns without loading actual files.

    --- Constructor ---
        loader = ParquetLoader(base_path='/data/timeseries/')

    --- Load All Files ---
        df = ParquetLoader.load_all_files('/data/timeseries/')
        # Returns a single DataFrame with all parquet data concatenated

    --- Load by Time Range ---
        df = ParquetLoader.load_by_time_range(
            base_path='/data/timeseries/',
            start_time=pd.Timestamp('2024-01-01'),
            end_time=pd.Timestamp('2024-01-02'),
        )
        # Filters files whose directory path (YYYY/MM/DD/HH) falls in range

    --- Load by UUID List ---
        df = ParquetLoader.load_by_uuid_list(
            base_path='/data/timeseries/',
            uuid_list=['sensor-temp-001', 'sensor-press-002'],
        )
        # Filters files whose filenames contain one of the specified UUIDs

    --- Load by Time Range + UUIDs ---
        df = ParquetLoader.load_files_by_time_range_and_uuids(
            base_path='/data/timeseries/',
            start_time=pd.Timestamp('2024-01-01'),
            end_time=pd.Timestamp('2024-01-02'),
            uuid_list=['sensor-temp-001'],
        )
        # Combines both filters: time range AND uuid matching
    """)

    # Show what the resulting DataFrame looks like
    print("--- Simulated Output (using synthetic data) ---")
    df = create_timeseries_dataframe()
    print(f"\n  Shape: {df.shape}")
    print(f"  UUIDs: {df['uuid'].unique().tolist()}")
    print(f"  Time range: {df['systime'].min()} to {df['systime'].max()}")
    print(f"\n  Sample rows:")
    print(df.head(6).to_string(index=False))


def demo_metadata_json_loader():
    """Demo 2: MetadataJsonLoader from JSON string."""
    print("\n" + "=" * 70)
    print("DEMO 2: MetadataJsonLoader")
    print("=" * 70)

    json_str = create_metadata_json_string()
    print(f"\n--- Input JSON ---")
    print(json_str)

    # Load from string
    print("\n--- Loading from JSON string ---")
    loader = MetadataJsonLoader.from_str(json_str)

    # View the DataFrame
    print("\n--- Metadata DataFrame ---")
    df = loader.to_df()
    print(df.to_string())

    # List UUIDs and labels
    print(f"\n--- UUIDs ---")
    print(f"  {loader.list_uuids()}")

    print(f"\n--- Labels ---")
    print(f"  {loader.list_labels()}")

    # Lookup by UUID
    print("\n--- Lookup by UUID ---")
    result = loader.get_by_uuid('sensor-temp-001')
    if result:
        print(f"  sensor-temp-001:")
        for key, value in result.items():
            print(f"    {key}: {value}")

    # Lookup by label
    print("\n--- Lookup by Label ---")
    result = loader.get_by_label('Pressure Sensor B')
    if result:
        print(f"  Pressure Sensor B:")
        for key, value in result.items():
            print(f"    {key}: {value}")

    # Filter by UUID set
    print("\n--- Filter by UUIDs ---")
    filtered = loader.filter_by_uuid(['sensor-temp-001', 'sensor-flow-003'])
    print(filtered.to_string())

    # Filter by label
    print("\n--- Filter by Labels ---")
    filtered = loader.filter_by_label(['Temperature Sensor A'])
    print(filtered.to_string())

    # Load from list-of-records format
    print("\n--- Alternative: Load from list of records ---")
    records = [
        {"uuid": "valve-001", "label": "Inlet Valve", "config": {"type": "butterfly", "size": "DN100"}},
        {"uuid": "valve-002", "label": "Outlet Valve", "config": {"type": "gate", "size": "DN150"}},
    ]
    loader2 = MetadataJsonLoader(records)
    print(loader2.to_df().to_string())

    # Load from columnar lists
    print("\n--- Alternative: Load from column lists ---")
    col_data = {
        "uuid": ["pump-01", "pump-02"],
        "label": ["Main Pump", "Backup Pump"],
        "config": [{"power_kw": 75, "type": "centrifugal"}, {"power_kw": 55, "type": "positive_displacement"}],
    }
    loader3 = MetadataJsonLoader(col_data)
    print(loader3.to_df().to_string())


def demo_data_integrator():
    """Demo 3: DataIntegratorHybrid combining timeseries and metadata."""
    print("\n" + "=" * 70)
    print("DEMO 3: DataIntegratorHybrid")
    print("=" * 70)

    # Create timeseries data
    ts_df = create_timeseries_dataframe()
    print(f"\n--- Timeseries Data ---")
    print(f"  Shape: {ts_df.shape}")
    print(f"  UUIDs: {ts_df['uuid'].unique().tolist()}")

    # Create metadata
    metadata_df = pd.DataFrame({
        'uuid': ['sensor-temp-001', 'sensor-press-002', 'sensor-flow-003'],
        'label': ['Temperature Sensor A', 'Pressure Sensor B', 'Flow Meter C'],
        'unit': ['celsius', 'bar', 'm3/h'],
        'location': ['reactor_1', 'pipeline_3', 'inlet_valve'],
    })
    print(f"\n--- Metadata ---")
    print(metadata_df.to_string(index=False))

    # Combine timeseries + metadata
    print("\n--- Combined Data (left join on uuid) ---")
    combined = DataIntegratorHybrid.combine_data(
        timeseries_sources=[ts_df],
        metadata_sources=[metadata_df],
        join_key='uuid',
        merge_how='left',
    )
    print(f"  Combined shape: {combined.shape}")
    print(f"  Columns: {list(combined.columns)}")
    print(f"\n  Sample rows:")
    print(combined.head(6).to_string(index=False))

    # Filter by specific UUIDs
    print("\n--- Combined + Filtered by UUIDs ---")
    filtered = DataIntegratorHybrid.combine_data(
        timeseries_sources=[ts_df],
        metadata_sources=[metadata_df],
        uuids=['sensor-temp-001'],
        join_key='uuid',
        merge_how='left',
    )
    print(f"  Filtered shape: {filtered.shape}")
    print(f"  UUIDs in result: {filtered['uuid'].unique().tolist()}")
    print(filtered.head(5).to_string(index=False))

    # Combine multiple timeseries sources
    print("\n--- Combining Multiple Timeseries Sources ---")
    ts_df_2 = pd.DataFrame({
        'systime': pd.to_datetime([datetime(2024, 1, 2, 8, 0) + timedelta(minutes=i * 5) for i in range(10)]),
        'uuid': 'sensor-temp-001',
        'value_double': np.random.normal(105, 3, 10),
        'is_delta': True,
    })
    multi_combined = DataIntegratorHybrid.combine_data(
        timeseries_sources=[ts_df, ts_df_2],
        metadata_sources=[metadata_df],
        join_key='uuid',
        merge_how='left',
    )
    print(f"  Combined from 2 sources: {multi_combined.shape}")
    temp_data = multi_combined[multi_combined['uuid'] == 'sensor-temp-001']
    print(f"  sensor-temp-001 rows: {len(temp_data)} (50 from source 1 + 10 from source 2)")

    # Integration with MetadataJsonLoader
    print("\n--- Integration with MetadataJsonLoader ---")
    json_str = create_metadata_json_string()
    loader = MetadataJsonLoader.from_str(json_str)
    meta_from_loader = loader.to_df().reset_index()  # bring uuid from index to column

    combined_with_loader = DataIntegratorHybrid.combine_data(
        timeseries_sources=[ts_df],
        metadata_sources=[meta_from_loader],
        join_key='uuid',
        merge_how='left',
    )
    print(f"  Shape: {combined_with_loader.shape}")
    print(f"  Columns: {list(combined_with_loader.columns)}")
    print(combined_with_loader.head(3).to_string(index=False))


def main():
    """Run all loader usage demonstrations."""
    print("\n" + "=" * 70)
    print("Loader Usage Demonstration")
    print("=" * 70)

    try:
        demo_parquet_loader()
        demo_metadata_json_loader()
        demo_data_integrator()

        print("\n" + "=" * 70)
        print("All demonstrations completed successfully!")
        print("=" * 70)

    except Exception as e:
        print(f"\nError during demonstration: {e}")
        import traceback
        traceback.print_exc()
        return 1

    return 0


if __name__ == "__main__":
    exit(main())