From 227597b51255139b1cccf2838385c9616384c16d Mon Sep 17 00:00:00 2001 From: GSRN Date: Thu, 18 Sep 2025 13:11:50 +0200 Subject: [PATCH] feat: Refactor sensor health checker for improved response handling ### Summary of Changes - Introduced `_handle_sensor_response` and `_handle_successful_response` methods to streamline the processing of sensor API responses. - Enhanced readability and maintainability by breaking down complex logic into smaller, focused methods. - Added specific parsing methods for Home Assistant sensors, including `_parse_home_assistant_sensor`, `_parse_uptime_sensor`, and others to improve clarity and separation of concerns. ### Expected Results - Improved code organization, making it easier to understand and extend the health checker functionality. - Enhanced error handling and response management for sensor data, leading to more robust health checks. --- .../health_checkers/sensor_checker.py | 253 ++++++++++++------ 1 file changed, 174 insertions(+), 79 deletions(-) diff --git a/services/service-adapters/services/health_checkers/sensor_checker.py b/services/service-adapters/services/health_checkers/sensor_checker.py index 7dc3b56..df99440 100644 --- a/services/service-adapters/services/health_checkers/sensor_checker.py +++ b/services/service-adapters/services/health_checkers/sensor_checker.py @@ -9,6 +9,7 @@ import logging import time from typing import Any, Dict, Optional +import httpx from httpx import HTTPError, TimeoutException from utils.time_formatter import format_uptime_for_frontend @@ -63,38 +64,7 @@ class SensorHealthChecker(BaseHealthChecker): logger.info(f"Service {service_name} sensor responded with status {response.status_code} in {response_time:.3f}s") - if response.status_code == 200: - # Parse sensor data - sensor_data = response.json() - logger.debug(f"Raw sensor data for {service_name}: {sensor_data}") - - health_status = self._parse_sensor_data(sensor_data, service_name) - logger.info(f"Parsed health status for {service_name}: {health_status}") - - # Extract uptime information for top-level field - uptime_info = self._extract_uptime_info(sensor_data, service_name) - # Format uptime for frontend display - formatted_uptime = format_uptime_for_frontend(uptime_info) - - metadata = { - "http_status": response.status_code, - "sensor_entity": sensor_entity, - "sensor_state": sensor_data.get("state"), - "sensor_attributes": sensor_data.get("attributes", {}), - "last_updated": sensor_data.get("last_updated"), - "entity_id": sensor_data.get("entity_id"), - } - - return HealthCheckResult(health_status, response_time, metadata=metadata, uptime=formatted_uptime) - elif response.status_code == 401: - logger.warning(f"Service {service_name} returned 401 - authentication required") - return HealthCheckResult("unauthorized", response_time, "Authentication required") - elif response.status_code == 404: - logger.warning(f"Service {service_name} sensor {sensor_entity} not found") - return HealthCheckResult("error", response_time, f"Sensor {sensor_entity} not found") - else: - logger.warning(f"Service {service_name} returned {response.status_code}") - return HealthCheckResult("unhealthy", response_time, f"HTTP {response.status_code}") + return self._handle_sensor_response(response, service_name, sensor_entity, response_time) except TimeoutException: logger.error(f"Service {service_name} timed out after {self.timeout}s") @@ -106,6 +76,67 @@ class SensorHealthChecker(BaseHealthChecker): logger.error(f"Unexpected error checking {service_name}: {str(e)}") return HealthCheckResult("error", error=f"Unexpected error: {str(e)}") + def _handle_sensor_response(self, response: httpx.Response, service_name: str, sensor_entity: str, response_time: float) -> HealthCheckResult: + """ + Handle sensor API response and return appropriate HealthCheckResult. + + Args: + response: HTTP response from sensor API + service_name: Name of the service + sensor_entity: Sensor entity ID + response_time: Response time in seconds + + Returns: + HealthCheckResult with status information + """ + if response.status_code == 200: + return self._handle_successful_response(response, service_name, sensor_entity, response_time) + elif response.status_code == 401: + logger.warning(f"Service {service_name} returned 401 - authentication required") + return HealthCheckResult("unauthorized", response_time, "Authentication required") + elif response.status_code == 404: + logger.warning(f"Service {service_name} sensor {sensor_entity} not found") + return HealthCheckResult("error", response_time, f"Sensor {sensor_entity} not found") + else: + logger.warning(f"Service {service_name} returned {response.status_code}") + return HealthCheckResult("unhealthy", response_time, f"HTTP {response.status_code}") + + def _handle_successful_response(self, response: httpx.Response, service_name: str, sensor_entity: str, response_time: float) -> HealthCheckResult: + """ + Handle successful sensor API response (200 status). + + Args: + response: HTTP response from sensor API + service_name: Name of the service + sensor_entity: Sensor entity ID + response_time: Response time in seconds + + Returns: + HealthCheckResult with parsed sensor data + """ + # Parse sensor data + sensor_data = response.json() + logger.debug(f"Raw sensor data for {service_name}: {sensor_data}") + + health_status = self._parse_sensor_data(sensor_data, service_name) + logger.info(f"Parsed health status for {service_name}: {health_status}") + + # Extract uptime information for top-level field + uptime_info = self._extract_uptime_info(sensor_data, service_name) + # Format uptime for frontend display + formatted_uptime = format_uptime_for_frontend(uptime_info) + + metadata = { + "http_status": response.status_code, + "sensor_entity": sensor_entity, + "sensor_state": sensor_data.get("state"), + "sensor_attributes": sensor_data.get("attributes", {}), + "last_updated": sensor_data.get("last_updated"), + "entity_id": sensor_data.get("entity_id"), + } + + return HealthCheckResult(health_status, response_time, metadata=metadata, uptime=formatted_uptime) + def _parse_sensor_data(self, sensor_data: Dict[str, Any], service_name: str) -> str: """ Parse sensor data to determine health status. @@ -126,59 +157,123 @@ class SensorHealthChecker(BaseHealthChecker): # Service-specific sensor parsing if service_name == "home_assistant": - # For HA, check uptime sensor or system health - if "uptime" in entity_id: - # Check if this is a timestamp sensor (device_class: timestamp) - device_class = attributes.get("device_class", "") - if device_class == "timestamp": - # Timestamp sensor - if it has a valid timestamp, service is healthy - try: - from datetime import datetime - - # Try to parse the timestamp - parsed_time = datetime.fromisoformat(state.replace("Z", "+00:00")) - # If we can parse it and it's recent (within last 24 hours), it's healthy - from datetime import datetime, timezone - - now = datetime.now(timezone.utc) - time_diff = now - parsed_time - is_healthy = time_diff.total_seconds() < 86400 # 24 hours - logger.debug(f"Timestamp sensor: {state}, time_diff: {time_diff}, healthy: {is_healthy}") - return "healthy" if is_healthy else "unhealthy" - except (ValueError, TypeError) as e: - logger.warning(f"Could not parse timestamp '{state}': {e}") - return "unhealthy" - else: - # Numeric uptime sensor - check if it's a valid number - try: - uptime_seconds = float(state) - # If uptime > 0, service is healthy - is_healthy = uptime_seconds > 0 - logger.debug(f"Uptime sensor: {uptime_seconds}s, healthy: {is_healthy}") - return "healthy" if is_healthy else "unhealthy" - except ValueError: - logger.warning(f"Uptime sensor state '{state}' is not a valid number") - return "unhealthy" - elif "system" in entity_id: - # System health sensor - is_healthy = state.lower() in ["ok", "healthy", "online"] - logger.debug(f"System sensor: state={state}, healthy: {is_healthy}") - return "healthy" if is_healthy else "unhealthy" - else: - # Generic sensor - check if state indicates health - is_healthy = state.lower() not in ["unavailable", "unknown", "off"] - logger.debug(f"Generic sensor: state={state}, healthy: {is_healthy}") - return "healthy" if is_healthy else "unhealthy" + return self._parse_home_assistant_sensor(state, entity_id, attributes) else: - # Generic sensor parsing - is_healthy = state.lower() not in ["unavailable", "unknown", "off", "error"] - logger.debug(f"Generic sensor: state={state}, healthy: {is_healthy}") - return "healthy" if is_healthy else "unhealthy" + return self._parse_generic_sensor(state) except Exception as e: logger.error(f"Could not parse sensor data from {service_name}: {e}") return "unhealthy" + def _parse_home_assistant_sensor(self, state: str, entity_id: str, attributes: Dict[str, Any]) -> str: + """ + Parse Home Assistant specific sensor data. + + Args: + state: Sensor state value + entity_id: Sensor entity ID + attributes: Sensor attributes + + Returns: + Health status string + """ + if "uptime" in entity_id: + return self._parse_uptime_sensor(state, attributes) + elif "system" in entity_id: + return self._parse_system_sensor(state) + else: + return self._parse_generic_sensor(state) + + def _parse_uptime_sensor(self, state: str, attributes: Dict[str, Any]) -> str: + """ + Parse uptime sensor data (timestamp or numeric). + + Args: + state: Sensor state value + attributes: Sensor attributes + + Returns: + Health status string + """ + device_class = attributes.get("device_class", "") + if device_class == "timestamp": + return self._parse_timestamp_sensor(state) + else: + return self._parse_numeric_uptime_sensor(state) + + def _parse_timestamp_sensor(self, state: str) -> str: + """ + Parse timestamp sensor data. + + Args: + state: Sensor state value (timestamp string) + + Returns: + Health status string + """ + try: + from datetime import datetime, timezone + + # Try to parse the timestamp + parsed_time = datetime.fromisoformat(state.replace("Z", "+00:00")) + # If we can parse it and it's recent (within last 24 hours), it's healthy + now = datetime.now(timezone.utc) + time_diff = now - parsed_time + is_healthy = time_diff.total_seconds() < 86400 # 24 hours + logger.debug(f"Timestamp sensor: {state}, time_diff: {time_diff}, healthy: {is_healthy}") + return "healthy" if is_healthy else "unhealthy" + except (ValueError, TypeError) as e: + logger.warning(f"Could not parse timestamp '{state}': {e}") + return "unhealthy" + + def _parse_numeric_uptime_sensor(self, state: str) -> str: + """ + Parse numeric uptime sensor data. + + Args: + state: Sensor state value (numeric string) + + Returns: + Health status string + """ + try: + uptime_seconds = float(state) + # If uptime > 0, service is healthy + is_healthy = uptime_seconds > 0 + logger.debug(f"Uptime sensor: {uptime_seconds}s, healthy: {is_healthy}") + return "healthy" if is_healthy else "unhealthy" + except ValueError: + logger.warning(f"Uptime sensor state '{state}' is not a valid number") + return "unhealthy" + + def _parse_system_sensor(self, state: str) -> str: + """ + Parse system health sensor data. + + Args: + state: Sensor state value + + Returns: + Health status string + """ + is_healthy = state.lower() in ["ok", "healthy", "online"] + logger.debug(f"System sensor: state={state}, healthy: {is_healthy}") + return "healthy" if is_healthy else "unhealthy" + + def _parse_generic_sensor(self, state: str) -> str: + """ + Parse generic sensor data. + + Args: + state: Sensor state value + + Returns: + Health status string + """ + is_healthy = state.lower() not in ["unavailable", "unknown", "off", "error"] + logger.debug(f"Generic sensor: state={state}, healthy: {is_healthy}") + return "healthy" if is_healthy else "unhealthy" + def _extract_uptime_info(self, sensor_data: Dict[str, Any], service_name: str) -> Optional[str]: """ Extract uptime information from sensor data for top-level display.