timeseries_shaper.loader.metadata.metadata_db_loader
1import pandas as pd 2import psycopg2 3import json 4from typing import List, Dict 5 6 7class DatapointDB: 8 """ 9 Class for accessing datapoints via a database. 10 """ 11 12 def __init__(self, device_names: List[str], db_user: str, db_pass: str, db_host: str, output_path: str = "data", required_uuid_list: List[str] = None, filter_enabled: bool = True): 13 """ 14 Initialize the DatapointDB class. 15 16 :param device_names: List of device names to retrieve metadata for. 17 :param db_user: Database user. 18 :param db_pass: Database password. 19 :param db_host: Database host. 20 :param output_path: Directory to save JSON files. 21 :param required_uuid_list: List of UUIDs to filter the metadata (optional). 22 :param filter_enabled: Whether to filter metadata by "enabled == True" and "archived == False" (default is True). 23 """ 24 self.device_names = device_names 25 self.db_user = db_user 26 self.db_pass = db_pass 27 self.db_host = db_host 28 self.output_path = output_path 29 self.required_uuid_list = required_uuid_list or [] 30 self.filter_enabled = filter_enabled 31 self.device_metadata: Dict[str, pd.DataFrame] = {} # Store metadata for each device 32 self.device_uuids: Dict[str, List[str]] = {} # Store UUIDs for each device 33 self._db_access() 34 35 def _db_access(self) -> None: 36 """Connect to the database and retrieve metadata for each device.""" 37 conn = psycopg2.connect( 38 dbname="config_repository", 39 user=self.db_user, 40 password=self.db_pass, 41 host=self.db_host, 42 port=5432 43 ) 44 cursor = conn.cursor() 45 46 for device_name in self.device_names: 47 query = """ 48 SELECT dp.uuid, dp.label, dp.config 49 FROM data_points dp 50 INNER JOIN devices dev ON dev.id = dp.device_id 51 WHERE dev.name = %s 52 """ 53 if self.filter_enabled: 54 query += " AND dp.enabled = true AND dp.archived = false" 55 56 cursor.execute(query, (device_name,)) 57 data_points = [{"uuid": r[0], "label": r[1], "config": r[2]} for r in cursor.fetchall()] 58 59 # Convert to DataFrame and filter by required UUIDs if necessary 60 metadata_df = pd.DataFrame(data_points) 61 if not metadata_df.empty and self.required_uuid_list: 62 metadata_df = metadata_df[metadata_df["uuid"].isin(self.required_uuid_list)] 63 64 # Store metadata and UUIDs for the device 65 self.device_metadata[device_name] = metadata_df 66 self.device_uuids[device_name] = metadata_df["uuid"].tolist() 67 68 # Export to JSON file 69 self._export_json(metadata_df.to_dict(orient="records"), device_name) 70 71 conn.close() 72 73 def _export_json(self, data_points: List[Dict[str, str]], device_name: str) -> None: 74 """Export data points to a JSON file for the specified device.""" 75 file_name = f"{self.output_path}/{device_name.replace(' ', '_')}_data_points.json" 76 with open(file_name, 'w') as f: 77 json.dump(data_points, f, indent=2) 78 79 def get_all_uuids(self) -> Dict[str, List[str]]: 80 """Return a dictionary of UUIDs for each device.""" 81 return self.device_uuids 82 83 def get_all_metadata(self) -> Dict[str, List[Dict[str, str]]]: 84 """Return a dictionary of metadata for each device.""" 85 return {device: metadata.to_dict(orient="records") for device, metadata in self.device_metadata.items()} 86 87 def display_dataframe(self, device_name: str = None, aggregate: bool = False) -> None: 88 """ 89 Display metadata as a DataFrame for a specific device or all devices. 90 91 :param device_name: Name of the device to display metadata for (optional). 92 :param aggregate: If True, combine metadata from all devices into a single DataFrame. 93 """ 94 if aggregate: 95 combined_df = pd.concat(self.device_metadata.values(), keys=self.device_metadata.keys()) 96 print("Aggregated metadata for all devices:") 97 print(combined_df) 98 elif device_name: 99 if device_name in self.device_metadata: 100 print(f"Metadata for device: {device_name}") 101 print(self.device_metadata[device_name]) 102 else: 103 print(f"No metadata found for device: {device_name}") 104 else: 105 for device, metadata in self.device_metadata.items(): 106 print(f"\nMetadata for device: {device}") 107 print(metadata)
class
DatapointDB:
8class DatapointDB: 9 """ 10 Class for accessing datapoints via a database. 11 """ 12 13 def __init__(self, device_names: List[str], db_user: str, db_pass: str, db_host: str, output_path: str = "data", required_uuid_list: List[str] = None, filter_enabled: bool = True): 14 """ 15 Initialize the DatapointDB class. 16 17 :param device_names: List of device names to retrieve metadata for. 18 :param db_user: Database user. 19 :param db_pass: Database password. 20 :param db_host: Database host. 21 :param output_path: Directory to save JSON files. 22 :param required_uuid_list: List of UUIDs to filter the metadata (optional). 23 :param filter_enabled: Whether to filter metadata by "enabled == True" and "archived == False" (default is True). 24 """ 25 self.device_names = device_names 26 self.db_user = db_user 27 self.db_pass = db_pass 28 self.db_host = db_host 29 self.output_path = output_path 30 self.required_uuid_list = required_uuid_list or [] 31 self.filter_enabled = filter_enabled 32 self.device_metadata: Dict[str, pd.DataFrame] = {} # Store metadata for each device 33 self.device_uuids: Dict[str, List[str]] = {} # Store UUIDs for each device 34 self._db_access() 35 36 def _db_access(self) -> None: 37 """Connect to the database and retrieve metadata for each device.""" 38 conn = psycopg2.connect( 39 dbname="config_repository", 40 user=self.db_user, 41 password=self.db_pass, 42 host=self.db_host, 43 port=5432 44 ) 45 cursor = conn.cursor() 46 47 for device_name in self.device_names: 48 query = """ 49 SELECT dp.uuid, dp.label, dp.config 50 FROM data_points dp 51 INNER JOIN devices dev ON dev.id = dp.device_id 52 WHERE dev.name = %s 53 """ 54 if self.filter_enabled: 55 query += " AND dp.enabled = true AND dp.archived = false" 56 57 cursor.execute(query, (device_name,)) 58 data_points = [{"uuid": r[0], "label": r[1], "config": r[2]} for r in cursor.fetchall()] 59 60 # Convert to DataFrame and filter by required UUIDs if necessary 61 metadata_df = pd.DataFrame(data_points) 62 if not metadata_df.empty and self.required_uuid_list: 63 metadata_df = metadata_df[metadata_df["uuid"].isin(self.required_uuid_list)] 64 65 # Store metadata and UUIDs for the device 66 self.device_metadata[device_name] = metadata_df 67 self.device_uuids[device_name] = metadata_df["uuid"].tolist() 68 69 # Export to JSON file 70 self._export_json(metadata_df.to_dict(orient="records"), device_name) 71 72 conn.close() 73 74 def _export_json(self, data_points: List[Dict[str, str]], device_name: str) -> None: 75 """Export data points to a JSON file for the specified device.""" 76 file_name = f"{self.output_path}/{device_name.replace(' ', '_')}_data_points.json" 77 with open(file_name, 'w') as f: 78 json.dump(data_points, f, indent=2) 79 80 def get_all_uuids(self) -> Dict[str, List[str]]: 81 """Return a dictionary of UUIDs for each device.""" 82 return self.device_uuids 83 84 def get_all_metadata(self) -> Dict[str, List[Dict[str, str]]]: 85 """Return a dictionary of metadata for each device.""" 86 return {device: metadata.to_dict(orient="records") for device, metadata in self.device_metadata.items()} 87 88 def display_dataframe(self, device_name: str = None, aggregate: bool = False) -> None: 89 """ 90 Display metadata as a DataFrame for a specific device or all devices. 91 92 :param device_name: Name of the device to display metadata for (optional). 93 :param aggregate: If True, combine metadata from all devices into a single DataFrame. 94 """ 95 if aggregate: 96 combined_df = pd.concat(self.device_metadata.values(), keys=self.device_metadata.keys()) 97 print("Aggregated metadata for all devices:") 98 print(combined_df) 99 elif device_name: 100 if device_name in self.device_metadata: 101 print(f"Metadata for device: {device_name}") 102 print(self.device_metadata[device_name]) 103 else: 104 print(f"No metadata found for device: {device_name}") 105 else: 106 for device, metadata in self.device_metadata.items(): 107 print(f"\nMetadata for device: {device}") 108 print(metadata)
Class for accessing datapoints via a database.
DatapointDB( device_names: List[str], db_user: str, db_pass: str, db_host: str, output_path: str = 'data', required_uuid_list: List[str] = None, filter_enabled: bool = True)
13 def __init__(self, device_names: List[str], db_user: str, db_pass: str, db_host: str, output_path: str = "data", required_uuid_list: List[str] = None, filter_enabled: bool = True): 14 """ 15 Initialize the DatapointDB class. 16 17 :param device_names: List of device names to retrieve metadata for. 18 :param db_user: Database user. 19 :param db_pass: Database password. 20 :param db_host: Database host. 21 :param output_path: Directory to save JSON files. 22 :param required_uuid_list: List of UUIDs to filter the metadata (optional). 23 :param filter_enabled: Whether to filter metadata by "enabled == True" and "archived == False" (default is True). 24 """ 25 self.device_names = device_names 26 self.db_user = db_user 27 self.db_pass = db_pass 28 self.db_host = db_host 29 self.output_path = output_path 30 self.required_uuid_list = required_uuid_list or [] 31 self.filter_enabled = filter_enabled 32 self.device_metadata: Dict[str, pd.DataFrame] = {} # Store metadata for each device 33 self.device_uuids: Dict[str, List[str]] = {} # Store UUIDs for each device 34 self._db_access()
Initialize the DatapointDB class.
Parameters
- device_names: List of device names to retrieve metadata for.
- db_user: Database user.
- db_pass: Database password.
- db_host: Database host.
- output_path: Directory to save JSON files.
- required_uuid_list: List of UUIDs to filter the metadata (optional).
- filter_enabled: Whether to filter metadata by "enabled == True" and "archived == False" (default is True).
def
get_all_uuids(self) -> Dict[str, List[str]]:
80 def get_all_uuids(self) -> Dict[str, List[str]]: 81 """Return a dictionary of UUIDs for each device.""" 82 return self.device_uuids
Return a dictionary of UUIDs for each device.
def
get_all_metadata(self) -> Dict[str, List[Dict[str, str]]]:
84 def get_all_metadata(self) -> Dict[str, List[Dict[str, str]]]: 85 """Return a dictionary of metadata for each device.""" 86 return {device: metadata.to_dict(orient="records") for device, metadata in self.device_metadata.items()}
Return a dictionary of metadata for each device.
def
display_dataframe(self, device_name: str = None, aggregate: bool = False) -> None:
88 def display_dataframe(self, device_name: str = None, aggregate: bool = False) -> None: 89 """ 90 Display metadata as a DataFrame for a specific device or all devices. 91 92 :param device_name: Name of the device to display metadata for (optional). 93 :param aggregate: If True, combine metadata from all devices into a single DataFrame. 94 """ 95 if aggregate: 96 combined_df = pd.concat(self.device_metadata.values(), keys=self.device_metadata.keys()) 97 print("Aggregated metadata for all devices:") 98 print(combined_df) 99 elif device_name: 100 if device_name in self.device_metadata: 101 print(f"Metadata for device: {device_name}") 102 print(self.device_metadata[device_name]) 103 else: 104 print(f"No metadata found for device: {device_name}") 105 else: 106 for device, metadata in self.device_metadata.items(): 107 print(f"\nMetadata for device: {device}") 108 print(metadata)
Display metadata as a DataFrame for a specific device or all devices.
Parameters
- device_name: Name of the device to display metadata for (optional).
- aggregate: If True, combine metadata from all devices into a single DataFrame.