azure_blob_loader
azure_blob_loader ¤
AzureBlobParquetLoader ¤
AzureBlobParquetLoader(
container_name: str,
*,
connection_string: Optional[str] = None,
account_url: Optional[str] = None,
credential: Optional[object] = None,
prefix: str = "",
max_workers: int = 8,
hour_pattern: str = "{Y}/{m}/{d}/{H}/"
)
Load parquet files from an Azure Blob Storage container filtered by a list of UUIDs.
Optimized for speed by: - Using server-side prefix filtering when provided - Streaming blob listings and filtering client-side by UUID containment - Downloading and parsing parquet files concurrently
Initialize the loader with Azure connection details.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connection_string
|
Optional[str]
|
Azure Storage connection string. |
None
|
container_name
|
str
|
Target container name. |
required |
prefix
|
str
|
Optional path prefix to narrow listing (e.g. "year/month/"). |
''
|
max_workers
|
int
|
Max concurrent downloads/reads. |
8
|
from_account_name
classmethod
¤
from_account_name(
account_name: str,
container_name: str,
*,
credential: Optional[object] = None,
endpoint_suffix: str = "blob.core.windows.net",
prefix: str = "",
max_workers: int = 8
) -> AzureBlobParquetLoader
Construct a loader using AAD credentials with an account name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
account_name
|
str
|
Storage account name. |
required |
container_name
|
str
|
Target container. |
required |
credential
|
Optional[object]
|
Optional Azure credential (DefaultAzureCredential if None). |
None
|
endpoint_suffix
|
str
|
DNS suffix for the blob endpoint (e.g., for sovereign clouds). |
'blob.core.windows.net'
|
prefix
|
str
|
Optional listing prefix (e.g., "parquet/"). |
''
|
max_workers
|
int
|
Concurrency for downloads. |
8
|
load_all_files ¤
load_all_files() -> pd.DataFrame
Load all parquet blobs in the container (optionally under prefix).
Returns:
| Type | Description |
|---|---|
DataFrame
|
A concatenated DataFrame of all parquet blobs. Returns an empty DataFrame |
DataFrame
|
if none are found. |
load_by_time_range ¤
load_by_time_range(
start_timestamp: str | Timestamp,
end_timestamp: str | Timestamp,
) -> pd.DataFrame
Load all parquet blobs under hourly folders within [start, end].
Assumes container structure: prefix/year/month/day/hour/{file}.parquet Listing is constrained per-hour for speed.
stream_by_time_range ¤
stream_by_time_range(
start_timestamp: str | Timestamp,
end_timestamp: str | Timestamp,
) -> Iterator[Tuple[str, pd.DataFrame]]
Stream parquet DataFrames under hourly folders within [start, end].
Yields (blob_name, DataFrame) one by one to avoid holding everything in memory.
load_files_by_time_range_and_uuids ¤
load_files_by_time_range_and_uuids(
start_timestamp: str | Timestamp,
end_timestamp: str | Timestamp,
uuid_list: List[str],
) -> pd.DataFrame
Load parquet blobs for given UUIDs within [start, end] hours.
Strategy: 1) Construct direct blob paths assuming pattern prefix/YYYY/MM/DD/HH/{uuid}.parquet (fast path, no listing). 2) For robustness, also list each hour prefix and include any blob whose basename equals one of the requested UUID variants (handles case differences and extra subfolders below the hour level).
stream_files_by_time_range_and_uuids ¤
stream_files_by_time_range_and_uuids(
start_timestamp: str | Timestamp,
end_timestamp: str | Timestamp,
uuid_list: List[str],
) -> Iterator[Tuple[str, pd.DataFrame]]
Stream parquet DataFrames for given UUIDs within [start, end] hours.
Yields (blob_name, DataFrame) as they arrive. Uses direct names plus per-hour listing fallback.
list_structure ¤
list_structure(
parquet_only: bool = True, limit: Optional[int] = None
) -> Dict[str, List[str]]
List folder prefixes (hours) and blob names under the configured prefix.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
parquet_only
|
bool
|
If True, only include blobs ending with .parquet. |
True
|
limit
|
Optional[int]
|
Optional cap on number of files collected for quick inspection. |
None
|
Returns:
| Type | Description |
|---|---|
Dict[str, List[str]]
|
A dict with: |
Dict[str, List[str]]
|
|
Dict[str, List[str]]
|
|
AzureBlobFlexibleFileLoader ¤
AzureBlobFlexibleFileLoader(
container_name: str,
*,
connection_string: Optional[str] = None,
account_url: Optional[str] = None,
credential: Optional[object] = None,
prefix: str = "",
max_workers: int = 8,
hour_pattern: str = "{Y}/{m}/{d}/{H}/"
)
Load arbitrary file types from Azure Blob Storage under time-structured folders.
Designed for containers with paths like: prefix/YYYY/MM/DD/HH/
list_files_by_time_range ¤
list_files_by_time_range(
start_timestamp: str | Timestamp,
end_timestamp: str | Timestamp,
*,
extensions: Optional[Iterable[str]] = None,
limit: Optional[int] = None
) -> List[str]
List blob names under each hourly prefix within [start, end].
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
extensions
|
Optional[Iterable[str]]
|
Optional set/list of file extensions (e.g., {"json", ".bmp"}). Case-insensitive. |
None
|
limit
|
Optional[int]
|
Optional cap on number of files collected. |
None
|
iter_file_names_by_time_range ¤
iter_file_names_by_time_range(
start_timestamp: str | Timestamp,
end_timestamp: str | Timestamp,
*,
extensions: Optional[Iterable[str]] = None
) -> Iterator[str]
Yield blob names under each hourly prefix within [start, end]. Uses server-side prefix listing and client-side extension filtering.
fetch_files_by_time_range ¤
fetch_files_by_time_range(
start_timestamp: str | Timestamp,
end_timestamp: str | Timestamp,
*,
extensions: Optional[Iterable[str]] = None,
parse: bool = False
) -> Dict[str, Any]
Download files that match extensions within [start, end] hour prefixes. Returns a dict mapping blob_name -> parsed object (if parse=True and a parser exists), otherwise raw bytes.
stream_files_by_time_range ¤
stream_files_by_time_range(
start_timestamp: str | Timestamp,
end_timestamp: str | Timestamp,
*,
extensions: Optional[Iterable[str]] = None,
parse: bool = False
) -> Iterator[Tuple[str, Any]]
Stream matching files as (blob_name, bytes-or-parsed) within [start, end].
Maintains up to max_workers concurrent downloads while yielding incrementally.
fetch_files_by_time_range_and_basenames ¤
fetch_files_by_time_range_and_basenames(
start_timestamp: str | Timestamp,
end_timestamp: str | Timestamp,
basenames: Iterable[str],
*,
extensions: Optional[Iterable[str]] = None,
parse: bool = False
) -> Dict[str, Any]
Download files whose basename (final path segment) is in basenames,
optionally filtered by extensions, within [start, end] hour prefixes.
Returns blob_name -> parsed object (if parse=True and a parser exists), otherwise raw bytes.
stream_files_by_time_range_and_basenames ¤
stream_files_by_time_range_and_basenames(
start_timestamp: str | Timestamp,
end_timestamp: str | Timestamp,
basenames: Iterable[str],
*,
extensions: Optional[Iterable[str]] = None,
parse: bool = False
) -> Iterator[Tuple[str, Any]]
Stream files whose basename is in basenames within [start, end].
Yields (blob_name, bytes-or-parsed) incrementally with bounded concurrency.