Skip to content

ts_shape.loader.timeseries.azure_blob_loader ¤

Classes:

AzureBlobFlexibleFileLoader ¤

AzureBlobFlexibleFileLoader(container_name: str, *, connection_string: Optional[str] = None, account_url: Optional[str] = None, credential: Optional[object] = None, prefix: str = '', max_workers: int = 8, hour_pattern: str = '{Y}/{m}/{d}/{H}/')

Load arbitrary file types from Azure Blob Storage under time-structured folders.

Designed for containers with paths like: prefix/YYYY/MM/DD/HH//file.ext This class lists by per-hour prefix and can filter by extensions and/or basenames, then downloads files concurrently as raw bytes.

Methods:

Source code in src/ts_shape/loader/timeseries/azure_blob_loader.py
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
def __init__(
    self,
    container_name: str,
    *,
    connection_string: Optional[str] = None,
    account_url: Optional[str] = None,
    credential: Optional[object] = None,
    prefix: str = "",
    max_workers: int = 8,
    hour_pattern: str = "{Y}/{m}/{d}/{H}/",
) -> None:
    try:
        from azure.storage.blob import ContainerClient  # type: ignore
    except Exception as exc:  # pragma: no cover - import guard
        raise ImportError(
            "azure-storage-blob is required for AzureBlobFlexibleFileLoader. "
            "Install with `pip install azure-storage-blob`."
        ) from exc

    # Prefer AAD credential path if account_url provided or credential is given
    if account_url or (credential is not None and not connection_string):
        if not account_url:
            raise ValueError("account_url must be provided when using AAD credential auth")
        if credential is None:
            raise ValueError("credential must be provided when using AAD credential auth")
        self.container_client = ContainerClient(account_url=account_url, container_name=container_name, credential=credential)
    else:
        if not connection_string:
            raise ValueError("Either connection_string or (account_url + credential) must be provided")
        self.container_client = ContainerClient.from_connection_string(
            conn_str=connection_string, container_name=container_name
        )
    self.prefix = prefix
    self.max_workers = max_workers if max_workers > 0 else 1
    # Pattern for hour-level subpath; tokens: {Y} {m} {d} {H}
    self.hour_pattern = hour_pattern
    # Initialize parsers lazily once per process
    if not AzureBlobFlexibleFileLoader._parsers_initialized:
        self._enable_builtin_parsers()
        AzureBlobFlexibleFileLoader._parsers_initialized = True

fetch_files_by_time_range ¤

fetch_files_by_time_range(start_timestamp: str | Timestamp, end_timestamp: str | Timestamp, *, extensions: Optional[Iterable[str]] = None, parse: bool = False) -> Dict[str, Any]

Download files that match extensions within [start, end] hour prefixes. Returns a dict mapping blob_name -> parsed object (if parse=True and a parser exists), otherwise raw bytes.

Source code in src/ts_shape/loader/timeseries/azure_blob_loader.py
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
def fetch_files_by_time_range(
    self,
    start_timestamp: str | pd.Timestamp,
    end_timestamp: str | pd.Timestamp,
    *,
    extensions: Optional[Iterable[str]] = None,
    parse: bool = False,
) -> Dict[str, Any]:
    """
    Download files that match extensions within [start, end] hour prefixes.
    Returns a dict mapping blob_name -> parsed object (if parse=True and a parser exists),
    otherwise raw bytes.
    """
    blob_names = self.list_files_by_time_range(start_timestamp, end_timestamp, extensions=extensions)
    if not blob_names:
        return {}
    results: Dict[str, Any] = {}
    with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
        future_to_name = {executor.submit(self._download_bytes, n): n for n in blob_names}
        for fut in as_completed(future_to_name):
            name = future_to_name[fut]
            content = fut.result()
            if content is not None:
                results[name] = self._parse_bytes(name, content) if parse else content
    return results

fetch_files_by_time_range_and_basenames ¤

fetch_files_by_time_range_and_basenames(start_timestamp: str | Timestamp, end_timestamp: str | Timestamp, basenames: Iterable[str], *, extensions: Optional[Iterable[str]] = None, parse: bool = False) -> Dict[str, Any]

Download files whose basename (final path segment) is in basenames, optionally filtered by extensions, within [start, end] hour prefixes. Returns blob_name -> parsed object (if parse=True and a parser exists), otherwise raw bytes.

Source code in src/ts_shape/loader/timeseries/azure_blob_loader.py
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
def fetch_files_by_time_range_and_basenames(
    self,
    start_timestamp: str | pd.Timestamp,
    end_timestamp: str | pd.Timestamp,
    basenames: Iterable[str],
    *,
    extensions: Optional[Iterable[str]] = None,
    parse: bool = False,
) -> Dict[str, Any]:
    """
    Download files whose basename (final path segment) is in `basenames`,
    optionally filtered by extensions, within [start, end] hour prefixes.
    Returns blob_name -> parsed object (if parse=True and a parser exists), otherwise raw bytes.
    """
    base_set = {str(b).strip() for b in basenames if str(b).strip()}
    allowed_exts = self._normalize_exts(extensions)
    candidates: List[str] = []
    for pfx in (self._hour_prefix(ts) for ts in self._hourly_slots(start_timestamp, end_timestamp)):
        blob_iter = self.container_client.list_blobs(name_starts_with=pfx)
        for b in blob_iter:  # type: ignore[attr-defined]
            name = str(b.name)
            base = name.rsplit('/', 1)[-1]
            if base not in base_set:
                continue
            if allowed_exts is not None and not any(name.lower().endswith(ext) for ext in allowed_exts):
                continue
            candidates.append(name)

    if not candidates:
        return {}

    results: Dict[str, Any] = {}
    with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
        future_to_name = {executor.submit(self._download_bytes, n): n for n in candidates}
        for fut in as_completed(future_to_name):
            name = future_to_name[fut]
            content = fut.result()
            if content is not None:
                results[name] = self._parse_bytes(name, content) if parse else content
    return results

iter_file_names_by_time_range ¤

iter_file_names_by_time_range(start_timestamp: str | Timestamp, end_timestamp: str | Timestamp, *, extensions: Optional[Iterable[str]] = None) -> Iterator[str]

Yield blob names under each hourly prefix within [start, end]. Uses server-side prefix listing and client-side extension filtering.

Source code in src/ts_shape/loader/timeseries/azure_blob_loader.py
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
def iter_file_names_by_time_range(
    self,
    start_timestamp: str | pd.Timestamp,
    end_timestamp: str | pd.Timestamp,
    *,
    extensions: Optional[Iterable[str]] = None,
) -> Iterator[str]:
    """
    Yield blob names under each hourly prefix within [start, end].
    Uses server-side prefix listing and client-side extension filtering.
    """
    allowed_exts = self._normalize_exts(extensions)
    for pfx in (self._hour_prefix(ts) for ts in self._hourly_slots(start_timestamp, end_timestamp)):
        blob_iter = self.container_client.list_blobs(name_starts_with=pfx)
        for b in blob_iter:  # type: ignore[attr-defined]
            name = str(b.name)
            if allowed_exts is not None:
                if not any(name.lower().endswith(ext) for ext in allowed_exts):
                    continue
            yield name

list_files_by_time_range ¤

list_files_by_time_range(start_timestamp: str | Timestamp, end_timestamp: str | Timestamp, *, extensions: Optional[Iterable[str]] = None, limit: Optional[int] = None) -> List[str]

List blob names under each hourly prefix within [start, end].

Parameters:

  • extensions ¤

    (Optional[Iterable[str]], default: None ) –

    Optional set/list of file extensions (e.g., {"json", ".bmp"}). Case-insensitive.

  • limit ¤

    (Optional[int], default: None ) –

    Optional cap on number of files collected.

Source code in src/ts_shape/loader/timeseries/azure_blob_loader.py
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
def list_files_by_time_range(
    self,
    start_timestamp: str | pd.Timestamp,
    end_timestamp: str | pd.Timestamp,
    *,
    extensions: Optional[Iterable[str]] = None,
    limit: Optional[int] = None,
) -> List[str]:
    """
    List blob names under each hourly prefix within [start, end].

    Args:
        extensions: Optional set/list of file extensions (e.g., {"json", ".bmp"}). Case-insensitive.
        limit: Optional cap on number of files collected.
    """
    allowed_exts = self._normalize_exts(extensions)
    names: List[str] = []
    collected = 0
    for pfx in (self._hour_prefix(ts) for ts in self._hourly_slots(start_timestamp, end_timestamp)):
        blob_iter = self.container_client.list_blobs(name_starts_with=pfx)
        for b in blob_iter:  # type: ignore[attr-defined]
            name = str(b.name)
            if allowed_exts is not None:
                lower_name = name.lower()
                if not any(lower_name.endswith(ext) for ext in allowed_exts):
                    continue
            names.append(name)
            collected += 1
            if limit is not None and collected >= limit:
                return names
    return names

stream_files_by_time_range ¤

stream_files_by_time_range(start_timestamp: str | Timestamp, end_timestamp: str | Timestamp, *, extensions: Optional[Iterable[str]] = None, parse: bool = False) -> Iterator[Tuple[str, Any]]

Stream matching files as (blob_name, bytes-or-parsed) within [start, end]. Maintains up to max_workers concurrent downloads while yielding incrementally.

Source code in src/ts_shape/loader/timeseries/azure_blob_loader.py
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
def stream_files_by_time_range(
    self,
    start_timestamp: str | pd.Timestamp,
    end_timestamp: str | pd.Timestamp,
    *,
    extensions: Optional[Iterable[str]] = None,
    parse: bool = False,
) -> Iterator[Tuple[str, Any]]:
    """
    Stream matching files as (blob_name, bytes-or-parsed) within [start, end].
    Maintains up to `max_workers` concurrent downloads while yielding incrementally.
    """
    names_iter = self.iter_file_names_by_time_range(start_timestamp, end_timestamp, extensions=extensions)

    with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
        future_to_name: Dict[Any, str] = {}
        # initial fill
        try:
            while len(future_to_name) < self.max_workers:
                n = next(names_iter)
                future_to_name[executor.submit(self._download_bytes, n)] = n
        except StopIteration:
            pass

        while future_to_name:
            # Drain
            for fut in as_completed(list(future_to_name.keys())):
                name = future_to_name.pop(fut)
                try:
                    content = fut.result()
                except Exception:
                    content = None
                if content is not None:
                    yield (name, self._parse_bytes(name, content) if parse else content)

            # Refill
            try:
                while len(future_to_name) < self.max_workers:
                    n = next(names_iter)
                    future_to_name[executor.submit(self._download_bytes, n)] = n
            except StopIteration:
                pass

stream_files_by_time_range_and_basenames ¤

stream_files_by_time_range_and_basenames(start_timestamp: str | Timestamp, end_timestamp: str | Timestamp, basenames: Iterable[str], *, extensions: Optional[Iterable[str]] = None, parse: bool = False) -> Iterator[Tuple[str, Any]]

Stream files whose basename is in basenames within [start, end]. Yields (blob_name, bytes-or-parsed) incrementally with bounded concurrency.

Source code in src/ts_shape/loader/timeseries/azure_blob_loader.py
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
def stream_files_by_time_range_and_basenames(
    self,
    start_timestamp: str | pd.Timestamp,
    end_timestamp: str | pd.Timestamp,
    basenames: Iterable[str],
    *,
    extensions: Optional[Iterable[str]] = None,
    parse: bool = False,
) -> Iterator[Tuple[str, Any]]:
    """
    Stream files whose basename is in `basenames` within [start, end].
    Yields (blob_name, bytes-or-parsed) incrementally with bounded concurrency.
    """
    base_set = {str(b).strip() for b in basenames if str(b).strip()}
    allowed_exts = self._normalize_exts(extensions)

    def _names_iter() -> Iterator[str]:
        for pfx in (self._hour_prefix(ts) for ts in self._hourly_slots(start_timestamp, end_timestamp)):
            blob_iter = self.container_client.list_blobs(name_starts_with=pfx)
            for b in blob_iter:  # type: ignore[attr-defined]
                name = str(b.name)
                base = name.rsplit('/', 1)[-1]
                if base not in base_set:
                    continue
                if allowed_exts is not None and not any(name.lower().endswith(ext) for ext in allowed_exts):
                    continue
                yield name

    names_iter = _names_iter()
    with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
        future_to_name: Dict[Any, str] = {}
        # initial fill
        try:
            while len(future_to_name) < self.max_workers:
                n = next(names_iter)
                future_to_name[executor.submit(self._download_bytes, n)] = n
        except StopIteration:
            pass

        while future_to_name:
            for fut in as_completed(list(future_to_name.keys())):
                name = future_to_name.pop(fut)
                try:
                    content = fut.result()
                except Exception:
                    content = None
                if content is not None:
                    yield (name, self._parse_bytes(name, content) if parse else content)

            try:
                while len(future_to_name) < self.max_workers:
                    n = next(names_iter)
                    future_to_name[executor.submit(self._download_bytes, n)] = n
            except StopIteration:
                pass

AzureBlobParquetLoader ¤

AzureBlobParquetLoader(container_name: str, *, connection_string: Optional[str] = None, account_url: Optional[str] = None, credential: Optional[object] = None, prefix: str = '', max_workers: int = 8, hour_pattern: str = '{Y}/{m}/{d}/{H}/')

Load parquet files from an Azure Blob Storage container filtered by a list of UUIDs.

Optimized for speed by: - Using server-side prefix filtering when provided - Streaming blob listings and filtering client-side by UUID containment - Downloading and parsing parquet files concurrently

Parameters:

  • connection_string ¤

    (Optional[str], default: None ) –

    Azure Storage connection string.

  • container_name ¤

    (str) –

    Target container name.

  • prefix ¤

    (str, default: '' ) –

    Optional path prefix to narrow listing (e.g. "year/month/").

  • max_workers ¤

    (int, default: 8 ) –

    Max concurrent downloads/reads.

Methods:

Source code in src/ts_shape/loader/timeseries/azure_blob_loader.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def __init__(
    self,
    container_name: str,
    *,
    connection_string: Optional[str] = None,
    account_url: Optional[str] = None,
    credential: Optional[object] = None,
    prefix: str = "",
    max_workers: int = 8,
    hour_pattern: str = "{Y}/{m}/{d}/{H}/",
) -> None:
    """
    Initialize the loader with Azure connection details.

    Args:
        connection_string: Azure Storage connection string.
        container_name: Target container name.
        prefix: Optional path prefix to narrow listing (e.g. "year/month/").
        max_workers: Max concurrent downloads/reads.
    """
    try:
        from azure.storage.blob import ContainerClient  # type: ignore
    except Exception as exc:  # pragma: no cover - import guard
        raise ImportError(
            "azure-storage-blob is required for AzureBlobParquetLoader. "
            "Install with `pip install azure-storage-blob`."
        ) from exc

    self._ContainerClient = ContainerClient
    # Prefer AAD credential path if account_url provided or credential is given
    if account_url or (credential is not None and not connection_string):
        if not account_url:
            raise ValueError("account_url must be provided when using AAD credential auth")
        if credential is None:
            raise ValueError("credential must be provided when using AAD credential auth")
        self.container_client = ContainerClient(account_url=account_url, container_name=container_name, credential=credential)
    else:
        if not connection_string:
            raise ValueError("Either connection_string or (account_url + credential) must be provided")
        self.container_client = ContainerClient.from_connection_string(
            conn_str=connection_string, container_name=container_name
        )
    self.prefix = prefix
    self.max_workers = max_workers if max_workers > 0 else 1
    # Pattern for hour-level subpath; tokens: {Y} {m} {d} {H}
    # Default matches many data lake layouts: YYYY/MM/DD/HH/
    self.hour_pattern = hour_pattern

from_account_name classmethod ¤

from_account_name(account_name: str, container_name: str, *, credential: Optional[object] = None, endpoint_suffix: str = 'blob.core.windows.net', prefix: str = '', max_workers: int = 8) -> AzureBlobParquetLoader

Construct a loader using AAD credentials with an account name.

Parameters:

  • account_name ¤

    (str) –

    Storage account name.

  • container_name ¤

    (str) –

    Target container.

  • credential ¤

    (Optional[object], default: None ) –

    Optional Azure credential (DefaultAzureCredential if None).

  • endpoint_suffix ¤

    (str, default: 'blob.core.windows.net' ) –

    DNS suffix for the blob endpoint (e.g., for sovereign clouds).

  • prefix ¤

    (str, default: '' ) –

    Optional listing prefix (e.g., "parquet/").

  • max_workers ¤

    (int, default: 8 ) –

    Concurrency for downloads.

Source code in src/ts_shape/loader/timeseries/azure_blob_loader.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@classmethod
def from_account_name(
    cls,
    account_name: str,
    container_name: str,
    *,
    credential: Optional[object] = None,
    endpoint_suffix: str = "blob.core.windows.net",
    prefix: str = "",
    max_workers: int = 8,
) -> "AzureBlobParquetLoader":
    """
    Construct a loader using AAD credentials with an account name.

    Args:
        account_name: Storage account name.
        container_name: Target container.
        credential: Optional Azure credential (DefaultAzureCredential if None).
        endpoint_suffix: DNS suffix for the blob endpoint (e.g., for sovereign clouds).
        prefix: Optional listing prefix (e.g., "parquet/").
        max_workers: Concurrency for downloads.
    """
    account_url = f"https://{account_name}.{endpoint_suffix}"
    if credential is None:
        raise ValueError("credential must be provided when using AAD credential auth")
    return cls(
        container_name=container_name,
        account_url=account_url,
        credential=credential,
        prefix=prefix,
        max_workers=max_workers,
    )

list_structure ¤

list_structure(parquet_only: bool = True, limit: Optional[int] = None) -> Dict[str, List[str]]

List folder prefixes (hours) and blob names under the configured prefix.

Parameters:

  • parquet_only ¤

    (bool, default: True ) –

    If True, only include blobs ending with .parquet.

  • limit ¤

    (Optional[int], default: None ) –

    Optional cap on number of files collected for quick inspection.

Returns:

  • Dict[str, List[str]]

    A dict with:

  • Dict[str, List[str]]
    • folders: Sorted unique hour-level prefixes like 'parquet/YYYY/MM/DD/HH/'
  • Dict[str, List[str]]
    • files: Sorted blob names (full paths) matching the filter
Source code in src/ts_shape/loader/timeseries/azure_blob_loader.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
def list_structure(self, parquet_only: bool = True, limit: Optional[int] = None) -> Dict[str, List[str]]:
    """
    List folder prefixes (hours) and blob names under the configured `prefix`.

    Args:
        parquet_only: If True, only include blobs ending with .parquet.
        limit: Optional cap on number of files collected for quick inspection.

    Returns:
        A dict with:
        - folders: Sorted unique hour-level prefixes like 'parquet/YYYY/MM/DD/HH/'
        - files: Sorted blob names (full paths) matching the filter
    """
    folders: Set[str] = set()
    files: List[str] = []
    collected = 0

    blob_iter = self.container_client.list_blobs(name_starts_with=self.prefix or None)
    for b in blob_iter:
        name = str(b.name)
        if parquet_only and not name.endswith(".parquet"):
            continue
        files.append(name)
        # Derive hour-level folder prefix
        if "/" in name:
            folders.add(name.rsplit("/", 1)[0].rstrip("/") + "/")
        collected += 1
        if limit is not None and collected >= limit:
            break

    return {
        "folders": sorted(folders),
        "files": sorted(files),
    }

load_all_files ¤

load_all_files() -> DataFrame

Load all parquet blobs in the container (optionally under prefix).

Returns:

  • DataFrame

    A concatenated DataFrame of all parquet blobs. Returns an empty DataFrame

  • DataFrame

    if none are found.

Source code in src/ts_shape/loader/timeseries/azure_blob_loader.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def load_all_files(self) -> pd.DataFrame:
    """
    Load all parquet blobs in the container (optionally under `prefix`).

    Returns:
        A concatenated DataFrame of all parquet blobs. Returns an empty DataFrame
        if none are found.
    """
    # List all parquet blob names using optional prefix for server-side filtering
    blob_iter = self.container_client.list_blobs(name_starts_with=self.prefix or None)
    blob_names = [b.name for b in blob_iter if str(b.name).endswith(".parquet")]  # type: ignore[attr-defined]
    if not blob_names:
        return pd.DataFrame()

    frames: List[pd.DataFrame] = []
    with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
        future_to_name = {executor.submit(self._download_parquet, name): name for name in blob_names}
        for future in as_completed(future_to_name):
            df = future.result()
            if df is not None and not df.empty:
                frames.append(df)

    return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()

load_by_time_range ¤

load_by_time_range(start_timestamp: str | Timestamp, end_timestamp: str | Timestamp) -> DataFrame

Load all parquet blobs under hourly folders within [start, end].

Assumes container structure: prefix/year/month/day/hour/{file}.parquet Listing is constrained per-hour for speed.

Source code in src/ts_shape/loader/timeseries/azure_blob_loader.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def load_by_time_range(self, start_timestamp: str | pd.Timestamp, end_timestamp: str | pd.Timestamp) -> pd.DataFrame:
    """
    Load all parquet blobs under hourly folders within [start, end].

    Assumes container structure: prefix/year/month/day/hour/{file}.parquet
    Listing is constrained per-hour for speed.
    """
    hour_prefixes = [self._hour_prefix(ts) for ts in self._hourly_slots(start_timestamp, end_timestamp)]
    blob_names: List[str] = []
    for pfx in hour_prefixes:
        blob_iter = self.container_client.list_blobs(name_starts_with=pfx)
        blob_names.extend([b.name for b in blob_iter if str(b.name).endswith(".parquet")])  # type: ignore[attr-defined]

    if not blob_names:
        return pd.DataFrame()

    frames: List[pd.DataFrame] = []
    with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
        future_to_name = {executor.submit(self._download_parquet, name): name for name in blob_names}
        for future in as_completed(future_to_name):
            df = future.result()
            if df is not None and not df.empty:
                frames.append(df)

    return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()

load_files_by_time_range_and_uuids ¤

load_files_by_time_range_and_uuids(start_timestamp: str | Timestamp, end_timestamp: str | Timestamp, uuid_list: List[str]) -> DataFrame

Load parquet blobs for given UUIDs within [start, end] hours.

Strategy: 1) Construct direct blob paths assuming pattern prefix/YYYY/MM/DD/HH/{uuid}.parquet (fast path, no listing). 2) For robustness, also list each hour prefix and include any blob whose basename equals one of the requested UUID variants (handles case differences and extra subfolders below the hour level).

Source code in src/ts_shape/loader/timeseries/azure_blob_loader.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def load_files_by_time_range_and_uuids(
    self,
    start_timestamp: str | pd.Timestamp,
    end_timestamp: str | pd.Timestamp,
    uuid_list: List[str],
) -> pd.DataFrame:
    """
    Load parquet blobs for given UUIDs within [start, end] hours.

    Strategy:
    1) Construct direct blob paths assuming pattern prefix/YYYY/MM/DD/HH/{uuid}.parquet
       (fast path, no listing).
    2) For robustness, also list each hour prefix and include any blob whose basename
       equals one of the requested UUID variants (handles case differences and extra
       subfolders below the hour level).
    """
    if not uuid_list:
        return pd.DataFrame()

    # Sanitize and deduplicate UUIDs while preserving order
    def _clean_uuid(u: object) -> str:
        s = str(u).strip().strip("{}").strip()
        return s

    raw = [_clean_uuid(u) for u in uuid_list]
    # Include lowercase variants to be tolerant of case differences in filenames
    variants_ordered: List[str] = []
    seen: Set[str] = set()
    for u in raw:
        for v in (u, u.lower()):
            if v and v not in seen:
                seen.add(v)
                variants_ordered.append(v)

    hour_prefixes = [self._hour_prefix(ts) for ts in self._hourly_slots(start_timestamp, end_timestamp)]

    # 1) Fast path: build direct blob names
    direct_names = [f"{pfx}{u}.parquet" for pfx in hour_prefixes for u in variants_ordered]

    # 2) Robust path: list each hour prefix and filter by basename match
    basenames = {f"{u}.parquet" for u in variants_ordered}
    listed_names: List[str] = []
    try:
        for pfx in hour_prefixes:
            blob_iter = self.container_client.list_blobs(name_starts_with=pfx)
            for b in blob_iter:  # type: ignore[attr-defined]
                name = str(b.name)
                if not name.endswith(".parquet"):
                    continue
                base = name.rsplit("/", 1)[-1]
                if base in basenames:
                    listed_names.append(name)
    except Exception:
        # If listing fails for any reason, continue with direct names only
        pass

    # Merge and preserve order, avoid duplicates
    all_blob_names = list(dict.fromkeys([*direct_names, *listed_names]))

    if not all_blob_names:
        return pd.DataFrame()

    frames: List[pd.DataFrame] = []
    with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
        future_to_name = {executor.submit(self._download_parquet, name): name for name in all_blob_names}
        for future in as_completed(future_to_name):
            df = future.result()
            if df is not None and not df.empty:
                frames.append(df)

    return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()

stream_by_time_range ¤

stream_by_time_range(start_timestamp: str | Timestamp, end_timestamp: str | Timestamp) -> Iterator[Tuple[str, DataFrame]]

Stream parquet DataFrames under hourly folders within [start, end].

Yields (blob_name, DataFrame) one by one to avoid holding everything in memory.

Source code in src/ts_shape/loader/timeseries/azure_blob_loader.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def stream_by_time_range(self, start_timestamp: str | pd.Timestamp, end_timestamp: str | pd.Timestamp) -> Iterator[Tuple[str, pd.DataFrame]]:
    """
    Stream parquet DataFrames under hourly folders within [start, end].

    Yields (blob_name, DataFrame) one by one to avoid holding everything in memory.
    """
    hour_prefixes = [self._hour_prefix(ts) for ts in self._hourly_slots(start_timestamp, end_timestamp)]

    def _names_iter() -> Iterator[str]:
        for pfx in hour_prefixes:
            blob_iter = self.container_client.list_blobs(name_starts_with=pfx)
            for b in blob_iter:  # type: ignore[attr-defined]
                name = str(b.name)
                if name.endswith(".parquet"):
                    yield name

    names_iter = _names_iter()
    with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
        futures: Dict[Any, str] = {}
        # initial fill
        try:
            while len(futures) < self.max_workers:
                n = next(names_iter)
                futures[executor.submit(self._download_parquet, n)] = n
        except StopIteration:
            pass

        while futures:
            # Drain current batch
            for fut in as_completed(list(futures.keys())):
                name = futures.pop(fut)
                try:
                    df = fut.result()
                except Exception:
                    df = None
                if df is not None and not df.empty:
                    yield (name, df)

            # Refill
            try:
                while len(futures) < self.max_workers:
                    n = next(names_iter)
                    futures[executor.submit(self._download_parquet, n)] = n
            except StopIteration:
                pass

stream_files_by_time_range_and_uuids ¤

stream_files_by_time_range_and_uuids(start_timestamp: str | Timestamp, end_timestamp: str | Timestamp, uuid_list: List[str]) -> Iterator[Tuple[str, DataFrame]]

Stream parquet DataFrames for given UUIDs within [start, end] hours.

Yields (blob_name, DataFrame) as they arrive. Uses direct names plus per-hour listing fallback.

Source code in src/ts_shape/loader/timeseries/azure_blob_loader.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
def stream_files_by_time_range_and_uuids(
    self,
    start_timestamp: str | pd.Timestamp,
    end_timestamp: str | pd.Timestamp,
    uuid_list: List[str],
) -> Iterator[Tuple[str, pd.DataFrame]]:
    """
    Stream parquet DataFrames for given UUIDs within [start, end] hours.

    Yields (blob_name, DataFrame) as they arrive. Uses direct names plus per-hour listing fallback.
    """
    if not uuid_list:
        return iter(())

    def _clean_uuid(u: object) -> str:
        return str(u).strip().strip("{}").strip()

    raw = [_clean_uuid(u) for u in uuid_list]
    variants_ordered: List[str] = []
    seen: Set[str] = set()
    for u in raw:
        for v in (u, u.lower()):
            if v and v not in seen:
                seen.add(v)
                variants_ordered.append(v)

    hour_prefixes = [self._hour_prefix(ts) for ts in self._hourly_slots(start_timestamp, end_timestamp)]
    direct_names = [f"{pfx}{u}.parquet" for pfx in hour_prefixes for u in variants_ordered]

    basenames = {f"{u}.parquet" for u in variants_ordered}

    def _names_iter() -> Iterator[str]:
        # yield direct first
        yielded: Set[str] = set()
        for n in direct_names:
            yielded.add(n)
            yield n
        # then list per-hour
        for pfx in hour_prefixes:
            blob_iter = self.container_client.list_blobs(name_starts_with=pfx)
            for b in blob_iter:  # type: ignore[attr-defined]
                name = str(b.name)
                if not name.endswith(".parquet"):
                    continue
                base = name.rsplit("/", 1)[-1]
                if base in basenames and name not in yielded:
                    yielded.add(name)
                    yield name

    names_iter = _names_iter()
    with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
        futures: Dict[Any, str] = {}
        try:
            while len(futures) < self.max_workers:
                n = next(names_iter)
                futures[executor.submit(self._download_parquet, n)] = n
        except StopIteration:
            pass

        while futures:
            for fut in as_completed(list(futures.keys())):
                name = futures.pop(fut)
                try:
                    df = fut.result()
                except Exception:
                    df = None
                if df is not None and not df.empty:
                    yield (name, df)

            try:
                while len(futures) < self.max_workers:
                    n = next(names_iter)
                    futures[executor.submit(self._download_parquet, n)] = n
            except StopIteration:
                pass