Skip to content

azure_blob_loader

azure_blob_loader ¤

AzureBlobParquetLoader ¤

AzureBlobParquetLoader(
    container_name: str,
    *,
    connection_string: Optional[str] = None,
    account_url: Optional[str] = None,
    credential: Optional[object] = None,
    prefix: str = "",
    max_workers: int = 8,
    hour_pattern: str = "{Y}/{m}/{d}/{H}/"
)

Load parquet files from an Azure Blob Storage container filtered by a list of UUIDs.

Optimized for speed by: - Using server-side prefix filtering when provided - Streaming blob listings and filtering client-side by UUID containment - Downloading and parsing parquet files concurrently

Initialize the loader with Azure connection details.

Parameters:

Name Type Description Default
connection_string Optional[str]

Azure Storage connection string.

None
container_name str

Target container name.

required
prefix str

Optional path prefix to narrow listing (e.g. "year/month/").

''
max_workers int

Max concurrent downloads/reads.

8

from_account_name classmethod ¤

from_account_name(
    account_name: str,
    container_name: str,
    *,
    credential: Optional[object] = None,
    endpoint_suffix: str = "blob.core.windows.net",
    prefix: str = "",
    max_workers: int = 8
) -> AzureBlobParquetLoader

Construct a loader using AAD credentials with an account name.

Parameters:

Name Type Description Default
account_name str

Storage account name.

required
container_name str

Target container.

required
credential Optional[object]

Optional Azure credential (DefaultAzureCredential if None).

None
endpoint_suffix str

DNS suffix for the blob endpoint (e.g., for sovereign clouds).

'blob.core.windows.net'
prefix str

Optional listing prefix (e.g., "parquet/").

''
max_workers int

Concurrency for downloads.

8

load_all_files ¤

load_all_files() -> pd.DataFrame

Load all parquet blobs in the container (optionally under prefix).

Returns:

Type Description
DataFrame

A concatenated DataFrame of all parquet blobs. Returns an empty DataFrame

DataFrame

if none are found.

load_by_time_range ¤

load_by_time_range(
    start_timestamp: str | Timestamp,
    end_timestamp: str | Timestamp,
) -> pd.DataFrame

Load all parquet blobs under hourly folders within [start, end].

Assumes container structure: prefix/year/month/day/hour/{file}.parquet Listing is constrained per-hour for speed.

stream_by_time_range ¤

stream_by_time_range(
    start_timestamp: str | Timestamp,
    end_timestamp: str | Timestamp,
) -> Iterator[Tuple[str, pd.DataFrame]]

Stream parquet DataFrames under hourly folders within [start, end].

Yields (blob_name, DataFrame) one by one to avoid holding everything in memory.

load_files_by_time_range_and_uuids ¤

load_files_by_time_range_and_uuids(
    start_timestamp: str | Timestamp,
    end_timestamp: str | Timestamp,
    uuid_list: List[str],
) -> pd.DataFrame

Load parquet blobs for given UUIDs within [start, end] hours.

Strategy: 1) Construct direct blob paths assuming pattern prefix/YYYY/MM/DD/HH/{uuid}.parquet (fast path, no listing). 2) For robustness, also list each hour prefix and include any blob whose basename equals one of the requested UUID variants (handles case differences and extra subfolders below the hour level).

stream_files_by_time_range_and_uuids ¤

stream_files_by_time_range_and_uuids(
    start_timestamp: str | Timestamp,
    end_timestamp: str | Timestamp,
    uuid_list: List[str],
) -> Iterator[Tuple[str, pd.DataFrame]]

Stream parquet DataFrames for given UUIDs within [start, end] hours.

Yields (blob_name, DataFrame) as they arrive. Uses direct names plus per-hour listing fallback.

list_structure ¤

list_structure(
    parquet_only: bool = True, limit: Optional[int] = None
) -> Dict[str, List[str]]

List folder prefixes (hours) and blob names under the configured prefix.

Parameters:

Name Type Description Default
parquet_only bool

If True, only include blobs ending with .parquet.

True
limit Optional[int]

Optional cap on number of files collected for quick inspection.

None

Returns:

Type Description
Dict[str, List[str]]

A dict with:

Dict[str, List[str]]
  • folders: Sorted unique hour-level prefixes like 'parquet/YYYY/MM/DD/HH/'
Dict[str, List[str]]
  • files: Sorted blob names (full paths) matching the filter

AzureBlobFlexibleFileLoader ¤

AzureBlobFlexibleFileLoader(
    container_name: str,
    *,
    connection_string: Optional[str] = None,
    account_url: Optional[str] = None,
    credential: Optional[object] = None,
    prefix: str = "",
    max_workers: int = 8,
    hour_pattern: str = "{Y}/{m}/{d}/{H}/"
)

Load arbitrary file types from Azure Blob Storage under time-structured folders.

Designed for containers with paths like: prefix/YYYY/MM/DD/HH//file.ext This class lists by per-hour prefix and can filter by extensions and/or basenames, then downloads files concurrently as raw bytes.

list_files_by_time_range ¤

list_files_by_time_range(
    start_timestamp: str | Timestamp,
    end_timestamp: str | Timestamp,
    *,
    extensions: Optional[Iterable[str]] = None,
    limit: Optional[int] = None
) -> List[str]

List blob names under each hourly prefix within [start, end].

Parameters:

Name Type Description Default
extensions Optional[Iterable[str]]

Optional set/list of file extensions (e.g., {"json", ".bmp"}). Case-insensitive.

None
limit Optional[int]

Optional cap on number of files collected.

None

iter_file_names_by_time_range ¤

iter_file_names_by_time_range(
    start_timestamp: str | Timestamp,
    end_timestamp: str | Timestamp,
    *,
    extensions: Optional[Iterable[str]] = None
) -> Iterator[str]

Yield blob names under each hourly prefix within [start, end]. Uses server-side prefix listing and client-side extension filtering.

fetch_files_by_time_range ¤

fetch_files_by_time_range(
    start_timestamp: str | Timestamp,
    end_timestamp: str | Timestamp,
    *,
    extensions: Optional[Iterable[str]] = None,
    parse: bool = False
) -> Dict[str, Any]

Download files that match extensions within [start, end] hour prefixes. Returns a dict mapping blob_name -> parsed object (if parse=True and a parser exists), otherwise raw bytes.

stream_files_by_time_range ¤

stream_files_by_time_range(
    start_timestamp: str | Timestamp,
    end_timestamp: str | Timestamp,
    *,
    extensions: Optional[Iterable[str]] = None,
    parse: bool = False
) -> Iterator[Tuple[str, Any]]

Stream matching files as (blob_name, bytes-or-parsed) within [start, end]. Maintains up to max_workers concurrent downloads while yielding incrementally.

fetch_files_by_time_range_and_basenames ¤

fetch_files_by_time_range_and_basenames(
    start_timestamp: str | Timestamp,
    end_timestamp: str | Timestamp,
    basenames: Iterable[str],
    *,
    extensions: Optional[Iterable[str]] = None,
    parse: bool = False
) -> Dict[str, Any]

Download files whose basename (final path segment) is in basenames, optionally filtered by extensions, within [start, end] hour prefixes. Returns blob_name -> parsed object (if parse=True and a parser exists), otherwise raw bytes.

stream_files_by_time_range_and_basenames ¤

stream_files_by_time_range_and_basenames(
    start_timestamp: str | Timestamp,
    end_timestamp: str | Timestamp,
    basenames: Iterable[str],
    *,
    extensions: Optional[Iterable[str]] = None,
    parse: bool = False
) -> Iterator[Tuple[str, Any]]

Stream files whose basename is in basenames within [start, end]. Yields (blob_name, bytes-or-parsed) incrementally with bounded concurrency.