Skip to content

ts_shape.loader.timeseries.s3proxy_parquet_loader ¤

Classes:

  • S3ProxyDataAccess

    A class to access timeseries data via an S3 proxy. This class retrieves

S3ProxyDataAccess ¤

S3ProxyDataAccess(start_timestamp: str, end_timestamp: str, uuids: List[str], s3_config: Dict[str, str])

A class to access timeseries data via an S3 proxy. This class retrieves data for specified UUIDs within a defined time range, with the option to output data as Parquet files or as a single combined DataFrame.

:param end_timestamp: End timestamp in "Year-Month-Day Hour:Minute:Second" format. :param uuids: List of UUIDs to retrieve data for. :param s3_config: Configuration dictionary for S3 connection.

Methods:

Source code in src/ts_shape/loader/timeseries/s3proxy_parquet_loader.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def __init__(self, start_timestamp: str, end_timestamp: str, uuids: List[str], s3_config: Dict[str, str]):
    """
    Initialize the S3ProxyDataAccess object.
    :param start_timestamp: Start timestamp in "Year-Month-Day Hour:Minute:Second" format.
    :param end_timestamp: End timestamp in "Year-Month-Day Hour:Minute:Second" format.
    :param uuids: List of UUIDs to retrieve data for.
    :param s3_config: Configuration dictionary for S3 connection.
    """
    self.start_timestamp = start_timestamp
    self.end_timestamp = end_timestamp
    self.uuids = uuids
    self.s3_config = s3_config

    # Establish connection to S3 using provided configuration
    self.s3 = s3fs.S3FileSystem(
        endpoint_url=s3_config["endpoint_url"],
        key=s3_config["key"],
        secret=s3_config["secret"],
        use_ssl=s3_config["use_ssl"],
        version_aware=s3_config["version_aware"]
    )
    self.s3_path_base = s3_config["s3_path_base"]

fetch_data_as_dataframe ¤

fetch_data_as_dataframe() -> DataFrame

Retrieves timeseries data from S3 and returns it as a single DataFrame. :return: A combined DataFrame with data for all specified UUIDs and time slots.

Source code in src/ts_shape/loader/timeseries/s3proxy_parquet_loader.py
75
76
77
78
79
80
81
82
83
def fetch_data_as_dataframe(self) -> pd.DataFrame:
    """
    Retrieves timeseries data from S3 and returns it as a single DataFrame.
    :return: A combined DataFrame with data for all specified UUIDs and time slots.
    """
    data_frames = [self._fetch_parquet(uuid, timeslot_dir) 
                   for timeslot_dir in self._generate_timeslot_paths()
                   for uuid in set(self.uuids)]
    return pd.concat([df for df in data_frames if df is not None], ignore_index=True) if data_frames else pd.DataFrame()

fetch_data_as_parquet ¤

fetch_data_as_parquet(output_dir: str)

Retrieves timeseries data from S3 and saves it as Parquet files. Each file is saved in a directory structure of UUID/year/month/day/hour. :param output_dir: Base directory to save the Parquet files.

Source code in src/ts_shape/loader/timeseries/s3proxy_parquet_loader.py
61
62
63
64
65
66
67
68
69
70
71
72
73
def fetch_data_as_parquet(self, output_dir: str):
    """
    Retrieves timeseries data from S3 and saves it as Parquet files.
    Each file is saved in a directory structure of UUID/year/month/day/hour.
    :param output_dir: Base directory to save the Parquet files.
    """
    for timeslot_dir in self._generate_timeslot_paths():
        for uuid in set(self.uuids):
            df = self._fetch_parquet(uuid, timeslot_dir)
            if df is not None:
                output_path = Path(output_dir, timeslot_dir)
                output_path.mkdir(parents=True, exist_ok=True)
                df.to_parquet(output_path / f"{uuid}.parquet")