Data Acquisition¤
Connect to plant historians, data lakes, and metadata stores. Every loader returns a standard Pandas DataFrame.
Data Sources¤
flowchart LR
subgraph SOURCES["<b>Plant Data Sources</b>"]
direction TB
P["Parquet Files<br/><i>local / network</i>"]
S3["S3 Storage<br/><i>MinIO / AWS</i>"]
AZ["Azure Blob<br/><i>container layout</i>"]
TS["TimescaleDB<br/><i>SQL timeseries</i>"]
MD["Metadata JSON<br/><i>signal config</i>"]
end
subgraph LOAD["<b>ts-shape Loaders</b>"]
PL["ParquetLoader"]
S3L["S3ProxyParquetLoader"]
AZL["AzureBlobParquetLoader"]
TSL["TimescaleLoader"]
MDL["MetadataLoader"]
end
subgraph OUT["<b>Output</b>"]
DF["Pandas DataFrame<br/><code>uuid | systime | value_*</code>"]
end
P --> PL --> DF
S3 --> S3L --> DF
AZ --> AZL --> DF
TS --> TSL --> DF
MD --> MDL --> DF
style SOURCES fill:#0f2a3d,stroke:#38bdf8,color:#e0f2fe
style LOAD fill:#1a3a4a,stroke:#2dd4bf,color:#e0f2fe
style OUT fill:#1a3a4a,stroke:#f59e0b,color:#fef3c7
Loading from Parquet Files¤
The most common pattern for offline analysis — load all parquet files from a directory.
from ts_shape.loader.timeseries.parquet_loader import ParquetLoader
# Load all parquet files from a directory
df = ParquetLoader.load_all_files("data/sensors/")
print(df.head())
# uuid systime value_double
# 0 temperature 2024-01-01 00:00:00+00:00 23.5
# 1 temperature 2024-01-01 00:01:00+00:00 23.7
# 2 pressure 2024-01-01 00:00:00+00:00 1013.2
Loading from Azure Blob Storage¤
Three authentication methods are supported. Pick whichever matches your plant's IT setup.
Connect with a SAS URL (simplest)¤
from ts_shape.loader.timeseries.azure_blob_loader import AzureBlobParquetLoader
loader = AzureBlobParquetLoader(
sas_url="https://myaccount.blob.core.windows.net/timeseries?sv=2021-06-08&st=...&se=...&sr=c&sp=rl&sig=...",
prefix="parquet/",
)
Connect with a connection string¤
loader = AzureBlobParquetLoader(
connection_string="DefaultEndpointsProtocol=https;AccountName=...;AccountKey=...",
container_name="timeseries",
prefix="parquet/",
)
Connect with AAD credential¤
from azure.identity import DefaultAzureCredential
loader = AzureBlobParquetLoader(
account_url="https://myaccount.blob.core.windows.net",
container_name="timeseries",
credential=DefaultAzureCredential(),
prefix="parquet/",
)
Explore container structure¤
structure = loader.list_structure(limit=20)
print("Folders:", structure["folders"])
print("Files:", structure["files"])
Load by time range¤
# Requires time-structured folders: prefix/YYYY/MM/DD/HH/
df = loader.load_by_time_range("2024-01-15 08:00", "2024-01-15 12:00")
Load by time range and specific UUIDs¤
df = loader.load_files_by_time_range_and_uuids(
start_timestamp="2024-01-15 08:00",
end_timestamp="2024-01-15 12:00",
uuid_list=["temperature", "pressure", "humidity"],
)
Stream results (low memory)¤
for blob_name, chunk_df in loader.stream_by_time_range("2024-01-15 08:00", "2024-01-15 12:00"):
print(f"{blob_name}: {len(chunk_df)} rows")
process(chunk_df)
Non-parquet files (CSV, JSON, XML)¤
from ts_shape.loader.timeseries.azure_blob_loader import AzureBlobFlexibleFileLoader
loader = AzureBlobFlexibleFileLoader(
sas_url="https://myaccount.blob.core.windows.net/rawdata?sv=...&sig=...",
prefix="incoming/",
)
# List files by time range and extension
names = loader.list_files_by_time_range(
start_timestamp="2024-01-15 08:00",
end_timestamp="2024-01-15 12:00",
extensions=[".csv", ".json"],
)
# Download and auto-parse
results = loader.fetch_files_by_time_range(
start_timestamp="2024-01-15 08:00",
end_timestamp="2024-01-15 12:00",
extensions=[".csv"],
parse=True,
)
Loading from S3-Compatible Storage¤
from ts_shape.loader.timeseries.s3proxy_parquet_loader import S3ProxyParquetLoader
loader = S3ProxyParquetLoader(
endpoint_url="https://s3.example.com",
bucket="data-lake",
prefix="timeseries/"
)
df = loader.fetch_data_as_dataframe()
Loading Metadata¤
Signal metadata (names, units, configuration) stored in JSON files.
from ts_shape.loader.metadata.metadata_json_loader import MetadataLoader
meta = MetadataLoader("config/signals.json").to_df()
print(meta)
# uuid label unit
# 0 temperature Temperature Celsius
# 1 pressure Pressure hPa
Combining Timeseries with Metadata¤
Join signal data with metadata for enriched analysis.
from ts_shape.loader.combine.integrator import DataIntegratorHybrid
combined = DataIntegratorHybrid.combine_data(
timeseries_sources=[ts_df],
metadata_sources=[meta_df],
join_key="uuid",
merge_how="left"
)
print(combined.head())
# uuid systime value_double label unit
# 0 temperature 2024-01-01 00:00:00+00:00 23.5 Temperature Celsius
Filter by specific signals¤
combined = DataIntegratorHybrid.combine_data(
timeseries_sources=[ts_df],
metadata_sources=[meta_df],
uuids=["temperature", "humidity"],
join_key="uuid"
)
Next Steps¤
- Signal Conditioning — Clean and filter the loaded data
- API Reference — Full loader API documentation