Files
labFusion/services/service-adapters/main_old.py
glenn schrooyen 8d1755fd52
Some checks failed
API Docs (Node.js Express) / test (16) (push) Failing after 5m29s
API Docs (Node.js Express) / test (18) (push) Failing after 5m25s
API Docs (Node.js Express) / test (20) (push) Failing after 1m4s
API Docs (Node.js Express) / build (push) Has been skipped
API Docs (Node.js Express) / security (push) Has been skipped
LabFusion CI/CD Pipeline / api-gateway (push) Failing after 4m52s
LabFusion CI/CD Pipeline / service-adapters (push) Failing after 5m1s
LabFusion CI/CD Pipeline / api-docs (push) Failing after 5m12s
LabFusion CI/CD Pipeline / frontend (push) Failing after 6m39s
LabFusion CI/CD Pipeline / integration-tests (push) Has been skipped
LabFusion CI/CD Pipeline / security-scan (push) Has been skipped
Docker Build and Push / build-and-push (push) Failing after 34s
Docker Build and Push / security-scan (push) Has been skipped
Integration Tests / integration-tests (push) Failing after 1m33s
Integration Tests / performance-tests (push) Has been skipped
Service Adapters (Python FastAPI) / test (3.1) (push) Failing after 35s
Service Adapters (Python FastAPI) / test (3.11) (push) Failing after 5m20s
Service Adapters (Python FastAPI) / test (3.12) (push) Failing after 5m27s
Service Adapters (Python FastAPI) / test (3.9) (push) Failing after 5m50s
Docker Build and Push / deploy-staging (push) Has been skipped
Service Adapters (Python FastAPI) / build (push) Has been skipped
Service Adapters (Python FastAPI) / security (push) Has been skipped
Docker Build and Push / deploy-production (push) Has been skipped
Refactor API Docs CI workflow to correct ESLint command syntax and enhance Python formatting in CI pipelines; update progress tracking documentation
2025-09-12 17:44:23 +02:00

456 lines
14 KiB
Python

import json
import os
from datetime import datetime
from typing import Any, Dict, List, Optional
import redis
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI, HTTPException, Path, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
# Load environment variables
load_dotenv()
# Pydantic models for request/response schemas
class ServiceStatus(BaseModel):
enabled: bool = Field(..., description="Whether the service is enabled")
url: str = Field(..., description="Service URL")
status: str = Field(..., description="Service status")
class HAAttributes(BaseModel):
unit_of_measurement: Optional[str] = Field(None, description="Unit of measurement")
friendly_name: Optional[str] = Field(None, description="Friendly name")
class HAEntity(BaseModel):
entity_id: str = Field(..., description="Entity ID")
state: str = Field(..., description="Current state")
attributes: HAAttributes = Field(..., description="Entity attributes")
class HAEntitiesResponse(BaseModel):
entities: List[HAEntity] = Field(..., description="List of Home Assistant entities")
class FrigateEvent(BaseModel):
id: str = Field(..., description="Event ID")
timestamp: str = Field(..., description="Event timestamp")
camera: str = Field(..., description="Camera name")
label: str = Field(..., description="Detection label")
confidence: float = Field(..., ge=0, le=1, description="Detection confidence")
class FrigateEventsResponse(BaseModel):
events: List[FrigateEvent] = Field(..., description="List of Frigate events")
class ImmichAsset(BaseModel):
id: str = Field(..., description="Asset ID")
filename: str = Field(..., description="Filename")
created_at: str = Field(..., description="Creation timestamp")
tags: List[str] = Field(..., description="Asset tags")
faces: List[str] = Field(..., description="Detected faces")
class ImmichAssetsResponse(BaseModel):
assets: List[ImmichAsset] = Field(..., description="List of Immich assets")
class EventData(BaseModel):
service: str = Field(..., description="Service name")
event_type: str = Field(..., description="Event type")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Event metadata")
class EventResponse(BaseModel):
status: str = Field(..., description="Publication status")
event: Dict[str, Any] = Field(..., description="Published event")
class Event(BaseModel):
timestamp: str = Field(..., description="Event timestamp")
service: str = Field(..., description="Service name")
event_type: str = Field(..., description="Event type")
metadata: str = Field(..., description="Event metadata as JSON string")
class EventsResponse(BaseModel):
events: List[Event] = Field(..., description="List of events")
class HealthResponse(BaseModel):
status: str = Field(..., description="Service health status")
timestamp: str = Field(..., description="Health check timestamp")
class RootResponse(BaseModel):
message: str = Field(..., description="API message")
version: str = Field(..., description="API version")
app = FastAPI(
title="LabFusion Service Adapters",
description="Service integration adapters for Home Assistant, Frigate, Immich, and other homelab services",
version="1.0.0",
contact={
"name": "LabFusion Team",
"url": "https://github.com/labfusion/labfusion",
"email": "team@labfusion.dev",
},
license_info={"name": "MIT License", "url": "https://opensource.org/licenses/MIT"},
servers=[
{"url": "http://localhost:8000", "description": "Development Server"},
{"url": "https://adapters.labfusion.dev", "description": "Production Server"},
],
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Redis connection
redis_client = redis.Redis(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", 6379)),
decode_responses=True,
)
# Service configurations
SERVICES = {
"home_assistant": {
"url": os.getenv("HOME_ASSISTANT_URL", "https://homeassistant.local:8123"),
"token": os.getenv("HOME_ASSISTANT_TOKEN", ""),
"enabled": bool(os.getenv("HOME_ASSISTANT_TOKEN")),
},
"frigate": {
"url": os.getenv("FRIGATE_URL", "http://frigate.local:5000"),
"token": os.getenv("FRIGATE_TOKEN", ""),
"enabled": bool(os.getenv("FRIGATE_TOKEN")),
},
"immich": {
"url": os.getenv("IMMICH_URL", "http://immich.local:2283"),
"api_key": os.getenv("IMMICH_API_KEY", ""),
"enabled": bool(os.getenv("IMMICH_API_KEY")),
},
"n8n": {
"url": os.getenv("N8N_URL", "http://n8n.local:5678"),
"webhook_url": os.getenv("N8N_WEBHOOK_URL", ""),
"enabled": bool(os.getenv("N8N_WEBHOOK_URL")),
},
}
@app.get(
"/",
response_model=RootResponse,
summary="API Root",
description="Get basic API information",
tags=["General"],
)
async def root():
"""Get basic API information and version"""
return RootResponse(message="LabFusion Service Adapters API", version="1.0.0")
@app.get(
"/health",
response_model=HealthResponse,
summary="Health Check",
description="Check service health status",
tags=["General"],
)
async def health_check():
"""Check the health status of the service adapters"""
return HealthResponse(status="healthy", timestamp=datetime.now().isoformat())
@app.get(
"/services",
response_model=Dict[str, ServiceStatus],
summary="Get Service Status",
description="Get status of all configured external services",
tags=["Services"],
)
async def get_services():
"""Get status of all configured external services (Home Assistant, Frigate, Immich, n8n)"""
service_status = {}
for service_name, config in SERVICES.items():
service_status[service_name] = ServiceStatus(
enabled=config["enabled"],
url=config["url"],
status="unknown", # Would check actual service status
)
return service_status
@app.get(
"/home-assistant/entities",
response_model=HAEntitiesResponse,
summary="Get Home Assistant Entities",
description="Retrieve all entities from Home Assistant",
responses={
200: {"description": "Successfully retrieved entities"},
503: {"description": "Home Assistant integration not configured"},
},
tags=["Home Assistant"],
)
async def get_ha_entities():
"""Get Home Assistant entities including sensors, switches, and other devices"""
if not SERVICES["home_assistant"]["enabled"]:
raise HTTPException(
status_code=503,
detail="Home Assistant integration not configured. Please set HOME_ASSISTANT_TOKEN environment variable.",
)
# This would make actual API calls to Home Assistant
# For now, return mock data
return HAEntitiesResponse(
entities=[
HAEntity(
entity_id="sensor.cpu_usage",
state="45.2",
attributes=HAAttributes(
unit_of_measurement="%", friendly_name="CPU Usage"
),
),
HAEntity(
entity_id="sensor.memory_usage",
state="2.1",
attributes=HAAttributes(
unit_of_measurement="GB", friendly_name="Memory Usage"
),
),
]
)
@app.get(
"/frigate/events",
response_model=FrigateEventsResponse,
summary="Get Frigate Events",
description="Retrieve detection events from Frigate NVR",
responses={
200: {"description": "Successfully retrieved events"},
503: {"description": "Frigate integration not configured"},
},
tags=["Frigate"],
)
async def get_frigate_events():
"""Get Frigate detection events including person, vehicle, and object detections"""
if not SERVICES["frigate"]["enabled"]:
raise HTTPException(
status_code=503,
detail="Frigate integration not configured. Please set FRIGATE_TOKEN environment variable.",
)
# This would make actual API calls to Frigate
# For now, return mock data
return FrigateEventsResponse(
events=[
FrigateEvent(
id="event_123",
timestamp=datetime.now().isoformat(),
camera="front_door",
label="person",
confidence=0.95,
)
]
)
@app.get(
"/immich/assets",
response_model=ImmichAssetsResponse,
summary="Get Immich Assets",
description="Retrieve photo assets from Immich",
responses={
200: {"description": "Successfully retrieved assets"},
503: {"description": "Immich integration not configured"},
},
tags=["Immich"],
)
async def get_immich_assets():
"""Get Immich photo assets including metadata, tags, and face detection results"""
if not SERVICES["immich"]["enabled"]:
raise HTTPException(
status_code=503,
detail="Immich integration not configured. Please set IMMICH_API_KEY environment variable.",
)
# This would make actual API calls to Immich
# For now, return mock data
return ImmichAssetsResponse(
assets=[
ImmichAsset(
id="asset_123",
filename="photo_001.jpg",
created_at=datetime.now().isoformat(),
tags=["person", "outdoor"],
faces=["Alice", "Bob"],
)
]
)
@app.post(
"/publish-event",
response_model=EventResponse,
summary="Publish Event",
description="Publish an event to the Redis message bus",
responses={
200: {"description": "Event published successfully"},
500: {"description": "Failed to publish event"},
},
tags=["Events"],
)
async def publish_event(event_data: EventData, background_tasks: BackgroundTasks):
"""Publish an event to the Redis message bus for consumption by other services"""
try:
event = {
"timestamp": datetime.now().isoformat(),
"service": event_data.service,
"event_type": event_data.event_type,
"metadata": json.dumps(event_data.metadata),
}
# Publish to Redis
redis_client.lpush("events", json.dumps(event))
return EventResponse(status="published", event=event)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get(
"/events",
response_model=EventsResponse,
summary="Get Events",
description="Retrieve recent events from the message bus",
responses={
200: {"description": "Successfully retrieved events"},
500: {"description": "Failed to retrieve events"},
},
tags=["Events"],
)
async def get_events(
limit: int = Query(
100, ge=1, le=1000, description="Maximum number of events to retrieve"
)
):
"""Get recent events from the Redis message bus"""
try:
events = redis_client.lrange("events", 0, limit - 1)
parsed_events = []
for event in events:
try:
event_data = json.loads(event)
parsed_events.append(Event(**event_data))
except json.JSONDecodeError:
continue
return EventsResponse(events=parsed_events)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get(
"/home-assistant/entity/{entity_id}",
response_model=HAEntity,
summary="Get Specific HA Entity",
description="Get a specific Home Assistant entity by ID",
responses={
200: {"description": "Successfully retrieved entity"},
404: {"description": "Entity not found"},
503: {"description": "Home Assistant integration not configured"},
},
tags=["Home Assistant"],
)
async def get_ha_entity(entity_id: str = Path(..., description="Entity ID")):
"""Get a specific Home Assistant entity by its ID"""
if not SERVICES["home_assistant"]["enabled"]:
raise HTTPException(
status_code=503,
detail="Home Assistant integration not configured. Please set HOME_ASSISTANT_TOKEN environment variable.",
)
# This would make actual API calls to Home Assistant
# For now, return mock data
return HAEntity(
entity_id=entity_id,
state="unknown",
attributes=HAAttributes(
unit_of_measurement="", friendly_name=f"Entity {entity_id}"
),
)
@app.get(
"/frigate/cameras",
summary="Get Frigate Cameras",
description="Get list of Frigate cameras",
responses={
200: {"description": "Successfully retrieved cameras"},
503: {"description": "Frigate integration not configured"},
},
tags=["Frigate"],
)
async def get_frigate_cameras():
"""Get list of available Frigate cameras"""
if not SERVICES["frigate"]["enabled"]:
raise HTTPException(
status_code=503,
detail="Frigate integration not configured. Please set FRIGATE_TOKEN environment variable.",
)
# This would make actual API calls to Frigate
# For now, return mock data
return {
"cameras": [
{"name": "front_door", "enabled": True},
{"name": "back_yard", "enabled": True},
{"name": "garage", "enabled": False},
]
}
@app.get(
"/immich/albums",
summary="Get Immich Albums",
description="Get list of Immich albums",
responses={
200: {"description": "Successfully retrieved albums"},
503: {"description": "Immich integration not configured"},
},
tags=["Immich"],
)
async def get_immich_albums():
"""Get list of Immich albums"""
if not SERVICES["immich"]["enabled"]:
raise HTTPException(
status_code=503,
detail="Immich integration not configured. Please set IMMICH_API_KEY environment variable.",
)
# This would make actual API calls to Immich
# For now, return mock data
return {
"albums": [
{"id": "album_1", "name": "Family Photos", "asset_count": 150},
{"id": "album_2", "name": "Vacation 2024", "asset_count": 75},
]
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)