feat: Enhance frontend loading experience and service status handling
Some checks failed
Integration Tests / integration-tests (push) Failing after 20s
Integration Tests / performance-tests (push) Has been skipped
Service Adapters (Python FastAPI) / test (3.11) (push) Failing after 23s
Frontend (React) / test (20) (push) Failing after 1m3s
Frontend (React) / build (push) Has been skipped
Frontend (React) / lighthouse (push) Has been skipped
Service Adapters (Python FastAPI) / test (3.12) (push) Failing after 23s
Service Adapters (Python FastAPI) / test (3.13) (push) Failing after 20s
Service Adapters (Python FastAPI) / build (push) Has been skipped

### Summary of Changes
- Removed proxy configuration in `rsbuild.config.js` as the API Gateway is not running.
- Added smooth transitions and gentle loading overlays in CSS for improved user experience during data loading.
- Updated `Dashboard` component to conditionally display loading spinner and gentle loading overlay based on data fetching state.
- Enhanced `useOfflineAwareServiceStatus` and `useOfflineAwareSystemData` hooks to manage loading states and service status more effectively.
- Increased refresh intervals for service status and system data to reduce API call frequency.

### Expected Results
- Improved user experience with smoother loading transitions and better feedback during data refreshes.
- Enhanced handling of service status checks, providing clearer information when services are unavailable.
- Streamlined code for managing loading states, making it easier to maintain and extend in the future.
This commit is contained in:
GSRN
2025-09-18 11:09:51 +02:00
parent 48c755dff3
commit 7373ccfa1d
30 changed files with 2402 additions and 89 deletions

View File

@@ -0,0 +1,280 @@
# Health Checking System
This document describes the generalized health checking system for LabFusion Service Adapters.
## Overview
The health checking system is designed to be flexible and extensible, supporting different types of health checks for different services. It uses a strategy pattern with pluggable health checkers.
## Architecture
### Core Components
1. **BaseHealthChecker**: Abstract base class for all health checkers
2. **HealthCheckResult**: Standardized result object
3. **HealthCheckerRegistry**: Registry for different checker types
4. **HealthCheckerFactory**: Factory for creating checker instances
5. **ServiceStatusChecker**: Main orchestrator
### Health Checker Types
#### 1. API Health Checker (`APIHealthChecker`)
- **Purpose**: Check services with HTTP health endpoints
- **Use Case**: Most REST APIs, microservices
- **Configuration**:
```python
{
"health_check_type": "api",
"health_endpoint": "/api/health",
"url": "https://service.example.com"
}
```
#### 2. Sensor Health Checker (`SensorHealthChecker`)
- **Purpose**: Check services via sensor data (e.g., Home Assistant entities)
- **Use Case**: Home Assistant, IoT devices, sensor-based monitoring
- **Configuration**:
```python
{
"health_check_type": "sensor",
"sensor_entity": "sensor.system_uptime",
"url": "https://homeassistant.example.com"
}
```
#### 3. Custom Health Checker (`CustomHealthChecker`)
- **Purpose**: Complex health checks with multiple validation steps
- **Use Case**: Services requiring multiple checks, custom logic
- **Configuration**:
```python
{
"health_check_type": "custom",
"health_checks": [
{
"type": "api",
"name": "main_api",
"url": "https://service.example.com/api/health"
},
{
"type": "sensor",
"name": "uptime_sensor",
"sensor_entity": "sensor.service_uptime"
}
]
}
```
## Configuration
### Service Configuration Structure
```python
SERVICES = {
"service_name": {
"url": "https://service.example.com",
"enabled": True,
"health_check_type": "api|sensor|custom",
# API-specific
"health_endpoint": "/api/health",
"token": "auth_token",
"api_key": "api_key",
# Sensor-specific
"sensor_entity": "sensor.entity_name",
# Custom-specific
"health_checks": [
{
"type": "api",
"name": "check_name",
"url": "https://endpoint.com/health"
}
]
}
}
```
### Environment Variables
```bash
# Service URLs
HOME_ASSISTANT_URL=https://ha.example.com
FRIGATE_URL=http://frigate.local:5000
IMMICH_URL=http://immich.local:2283
N8N_URL=http://n8n.local:5678
# Authentication
HOME_ASSISTANT_TOKEN=your_token
FRIGATE_TOKEN=your_token
IMMICH_API_KEY=your_key
N8N_API_KEY=your_key
```
## Usage Examples
### Basic API Health Check
```python
from services.health_checkers import factory
# Create API checker
checker = factory.create_checker("api", timeout=5.0)
# Check service
config = {
"url": "https://api.example.com",
"health_endpoint": "/health",
"enabled": True
}
result = await checker.check_health("example_service", config)
print(f"Status: {result.status}")
print(f"Response time: {result.response_time}s")
```
### Sensor-Based Health Check
```python
# Create sensor checker
checker = factory.create_checker("sensor", timeout=5.0)
# Check Home Assistant sensor
config = {
"url": "https://ha.example.com",
"sensor_entity": "sensor.system_uptime",
"token": "your_token",
"enabled": True
}
result = await checker.check_health("home_assistant", config)
print(f"Uptime: {result.metadata.get('sensor_state')}")
```
### Custom Health Check
```python
# Create custom checker
checker = factory.create_checker("custom", timeout=10.0)
# Check with multiple validations
config = {
"url": "https://service.example.com",
"enabled": True,
"health_checks": [
{
"type": "api",
"name": "main_api",
"url": "https://service.example.com/api/health"
},
{
"type": "api",
"name": "database",
"url": "https://service.example.com/api/db/health"
}
]
}
result = await checker.check_health("complex_service", config)
print(f"Overall status: {result.status}")
print(f"Individual checks: {result.metadata.get('check_results')}")
```
## Health Check Results
### HealthCheckResult Structure
```python
{
"status": "healthy|unhealthy|disabled|error|timeout|unauthorized|forbidden",
"response_time": 0.123, # seconds
"error": "Error message if applicable",
"metadata": {
"http_status": 200,
"response_size": 1024,
"sensor_state": "12345",
"last_updated": "2024-01-15T10:30:00Z"
}
}
```
### Status Values
- **healthy**: Service is responding normally
- **unhealthy**: Service responded but with error status
- **disabled**: Service is disabled in configuration
- **timeout**: Request timed out
- **unauthorized**: Authentication required (HTTP 401)
- **forbidden**: Access forbidden (HTTP 403)
- **error**: Network or other error occurred
## Extending the System
### Adding a New Health Checker
1. **Create the checker class**:
```python
from .base import BaseHealthChecker, HealthCheckResult
class MyCustomChecker(BaseHealthChecker):
async def check_health(self, service_name: str, config: Dict) -> HealthCheckResult:
# Implementation
pass
```
2. **Register the checker**:
```python
from services.health_checkers import registry
registry.register("my_custom", MyCustomChecker)
```
3. **Use in configuration**:
```python
{
"health_check_type": "my_custom",
"custom_param": "value"
}
```
### Service-Specific Logic
The factory automatically selects the appropriate checker based on:
1. `health_check_type` in configuration
2. Service name patterns
3. Configuration presence (e.g., `sensor_entity` → sensor checker)
## Performance Considerations
- **Concurrent Checking**: All services are checked simultaneously
- **Checker Caching**: Checkers are cached per service to avoid recreation
- **Timeout Management**: Configurable timeouts per checker type
- **Resource Cleanup**: Proper cleanup of HTTP clients
## Monitoring and Logging
- **Debug Logs**: Detailed operation logs for troubleshooting
- **Performance Metrics**: Response times and success rates
- **Error Tracking**: Comprehensive error logging with context
- **Health Summary**: Overall system health statistics
## Best Practices
1. **Choose Appropriate Checker**: Use the right checker type for your service
2. **Set Reasonable Timeouts**: Balance responsiveness with reliability
3. **Handle Errors Gracefully**: Always provide meaningful error messages
4. **Monitor Performance**: Track response times and success rates
5. **Test Thoroughly**: Verify health checks work in all scenarios
6. **Document Configuration**: Keep service configurations well-documented
## Troubleshooting
### Common Issues
1. **Timeout Errors**: Increase timeout or check network connectivity
2. **Authentication Failures**: Verify tokens and API keys
3. **Sensor Not Found**: Check entity names and permissions
4. **Configuration Errors**: Validate service configuration structure
### Debug Tools
- **Debug Endpoint**: `/debug/logging` to test logging configuration
- **Health Check Logs**: Detailed logs for each health check operation
- **Metadata Inspection**: Check metadata for additional context

View File

@@ -0,0 +1,148 @@
# Unified Logging Configuration
This document describes the unified logging setup and usage in the LabFusion Service Adapters.
## Overview
The service adapters use Python's built-in `logging` module with a centralized configuration system that provides **unified logging for both application logs and incoming request logs**. All logs use the same format, handler, and configuration for consistency and easier monitoring.
## Logging Levels
- **DEBUG**: Detailed information for debugging (status checker operations)
- **INFO**: General information about application flow
- **WARNING**: Warning messages for non-critical issues
- **ERROR**: Error messages for failed operations
- **CRITICAL**: Critical errors that may cause application failure
## Configuration
Logging is configured in `services/logging_config.py` with unified settings:
- **Root Level**: INFO
- **Status Checker**: DEBUG (detailed health check logging)
- **Routes**: INFO (API endpoint logging)
- **Request Logging**: INFO (unified with application logs)
- **HTTP Client**: WARNING (reduced verbosity)
- **Unified Handler**: Single handler for all log types
## Log Format
**Unified Format** (same for application and request logs):
```
2024-01-15 10:30:45,123 - services.status_checker - INFO - status_checker.py:140 - Starting health check for 4 services
2024-01-15 10:30:45,124 - uvicorn.access - INFO - logging_middleware.py:45 - Request started: GET /services from 192.168.1.100
2024-01-15 10:30:45,125 - routes.general - INFO - general.py:78 - Service status endpoint called - checking all services
2024-01-15 10:30:45,126 - uvicorn.access - INFO - logging_middleware.py:55 - Request completed: GET /services -> 200 in 0.123s
```
Format includes:
- Timestamp
- Logger name (unified across all log types)
- Log level
- Filename and line number
- Message
## Usage Examples
### Basic Logging
```python
import logging
from services.logging_config import get_logger
logger = get_logger(__name__)
logger.debug("Debug information")
logger.info("General information")
logger.warning("Warning message")
logger.error("Error occurred")
```
### Request Logging
```python
from services.logging_config import get_request_logger
request_logger = get_request_logger()
request_logger.info("Custom request log message")
```
### Application Logging
```python
from services.logging_config import get_application_logger
app_logger = get_application_logger()
app_logger.info("Application-level log message")
```
### Service Status Logging
The status checker automatically logs:
- Health check start/completion
- Individual service responses
- Response times
- Error conditions
- Authentication status
### API Endpoint Logging
Routes log:
- Endpoint calls
- Request processing
- Response generation
### Request Middleware Logging
The logging middleware automatically logs:
- Request start (method, path, client IP, user agent)
- Request completion (status code, processing time)
- Request errors (exceptions, processing time)
## Debug Endpoint
A debug endpoint is available at `/debug/logging` to:
- Test unified log levels across all logger types
- View current configuration
- Verify unified logging setup
- Test request, application, and route loggers
## Environment Variables
You can control logging behavior with environment variables:
```bash
# Set log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
export LOG_LEVEL=DEBUG
# Disable timestamps
export LOG_NO_TIMESTAMP=true
```
## Log Files
Currently, logs are output to stdout. For production, consider:
- File logging with rotation
- Structured logging (JSON)
- Log aggregation (ELK stack, Fluentd)
- Log levels per environment
## Troubleshooting
### No Logs Appearing
1. Check log level configuration
2. Verify logger names match module names
3. Ensure logging is initialized before use
### Too Many Logs
1. Increase log level to WARNING or ERROR
2. Disable DEBUG logging for specific modules
3. Use log filtering
### Performance Impact
1. Use appropriate log levels
2. Avoid logging in tight loops
3. Consider async logging for high-volume applications
## Best Practices
1. **Use appropriate levels**: DEBUG for development, INFO for production
2. **Include context**: Service names, request IDs, user information
3. **Structured messages**: Consistent format for parsing
4. **Avoid sensitive data**: No passwords, tokens, or personal information
5. **Performance**: Log asynchronously when possible
6. **Monitoring**: Set up alerts for ERROR and CRITICAL levels

View File

@@ -1,8 +1,28 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
# Import route modules
from middleware import LoggingMiddleware
from routes import events, frigate, general, home_assistant, immich
from services.logging_config import get_application_logger, setup_logging
from services.status_checker import status_checker
# Set up unified logging for both application and request logs
setup_logging(level="INFO", enable_request_logging=True)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifespan events."""
# Startup
logger = get_application_logger()
logger.info("LabFusion Service Adapters starting up")
yield
# Shutdown
logger.info("LabFusion Service Adapters shutting down")
await status_checker.close()
# Create FastAPI app
app = FastAPI(
@@ -14,8 +34,12 @@ app = FastAPI(
{"url": "http://localhost:8001", "description": "Development Server"},
{"url": "https://adapters.labfusion.dev", "description": "Production Server"},
],
lifespan=lifespan,
)
# Add custom logging middleware first (runs last in the chain)
app.add_middleware(LoggingMiddleware)
# CORS middleware
app.add_middleware(
CORSMiddleware,
@@ -35,4 +59,11 @@ app.include_router(events.router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8001)
# Configure uvicorn to use our unified logging
uvicorn.run(
app,
host="127.0.0.1",
port=8001,
log_config=None, # Disable uvicorn's default logging config
access_log=True, # Enable access logging
)

View File

@@ -0,0 +1,9 @@
"""
Middleware Package
This package contains custom middleware for the service adapters.
"""
from .logging_middleware import LoggingMiddleware
__all__ = ["LoggingMiddleware"]

View File

@@ -0,0 +1,75 @@
"""
Logging Middleware
This module provides custom logging middleware for FastAPI requests
to ensure consistent logging format with application logs.
"""
import logging
import time
from typing import Callable
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from services.logging_config import get_request_logger
logger = get_request_logger()
class LoggingMiddleware(BaseHTTPMiddleware):
"""Custom logging middleware for unified request logging."""
async def dispatch(self, request: Request, call_next: Callable) -> Response:
"""
Log each request with unified formatting.
Args:
request: The incoming request
call_next: The next middleware/handler in the chain
Returns:
The response
"""
# Start timing
start_time = time.time()
# Extract request information
method = request.method
url_path = request.url.path
client_ip = request.client.host if request.client else "unknown"
user_agent = request.headers.get("user-agent", "unknown")
# Log request start
logger.info(
f"Request started: {method} {url_path} from {client_ip} "
f"(User-Agent: {user_agent})"
)
try:
# Process the request
response = await call_next(request)
# Calculate processing time
process_time = time.time() - start_time
# Log successful response
logger.info(
f"Request completed: {method} {url_path} -> "
f"{response.status_code} in {process_time:.3f}s"
)
return response
except Exception as e:
# Calculate processing time for failed requests
process_time = time.time() - start_time
# Log error
logger.error(
f"Request failed: {method} {url_path} -> "
f"Exception: {str(e)} in {process_time:.3f}s"
)
# Re-raise the exception
raise

View File

@@ -6,7 +6,11 @@ from pydantic import BaseModel, Field
class ServiceStatus(BaseModel):
enabled: bool = Field(..., description="Whether the service is enabled")
url: str = Field(..., description="Service URL")
status: str = Field(..., description="Service status")
status: str = Field(..., description="Service status (healthy, unhealthy, disabled, error, timeout, unauthorized, forbidden)")
response_time: Optional[float] = Field(None, description="Response time in seconds")
error: Optional[str] = Field(None, description="Error message if status is not healthy")
uptime: Optional[str] = Field(None, description="Service uptime information (for sensor-based checks)")
metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional metadata from health check")
class HAAttributes(BaseModel):

View File

@@ -1,9 +1,14 @@
import logging
from datetime import datetime
from fastapi import APIRouter
from models.schemas import HealthResponse, RootResponse, ServiceStatus
from services.config import SERVICES
from services.status_checker import status_checker
# Configure logger
logger = logging.getLogger(__name__)
router = APIRouter()
@@ -29,9 +34,91 @@ async def root():
)
async def health_check():
"""Check the health status of the service adapters"""
logger.debug("Health check endpoint called")
return HealthResponse(status="healthy", timestamp=datetime.now().isoformat())
@router.get(
"/debug/logging",
summary="Logging Debug Info",
description="Get current logging configuration and test log levels",
tags=["Debug"],
)
async def debug_logging():
"""Debug endpoint to test unified logging configuration"""
# Test different log levels
logger.debug("This is a DEBUG message from routes.general")
logger.info("This is an INFO message from routes.general")
logger.warning("This is a WARNING message from routes.general")
logger.error("This is an ERROR message from routes.general")
# Test request logger
from services.logging_config import get_request_logger
request_logger = get_request_logger()
request_logger.info("This is a request logger message")
# Test application logger
from services.logging_config import get_application_logger
app_logger = get_application_logger()
app_logger.info("This is an application logger message")
# Get current logging configuration
root_logger = logging.getLogger()
config_info = {
"root_level": logging.getLevelName(root_logger.level),
"handlers": [str(h) for h in root_logger.handlers],
"handler_count": len(root_logger.handlers),
"status_checker_level": logging.getLevelName(logging.getLogger("services.status_checker").level),
"general_level": logging.getLevelName(logging.getLogger("routes.general").level),
"request_logger_level": logging.getLevelName(request_logger.level),
"application_logger_level": logging.getLevelName(app_logger.level),
"uvicorn_access_level": logging.getLevelName(logging.getLogger("uvicorn.access").level),
}
logger.info("Unified logging debug info requested")
return {
"message": "Unified log messages sent to console",
"config": config_info,
"note": "All logs now use the same format and handler"
}
@router.get(
"/debug/sensor/{service_name}",
summary="Debug Sensor Data",
description="Get raw sensor data for debugging health check issues",
tags=["Debug"],
)
async def debug_sensor(service_name: str):
"""Debug endpoint to inspect raw sensor data"""
from services.config import SERVICES
from services.health_checkers import factory
if service_name not in SERVICES:
return {"error": f"Service {service_name} not found"}
config = SERVICES[service_name]
if config.get("health_check_type") != "sensor":
return {"error": f"Service {service_name} is not using sensor health checking"}
try:
# Create sensor checker
checker = factory.create_checker("sensor", timeout=10.0)
# Get raw sensor data
result = await checker.check_health(service_name, config)
return {
"service_name": service_name,
"config": config,
"result": result.to_dict(),
"raw_sensor_data": result.metadata
}
except Exception as e:
logger.error(f"Error debugging sensor for {service_name}: {e}")
return {"error": str(e)}
@router.get(
"/services",
response_model=dict,
@@ -41,11 +128,23 @@ async def health_check():
)
async def get_services():
"""Get status of all configured external services (Home Assistant, Frigate, Immich, n8n)"""
logger.info("Service status endpoint called - checking all services")
# Check all services concurrently
status_results = await status_checker.check_all_services()
service_status = {}
for service_name, config in SERVICES.items():
status_info = status_results.get(service_name, {})
service_status[service_name] = ServiceStatus(
enabled=config["enabled"],
url=config["url"],
status="unknown", # Would check actual service status
status=status_info.get("status", "unknown"),
response_time=status_info.get("response_time"),
error=status_info.get("error"),
uptime=status_info.get("uptime"),
metadata=status_info.get("metadata", {})
)
logger.info(f"Service status check completed - returning status for {len(service_status)} services")
return service_status

View File

@@ -1,3 +1,4 @@
from operator import truediv
import os
from dotenv import load_dotenv
@@ -8,23 +9,32 @@ load_dotenv()
# Service configurations
SERVICES = {
"home_assistant": {
"url": os.getenv("HOME_ASSISTANT_URL", "https://homeassistant.local:8123"),
"token": os.getenv("HOME_ASSISTANT_TOKEN", ""),
"enabled": bool(os.getenv("HOME_ASSISTANT_TOKEN")),
"url": os.getenv("HOME_ASSISTANT_URL", "http://192.168.2.158:8123"),
"token": os.getenv("HOME_ASSISTANT_TOKEN", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiI3MjdiY2QwMjNkNmM0NzgzYmRiMzg2ZDYxYzQ3N2NmYyIsImlhdCI6MTc1ODE4MDg2MiwiZXhwIjoyMDczNTQwODYyfQ.rN_dBtYmXIo4J1DffgWb6G0KLsgaQ6_kH-kiWJeQQQM"),
"enabled": True,
"health_check_type": "sensor", # Use sensor-based health checking
"sensor_entity": "sensor.uptime_34", # Check uptime sensor
"health_endpoint": "/api/", # Fallback API endpoint
},
"frigate": {
"url": os.getenv("FRIGATE_URL", "http://frigate.local:5000"),
"token": os.getenv("FRIGATE_TOKEN", ""),
"enabled": bool(os.getenv("FRIGATE_TOKEN")),
"health_check_type": "api",
"health_endpoint": "/api/version",
},
"immich": {
"url": os.getenv("IMMICH_URL", "http://immich.local:2283"),
"api_key": os.getenv("IMMICH_API_KEY", ""),
"enabled": bool(os.getenv("IMMICH_API_KEY")),
"health_check_type": "api",
"health_endpoint": "/api/server-info/ping",
},
"n8n": {
"url": os.getenv("N8N_URL", "http://n8n.local:5678"),
"webhook_url": os.getenv("N8N_WEBHOOK_URL", ""),
"enabled": bool(os.getenv("N8N_WEBHOOK_URL")),
"health_check_type": "api",
"health_endpoint": "/healthz",
},
}

View File

@@ -0,0 +1,23 @@
"""
Health Checkers Package
This package provides various health checking strategies for different service types.
"""
from .api_checker import APIHealthChecker
from .base import BaseHealthChecker, HealthCheckResult
from .custom_checker import CustomHealthChecker
from .registry import HealthCheckerFactory, HealthCheckerRegistry, factory, registry
from .sensor_checker import SensorHealthChecker
__all__ = [
"BaseHealthChecker",
"HealthCheckResult",
"APIHealthChecker",
"SensorHealthChecker",
"CustomHealthChecker",
"HealthCheckerRegistry",
"HealthCheckerFactory",
"registry",
"factory",
]

View File

@@ -0,0 +1,166 @@
"""
API Health Checker
This module provides health checking for services that expose health endpoints.
"""
import logging
import time
from typing import Any, Dict, Optional
import httpx
from httpx import HTTPError, TimeoutException
from .base import BaseHealthChecker, HealthCheckResult
from utils.time_formatter import format_uptime_for_frontend
logger = logging.getLogger(__name__)
class APIHealthChecker(BaseHealthChecker):
"""Health checker for services with API health endpoints."""
async def check_health(
self,
service_name: str,
config: Dict[str, Any]
) -> HealthCheckResult:
"""
Check health via API endpoint.
Args:
service_name: Name of the service
config: Service configuration
Returns:
HealthCheckResult with status information
"""
logger.debug(f"Starting API health check for {service_name}")
if not config.get("enabled", False):
logger.debug(f"Service {service_name} is disabled")
return HealthCheckResult("disabled")
url = config.get("url")
if not url:
logger.warning(f"Service {service_name} has no URL configured")
return HealthCheckResult("error", error="No URL configured")
# Get health endpoint from config or use default
health_endpoint = config.get("health_endpoint", "/")
health_url = f"{url.rstrip('/')}{health_endpoint}"
logger.debug(f"Checking {service_name} at {health_url}")
try:
start_time = time.time()
headers = self._get_auth_headers(service_name, config)
response = await self.client.get(health_url, headers=headers)
response_time = time.time() - start_time
logger.info(f"Service {service_name} responded with status {response.status_code} in {response_time:.3f}s")
# Determine health status based on response
if response.status_code == 200:
# Check if response body indicates health
health_status = self._parse_health_response(response, service_name)
# Try to extract uptime from response
uptime_info = self._extract_uptime_from_response(response, service_name)
formatted_uptime = format_uptime_for_frontend(uptime_info)
metadata = {
"http_status": response.status_code,
"response_size": len(response.content),
"health_status": health_status
}
return HealthCheckResult("healthy", response_time, metadata=metadata, uptime=formatted_uptime)
elif response.status_code == 401:
logger.warning(f"Service {service_name} returned 401 - authentication required")
return HealthCheckResult("unauthorized", response_time, "Authentication required")
elif response.status_code == 403:
logger.warning(f"Service {service_name} returned 403 - access forbidden")
return HealthCheckResult("forbidden", response_time, "Access forbidden")
else:
logger.warning(f"Service {service_name} returned {response.status_code}")
return HealthCheckResult("unhealthy", response_time, f"HTTP {response.status_code}")
except TimeoutException:
logger.error(f"Service {service_name} timed out after {self.timeout}s")
return HealthCheckResult("timeout", error=f"Request timed out after {self.timeout}s")
except HTTPError as e:
logger.error(f"HTTP error checking {service_name}: {str(e)}")
return HealthCheckResult("error", error=f"HTTP error: {str(e)}")
except Exception as e:
logger.error(f"Unexpected error checking {service_name}: {str(e)}")
return HealthCheckResult("error", error=f"Unexpected error: {str(e)}")
def _parse_health_response(self, response: httpx.Response, service_name: str) -> str:
"""
Parse health response to determine actual health status.
Args:
response: HTTP response
service_name: Name of the service
Returns:
Health status string
"""
try:
# Try to parse JSON response
data = response.json()
# Service-specific health parsing
if service_name == "home_assistant":
# Home Assistant returns {"message": "API running."} for healthy
return "healthy" if data.get("message") == "API running." else "unhealthy"
elif service_name == "frigate":
# Frigate version endpoint returns version info
return "healthy" if "version" in data else "unhealthy"
elif service_name == "immich":
# Immich ping endpoint returns {"res": "pong"}
return "healthy" if data.get("res") == "pong" else "unhealthy"
elif service_name == "n8n":
# n8n health endpoint returns {"status": "ok"}
return "healthy" if data.get("status") == "ok" else "unhealthy"
else:
# Generic check - if we got JSON, assume healthy
return "healthy"
except Exception as e:
logger.debug(f"Could not parse JSON response from {service_name}: {e}")
# If we can't parse JSON but got 200, assume healthy
return "healthy"
def _extract_uptime_from_response(self, response: httpx.Response, service_name: str) -> Optional[str]:
"""
Extract uptime information from API response.
Args:
response: HTTP response
service_name: Name of the service
Returns:
Uptime information string or None
"""
try:
data = response.json()
# Service-specific uptime extraction
if service_name == "frigate":
# Frigate might have uptime in version response
return data.get("uptime")
elif service_name == "immich":
# Immich might have server info with uptime
return data.get("uptime")
elif service_name == "n8n":
# n8n health endpoint might have uptime
return data.get("uptime")
else:
# Generic uptime extraction
return data.get("uptime") or data.get("uptime_seconds")
except Exception as e:
logger.debug(f"Could not extract uptime from {service_name} response: {e}")
return None

View File

@@ -0,0 +1,105 @@
"""
Base Health Checker
This module provides the abstract base class and interfaces for different
health checking strategies.
"""
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Tuple
import httpx
logger = logging.getLogger(__name__)
class HealthCheckResult:
"""Result of a health check operation."""
def __init__(
self,
status: str,
response_time: Optional[float] = None,
error: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
uptime: Optional[str] = None
):
self.status = status
self.response_time = response_time
self.error = error
self.metadata = metadata or {}
self.uptime = uptime
def to_dict(self) -> Dict[str, Any]:
"""Convert result to dictionary."""
return {
"status": self.status,
"response_time": self.response_time,
"error": self.error,
"uptime": self.uptime,
"metadata": self.metadata
}
class BaseHealthChecker(ABC):
"""Abstract base class for health checkers."""
def __init__(self, timeout: float = 5.0):
"""
Initialize the health checker.
Args:
timeout: Request timeout in seconds
"""
self.timeout = timeout
self.client = httpx.AsyncClient(timeout=timeout)
logger.debug(f"Initialized {self.__class__.__name__} with timeout: {timeout}s")
@abstractmethod
async def check_health(
self,
service_name: str,
config: Dict[str, Any]
) -> HealthCheckResult:
"""
Check the health of a service.
Args:
service_name: Name of the service
config: Service configuration
Returns:
HealthCheckResult with status information
"""
pass
def _get_auth_headers(self, service_name: str, config: Dict[str, Any]) -> Dict[str, str]:
"""
Get authentication headers for the service.
Args:
service_name: Name of the service
config: Service configuration
Returns:
Dictionary of headers
"""
headers = {"User-Agent": "LabFusion-ServiceAdapters/1.0.0"}
# Service-specific authentication
if service_name == "home_assistant" and config.get("token"):
headers["Authorization"] = f"Bearer {config['token']}"
elif service_name == "frigate" and config.get("token"):
headers["X-API-Key"] = config["token"]
elif service_name == "immich" and config.get("api_key"):
headers["X-API-Key"] = config["api_key"]
elif service_name == "n8n" and config.get("api_key"):
headers["X-API-Key"] = config["api_key"]
return headers
async def close(self):
"""Close the HTTP client."""
await self.client.aclose()
logger.debug(f"Closed {self.__class__.__name__} HTTP client")

View File

@@ -0,0 +1,173 @@
"""
Custom Health Checker
This module provides health checking for services that require custom
health check logic or multiple checks.
"""
import logging
import time
from typing import Any, Dict, List
import httpx
from httpx import HTTPError, TimeoutException
from .base import BaseHealthChecker, HealthCheckResult
logger = logging.getLogger(__name__)
class CustomHealthChecker(BaseHealthChecker):
"""Health checker for services requiring custom health check logic."""
async def check_health(
self,
service_name: str,
config: Dict[str, Any]
) -> HealthCheckResult:
"""
Check health using custom logic.
Args:
service_name: Name of the service
config: Service configuration
Returns:
HealthCheckResult with status information
"""
logger.debug(f"Starting custom health check for {service_name}")
if not config.get("enabled", False):
logger.debug(f"Service {service_name} is disabled")
return HealthCheckResult("disabled")
# Get custom health check configuration
health_checks = config.get("health_checks", [])
if not health_checks:
logger.warning(f"Service {service_name} has no health_checks configured")
return HealthCheckResult("error", error="No health checks configured")
# Run all health checks
results = []
overall_start_time = time.time()
for check_config in health_checks:
check_result = await self._run_single_check(service_name, check_config)
results.append(check_result)
overall_response_time = time.time() - overall_start_time
# Determine overall health status
overall_status = self._determine_overall_status(results)
metadata = {
"total_checks": len(health_checks),
"check_results": [result.to_dict() for result in results],
"overall_response_time": overall_response_time
}
return HealthCheckResult(overall_status, overall_response_time, metadata=metadata)
async def _run_single_check(
self,
service_name: str,
check_config: Dict[str, Any]
) -> HealthCheckResult:
"""
Run a single health check.
Args:
service_name: Name of the service
check_config: Configuration for this specific check
Returns:
HealthCheckResult for this check
"""
check_type = check_config.get("type", "api")
check_name = check_config.get("name", "unknown")
logger.debug(f"Running {check_type} check '{check_name}' for {service_name}")
if check_type == "api":
return await self._api_check(service_name, check_config)
elif check_type == "sensor":
return await self._sensor_check(service_name, check_config)
elif check_type == "ping":
return await self._ping_check(service_name, check_config)
else:
logger.warning(f"Unknown check type '{check_type}' for {service_name}")
return HealthCheckResult("error", error=f"Unknown check type: {check_type}")
async def _api_check(self, service_name: str, check_config: Dict[str, Any]) -> HealthCheckResult:
"""Run an API-based health check."""
url = check_config.get("url")
if not url:
return HealthCheckResult("error", error="No URL in check config")
try:
start_time = time.time()
headers = self._get_auth_headers(service_name, check_config)
response = await self.client.get(url, headers=headers)
response_time = time.time() - start_time
if response.status_code == 200:
return HealthCheckResult("healthy", response_time)
else:
return HealthCheckResult("unhealthy", response_time, f"HTTP {response.status_code}")
except Exception as e:
return HealthCheckResult("error", error=str(e))
async def _sensor_check(self, service_name: str, check_config: Dict[str, Any]) -> HealthCheckResult:
"""Run a sensor-based health check."""
# This would be similar to the sensor checker logic
# For now, delegate to API check with sensor endpoint
sensor_entity = check_config.get("sensor_entity")
if not sensor_entity:
return HealthCheckResult("error", error="No sensor_entity in check config")
# Build sensor URL
base_url = check_config.get("url", "")
sensor_url = f"{base_url.rstrip('/')}/api/states/{sensor_entity}"
# Update check config with sensor URL
check_config["url"] = sensor_url
return await self._api_check(service_name, check_config)
async def _ping_check(self, service_name: str, check_config: Dict[str, Any]) -> HealthCheckResult:
"""Run a ping-based health check."""
# This would implement actual ping logic
# For now, just do a basic HTTP check
return await self._api_check(service_name, check_config)
def _determine_overall_status(self, results: List[HealthCheckResult]) -> str:
"""
Determine overall health status from multiple check results.
Args:
results: List of individual check results
Returns:
Overall health status
"""
if not results:
return "error"
# Count statuses
status_counts = {}
for result in results:
status = result.status
status_counts[status] = status_counts.get(status, 0) + 1
# Determine overall status based on priority
if status_counts.get("healthy", 0) == len(results):
return "healthy"
elif status_counts.get("unhealthy", 0) > 0:
return "unhealthy"
elif status_counts.get("timeout", 0) > 0:
return "timeout"
elif status_counts.get("error", 0) > 0:
return "error"
else:
return "unknown"

View File

@@ -0,0 +1,135 @@
"""
Health Checker Registry
This module provides a registry and factory for different health checker types.
"""
import logging
from typing import Any, Dict, Type
from .api_checker import APIHealthChecker
from .base import BaseHealthChecker
from .custom_checker import CustomHealthChecker
from .sensor_checker import SensorHealthChecker
logger = logging.getLogger(__name__)
class HealthCheckerRegistry:
"""Registry for health checker types."""
def __init__(self):
"""Initialize the registry with default checkers."""
self._checkers: Dict[str, Type[BaseHealthChecker]] = {
"api": APIHealthChecker,
"sensor": SensorHealthChecker,
"custom": CustomHealthChecker,
}
logger.debug(f"Initialized health checker registry with {len(self._checkers)} checkers")
def register(self, name: str, checker_class: Type[BaseHealthChecker]) -> None:
"""
Register a new health checker type.
Args:
name: Name of the checker type
checker_class: Health checker class
"""
self._checkers[name] = checker_class
logger.info(f"Registered health checker: {name}")
def get_checker(self, name: str) -> Type[BaseHealthChecker]:
"""
Get a health checker class by name.
Args:
name: Name of the checker type
Returns:
Health checker class
Raises:
ValueError: If checker type not found
"""
if name not in self._checkers:
available = ", ".join(self._checkers.keys())
raise ValueError(f"Unknown health checker type '{name}'. Available: {available}")
return self._checkers[name]
def list_checkers(self) -> list[str]:
"""
List all available health checker types.
Returns:
List of checker type names
"""
return list(self._checkers.keys())
class HealthCheckerFactory:
"""Factory for creating health checker instances."""
def __init__(self, registry: HealthCheckerRegistry = None):
"""
Initialize the factory.
Args:
registry: Health checker registry (uses default if None)
"""
self.registry = registry or HealthCheckerRegistry()
logger.debug("Initialized health checker factory")
def create_checker(
self,
checker_type: str,
timeout: float = 5.0
) -> BaseHealthChecker:
"""
Create a health checker instance.
Args:
checker_type: Type of checker to create
timeout: Request timeout in seconds
Returns:
Health checker instance
"""
checker_class = self.registry.get_checker(checker_type)
checker = checker_class(timeout=timeout)
logger.debug(f"Created {checker_type} health checker with timeout {timeout}s")
return checker
def create_checker_for_service(
self,
service_name: str,
config: Dict[str, Any],
timeout: float = 5.0
) -> BaseHealthChecker:
"""
Create a health checker for a specific service based on its configuration.
Args:
service_name: Name of the service
config: Service configuration
timeout: Request timeout in seconds
Returns:
Health checker instance
"""
# Determine checker type from config
checker_type = config.get("health_check_type", "api")
# Override based on service-specific logic
if service_name == "home_assistant" and config.get("sensor_entity"):
checker_type = "sensor"
elif config.get("health_checks"):
checker_type = "custom"
logger.debug(f"Creating {checker_type} checker for {service_name}")
return self.create_checker(checker_type, timeout)
# Global registry and factory instances
registry = HealthCheckerRegistry()
factory = HealthCheckerFactory(registry)

View File

@@ -0,0 +1,220 @@
"""
Sensor Health Checker
This module provides health checking for services that expose health information
via sensors (like Home Assistant entities).
"""
import logging
import time
from typing import Any, Dict, Optional
import httpx
from httpx import HTTPError, TimeoutException
from .base import BaseHealthChecker, HealthCheckResult
from utils.time_formatter import format_uptime_for_frontend
logger = logging.getLogger(__name__)
class SensorHealthChecker(BaseHealthChecker):
"""Health checker for services with sensor-based health information."""
async def check_health(
self,
service_name: str,
config: Dict[str, Any]
) -> HealthCheckResult:
"""
Check health via sensor data.
Args:
service_name: Name of the service
config: Service configuration
Returns:
HealthCheckResult with status information
"""
logger.debug(f"Starting sensor health check for {service_name}")
if not config.get("enabled", False):
logger.debug(f"Service {service_name} is disabled")
return HealthCheckResult("disabled")
url = config.get("url")
if not url:
logger.warning(f"Service {service_name} has no URL configured")
return HealthCheckResult("error", error="No URL configured")
# Get sensor configuration
sensor_entity = config.get("sensor_entity")
if not sensor_entity:
logger.warning(f"Service {service_name} has no sensor_entity configured")
return HealthCheckResult("error", error="No sensor entity configured")
# Build sensor API URL
sensor_url = f"{url.rstrip('/')}/api/states/{sensor_entity}"
logger.debug(f"Checking {service_name} sensor {sensor_entity} at {sensor_url}")
try:
start_time = time.time()
headers = self._get_auth_headers(service_name, config)
response = await self.client.get(sensor_url, headers=headers)
response_time = time.time() - start_time
logger.info(f"Service {service_name} sensor responded with status {response.status_code} in {response_time:.3f}s")
if response.status_code == 200:
# Parse sensor data
sensor_data = response.json()
logger.debug(f"Raw sensor data for {service_name}: {sensor_data}")
health_status = self._parse_sensor_data(sensor_data, service_name)
logger.info(f"Parsed health status for {service_name}: {health_status}")
# Extract uptime information for top-level field
uptime_info = self._extract_uptime_info(sensor_data, service_name)
# Format uptime for frontend display
formatted_uptime = format_uptime_for_frontend(uptime_info)
metadata = {
"http_status": response.status_code,
"sensor_entity": sensor_entity,
"sensor_state": sensor_data.get("state"),
"sensor_attributes": sensor_data.get("attributes", {}),
"last_updated": sensor_data.get("last_updated"),
"entity_id": sensor_data.get("entity_id")
}
return HealthCheckResult(health_status, response_time, metadata=metadata, uptime=formatted_uptime)
elif response.status_code == 401:
logger.warning(f"Service {service_name} returned 401 - authentication required")
return HealthCheckResult("unauthorized", response_time, "Authentication required")
elif response.status_code == 404:
logger.warning(f"Service {service_name} sensor {sensor_entity} not found")
return HealthCheckResult("error", response_time, f"Sensor {sensor_entity} not found")
else:
logger.warning(f"Service {service_name} returned {response.status_code}")
return HealthCheckResult("unhealthy", response_time, f"HTTP {response.status_code}")
except TimeoutException:
logger.error(f"Service {service_name} timed out after {self.timeout}s")
return HealthCheckResult("timeout", error=f"Request timed out after {self.timeout}s")
except HTTPError as e:
logger.error(f"HTTP error checking {service_name}: {str(e)}")
return HealthCheckResult("error", error=f"HTTP error: {str(e)}")
except Exception as e:
logger.error(f"Unexpected error checking {service_name}: {str(e)}")
return HealthCheckResult("error", error=f"Unexpected error: {str(e)}")
def _parse_sensor_data(self, sensor_data: Dict[str, Any], service_name: str) -> str:
"""
Parse sensor data to determine health status.
Args:
sensor_data: Sensor data from API
service_name: Name of the service
Returns:
Health status string
"""
try:
state = sensor_data.get("state", "")
entity_id = sensor_data.get("entity_id", "").lower()
attributes = sensor_data.get("attributes", {})
logger.debug(f"Parsing sensor data for {service_name}: entity_id={entity_id}, state={state}")
# Service-specific sensor parsing
if service_name == "home_assistant":
# For HA, check uptime sensor or system health
if "uptime" in entity_id:
# Check if this is a timestamp sensor (device_class: timestamp)
device_class = attributes.get("device_class", "")
if device_class == "timestamp":
# Timestamp sensor - if it has a valid timestamp, service is healthy
try:
from datetime import datetime
# Try to parse the timestamp
parsed_time = datetime.fromisoformat(state.replace('Z', '+00:00'))
# If we can parse it and it's recent (within last 24 hours), it's healthy
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
time_diff = now - parsed_time
is_healthy = time_diff.total_seconds() < 86400 # 24 hours
logger.debug(f"Timestamp sensor: {state}, time_diff: {time_diff}, healthy: {is_healthy}")
return "healthy" if is_healthy else "unhealthy"
except (ValueError, TypeError) as e:
logger.warning(f"Could not parse timestamp '{state}': {e}")
return "unhealthy"
else:
# Numeric uptime sensor - check if it's a valid number
try:
uptime_seconds = float(state)
# If uptime > 0, service is healthy
is_healthy = uptime_seconds > 0
logger.debug(f"Uptime sensor: {uptime_seconds}s, healthy: {is_healthy}")
return "healthy" if is_healthy else "unhealthy"
except ValueError:
logger.warning(f"Uptime sensor state '{state}' is not a valid number")
return "unhealthy"
elif "system" in entity_id:
# System health sensor
is_healthy = state.lower() in ["ok", "healthy", "online"]
logger.debug(f"System sensor: state={state}, healthy: {is_healthy}")
return "healthy" if is_healthy else "unhealthy"
else:
# Generic sensor - check if state indicates health
is_healthy = state.lower() not in ["unavailable", "unknown", "off"]
logger.debug(f"Generic sensor: state={state}, healthy: {is_healthy}")
return "healthy" if is_healthy else "unhealthy"
else:
# Generic sensor parsing
is_healthy = state.lower() not in ["unavailable", "unknown", "off", "error"]
logger.debug(f"Generic sensor: state={state}, healthy: {is_healthy}")
return "healthy" if is_healthy else "unhealthy"
except Exception as e:
logger.error(f"Could not parse sensor data from {service_name}: {e}")
return "unhealthy"
def _extract_uptime_info(self, sensor_data: Dict[str, Any], service_name: str) -> Optional[str]:
"""
Extract uptime information from sensor data for top-level display.
Args:
sensor_data: Sensor data from API
service_name: Name of the service
Returns:
Uptime information string or None
"""
try:
state = sensor_data.get("state", "")
entity_id = sensor_data.get("entity_id", "").lower()
attributes = sensor_data.get("attributes", {})
if service_name == "home_assistant" and "uptime" in entity_id:
device_class = attributes.get("device_class", "")
if device_class == "timestamp":
# For timestamp sensors, show the timestamp
return state
else:
# For numeric uptime sensors, show as duration
try:
uptime_seconds = float(state)
return f"{uptime_seconds:.0f} seconds"
except ValueError:
return state
else:
# For other sensors, show the state if it might be uptime-related
if "uptime" in entity_id or "duration" in entity_id.lower():
return state
return None
except Exception as e:
logger.debug(f"Could not extract uptime info from {service_name}: {e}")
return None

View File

@@ -0,0 +1,132 @@
"""
Logging Configuration
This module provides centralized logging configuration for the service adapters,
including both application logs and request logs with unified formatting.
"""
import logging
import sys
from typing import Optional
# Global format string for consistent logging
DEFAULT_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s"
def setup_logging(
level: str = "INFO",
format_string: Optional[str] = None,
include_timestamp: bool = True,
enable_request_logging: bool = True
) -> None:
"""
Set up unified logging configuration for the application and requests.
Args:
level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
format_string: Custom format string for log messages
include_timestamp: Whether to include timestamp in log messages
enable_request_logging: Whether to enable FastAPI request logging
"""
if format_string is None:
if include_timestamp:
format_string = DEFAULT_FORMAT
else:
format_string = "%(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s"
# Clear any existing handlers to avoid duplicates
root_logger = logging.getLogger()
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# Create a single handler for all logs
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(format_string))
# Configure root logger
root_logger.setLevel(getattr(logging, level.upper()))
root_logger.addHandler(handler)
# Set specific loggers with unified configuration
loggers = {
"services.status_checker": "DEBUG",
"services.health_checkers": "DEBUG",
"routes.general": "INFO",
"routes.home_assistant": "INFO",
"routes.frigate": "INFO",
"routes.immich": "INFO",
"routes.events": "INFO",
"httpx": "WARNING", # Reduce httpx verbosity
"uvicorn.access": "INFO" if enable_request_logging else "WARNING",
"uvicorn.error": "INFO",
"uvicorn": "INFO",
}
for logger_name, logger_level in loggers.items():
logger = logging.getLogger(logger_name)
logger.setLevel(getattr(logging, logger_level))
# Ensure all loggers use the same handler
logger.handlers = []
logger.addHandler(handler)
logger.propagate = False # Prevent duplicate logs
# Configure FastAPI request logging if enabled
if enable_request_logging:
_setup_request_logging(handler)
# Log the configuration
logger = logging.getLogger(__name__)
logger.info(f"Unified logging configured with level: {level}")
def _setup_request_logging(handler: logging.Handler) -> None:
"""
Set up FastAPI request logging with the same handler.
Args:
handler: The logging handler to use for requests
"""
# Configure uvicorn access logger for requests
access_logger = logging.getLogger("uvicorn.access")
access_logger.handlers = []
access_logger.addHandler(handler)
access_logger.propagate = False
# Configure uvicorn error logger
error_logger = logging.getLogger("uvicorn.error")
error_logger.handlers = []
error_logger.addHandler(handler)
error_logger.propagate = False
def get_logger(name: str) -> logging.Logger:
"""
Get a logger instance for the given name.
Args:
name: Logger name (usually __name__)
Returns:
Logger instance
"""
return logging.getLogger(name)
def get_request_logger() -> logging.Logger:
"""
Get the request logger for FastAPI requests.
Returns:
Request logger instance
"""
return logging.getLogger("uvicorn.access")
def get_application_logger() -> logging.Logger:
"""
Get the main application logger.
Returns:
Application logger instance
"""
return logging.getLogger("labfusion.service_adapters")

View File

@@ -0,0 +1,149 @@
"""
Service Status Checker
This module provides functionality to check the health status of external services
using a generalized health checking system.
"""
import asyncio
import logging
from typing import Dict
from services.config import SERVICES
from services.health_checkers import factory
# Configure logger
logger = logging.getLogger(__name__)
class ServiceStatusChecker:
"""Handles health checks for external services using generalized checkers."""
def __init__(self, timeout: float = 5.0):
"""
Initialize the status checker.
Args:
timeout: Request timeout in seconds
"""
self.timeout = timeout
self.checkers = {} # Cache for checker instances
logger.info(f"ServiceStatusChecker initialized with timeout: {timeout}s")
async def check_service_health(self, service_name: str, config: Dict) -> Dict:
"""
Check the health status of a specific service.
Args:
service_name: Name of the service to check
config: Service configuration dictionary
Returns:
Dictionary with status information
"""
logger.debug(f"Starting health check for service: {service_name}")
if not config.get("enabled", False):
logger.debug(f"Service {service_name} is disabled, skipping health check")
return {
"status": "disabled",
"response_time": None,
"error": None,
"metadata": {}
}
try:
# Get or create checker for this service
checker = await self._get_checker_for_service(service_name, config)
# Run health check
result = await checker.check_health(service_name, config)
logger.info(f"Service {service_name} health check completed: {result.status}")
return result.to_dict()
except Exception as e:
logger.error(f"Unexpected error checking {service_name}: {str(e)}")
return {
"status": "error",
"response_time": None,
"error": f"Unexpected error: {str(e)}",
"metadata": {}
}
async def _get_checker_for_service(self, service_name: str, config: Dict):
"""
Get or create a health checker for the service.
Args:
service_name: Name of the service
config: Service configuration
Returns:
Health checker instance
"""
# Use service name as cache key
if service_name not in self.checkers:
checker = factory.create_checker_for_service(service_name, config, self.timeout)
self.checkers[service_name] = checker
logger.debug(f"Created new checker for {service_name}")
return self.checkers[service_name]
async def check_all_services(self) -> Dict[str, Dict]:
"""
Check the health status of all configured services.
Returns:
Dictionary mapping service names to their status information
"""
logger.info(f"Starting health check for {len(SERVICES)} services")
tasks = []
service_names = []
for service_name, config in SERVICES.items():
tasks.append(self.check_service_health(service_name, config))
service_names.append(service_name)
logger.debug(f"Created {len(tasks)} concurrent health check tasks")
results = await asyncio.gather(*tasks, return_exceptions=True)
service_status = {}
healthy_count = 0
error_count = 0
for service_name, result in zip(service_names, results):
if isinstance(result, Exception):
logger.error(f"Exception during health check for {service_name}: {str(result)}")
service_status[service_name] = {
"status": "error",
"response_time": None,
"error": f"Exception: {str(result)}",
"metadata": {}
}
error_count += 1
else:
service_status[service_name] = result
if result["status"] == "healthy":
healthy_count += 1
elif result["status"] in ["error", "timeout", "unhealthy"]:
error_count += 1
logger.info(f"Health check completed: {healthy_count} healthy, {error_count} errors, {len(SERVICES) - healthy_count - error_count} other statuses")
return service_status
async def close(self):
"""Close all health checker instances."""
logger.info("Closing ServiceStatusChecker and all health checkers")
for service_name, checker in self.checkers.items():
try:
await checker.close()
logger.debug(f"Closed checker for {service_name}")
except Exception as e:
logger.warning(f"Error closing checker for {service_name}: {e}")
self.checkers.clear()
# Global status checker instance
status_checker = ServiceStatusChecker()

View File

@@ -0,0 +1,12 @@
"""
Utilities Package
This package contains utility functions for the service adapters.
"""
from .time_formatter import format_uptime_for_frontend, format_response_time
__all__ = [
"format_uptime_for_frontend",
"format_response_time",
]

View File

@@ -0,0 +1,199 @@
"""
Time Formatting Utilities
This module provides utilities for formatting time durations and timestamps
into human-readable formats for the frontend.
"""
import re
from datetime import datetime, timezone
from typing import Optional, Union
def format_uptime_for_frontend(uptime_value: Optional[str]) -> str:
"""
Format uptime value for frontend display in "Xd Xh Xm" format.
Args:
uptime_value: Raw uptime value (timestamp, epoch, duration string, etc.)
Returns:
Formatted uptime string like "2d 5h 30m" or "0d 0h" if invalid
"""
if not uptime_value:
return "0d 0h"
try:
# Try to parse as timestamp (ISO format)
if _is_timestamp(uptime_value):
return _format_timestamp_uptime(uptime_value)
# Try to parse as epoch timestamp
if _is_epoch(uptime_value):
return _format_epoch_uptime(uptime_value)
# Try to parse as duration string (e.g., "2h 30m", "5d 2h 15m")
if _is_duration_string(uptime_value):
return _format_duration_string(uptime_value)
# Try to parse as numeric seconds
if _is_numeric_seconds(uptime_value):
return _format_seconds_uptime(float(uptime_value))
# If none of the above, return as-is or default
return uptime_value if len(uptime_value) < 50 else "0d 0h"
except Exception:
return "0d 0h"
def _is_timestamp(value: str) -> bool:
"""Check if value is an ISO timestamp."""
try:
datetime.fromisoformat(value.replace('Z', '+00:00'))
return True
except (ValueError, AttributeError):
return False
def _is_epoch(value: str) -> bool:
"""Check if value is an epoch timestamp."""
try:
float(value)
return len(value) >= 10 and float(value) > 1000000000 # Reasonable epoch range
except (ValueError, TypeError):
return False
def _is_duration_string(value: str) -> bool:
"""Check if value is a duration string like '2h 30m' or '5d 2h 15m'."""
# Look for patterns like "2h 30m", "5d 2h 15m", "1d 2h 3m 4s"
pattern = r'^\d+[dhms]\s*(\d+[dhms]\s*)*$'
return bool(re.match(pattern, value.strip()))
def _is_numeric_seconds(value: str) -> bool:
"""Check if value is numeric seconds."""
try:
float(value)
return True
except (ValueError, TypeError):
return False
def _format_timestamp_uptime(timestamp: str) -> str:
"""Format timestamp uptime (time since timestamp)."""
try:
# Parse timestamp
dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
# Calculate time difference
now = datetime.now(timezone.utc)
diff = now - dt
return _format_timedelta(diff)
except Exception:
return "0d 0h"
def _format_epoch_uptime(epoch_str: str) -> str:
"""Format epoch timestamp uptime."""
try:
epoch = float(epoch_str)
dt = datetime.fromtimestamp(epoch, tz=timezone.utc)
now = datetime.now(timezone.utc)
diff = now - dt
return _format_timedelta(diff)
except Exception:
return "0d 0h"
def _format_duration_string(duration: str) -> str:
"""Format duration string to standardized format."""
try:
# Parse duration string like "2h 30m" or "5d 2h 15m"
total_seconds = _parse_duration_string(duration)
return _format_seconds_uptime(total_seconds)
except Exception:
return "0d 0h"
def _format_seconds_uptime(seconds: float) -> str:
"""Format seconds to "Xd Xh Xm" format."""
return _format_timedelta_from_seconds(seconds)
def _parse_duration_string(duration: str) -> float:
"""Parse duration string to total seconds."""
total_seconds = 0
# Extract days
days_match = re.search(r'(\d+)d', duration)
if days_match:
total_seconds += int(days_match.group(1)) * 86400
# Extract hours
hours_match = re.search(r'(\d+)h', duration)
if hours_match:
total_seconds += int(hours_match.group(1)) * 3600
# Extract minutes
minutes_match = re.search(r'(\d+)m', duration)
if minutes_match:
total_seconds += int(minutes_match.group(1)) * 60
# Extract seconds
seconds_match = re.search(r'(\d+)s', duration)
if seconds_match:
total_seconds += int(seconds_match.group(1))
return total_seconds
def _format_timedelta(td) -> str:
"""Format timedelta to "Xd Xh Xm" format."""
total_seconds = int(td.total_seconds())
return _format_timedelta_from_seconds(total_seconds)
def _format_timedelta_from_seconds(total_seconds: Union[int, float]) -> str:
"""Format total seconds to "Xd Xh Xm" format."""
if total_seconds < 0:
return "0d 0h"
# Convert to int to avoid decimal places
total_seconds = int(total_seconds)
days = total_seconds // 86400
hours = (total_seconds % 86400) // 3600
minutes = (total_seconds % 3600) // 60
# Only show days if > 0
if days > 0:
return f"{days}d {hours}h {minutes}m"
elif hours > 0:
return f"{hours}h {minutes}m"
else:
return f"{minutes}m"
def format_response_time(seconds: Optional[float]) -> str:
"""
Format response time for display.
Args:
seconds: Response time in seconds
Returns:
Formatted response time string
"""
if seconds is None:
return "N/A"
if seconds < 1:
return f"{seconds * 1000:.0f}ms"
else:
return f"{seconds:.2f}s"