feat: Refactor sensor health checker for improved response handling
Some checks failed
Integration Tests / integration-tests (push) Failing after 19s
Integration Tests / performance-tests (push) Has been skipped
Service Adapters (Python FastAPI) / test (3.11) (push) Failing after 55s
Service Adapters (Python FastAPI) / test (3.12) (push) Failing after 1m0s
Service Adapters (Python FastAPI) / test (3.13) (push) Failing after 58s
Service Adapters (Python FastAPI) / build (push) Has been skipped

### Summary of Changes
- Introduced `_handle_sensor_response` and `_handle_successful_response` methods to streamline the processing of sensor API responses.
- Enhanced readability and maintainability by breaking down complex logic into smaller, focused methods.
- Added specific parsing methods for Home Assistant sensors, including `_parse_home_assistant_sensor`, `_parse_uptime_sensor`, and others to improve clarity and separation of concerns.

### Expected Results
- Improved code organization, making it easier to understand and extend the health checker functionality.
- Enhanced error handling and response management for sensor data, leading to more robust health checks.
This commit is contained in:
GSRN
2025-09-18 13:11:50 +02:00
parent 7eaea39928
commit 227597b512

View File

@@ -9,6 +9,7 @@ import logging
import time import time
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
import httpx
from httpx import HTTPError, TimeoutException from httpx import HTTPError, TimeoutException
from utils.time_formatter import format_uptime_for_frontend from utils.time_formatter import format_uptime_for_frontend
@@ -63,38 +64,7 @@ class SensorHealthChecker(BaseHealthChecker):
logger.info(f"Service {service_name} sensor responded with status {response.status_code} in {response_time:.3f}s") logger.info(f"Service {service_name} sensor responded with status {response.status_code} in {response_time:.3f}s")
if response.status_code == 200: return self._handle_sensor_response(response, service_name, sensor_entity, response_time)
# Parse sensor data
sensor_data = response.json()
logger.debug(f"Raw sensor data for {service_name}: {sensor_data}")
health_status = self._parse_sensor_data(sensor_data, service_name)
logger.info(f"Parsed health status for {service_name}: {health_status}")
# Extract uptime information for top-level field
uptime_info = self._extract_uptime_info(sensor_data, service_name)
# Format uptime for frontend display
formatted_uptime = format_uptime_for_frontend(uptime_info)
metadata = {
"http_status": response.status_code,
"sensor_entity": sensor_entity,
"sensor_state": sensor_data.get("state"),
"sensor_attributes": sensor_data.get("attributes", {}),
"last_updated": sensor_data.get("last_updated"),
"entity_id": sensor_data.get("entity_id"),
}
return HealthCheckResult(health_status, response_time, metadata=metadata, uptime=formatted_uptime)
elif response.status_code == 401:
logger.warning(f"Service {service_name} returned 401 - authentication required")
return HealthCheckResult("unauthorized", response_time, "Authentication required")
elif response.status_code == 404:
logger.warning(f"Service {service_name} sensor {sensor_entity} not found")
return HealthCheckResult("error", response_time, f"Sensor {sensor_entity} not found")
else:
logger.warning(f"Service {service_name} returned {response.status_code}")
return HealthCheckResult("unhealthy", response_time, f"HTTP {response.status_code}")
except TimeoutException: except TimeoutException:
logger.error(f"Service {service_name} timed out after {self.timeout}s") logger.error(f"Service {service_name} timed out after {self.timeout}s")
@@ -106,6 +76,67 @@ class SensorHealthChecker(BaseHealthChecker):
logger.error(f"Unexpected error checking {service_name}: {str(e)}") logger.error(f"Unexpected error checking {service_name}: {str(e)}")
return HealthCheckResult("error", error=f"Unexpected error: {str(e)}") return HealthCheckResult("error", error=f"Unexpected error: {str(e)}")
def _handle_sensor_response(self, response: httpx.Response, service_name: str, sensor_entity: str, response_time: float) -> HealthCheckResult:
"""
Handle sensor API response and return appropriate HealthCheckResult.
Args:
response: HTTP response from sensor API
service_name: Name of the service
sensor_entity: Sensor entity ID
response_time: Response time in seconds
Returns:
HealthCheckResult with status information
"""
if response.status_code == 200:
return self._handle_successful_response(response, service_name, sensor_entity, response_time)
elif response.status_code == 401:
logger.warning(f"Service {service_name} returned 401 - authentication required")
return HealthCheckResult("unauthorized", response_time, "Authentication required")
elif response.status_code == 404:
logger.warning(f"Service {service_name} sensor {sensor_entity} not found")
return HealthCheckResult("error", response_time, f"Sensor {sensor_entity} not found")
else:
logger.warning(f"Service {service_name} returned {response.status_code}")
return HealthCheckResult("unhealthy", response_time, f"HTTP {response.status_code}")
def _handle_successful_response(self, response: httpx.Response, service_name: str, sensor_entity: str, response_time: float) -> HealthCheckResult:
"""
Handle successful sensor API response (200 status).
Args:
response: HTTP response from sensor API
service_name: Name of the service
sensor_entity: Sensor entity ID
response_time: Response time in seconds
Returns:
HealthCheckResult with parsed sensor data
"""
# Parse sensor data
sensor_data = response.json()
logger.debug(f"Raw sensor data for {service_name}: {sensor_data}")
health_status = self._parse_sensor_data(sensor_data, service_name)
logger.info(f"Parsed health status for {service_name}: {health_status}")
# Extract uptime information for top-level field
uptime_info = self._extract_uptime_info(sensor_data, service_name)
# Format uptime for frontend display
formatted_uptime = format_uptime_for_frontend(uptime_info)
metadata = {
"http_status": response.status_code,
"sensor_entity": sensor_entity,
"sensor_state": sensor_data.get("state"),
"sensor_attributes": sensor_data.get("attributes", {}),
"last_updated": sensor_data.get("last_updated"),
"entity_id": sensor_data.get("entity_id"),
}
return HealthCheckResult(health_status, response_time, metadata=metadata, uptime=formatted_uptime)
def _parse_sensor_data(self, sensor_data: Dict[str, Any], service_name: str) -> str: def _parse_sensor_data(self, sensor_data: Dict[str, Any], service_name: str) -> str:
""" """
Parse sensor data to determine health status. Parse sensor data to determine health status.
@@ -126,59 +157,123 @@ class SensorHealthChecker(BaseHealthChecker):
# Service-specific sensor parsing # Service-specific sensor parsing
if service_name == "home_assistant": if service_name == "home_assistant":
# For HA, check uptime sensor or system health return self._parse_home_assistant_sensor(state, entity_id, attributes)
if "uptime" in entity_id:
# Check if this is a timestamp sensor (device_class: timestamp)
device_class = attributes.get("device_class", "")
if device_class == "timestamp":
# Timestamp sensor - if it has a valid timestamp, service is healthy
try:
from datetime import datetime
# Try to parse the timestamp
parsed_time = datetime.fromisoformat(state.replace("Z", "+00:00"))
# If we can parse it and it's recent (within last 24 hours), it's healthy
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
time_diff = now - parsed_time
is_healthy = time_diff.total_seconds() < 86400 # 24 hours
logger.debug(f"Timestamp sensor: {state}, time_diff: {time_diff}, healthy: {is_healthy}")
return "healthy" if is_healthy else "unhealthy"
except (ValueError, TypeError) as e:
logger.warning(f"Could not parse timestamp '{state}': {e}")
return "unhealthy"
else:
# Numeric uptime sensor - check if it's a valid number
try:
uptime_seconds = float(state)
# If uptime > 0, service is healthy
is_healthy = uptime_seconds > 0
logger.debug(f"Uptime sensor: {uptime_seconds}s, healthy: {is_healthy}")
return "healthy" if is_healthy else "unhealthy"
except ValueError:
logger.warning(f"Uptime sensor state '{state}' is not a valid number")
return "unhealthy"
elif "system" in entity_id:
# System health sensor
is_healthy = state.lower() in ["ok", "healthy", "online"]
logger.debug(f"System sensor: state={state}, healthy: {is_healthy}")
return "healthy" if is_healthy else "unhealthy"
else:
# Generic sensor - check if state indicates health
is_healthy = state.lower() not in ["unavailable", "unknown", "off"]
logger.debug(f"Generic sensor: state={state}, healthy: {is_healthy}")
return "healthy" if is_healthy else "unhealthy"
else: else:
# Generic sensor parsing return self._parse_generic_sensor(state)
is_healthy = state.lower() not in ["unavailable", "unknown", "off", "error"]
logger.debug(f"Generic sensor: state={state}, healthy: {is_healthy}")
return "healthy" if is_healthy else "unhealthy"
except Exception as e: except Exception as e:
logger.error(f"Could not parse sensor data from {service_name}: {e}") logger.error(f"Could not parse sensor data from {service_name}: {e}")
return "unhealthy" return "unhealthy"
def _parse_home_assistant_sensor(self, state: str, entity_id: str, attributes: Dict[str, Any]) -> str:
"""
Parse Home Assistant specific sensor data.
Args:
state: Sensor state value
entity_id: Sensor entity ID
attributes: Sensor attributes
Returns:
Health status string
"""
if "uptime" in entity_id:
return self._parse_uptime_sensor(state, attributes)
elif "system" in entity_id:
return self._parse_system_sensor(state)
else:
return self._parse_generic_sensor(state)
def _parse_uptime_sensor(self, state: str, attributes: Dict[str, Any]) -> str:
"""
Parse uptime sensor data (timestamp or numeric).
Args:
state: Sensor state value
attributes: Sensor attributes
Returns:
Health status string
"""
device_class = attributes.get("device_class", "")
if device_class == "timestamp":
return self._parse_timestamp_sensor(state)
else:
return self._parse_numeric_uptime_sensor(state)
def _parse_timestamp_sensor(self, state: str) -> str:
"""
Parse timestamp sensor data.
Args:
state: Sensor state value (timestamp string)
Returns:
Health status string
"""
try:
from datetime import datetime, timezone
# Try to parse the timestamp
parsed_time = datetime.fromisoformat(state.replace("Z", "+00:00"))
# If we can parse it and it's recent (within last 24 hours), it's healthy
now = datetime.now(timezone.utc)
time_diff = now - parsed_time
is_healthy = time_diff.total_seconds() < 86400 # 24 hours
logger.debug(f"Timestamp sensor: {state}, time_diff: {time_diff}, healthy: {is_healthy}")
return "healthy" if is_healthy else "unhealthy"
except (ValueError, TypeError) as e:
logger.warning(f"Could not parse timestamp '{state}': {e}")
return "unhealthy"
def _parse_numeric_uptime_sensor(self, state: str) -> str:
"""
Parse numeric uptime sensor data.
Args:
state: Sensor state value (numeric string)
Returns:
Health status string
"""
try:
uptime_seconds = float(state)
# If uptime > 0, service is healthy
is_healthy = uptime_seconds > 0
logger.debug(f"Uptime sensor: {uptime_seconds}s, healthy: {is_healthy}")
return "healthy" if is_healthy else "unhealthy"
except ValueError:
logger.warning(f"Uptime sensor state '{state}' is not a valid number")
return "unhealthy"
def _parse_system_sensor(self, state: str) -> str:
"""
Parse system health sensor data.
Args:
state: Sensor state value
Returns:
Health status string
"""
is_healthy = state.lower() in ["ok", "healthy", "online"]
logger.debug(f"System sensor: state={state}, healthy: {is_healthy}")
return "healthy" if is_healthy else "unhealthy"
def _parse_generic_sensor(self, state: str) -> str:
"""
Parse generic sensor data.
Args:
state: Sensor state value
Returns:
Health status string
"""
is_healthy = state.lower() not in ["unavailable", "unknown", "off", "error"]
logger.debug(f"Generic sensor: state={state}, healthy: {is_healthy}")
return "healthy" if is_healthy else "unhealthy"
def _extract_uptime_info(self, sensor_data: Dict[str, Any], service_name: str) -> Optional[str]: def _extract_uptime_info(self, sensor_data: Dict[str, Any], service_name: str) -> Optional[str]:
""" """
Extract uptime information from sensor data for top-level display. Extract uptime information from sensor data for top-level display.