Skip to content

ts_shape.loader.timeseries.azure_blob_loader ¤

Classes:

  • AzureBlobParquetLoader

    Load parquet files from an Azure Blob Storage container filtered by a list of UUIDs.

AzureBlobParquetLoader ¤

AzureBlobParquetLoader(container_name: str, *, connection_string: Optional[str] = None, account_url: Optional[str] = None, credential: Optional[object] = None, prefix: str = '', max_workers: int = 8)

Load parquet files from an Azure Blob Storage container filtered by a list of UUIDs.

Optimized for speed by: - Using server-side prefix filtering when provided - Streaming blob listings and filtering client-side by UUID containment - Downloading and parsing parquet files concurrently

Parameters:

  • connection_string ¤

    (Optional[str], default: None ) –

    Azure Storage connection string.

  • container_name ¤

    (str) –

    Target container name.

  • prefix ¤

    (str, default: '' ) –

    Optional path prefix to narrow listing (e.g. "year/month/").

  • max_workers ¤

    (int, default: 8 ) –

    Max concurrent downloads/reads.

Methods:

Source code in src/ts_shape/loader/timeseries/azure_blob_loader.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __init__(
    self,
    container_name: str,
    *,
    connection_string: Optional[str] = None,
    account_url: Optional[str] = None,
    credential: Optional[object] = None,
    prefix: str = "",
    max_workers: int = 8,
) -> None:
    """
    Initialize the loader with Azure connection details.

    Args:
        connection_string: Azure Storage connection string.
        container_name: Target container name.
        prefix: Optional path prefix to narrow listing (e.g. "year/month/").
        max_workers: Max concurrent downloads/reads.
    """
    try:
        from azure.storage.blob import ContainerClient  # type: ignore
    except Exception as exc:  # pragma: no cover - import guard
        raise ImportError(
            "azure-storage-blob is required for AzureBlobParquetLoader. "
            "Install with `pip install azure-storage-blob`."
        ) from exc

    self._ContainerClient = ContainerClient
    # Prefer AAD credential path if account_url provided or credential is given
    if account_url or (credential is not None and not connection_string):
        if not account_url:
            raise ValueError("account_url must be provided when using AAD credential auth")
        if credential is None:
            raise ValueError("credential must be provided when using AAD credential auth")
        self.container_client = ContainerClient(account_url=account_url, container_name=container_name, credential=credential)
    else:
        if not connection_string:
            raise ValueError("Either connection_string or (account_url + credential) must be provided")
        self.container_client = ContainerClient.from_connection_string(
            conn_str=connection_string, container_name=container_name
        )
    self.prefix = prefix
    self.max_workers = max_workers if max_workers > 0 else 1

from_account_name classmethod ¤

from_account_name(account_name: str, container_name: str, *, credential: Optional[object] = None, endpoint_suffix: str = 'blob.core.windows.net', prefix: str = '', max_workers: int = 8) -> AzureBlobParquetLoader

Construct a loader using AAD credentials with an account name.

Parameters:

  • account_name ¤

    (str) –

    Storage account name.

  • container_name ¤

    (str) –

    Target container.

  • credential ¤

    (Optional[object], default: None ) –

    Optional Azure credential (DefaultAzureCredential if None).

  • endpoint_suffix ¤

    (str, default: 'blob.core.windows.net' ) –

    DNS suffix for the blob endpoint (e.g., for sovereign clouds).

  • prefix ¤

    (str, default: '' ) –

    Optional listing prefix (e.g., "parquet/").

  • max_workers ¤

    (int, default: 8 ) –

    Concurrency for downloads.

Source code in src/ts_shape/loader/timeseries/azure_blob_loader.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@classmethod
def from_account_name(
    cls,
    account_name: str,
    container_name: str,
    *,
    credential: Optional[object] = None,
    endpoint_suffix: str = "blob.core.windows.net",
    prefix: str = "",
    max_workers: int = 8,
) -> "AzureBlobParquetLoader":
    """
    Construct a loader using AAD credentials with an account name.

    Args:
        account_name: Storage account name.
        container_name: Target container.
        credential: Optional Azure credential (DefaultAzureCredential if None).
        endpoint_suffix: DNS suffix for the blob endpoint (e.g., for sovereign clouds).
        prefix: Optional listing prefix (e.g., "parquet/").
        max_workers: Concurrency for downloads.
    """
    account_url = f"https://{account_name}.{endpoint_suffix}"
    if credential is None:
        raise ValueError("credential must be provided when using AAD credential auth")
    return cls(
        container_name=container_name,
        account_url=account_url,
        credential=credential,
        prefix=prefix,
        max_workers=max_workers,
    )

list_structure ¤

list_structure(parquet_only: bool = True, limit: Optional[int] = None) -> Dict[str, List[str]]

List folder prefixes (hours) and blob names under the configured prefix.

Parameters:

  • parquet_only ¤

    (bool, default: True ) –

    If True, only include blobs ending with .parquet.

  • limit ¤

    (Optional[int], default: None ) –

    Optional cap on number of files collected for quick inspection.

Returns:

  • Dict[str, List[str]]

    A dict with:

  • Dict[str, List[str]]
    • folders: Sorted unique hour-level prefixes like 'parquet/YYYY/MM/DD/HH/'
  • Dict[str, List[str]]
    • files: Sorted blob names (full paths) matching the filter
Source code in src/ts_shape/loader/timeseries/azure_blob_loader.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def list_structure(self, parquet_only: bool = True, limit: Optional[int] = None) -> Dict[str, List[str]]:
    """
    List folder prefixes (hours) and blob names under the configured `prefix`.

    Args:
        parquet_only: If True, only include blobs ending with .parquet.
        limit: Optional cap on number of files collected for quick inspection.

    Returns:
        A dict with:
        - folders: Sorted unique hour-level prefixes like 'parquet/YYYY/MM/DD/HH/'
        - files: Sorted blob names (full paths) matching the filter
    """
    folders: Set[str] = set()
    files: List[str] = []
    collected = 0

    blob_iter = self.container_client.list_blobs(name_starts_with=self.prefix or None)
    for b in blob_iter:
        name = str(b.name)
        if parquet_only and not name.endswith(".parquet"):
            continue
        files.append(name)
        # Derive hour-level folder prefix
        if "/" in name:
            folders.add(name.rsplit("/", 1)[0].rstrip("/") + "/")
        collected += 1
        if limit is not None and collected >= limit:
            break

    return {
        "folders": sorted(folders),
        "files": sorted(files),
    }

load_all_files ¤

load_all_files() -> DataFrame

Load all parquet blobs in the container (optionally under prefix).

Returns:

  • DataFrame

    A concatenated DataFrame of all parquet blobs. Returns an empty DataFrame

  • DataFrame

    if none are found.

Source code in src/ts_shape/loader/timeseries/azure_blob_loader.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def load_all_files(self) -> pd.DataFrame:
    """
    Load all parquet blobs in the container (optionally under `prefix`).

    Returns:
        A concatenated DataFrame of all parquet blobs. Returns an empty DataFrame
        if none are found.
    """
    # List all parquet blob names using optional prefix for server-side filtering
    blob_iter = self.container_client.list_blobs(name_starts_with=self.prefix or None)
    blob_names = [b.name for b in blob_iter if str(b.name).endswith(".parquet")]  # type: ignore[attr-defined]
    if not blob_names:
        return pd.DataFrame()

    frames: List[pd.DataFrame] = []
    with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
        future_to_name = {executor.submit(self._download_parquet, name): name for name in blob_names}
        for future in as_completed(future_to_name):
            df = future.result()
            if df is not None and not df.empty:
                frames.append(df)

    return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()

load_by_time_range ¤

load_by_time_range(start_timestamp: str | Timestamp, end_timestamp: str | Timestamp) -> DataFrame

Load all parquet blobs under hourly folders within [start, end].

Assumes container structure: prefix/year/month/day/hour/{file}.parquet Listing is constrained per-hour for speed.

Source code in src/ts_shape/loader/timeseries/azure_blob_loader.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def load_by_time_range(self, start_timestamp: str | pd.Timestamp, end_timestamp: str | pd.Timestamp) -> pd.DataFrame:
    """
    Load all parquet blobs under hourly folders within [start, end].

    Assumes container structure: prefix/year/month/day/hour/{file}.parquet
    Listing is constrained per-hour for speed.
    """
    hour_prefixes = [self._hour_prefix(ts) for ts in self._hourly_slots(start_timestamp, end_timestamp)]
    blob_names: List[str] = []
    for pfx in hour_prefixes:
        blob_iter = self.container_client.list_blobs(name_starts_with=pfx)
        blob_names.extend([b.name for b in blob_iter if str(b.name).endswith(".parquet")])  # type: ignore[attr-defined]

    if not blob_names:
        return pd.DataFrame()

    frames: List[pd.DataFrame] = []
    with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
        future_to_name = {executor.submit(self._download_parquet, name): name for name in blob_names}
        for future in as_completed(future_to_name):
            df = future.result()
            if df is not None and not df.empty:
                frames.append(df)

    return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()

load_files_by_time_range_and_uuids ¤

load_files_by_time_range_and_uuids(start_timestamp: str | Timestamp, end_timestamp: str | Timestamp, uuid_list: List[str]) -> DataFrame

Load parquet blobs for given UUIDs within [start, end] hours using direct paths.

Assumes blob path pattern: prefix/YYYY/MM/DD/HH/{uuid}.parquet Avoids container-wide listing for maximum speed.

Source code in src/ts_shape/loader/timeseries/azure_blob_loader.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def load_files_by_time_range_and_uuids(
    self,
    start_timestamp: str | pd.Timestamp,
    end_timestamp: str | pd.Timestamp,
    uuid_list: List[str],
) -> pd.DataFrame:
    """
    Load parquet blobs for given UUIDs within [start, end] hours using direct paths.

    Assumes blob path pattern: prefix/YYYY/MM/DD/HH/{uuid}.parquet
    Avoids container-wide listing for maximum speed.
    """
    if not uuid_list:
        return pd.DataFrame()

    uuids = list(dict.fromkeys(uuid_list))  # preserve order, dedupe
    hour_prefixes = [self._hour_prefix(ts) for ts in self._hourly_slots(start_timestamp, end_timestamp)]

    # Build full blob names deterministically
    blob_names = [f"{pfx}{u}.parquet" for pfx in hour_prefixes for u in uuids]

    frames: List[pd.DataFrame] = []
    with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
        future_to_name = {executor.submit(self._download_parquet, name): name for name in blob_names}
        for future in as_completed(future_to_name):
            df = future.result()
            if df is not None and not df.empty:
                frames.append(df)

    return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()