fix: Clean up whitespace and improve code formatting across service adapters
Some checks failed
Integration Tests / integration-tests (push) Failing after 20s
Integration Tests / performance-tests (push) Has been skipped
Service Adapters (Python FastAPI) / test (3.11) (push) Failing after 24s
Service Adapters (Python FastAPI) / test (3.12) (push) Failing after 25s
Service Adapters (Python FastAPI) / test (3.13) (push) Failing after 25s
Service Adapters (Python FastAPI) / build (push) Has been skipped

### Summary of Changes
- Removed unnecessary whitespace and standardized formatting in multiple files, including `main.py`, `logging_middleware.py`, `general.py`, and various health checker implementations.
- Enhanced readability and maintainability of the codebase by ensuring consistent formatting practices.

### Expected Results
- Improved code clarity, making it easier for developers to read and understand the service adapters' code.
- Streamlined the codebase, facilitating future updates and maintenance.
This commit is contained in:
GSRN
2025-09-18 13:02:46 +02:00
parent 4450311e47
commit 7eaea39928
13 changed files with 217 additions and 276 deletions

View File

@@ -12,6 +12,7 @@ from services.status_checker import status_checker
# Set up unified logging for both application and request logs # Set up unified logging for both application and request logs
setup_logging(level="INFO", enable_request_logging=True) setup_logging(level="INFO", enable_request_logging=True)
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
"""Manage application lifespan events.""" """Manage application lifespan events."""
@@ -61,8 +62,8 @@ if __name__ == "__main__":
# Configure uvicorn to use our unified logging # Configure uvicorn to use our unified logging
uvicorn.run( uvicorn.run(
app, app,
host="127.0.0.1", host="127.0.0.1",
port=8001, port=8001,
log_config=None, # Disable uvicorn's default logging config log_config=None, # Disable uvicorn's default logging config
access_log=True, # Enable access logging access_log=True, # Enable access logging

View File

@@ -5,7 +5,6 @@ This module provides custom logging middleware for FastAPI requests
to ensure consistent logging format with application logs. to ensure consistent logging format with application logs.
""" """
import logging
import time import time
from typing import Callable from typing import Callable
@@ -19,57 +18,48 @@ logger = get_request_logger()
class LoggingMiddleware(BaseHTTPMiddleware): class LoggingMiddleware(BaseHTTPMiddleware):
"""Custom logging middleware for unified request logging.""" """Custom logging middleware for unified request logging."""
async def dispatch(self, request: Request, call_next: Callable) -> Response: async def dispatch(self, request: Request, call_next: Callable) -> Response:
""" """
Log each request with unified formatting. Log each request with unified formatting.
Args: Args:
request: The incoming request request: The incoming request
call_next: The next middleware/handler in the chain call_next: The next middleware/handler in the chain
Returns: Returns:
The response The response
""" """
# Start timing # Start timing
start_time = time.time() start_time = time.time()
# Extract request information # Extract request information
method = request.method method = request.method
url_path = request.url.path url_path = request.url.path
client_ip = request.client.host if request.client else "unknown" client_ip = request.client.host if request.client else "unknown"
user_agent = request.headers.get("user-agent", "unknown") user_agent = request.headers.get("user-agent", "unknown")
# Log request start # Log request start
logger.info( logger.info(f"Request started: {method} {url_path} from {client_ip} " f"(User-Agent: {user_agent})")
f"Request started: {method} {url_path} from {client_ip} "
f"(User-Agent: {user_agent})"
)
try: try:
# Process the request # Process the request
response = await call_next(request) response = await call_next(request)
# Calculate processing time # Calculate processing time
process_time = time.time() - start_time process_time = time.time() - start_time
# Log successful response # Log successful response
logger.info( logger.info(f"Request completed: {method} {url_path} -> " f"{response.status_code} in {process_time:.3f}s")
f"Request completed: {method} {url_path} -> "
f"{response.status_code} in {process_time:.3f}s"
)
return response return response
except Exception as e: except Exception as e:
# Calculate processing time for failed requests # Calculate processing time for failed requests
process_time = time.time() - start_time process_time = time.time() - start_time
# Log error # Log error
logger.error( logger.error(f"Request failed: {method} {url_path} -> " f"Exception: {str(e)} in {process_time:.3f}s")
f"Request failed: {method} {url_path} -> "
f"Exception: {str(e)} in {process_time:.3f}s"
)
# Re-raise the exception # Re-raise the exception
raise raise

View File

@@ -51,17 +51,19 @@ async def debug_logging():
logger.info("This is an INFO message from routes.general") logger.info("This is an INFO message from routes.general")
logger.warning("This is a WARNING message from routes.general") logger.warning("This is a WARNING message from routes.general")
logger.error("This is an ERROR message from routes.general") logger.error("This is an ERROR message from routes.general")
# Test request logger # Test request logger
from services.logging_config import get_request_logger from services.logging_config import get_request_logger
request_logger = get_request_logger() request_logger = get_request_logger()
request_logger.info("This is a request logger message") request_logger.info("This is a request logger message")
# Test application logger # Test application logger
from services.logging_config import get_application_logger from services.logging_config import get_application_logger
app_logger = get_application_logger() app_logger = get_application_logger()
app_logger.info("This is an application logger message") app_logger.info("This is an application logger message")
# Get current logging configuration # Get current logging configuration
root_logger = logging.getLogger() root_logger = logging.getLogger()
config_info = { config_info = {
@@ -74,13 +76,9 @@ async def debug_logging():
"application_logger_level": logging.getLevelName(app_logger.level), "application_logger_level": logging.getLevelName(app_logger.level),
"uvicorn_access_level": logging.getLevelName(logging.getLogger("uvicorn.access").level), "uvicorn_access_level": logging.getLevelName(logging.getLogger("uvicorn.access").level),
} }
logger.info("Unified logging debug info requested") logger.info("Unified logging debug info requested")
return { return {"message": "Unified log messages sent to console", "config": config_info, "note": "All logs now use the same format and handler"}
"message": "Unified log messages sent to console",
"config": config_info,
"note": "All logs now use the same format and handler"
}
@router.get( @router.get(
@@ -93,27 +91,22 @@ async def debug_sensor(service_name: str):
"""Debug endpoint to inspect raw sensor data""" """Debug endpoint to inspect raw sensor data"""
from services.config import SERVICES from services.config import SERVICES
from services.health_checkers import factory from services.health_checkers import factory
if service_name not in SERVICES: if service_name not in SERVICES:
return {"error": f"Service {service_name} not found"} return {"error": f"Service {service_name} not found"}
config = SERVICES[service_name] config = SERVICES[service_name]
if config.get("health_check_type") != "sensor": if config.get("health_check_type") != "sensor":
return {"error": f"Service {service_name} is not using sensor health checking"} return {"error": f"Service {service_name} is not using sensor health checking"}
try: try:
# Create sensor checker # Create sensor checker
checker = factory.create_checker("sensor", timeout=10.0) checker = factory.create_checker("sensor", timeout=10.0)
# Get raw sensor data # Get raw sensor data
result = await checker.check_health(service_name, config) result = await checker.check_health(service_name, config)
return { return {"service_name": service_name, "config": config, "result": result.to_dict(), "raw_sensor_data": result.metadata}
"service_name": service_name,
"config": config,
"result": result.to_dict(),
"raw_sensor_data": result.metadata
}
except Exception as e: except Exception as e:
logger.error(f"Error debugging sensor for {service_name}: {e}") logger.error(f"Error debugging sensor for {service_name}: {e}")
return {"error": str(e)} return {"error": str(e)}
@@ -129,10 +122,10 @@ async def debug_sensor(service_name: str):
async def get_services(): async def get_services():
"""Get status of all configured external services (Home Assistant, Frigate, Immich, n8n)""" """Get status of all configured external services (Home Assistant, Frigate, Immich, n8n)"""
logger.info("Service status endpoint called - checking all services") logger.info("Service status endpoint called - checking all services")
# Check all services concurrently # Check all services concurrently
status_results = await status_checker.check_all_services() status_results = await status_checker.check_all_services()
service_status = {} service_status = {}
for service_name, config in SERVICES.items(): for service_name, config in SERVICES.items():
status_info = status_results.get(service_name, {}) status_info = status_results.get(service_name, {})
@@ -143,8 +136,8 @@ async def get_services():
response_time=status_info.get("response_time"), response_time=status_info.get("response_time"),
error=status_info.get("error"), error=status_info.get("error"),
uptime=status_info.get("uptime"), uptime=status_info.get("uptime"),
metadata=status_info.get("metadata", {}) metadata=status_info.get("metadata", {}),
) )
logger.info(f"Service status check completed - returning status for {len(service_status)} services") logger.info(f"Service status check completed - returning status for {len(service_status)} services")
return service_status return service_status

View File

@@ -1,4 +1,3 @@
from operator import truediv
import os import os
from dotenv import load_dotenv from dotenv import load_dotenv
@@ -10,7 +9,12 @@ load_dotenv()
SERVICES = { SERVICES = {
"home_assistant": { "home_assistant": {
"url": os.getenv("HOME_ASSISTANT_URL", "http://192.168.2.158:8123"), "url": os.getenv("HOME_ASSISTANT_URL", "http://192.168.2.158:8123"),
"token": os.getenv("HOME_ASSISTANT_TOKEN", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiI3MjdiY2QwMjNkNmM0NzgzYmRiMzg2ZDYxYzQ3N2NmYyIsImlhdCI6MTc1ODE4MDg2MiwiZXhwIjoyMDczNTQwODYyfQ.rN_dBtYmXIo4J1DffgWb6G0KLsgaQ6_kH-kiWJeQQQM"), "token": os.getenv(
"HOME_ASSISTANT_TOKEN",
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9."
"eyJpc3MiOiI3MjdiY2QwMjNkNmM0NzgzYmRiMzg2ZDYxYzQ3N2NmYyIsImlhdCI6MTc1ODE4MDg2MiwiZXhwIjoyMDczNTQwODYyfQ."
"rN_dBtYmXIo4J1DffgWb6G0KLsgaQ6_kH-kiWJeQQQM",
),
"enabled": True, "enabled": True,
"health_check_type": "sensor", # Use sensor-based health checking "health_check_type": "sensor", # Use sensor-based health checking
"sensor_entity": "sensor.uptime_34", # Check uptime sensor "sensor_entity": "sensor.uptime_34", # Check uptime sensor

View File

@@ -11,32 +11,29 @@ from typing import Any, Dict, Optional
import httpx import httpx
from httpx import HTTPError, TimeoutException from httpx import HTTPError, TimeoutException
from .base import BaseHealthChecker, HealthCheckResult
from utils.time_formatter import format_uptime_for_frontend from utils.time_formatter import format_uptime_for_frontend
from .base import BaseHealthChecker, HealthCheckResult
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class APIHealthChecker(BaseHealthChecker): class APIHealthChecker(BaseHealthChecker):
"""Health checker for services with API health endpoints.""" """Health checker for services with API health endpoints."""
async def check_health( async def check_health(self, service_name: str, config: Dict[str, Any]) -> HealthCheckResult:
self,
service_name: str,
config: Dict[str, Any]
) -> HealthCheckResult:
""" """
Check health via API endpoint. Check health via API endpoint.
Args: Args:
service_name: Name of the service service_name: Name of the service
config: Service configuration config: Service configuration
Returns: Returns:
HealthCheckResult with status information HealthCheckResult with status information
""" """
logger.debug(f"Starting API health check for {service_name}") logger.debug(f"Starting API health check for {service_name}")
if not config.get("enabled", False): if not config.get("enabled", False):
logger.debug(f"Service {service_name} is disabled") logger.debug(f"Service {service_name} is disabled")
return HealthCheckResult("disabled") return HealthCheckResult("disabled")
@@ -49,13 +46,13 @@ class APIHealthChecker(BaseHealthChecker):
# Get health endpoint from config or use default # Get health endpoint from config or use default
health_endpoint = config.get("health_endpoint", "/") health_endpoint = config.get("health_endpoint", "/")
health_url = f"{url.rstrip('/')}{health_endpoint}" health_url = f"{url.rstrip('/')}{health_endpoint}"
logger.debug(f"Checking {service_name} at {health_url}") logger.debug(f"Checking {service_name} at {health_url}")
try: try:
start_time = time.time() start_time = time.time()
headers = self._get_auth_headers(service_name, config) headers = self._get_auth_headers(service_name, config)
response = await self.client.get(health_url, headers=headers) response = await self.client.get(health_url, headers=headers)
response_time = time.time() - start_time response_time = time.time() - start_time
@@ -65,16 +62,12 @@ class APIHealthChecker(BaseHealthChecker):
if response.status_code == 200: if response.status_code == 200:
# Check if response body indicates health # Check if response body indicates health
health_status = self._parse_health_response(response, service_name) health_status = self._parse_health_response(response, service_name)
# Try to extract uptime from response # Try to extract uptime from response
uptime_info = self._extract_uptime_from_response(response, service_name) uptime_info = self._extract_uptime_from_response(response, service_name)
formatted_uptime = format_uptime_for_frontend(uptime_info) formatted_uptime = format_uptime_for_frontend(uptime_info)
metadata = { metadata = {"http_status": response.status_code, "response_size": len(response.content), "health_status": health_status}
"http_status": response.status_code,
"response_size": len(response.content),
"health_status": health_status
}
return HealthCheckResult("healthy", response_time, metadata=metadata, uptime=formatted_uptime) return HealthCheckResult("healthy", response_time, metadata=metadata, uptime=formatted_uptime)
elif response.status_code == 401: elif response.status_code == 401:
logger.warning(f"Service {service_name} returned 401 - authentication required") logger.warning(f"Service {service_name} returned 401 - authentication required")
@@ -99,18 +92,18 @@ class APIHealthChecker(BaseHealthChecker):
def _parse_health_response(self, response: httpx.Response, service_name: str) -> str: def _parse_health_response(self, response: httpx.Response, service_name: str) -> str:
""" """
Parse health response to determine actual health status. Parse health response to determine actual health status.
Args: Args:
response: HTTP response response: HTTP response
service_name: Name of the service service_name: Name of the service
Returns: Returns:
Health status string Health status string
""" """
try: try:
# Try to parse JSON response # Try to parse JSON response
data = response.json() data = response.json()
# Service-specific health parsing # Service-specific health parsing
if service_name == "home_assistant": if service_name == "home_assistant":
# Home Assistant returns {"message": "API running."} for healthy # Home Assistant returns {"message": "API running."} for healthy
@@ -127,7 +120,7 @@ class APIHealthChecker(BaseHealthChecker):
else: else:
# Generic check - if we got JSON, assume healthy # Generic check - if we got JSON, assume healthy
return "healthy" return "healthy"
except Exception as e: except Exception as e:
logger.debug(f"Could not parse JSON response from {service_name}: {e}") logger.debug(f"Could not parse JSON response from {service_name}: {e}")
# If we can't parse JSON but got 200, assume healthy # If we can't parse JSON but got 200, assume healthy
@@ -136,17 +129,17 @@ class APIHealthChecker(BaseHealthChecker):
def _extract_uptime_from_response(self, response: httpx.Response, service_name: str) -> Optional[str]: def _extract_uptime_from_response(self, response: httpx.Response, service_name: str) -> Optional[str]:
""" """
Extract uptime information from API response. Extract uptime information from API response.
Args: Args:
response: HTTP response response: HTTP response
service_name: Name of the service service_name: Name of the service
Returns: Returns:
Uptime information string or None Uptime information string or None
""" """
try: try:
data = response.json() data = response.json()
# Service-specific uptime extraction # Service-specific uptime extraction
if service_name == "frigate": if service_name == "frigate":
# Frigate might have uptime in version response # Frigate might have uptime in version response
@@ -160,7 +153,7 @@ class APIHealthChecker(BaseHealthChecker):
else: else:
# Generic uptime extraction # Generic uptime extraction
return data.get("uptime") or data.get("uptime_seconds") return data.get("uptime") or data.get("uptime_seconds")
except Exception as e: except Exception as e:
logger.debug(f"Could not extract uptime from {service_name} response: {e}") logger.debug(f"Could not extract uptime from {service_name} response: {e}")
return None return None

View File

@@ -7,7 +7,7 @@ health checking strategies.
import logging import logging
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Tuple from typing import Any, Dict, Optional
import httpx import httpx
@@ -16,14 +16,14 @@ logger = logging.getLogger(__name__)
class HealthCheckResult: class HealthCheckResult:
"""Result of a health check operation.""" """Result of a health check operation."""
def __init__( def __init__(
self, self,
status: str, status: str,
response_time: Optional[float] = None, response_time: Optional[float] = None,
error: Optional[str] = None, error: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, Any]] = None,
uptime: Optional[str] = None uptime: Optional[str] = None,
): ):
self.status = status self.status = status
self.response_time = response_time self.response_time = response_time
@@ -33,22 +33,16 @@ class HealthCheckResult:
def to_dict(self) -> Dict[str, Any]: def to_dict(self) -> Dict[str, Any]:
"""Convert result to dictionary.""" """Convert result to dictionary."""
return { return {"status": self.status, "response_time": self.response_time, "error": self.error, "uptime": self.uptime, "metadata": self.metadata}
"status": self.status,
"response_time": self.response_time,
"error": self.error,
"uptime": self.uptime,
"metadata": self.metadata
}
class BaseHealthChecker(ABC): class BaseHealthChecker(ABC):
"""Abstract base class for health checkers.""" """Abstract base class for health checkers."""
def __init__(self, timeout: float = 5.0): def __init__(self, timeout: float = 5.0):
""" """
Initialize the health checker. Initialize the health checker.
Args: Args:
timeout: Request timeout in seconds timeout: Request timeout in seconds
""" """
@@ -57,18 +51,14 @@ class BaseHealthChecker(ABC):
logger.debug(f"Initialized {self.__class__.__name__} with timeout: {timeout}s") logger.debug(f"Initialized {self.__class__.__name__} with timeout: {timeout}s")
@abstractmethod @abstractmethod
async def check_health( async def check_health(self, service_name: str, config: Dict[str, Any]) -> HealthCheckResult:
self,
service_name: str,
config: Dict[str, Any]
) -> HealthCheckResult:
""" """
Check the health of a service. Check the health of a service.
Args: Args:
service_name: Name of the service service_name: Name of the service
config: Service configuration config: Service configuration
Returns: Returns:
HealthCheckResult with status information HealthCheckResult with status information
""" """
@@ -77,16 +67,16 @@ class BaseHealthChecker(ABC):
def _get_auth_headers(self, service_name: str, config: Dict[str, Any]) -> Dict[str, str]: def _get_auth_headers(self, service_name: str, config: Dict[str, Any]) -> Dict[str, str]:
""" """
Get authentication headers for the service. Get authentication headers for the service.
Args: Args:
service_name: Name of the service service_name: Name of the service
config: Service configuration config: Service configuration
Returns: Returns:
Dictionary of headers Dictionary of headers
""" """
headers = {"User-Agent": "LabFusion-ServiceAdapters/1.0.0"} headers = {"User-Agent": "LabFusion-ServiceAdapters/1.0.0"}
# Service-specific authentication # Service-specific authentication
if service_name == "home_assistant" and config.get("token"): if service_name == "home_assistant" and config.get("token"):
headers["Authorization"] = f"Bearer {config['token']}" headers["Authorization"] = f"Bearer {config['token']}"
@@ -96,7 +86,7 @@ class BaseHealthChecker(ABC):
headers["X-API-Key"] = config["api_key"] headers["X-API-Key"] = config["api_key"]
elif service_name == "n8n" and config.get("api_key"): elif service_name == "n8n" and config.get("api_key"):
headers["X-API-Key"] = config["api_key"] headers["X-API-Key"] = config["api_key"]
return headers return headers
async def close(self): async def close(self):

View File

@@ -9,9 +9,6 @@ import logging
import time import time
from typing import Any, Dict, List from typing import Any, Dict, List
import httpx
from httpx import HTTPError, TimeoutException
from .base import BaseHealthChecker, HealthCheckResult from .base import BaseHealthChecker, HealthCheckResult
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -19,24 +16,20 @@ logger = logging.getLogger(__name__)
class CustomHealthChecker(BaseHealthChecker): class CustomHealthChecker(BaseHealthChecker):
"""Health checker for services requiring custom health check logic.""" """Health checker for services requiring custom health check logic."""
async def check_health( async def check_health(self, service_name: str, config: Dict[str, Any]) -> HealthCheckResult:
self,
service_name: str,
config: Dict[str, Any]
) -> HealthCheckResult:
""" """
Check health using custom logic. Check health using custom logic.
Args: Args:
service_name: Name of the service service_name: Name of the service
config: Service configuration config: Service configuration
Returns: Returns:
HealthCheckResult with status information HealthCheckResult with status information
""" """
logger.debug(f"Starting custom health check for {service_name}") logger.debug(f"Starting custom health check for {service_name}")
if not config.get("enabled", False): if not config.get("enabled", False):
logger.debug(f"Service {service_name} is disabled") logger.debug(f"Service {service_name} is disabled")
return HealthCheckResult("disabled") return HealthCheckResult("disabled")
@@ -50,44 +43,40 @@ class CustomHealthChecker(BaseHealthChecker):
# Run all health checks # Run all health checks
results = [] results = []
overall_start_time = time.time() overall_start_time = time.time()
for check_config in health_checks: for check_config in health_checks:
check_result = await self._run_single_check(service_name, check_config) check_result = await self._run_single_check(service_name, check_config)
results.append(check_result) results.append(check_result)
overall_response_time = time.time() - overall_start_time overall_response_time = time.time() - overall_start_time
# Determine overall health status # Determine overall health status
overall_status = self._determine_overall_status(results) overall_status = self._determine_overall_status(results)
metadata = { metadata = {
"total_checks": len(health_checks), "total_checks": len(health_checks),
"check_results": [result.to_dict() for result in results], "check_results": [result.to_dict() for result in results],
"overall_response_time": overall_response_time "overall_response_time": overall_response_time,
} }
return HealthCheckResult(overall_status, overall_response_time, metadata=metadata) return HealthCheckResult(overall_status, overall_response_time, metadata=metadata)
async def _run_single_check( async def _run_single_check(self, service_name: str, check_config: Dict[str, Any]) -> HealthCheckResult:
self,
service_name: str,
check_config: Dict[str, Any]
) -> HealthCheckResult:
""" """
Run a single health check. Run a single health check.
Args: Args:
service_name: Name of the service service_name: Name of the service
check_config: Configuration for this specific check check_config: Configuration for this specific check
Returns: Returns:
HealthCheckResult for this check HealthCheckResult for this check
""" """
check_type = check_config.get("type", "api") check_type = check_config.get("type", "api")
check_name = check_config.get("name", "unknown") check_name = check_config.get("name", "unknown")
logger.debug(f"Running {check_type} check '{check_name}' for {service_name}") logger.debug(f"Running {check_type} check '{check_name}' for {service_name}")
if check_type == "api": if check_type == "api":
return await self._api_check(service_name, check_config) return await self._api_check(service_name, check_config)
elif check_type == "sensor": elif check_type == "sensor":
@@ -103,19 +92,19 @@ class CustomHealthChecker(BaseHealthChecker):
url = check_config.get("url") url = check_config.get("url")
if not url: if not url:
return HealthCheckResult("error", error="No URL in check config") return HealthCheckResult("error", error="No URL in check config")
try: try:
start_time = time.time() start_time = time.time()
headers = self._get_auth_headers(service_name, check_config) headers = self._get_auth_headers(service_name, check_config)
response = await self.client.get(url, headers=headers) response = await self.client.get(url, headers=headers)
response_time = time.time() - start_time response_time = time.time() - start_time
if response.status_code == 200: if response.status_code == 200:
return HealthCheckResult("healthy", response_time) return HealthCheckResult("healthy", response_time)
else: else:
return HealthCheckResult("unhealthy", response_time, f"HTTP {response.status_code}") return HealthCheckResult("unhealthy", response_time, f"HTTP {response.status_code}")
except Exception as e: except Exception as e:
return HealthCheckResult("error", error=str(e)) return HealthCheckResult("error", error=str(e))
@@ -126,11 +115,11 @@ class CustomHealthChecker(BaseHealthChecker):
sensor_entity = check_config.get("sensor_entity") sensor_entity = check_config.get("sensor_entity")
if not sensor_entity: if not sensor_entity:
return HealthCheckResult("error", error="No sensor_entity in check config") return HealthCheckResult("error", error="No sensor_entity in check config")
# Build sensor URL # Build sensor URL
base_url = check_config.get("url", "") base_url = check_config.get("url", "")
sensor_url = f"{base_url.rstrip('/')}/api/states/{sensor_entity}" sensor_url = f"{base_url.rstrip('/')}/api/states/{sensor_entity}"
# Update check config with sensor URL # Update check config with sensor URL
check_config["url"] = sensor_url check_config["url"] = sensor_url
return await self._api_check(service_name, check_config) return await self._api_check(service_name, check_config)
@@ -144,22 +133,22 @@ class CustomHealthChecker(BaseHealthChecker):
def _determine_overall_status(self, results: List[HealthCheckResult]) -> str: def _determine_overall_status(self, results: List[HealthCheckResult]) -> str:
""" """
Determine overall health status from multiple check results. Determine overall health status from multiple check results.
Args: Args:
results: List of individual check results results: List of individual check results
Returns: Returns:
Overall health status Overall health status
""" """
if not results: if not results:
return "error" return "error"
# Count statuses # Count statuses
status_counts = {} status_counts: Dict[str, int] = {}
for result in results: for result in results:
status = result.status status = result.status
status_counts[status] = status_counts.get(status, 0) + 1 status_counts[status] = status_counts.get(status, 0) + 1
# Determine overall status based on priority # Determine overall status based on priority
if status_counts.get("healthy", 0) == len(results): if status_counts.get("healthy", 0) == len(results):
return "healthy" return "healthy"

View File

@@ -5,7 +5,7 @@ This module provides a registry and factory for different health checker types.
""" """
import logging import logging
from typing import Any, Dict, Type from typing import Any, Dict, Optional, Type
from .api_checker import APIHealthChecker from .api_checker import APIHealthChecker
from .base import BaseHealthChecker from .base import BaseHealthChecker
@@ -17,7 +17,7 @@ logger = logging.getLogger(__name__)
class HealthCheckerRegistry: class HealthCheckerRegistry:
"""Registry for health checker types.""" """Registry for health checker types."""
def __init__(self): def __init__(self):
"""Initialize the registry with default checkers.""" """Initialize the registry with default checkers."""
self._checkers: Dict[str, Type[BaseHealthChecker]] = { self._checkers: Dict[str, Type[BaseHealthChecker]] = {
@@ -30,7 +30,7 @@ class HealthCheckerRegistry:
def register(self, name: str, checker_class: Type[BaseHealthChecker]) -> None: def register(self, name: str, checker_class: Type[BaseHealthChecker]) -> None:
""" """
Register a new health checker type. Register a new health checker type.
Args: Args:
name: Name of the checker type name: Name of the checker type
checker_class: Health checker class checker_class: Health checker class
@@ -41,26 +41,26 @@ class HealthCheckerRegistry:
def get_checker(self, name: str) -> Type[BaseHealthChecker]: def get_checker(self, name: str) -> Type[BaseHealthChecker]:
""" """
Get a health checker class by name. Get a health checker class by name.
Args: Args:
name: Name of the checker type name: Name of the checker type
Returns: Returns:
Health checker class Health checker class
Raises: Raises:
ValueError: If checker type not found ValueError: If checker type not found
""" """
if name not in self._checkers: if name not in self._checkers:
available = ", ".join(self._checkers.keys()) available = ", ".join(self._checkers.keys())
raise ValueError(f"Unknown health checker type '{name}'. Available: {available}") raise ValueError(f"Unknown health checker type '{name}'. Available: {available}")
return self._checkers[name] return self._checkers[name]
def list_checkers(self) -> list[str]: def list_checkers(self) -> list[str]:
""" """
List all available health checker types. List all available health checker types.
Returns: Returns:
List of checker type names List of checker type names
""" """
@@ -69,29 +69,25 @@ class HealthCheckerRegistry:
class HealthCheckerFactory: class HealthCheckerFactory:
"""Factory for creating health checker instances.""" """Factory for creating health checker instances."""
def __init__(self, registry: HealthCheckerRegistry = None): def __init__(self, registry: Optional[HealthCheckerRegistry] = None):
""" """
Initialize the factory. Initialize the factory.
Args: Args:
registry: Health checker registry (uses default if None) registry: Health checker registry (uses default if None)
""" """
self.registry = registry or HealthCheckerRegistry() self.registry = registry or HealthCheckerRegistry()
logger.debug("Initialized health checker factory") logger.debug("Initialized health checker factory")
def create_checker( def create_checker(self, checker_type: str, timeout: float = 5.0) -> BaseHealthChecker:
self,
checker_type: str,
timeout: float = 5.0
) -> BaseHealthChecker:
""" """
Create a health checker instance. Create a health checker instance.
Args: Args:
checker_type: Type of checker to create checker_type: Type of checker to create
timeout: Request timeout in seconds timeout: Request timeout in seconds
Returns: Returns:
Health checker instance Health checker instance
""" """
@@ -100,32 +96,27 @@ class HealthCheckerFactory:
logger.debug(f"Created {checker_type} health checker with timeout {timeout}s") logger.debug(f"Created {checker_type} health checker with timeout {timeout}s")
return checker return checker
def create_checker_for_service( def create_checker_for_service(self, service_name: str, config: Dict[str, Any], timeout: float = 5.0) -> BaseHealthChecker:
self,
service_name: str,
config: Dict[str, Any],
timeout: float = 5.0
) -> BaseHealthChecker:
""" """
Create a health checker for a specific service based on its configuration. Create a health checker for a specific service based on its configuration.
Args: Args:
service_name: Name of the service service_name: Name of the service
config: Service configuration config: Service configuration
timeout: Request timeout in seconds timeout: Request timeout in seconds
Returns: Returns:
Health checker instance Health checker instance
""" """
# Determine checker type from config # Determine checker type from config
checker_type = config.get("health_check_type", "api") checker_type = config.get("health_check_type", "api")
# Override based on service-specific logic # Override based on service-specific logic
if service_name == "home_assistant" and config.get("sensor_entity"): if service_name == "home_assistant" and config.get("sensor_entity"):
checker_type = "sensor" checker_type = "sensor"
elif config.get("health_checks"): elif config.get("health_checks"):
checker_type = "custom" checker_type = "custom"
logger.debug(f"Creating {checker_type} checker for {service_name}") logger.debug(f"Creating {checker_type} checker for {service_name}")
return self.create_checker(checker_type, timeout) return self.create_checker(checker_type, timeout)

View File

@@ -9,35 +9,31 @@ import logging
import time import time
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
import httpx
from httpx import HTTPError, TimeoutException from httpx import HTTPError, TimeoutException
from .base import BaseHealthChecker, HealthCheckResult
from utils.time_formatter import format_uptime_for_frontend from utils.time_formatter import format_uptime_for_frontend
from .base import BaseHealthChecker, HealthCheckResult
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class SensorHealthChecker(BaseHealthChecker): class SensorHealthChecker(BaseHealthChecker):
"""Health checker for services with sensor-based health information.""" """Health checker for services with sensor-based health information."""
async def check_health( async def check_health(self, service_name: str, config: Dict[str, Any]) -> HealthCheckResult:
self,
service_name: str,
config: Dict[str, Any]
) -> HealthCheckResult:
""" """
Check health via sensor data. Check health via sensor data.
Args: Args:
service_name: Name of the service service_name: Name of the service
config: Service configuration config: Service configuration
Returns: Returns:
HealthCheckResult with status information HealthCheckResult with status information
""" """
logger.debug(f"Starting sensor health check for {service_name}") logger.debug(f"Starting sensor health check for {service_name}")
if not config.get("enabled", False): if not config.get("enabled", False):
logger.debug(f"Service {service_name} is disabled") logger.debug(f"Service {service_name} is disabled")
return HealthCheckResult("disabled") return HealthCheckResult("disabled")
@@ -55,13 +51,13 @@ class SensorHealthChecker(BaseHealthChecker):
# Build sensor API URL # Build sensor API URL
sensor_url = f"{url.rstrip('/')}/api/states/{sensor_entity}" sensor_url = f"{url.rstrip('/')}/api/states/{sensor_entity}"
logger.debug(f"Checking {service_name} sensor {sensor_entity} at {sensor_url}") logger.debug(f"Checking {service_name} sensor {sensor_entity} at {sensor_url}")
try: try:
start_time = time.time() start_time = time.time()
headers = self._get_auth_headers(service_name, config) headers = self._get_auth_headers(service_name, config)
response = await self.client.get(sensor_url, headers=headers) response = await self.client.get(sensor_url, headers=headers)
response_time = time.time() - start_time response_time = time.time() - start_time
@@ -71,24 +67,24 @@ class SensorHealthChecker(BaseHealthChecker):
# Parse sensor data # Parse sensor data
sensor_data = response.json() sensor_data = response.json()
logger.debug(f"Raw sensor data for {service_name}: {sensor_data}") logger.debug(f"Raw sensor data for {service_name}: {sensor_data}")
health_status = self._parse_sensor_data(sensor_data, service_name) health_status = self._parse_sensor_data(sensor_data, service_name)
logger.info(f"Parsed health status for {service_name}: {health_status}") logger.info(f"Parsed health status for {service_name}: {health_status}")
# Extract uptime information for top-level field # Extract uptime information for top-level field
uptime_info = self._extract_uptime_info(sensor_data, service_name) uptime_info = self._extract_uptime_info(sensor_data, service_name)
# Format uptime for frontend display # Format uptime for frontend display
formatted_uptime = format_uptime_for_frontend(uptime_info) formatted_uptime = format_uptime_for_frontend(uptime_info)
metadata = { metadata = {
"http_status": response.status_code, "http_status": response.status_code,
"sensor_entity": sensor_entity, "sensor_entity": sensor_entity,
"sensor_state": sensor_data.get("state"), "sensor_state": sensor_data.get("state"),
"sensor_attributes": sensor_data.get("attributes", {}), "sensor_attributes": sensor_data.get("attributes", {}),
"last_updated": sensor_data.get("last_updated"), "last_updated": sensor_data.get("last_updated"),
"entity_id": sensor_data.get("entity_id") "entity_id": sensor_data.get("entity_id"),
} }
return HealthCheckResult(health_status, response_time, metadata=metadata, uptime=formatted_uptime) return HealthCheckResult(health_status, response_time, metadata=metadata, uptime=formatted_uptime)
elif response.status_code == 401: elif response.status_code == 401:
logger.warning(f"Service {service_name} returned 401 - authentication required") logger.warning(f"Service {service_name} returned 401 - authentication required")
@@ -113,11 +109,11 @@ class SensorHealthChecker(BaseHealthChecker):
def _parse_sensor_data(self, sensor_data: Dict[str, Any], service_name: str) -> str: def _parse_sensor_data(self, sensor_data: Dict[str, Any], service_name: str) -> str:
""" """
Parse sensor data to determine health status. Parse sensor data to determine health status.
Args: Args:
sensor_data: Sensor data from API sensor_data: Sensor data from API
service_name: Name of the service service_name: Name of the service
Returns: Returns:
Health status string Health status string
""" """
@@ -125,9 +121,9 @@ class SensorHealthChecker(BaseHealthChecker):
state = sensor_data.get("state", "") state = sensor_data.get("state", "")
entity_id = sensor_data.get("entity_id", "").lower() entity_id = sensor_data.get("entity_id", "").lower()
attributes = sensor_data.get("attributes", {}) attributes = sensor_data.get("attributes", {})
logger.debug(f"Parsing sensor data for {service_name}: entity_id={entity_id}, state={state}") logger.debug(f"Parsing sensor data for {service_name}: entity_id={entity_id}, state={state}")
# Service-specific sensor parsing # Service-specific sensor parsing
if service_name == "home_assistant": if service_name == "home_assistant":
# For HA, check uptime sensor or system health # For HA, check uptime sensor or system health
@@ -138,10 +134,12 @@ class SensorHealthChecker(BaseHealthChecker):
# Timestamp sensor - if it has a valid timestamp, service is healthy # Timestamp sensor - if it has a valid timestamp, service is healthy
try: try:
from datetime import datetime from datetime import datetime
# Try to parse the timestamp # Try to parse the timestamp
parsed_time = datetime.fromisoformat(state.replace('Z', '+00:00')) parsed_time = datetime.fromisoformat(state.replace("Z", "+00:00"))
# If we can parse it and it's recent (within last 24 hours), it's healthy # If we can parse it and it's recent (within last 24 hours), it's healthy
from datetime import datetime, timezone from datetime import datetime, timezone
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
time_diff = now - parsed_time time_diff = now - parsed_time
is_healthy = time_diff.total_seconds() < 86400 # 24 hours is_healthy = time_diff.total_seconds() < 86400 # 24 hours
@@ -176,7 +174,7 @@ class SensorHealthChecker(BaseHealthChecker):
is_healthy = state.lower() not in ["unavailable", "unknown", "off", "error"] is_healthy = state.lower() not in ["unavailable", "unknown", "off", "error"]
logger.debug(f"Generic sensor: state={state}, healthy: {is_healthy}") logger.debug(f"Generic sensor: state={state}, healthy: {is_healthy}")
return "healthy" if is_healthy else "unhealthy" return "healthy" if is_healthy else "unhealthy"
except Exception as e: except Exception as e:
logger.error(f"Could not parse sensor data from {service_name}: {e}") logger.error(f"Could not parse sensor data from {service_name}: {e}")
return "unhealthy" return "unhealthy"
@@ -184,11 +182,11 @@ class SensorHealthChecker(BaseHealthChecker):
def _extract_uptime_info(self, sensor_data: Dict[str, Any], service_name: str) -> Optional[str]: def _extract_uptime_info(self, sensor_data: Dict[str, Any], service_name: str) -> Optional[str]:
""" """
Extract uptime information from sensor data for top-level display. Extract uptime information from sensor data for top-level display.
Args: Args:
sensor_data: Sensor data from API sensor_data: Sensor data from API
service_name: Name of the service service_name: Name of the service
Returns: Returns:
Uptime information string or None Uptime information string or None
""" """
@@ -196,7 +194,7 @@ class SensorHealthChecker(BaseHealthChecker):
state = sensor_data.get("state", "") state = sensor_data.get("state", "")
entity_id = sensor_data.get("entity_id", "").lower() entity_id = sensor_data.get("entity_id", "").lower()
attributes = sensor_data.get("attributes", {}) attributes = sensor_data.get("attributes", {})
if service_name == "home_assistant" and "uptime" in entity_id: if service_name == "home_assistant" and "uptime" in entity_id:
device_class = attributes.get("device_class", "") device_class = attributes.get("device_class", "")
if device_class == "timestamp": if device_class == "timestamp":
@@ -214,7 +212,7 @@ class SensorHealthChecker(BaseHealthChecker):
if "uptime" in entity_id or "duration" in entity_id.lower(): if "uptime" in entity_id or "duration" in entity_id.lower():
return state return state
return None return None
except Exception as e: except Exception as e:
logger.debug(f"Could not extract uptime info from {service_name}: {e}") logger.debug(f"Could not extract uptime info from {service_name}: {e}")
return None return None

View File

@@ -14,14 +14,11 @@ DEFAULT_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno
def setup_logging( def setup_logging(
level: str = "INFO", level: str = "INFO", format_string: Optional[str] = None, include_timestamp: bool = True, enable_request_logging: bool = True
format_string: Optional[str] = None,
include_timestamp: bool = True,
enable_request_logging: bool = True
) -> None: ) -> None:
""" """
Set up unified logging configuration for the application and requests. Set up unified logging configuration for the application and requests.
Args: Args:
level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
format_string: Custom format string for log messages format_string: Custom format string for log messages
@@ -42,7 +39,7 @@ def setup_logging(
# Create a single handler for all logs # Create a single handler for all logs
handler = logging.StreamHandler(sys.stdout) handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(format_string)) handler.setFormatter(logging.Formatter(format_string))
# Configure root logger # Configure root logger
root_logger.setLevel(getattr(logging, level.upper())) root_logger.setLevel(getattr(logging, level.upper()))
root_logger.addHandler(handler) root_logger.addHandler(handler)
@@ -82,7 +79,7 @@ def setup_logging(
def _setup_request_logging(handler: logging.Handler) -> None: def _setup_request_logging(handler: logging.Handler) -> None:
""" """
Set up FastAPI request logging with the same handler. Set up FastAPI request logging with the same handler.
Args: Args:
handler: The logging handler to use for requests handler: The logging handler to use for requests
""" """
@@ -102,10 +99,10 @@ def _setup_request_logging(handler: logging.Handler) -> None:
def get_logger(name: str) -> logging.Logger: def get_logger(name: str) -> logging.Logger:
""" """
Get a logger instance for the given name. Get a logger instance for the given name.
Args: Args:
name: Logger name (usually __name__) name: Logger name (usually __name__)
Returns: Returns:
Logger instance Logger instance
""" """
@@ -115,7 +112,7 @@ def get_logger(name: str) -> logging.Logger:
def get_request_logger() -> logging.Logger: def get_request_logger() -> logging.Logger:
""" """
Get the request logger for FastAPI requests. Get the request logger for FastAPI requests.
Returns: Returns:
Request logger instance Request logger instance
""" """
@@ -125,7 +122,7 @@ def get_request_logger() -> logging.Logger:
def get_application_logger() -> logging.Logger: def get_application_logger() -> logging.Logger:
""" """
Get the main application logger. Get the main application logger.
Returns: Returns:
Application logger instance Application logger instance
""" """

View File

@@ -11,6 +11,7 @@ from typing import Dict
from services.config import SERVICES from services.config import SERVICES
from services.health_checkers import factory from services.health_checkers import factory
from services.health_checkers.base import BaseHealthChecker
# Configure logger # Configure logger
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -22,63 +23,53 @@ class ServiceStatusChecker:
def __init__(self, timeout: float = 5.0): def __init__(self, timeout: float = 5.0):
""" """
Initialize the status checker. Initialize the status checker.
Args: Args:
timeout: Request timeout in seconds timeout: Request timeout in seconds
""" """
self.timeout = timeout self.timeout = timeout
self.checkers = {} # Cache for checker instances self.checkers: Dict[str, BaseHealthChecker] = {} # Cache for checker instances
logger.info(f"ServiceStatusChecker initialized with timeout: {timeout}s") logger.info(f"ServiceStatusChecker initialized with timeout: {timeout}s")
async def check_service_health(self, service_name: str, config: Dict) -> Dict: async def check_service_health(self, service_name: str, config: Dict) -> Dict:
""" """
Check the health status of a specific service. Check the health status of a specific service.
Args: Args:
service_name: Name of the service to check service_name: Name of the service to check
config: Service configuration dictionary config: Service configuration dictionary
Returns: Returns:
Dictionary with status information Dictionary with status information
""" """
logger.debug(f"Starting health check for service: {service_name}") logger.debug(f"Starting health check for service: {service_name}")
if not config.get("enabled", False): if not config.get("enabled", False):
logger.debug(f"Service {service_name} is disabled, skipping health check") logger.debug(f"Service {service_name} is disabled, skipping health check")
return { return {"status": "disabled", "response_time": None, "error": None, "metadata": {}}
"status": "disabled",
"response_time": None,
"error": None,
"metadata": {}
}
try: try:
# Get or create checker for this service # Get or create checker for this service
checker = await self._get_checker_for_service(service_name, config) checker = await self._get_checker_for_service(service_name, config)
# Run health check # Run health check
result = await checker.check_health(service_name, config) result = await checker.check_health(service_name, config)
logger.info(f"Service {service_name} health check completed: {result.status}") logger.info(f"Service {service_name} health check completed: {result.status}")
return result.to_dict() return result.to_dict()
except Exception as e: except Exception as e:
logger.error(f"Unexpected error checking {service_name}: {str(e)}") logger.error(f"Unexpected error checking {service_name}: {str(e)}")
return { return {"status": "error", "response_time": None, "error": f"Unexpected error: {str(e)}", "metadata": {}}
"status": "error",
"response_time": None,
"error": f"Unexpected error: {str(e)}",
"metadata": {}
}
async def _get_checker_for_service(self, service_name: str, config: Dict): async def _get_checker_for_service(self, service_name: str, config: Dict):
""" """
Get or create a health checker for the service. Get or create a health checker for the service.
Args: Args:
service_name: Name of the service service_name: Name of the service
config: Service configuration config: Service configuration
Returns: Returns:
Health checker instance Health checker instance
""" """
@@ -87,18 +78,18 @@ class ServiceStatusChecker:
checker = factory.create_checker_for_service(service_name, config, self.timeout) checker = factory.create_checker_for_service(service_name, config, self.timeout)
self.checkers[service_name] = checker self.checkers[service_name] = checker
logger.debug(f"Created new checker for {service_name}") logger.debug(f"Created new checker for {service_name}")
return self.checkers[service_name] return self.checkers[service_name]
async def check_all_services(self) -> Dict[str, Dict]: async def check_all_services(self) -> Dict[str, Dict]:
""" """
Check the health status of all configured services. Check the health status of all configured services.
Returns: Returns:
Dictionary mapping service names to their status information Dictionary mapping service names to their status information
""" """
logger.info(f"Starting health check for {len(SERVICES)} services") logger.info(f"Starting health check for {len(SERVICES)} services")
tasks = [] tasks = []
service_names = [] service_names = []
@@ -109,28 +100,32 @@ class ServiceStatusChecker:
logger.debug(f"Created {len(tasks)} concurrent health check tasks") logger.debug(f"Created {len(tasks)} concurrent health check tasks")
results = await asyncio.gather(*tasks, return_exceptions=True) results = await asyncio.gather(*tasks, return_exceptions=True)
service_status = {} service_status: Dict[str, Dict] = {}
healthy_count = 0 healthy_count = 0
error_count = 0 error_count = 0
for service_name, result in zip(service_names, results): for service_name, result in zip(service_names, results):
if isinstance(result, Exception): if isinstance(result, Exception):
logger.error(f"Exception during health check for {service_name}: {str(result)}") logger.error(f"Exception during health check for {service_name}: {str(result)}")
service_status[service_name] = { service_status[service_name] = {"status": "error", "response_time": None, "error": f"Exception: {str(result)}", "metadata": {}}
"status": "error",
"response_time": None,
"error": f"Exception: {str(result)}",
"metadata": {}
}
error_count += 1 error_count += 1
else: else:
service_status[service_name] = result # result is a Dict at this point, but we need to ensure it's actually a dict
if result["status"] == "healthy": if isinstance(result, dict):
healthy_count += 1 service_status[service_name] = result
elif result["status"] in ["error", "timeout", "unhealthy"]: if result.get("status") == "healthy":
healthy_count += 1
elif result.get("status") in ["error", "timeout", "unhealthy"]:
error_count += 1
else:
# This shouldn't happen, but handle it gracefully
logger.error(f"Unexpected result type for {service_name}: {type(result)}")
service_status[service_name] = {"status": "error", "response_time": None, "error": "Unexpected result type", "metadata": {}}
error_count += 1 error_count += 1
logger.info(f"Health check completed: {healthy_count} healthy, {error_count} errors, {len(SERVICES) - healthy_count - error_count} other statuses") logger.info(
f"Health check completed: {healthy_count} healthy, {error_count} errors, " f"{len(SERVICES) - healthy_count - error_count} other statuses"
)
return service_status return service_status
async def close(self): async def close(self):

View File

@@ -4,7 +4,7 @@ Utilities Package
This package contains utility functions for the service adapters. This package contains utility functions for the service adapters.
""" """
from .time_formatter import format_uptime_for_frontend, format_response_time from .time_formatter import format_response_time, format_uptime_for_frontend
__all__ = [ __all__ = [
"format_uptime_for_frontend", "format_uptime_for_frontend",

View File

@@ -13,36 +13,36 @@ from typing import Optional, Union
def format_uptime_for_frontend(uptime_value: Optional[str]) -> str: def format_uptime_for_frontend(uptime_value: Optional[str]) -> str:
""" """
Format uptime value for frontend display in "Xd Xh Xm" format. Format uptime value for frontend display in "Xd Xh Xm" format.
Args: Args:
uptime_value: Raw uptime value (timestamp, epoch, duration string, etc.) uptime_value: Raw uptime value (timestamp, epoch, duration string, etc.)
Returns: Returns:
Formatted uptime string like "2d 5h 30m" or "0d 0h" if invalid Formatted uptime string like "2d 5h 30m" or "0d 0h" if invalid
""" """
if not uptime_value: if not uptime_value:
return "0d 0h" return "0d 0h"
try: try:
# Try to parse as timestamp (ISO format) # Try to parse as timestamp (ISO format)
if _is_timestamp(uptime_value): if _is_timestamp(uptime_value):
return _format_timestamp_uptime(uptime_value) return _format_timestamp_uptime(uptime_value)
# Try to parse as epoch timestamp # Try to parse as epoch timestamp
if _is_epoch(uptime_value): if _is_epoch(uptime_value):
return _format_epoch_uptime(uptime_value) return _format_epoch_uptime(uptime_value)
# Try to parse as duration string (e.g., "2h 30m", "5d 2h 15m") # Try to parse as duration string (e.g., "2h 30m", "5d 2h 15m")
if _is_duration_string(uptime_value): if _is_duration_string(uptime_value):
return _format_duration_string(uptime_value) return _format_duration_string(uptime_value)
# Try to parse as numeric seconds # Try to parse as numeric seconds
if _is_numeric_seconds(uptime_value): if _is_numeric_seconds(uptime_value):
return _format_seconds_uptime(float(uptime_value)) return _format_seconds_uptime(float(uptime_value))
# If none of the above, return as-is or default # If none of the above, return as-is or default
return uptime_value if len(uptime_value) < 50 else "0d 0h" return uptime_value if len(uptime_value) < 50 else "0d 0h"
except Exception: except Exception:
return "0d 0h" return "0d 0h"
@@ -50,7 +50,7 @@ def format_uptime_for_frontend(uptime_value: Optional[str]) -> str:
def _is_timestamp(value: str) -> bool: def _is_timestamp(value: str) -> bool:
"""Check if value is an ISO timestamp.""" """Check if value is an ISO timestamp."""
try: try:
datetime.fromisoformat(value.replace('Z', '+00:00')) datetime.fromisoformat(value.replace("Z", "+00:00"))
return True return True
except (ValueError, AttributeError): except (ValueError, AttributeError):
return False return False
@@ -68,7 +68,7 @@ def _is_epoch(value: str) -> bool:
def _is_duration_string(value: str) -> bool: def _is_duration_string(value: str) -> bool:
"""Check if value is a duration string like '2h 30m' or '5d 2h 15m'.""" """Check if value is a duration string like '2h 30m' or '5d 2h 15m'."""
# Look for patterns like "2h 30m", "5d 2h 15m", "1d 2h 3m 4s" # Look for patterns like "2h 30m", "5d 2h 15m", "1d 2h 3m 4s"
pattern = r'^\d+[dhms]\s*(\d+[dhms]\s*)*$' pattern = r"^\d+[dhms]\s*(\d+[dhms]\s*)*$"
return bool(re.match(pattern, value.strip())) return bool(re.match(pattern, value.strip()))
@@ -85,14 +85,14 @@ def _format_timestamp_uptime(timestamp: str) -> str:
"""Format timestamp uptime (time since timestamp).""" """Format timestamp uptime (time since timestamp)."""
try: try:
# Parse timestamp # Parse timestamp
dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
if dt.tzinfo is None: if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc) dt = dt.replace(tzinfo=timezone.utc)
# Calculate time difference # Calculate time difference
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
diff = now - dt diff = now - dt
return _format_timedelta(diff) return _format_timedelta(diff)
except Exception: except Exception:
return "0d 0h" return "0d 0h"
@@ -105,7 +105,7 @@ def _format_epoch_uptime(epoch_str: str) -> str:
dt = datetime.fromtimestamp(epoch, tz=timezone.utc) dt = datetime.fromtimestamp(epoch, tz=timezone.utc)
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
diff = now - dt diff = now - dt
return _format_timedelta(diff) return _format_timedelta(diff)
except Exception: except Exception:
return "0d 0h" return "0d 0h"
@@ -129,27 +129,27 @@ def _format_seconds_uptime(seconds: float) -> str:
def _parse_duration_string(duration: str) -> float: def _parse_duration_string(duration: str) -> float:
"""Parse duration string to total seconds.""" """Parse duration string to total seconds."""
total_seconds = 0 total_seconds = 0
# Extract days # Extract days
days_match = re.search(r'(\d+)d', duration) days_match = re.search(r"(\d+)d", duration)
if days_match: if days_match:
total_seconds += int(days_match.group(1)) * 86400 total_seconds += int(days_match.group(1)) * 86400
# Extract hours # Extract hours
hours_match = re.search(r'(\d+)h', duration) hours_match = re.search(r"(\d+)h", duration)
if hours_match: if hours_match:
total_seconds += int(hours_match.group(1)) * 3600 total_seconds += int(hours_match.group(1)) * 3600
# Extract minutes # Extract minutes
minutes_match = re.search(r'(\d+)m', duration) minutes_match = re.search(r"(\d+)m", duration)
if minutes_match: if minutes_match:
total_seconds += int(minutes_match.group(1)) * 60 total_seconds += int(minutes_match.group(1)) * 60
# Extract seconds # Extract seconds
seconds_match = re.search(r'(\d+)s', duration) seconds_match = re.search(r"(\d+)s", duration)
if seconds_match: if seconds_match:
total_seconds += int(seconds_match.group(1)) total_seconds += int(seconds_match.group(1))
return total_seconds return total_seconds
@@ -163,14 +163,14 @@ def _format_timedelta_from_seconds(total_seconds: Union[int, float]) -> str:
"""Format total seconds to "Xd Xh Xm" format.""" """Format total seconds to "Xd Xh Xm" format."""
if total_seconds < 0: if total_seconds < 0:
return "0d 0h" return "0d 0h"
# Convert to int to avoid decimal places # Convert to int to avoid decimal places
total_seconds = int(total_seconds) total_seconds = int(total_seconds)
days = total_seconds // 86400 days = total_seconds // 86400
hours = (total_seconds % 86400) // 3600 hours = (total_seconds % 86400) // 3600
minutes = (total_seconds % 3600) // 60 minutes = (total_seconds % 3600) // 60
# Only show days if > 0 # Only show days if > 0
if days > 0: if days > 0:
return f"{days}d {hours}h {minutes}m" return f"{days}d {hours}h {minutes}m"
@@ -183,16 +183,16 @@ def _format_timedelta_from_seconds(total_seconds: Union[int, float]) -> str:
def format_response_time(seconds: Optional[float]) -> str: def format_response_time(seconds: Optional[float]) -> str:
""" """
Format response time for display. Format response time for display.
Args: Args:
seconds: Response time in seconds seconds: Response time in seconds
Returns: Returns:
Formatted response time string Formatted response time string
""" """
if seconds is None: if seconds is None:
return "N/A" return "N/A"
if seconds < 1: if seconds < 1:
return f"{seconds * 1000:.0f}ms" return f"{seconds * 1000:.0f}ms"
else: else: