import json import os from datetime import datetime from typing import Any, Dict, List, Optional import redis from dotenv import load_dotenv from fastapi import BackgroundTasks, FastAPI, HTTPException, Path, Query from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field # Load environment variables load_dotenv() # Pydantic models for request/response schemas class ServiceStatus(BaseModel): enabled: bool = Field(..., description="Whether the service is enabled") url: str = Field(..., description="Service URL") status: str = Field(..., description="Service status") class HAAttributes(BaseModel): unit_of_measurement: Optional[str] = Field(None, description="Unit of measurement") friendly_name: Optional[str] = Field(None, description="Friendly name") class HAEntity(BaseModel): entity_id: str = Field(..., description="Entity ID") state: str = Field(..., description="Current state") attributes: HAAttributes = Field(..., description="Entity attributes") class HAEntitiesResponse(BaseModel): entities: List[HAEntity] = Field(..., description="List of Home Assistant entities") class FrigateEvent(BaseModel): id: str = Field(..., description="Event ID") timestamp: str = Field(..., description="Event timestamp") camera: str = Field(..., description="Camera name") label: str = Field(..., description="Detection label") confidence: float = Field(..., ge=0, le=1, description="Detection confidence") class FrigateEventsResponse(BaseModel): events: List[FrigateEvent] = Field(..., description="List of Frigate events") class ImmichAsset(BaseModel): id: str = Field(..., description="Asset ID") filename: str = Field(..., description="Filename") created_at: str = Field(..., description="Creation timestamp") tags: List[str] = Field(..., description="Asset tags") faces: List[str] = Field(..., description="Detected faces") class ImmichAssetsResponse(BaseModel): assets: List[ImmichAsset] = Field(..., description="List of Immich assets") class EventData(BaseModel): service: str = Field(..., description="Service name") event_type: str = Field(..., description="Event type") metadata: Dict[str, Any] = Field(default_factory=dict, description="Event metadata") class EventResponse(BaseModel): status: str = Field(..., description="Publication status") event: Dict[str, Any] = Field(..., description="Published event") class Event(BaseModel): timestamp: str = Field(..., description="Event timestamp") service: str = Field(..., description="Service name") event_type: str = Field(..., description="Event type") metadata: str = Field(..., description="Event metadata as JSON string") class EventsResponse(BaseModel): events: List[Event] = Field(..., description="List of events") class HealthResponse(BaseModel): status: str = Field(..., description="Service health status") timestamp: str = Field(..., description="Health check timestamp") class RootResponse(BaseModel): message: str = Field(..., description="API message") version: str = Field(..., description="API version") app = FastAPI( title="LabFusion Service Adapters", description="Service integration adapters for Home Assistant, Frigate, Immich, and other homelab services", version="1.0.0", contact={ "name": "LabFusion Team", "url": "https://github.com/labfusion/labfusion", "email": "team@labfusion.dev", }, license_info={"name": "MIT License", "url": "https://opensource.org/licenses/MIT"}, servers=[ {"url": "http://localhost:8000", "description": "Development Server"}, {"url": "https://adapters.labfusion.dev", "description": "Production Server"}, ], ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Redis connection redis_client = redis.Redis( host=os.getenv("REDIS_HOST", "localhost"), port=int(os.getenv("REDIS_PORT", 6379)), decode_responses=True, ) # Service configurations SERVICES = { "home_assistant": { "url": os.getenv("HOME_ASSISTANT_URL", "https://homeassistant.local:8123"), "token": os.getenv("HOME_ASSISTANT_TOKEN", ""), "enabled": bool(os.getenv("HOME_ASSISTANT_TOKEN")), }, "frigate": { "url": os.getenv("FRIGATE_URL", "http://frigate.local:5000"), "token": os.getenv("FRIGATE_TOKEN", ""), "enabled": bool(os.getenv("FRIGATE_TOKEN")), }, "immich": { "url": os.getenv("IMMICH_URL", "http://immich.local:2283"), "api_key": os.getenv("IMMICH_API_KEY", ""), "enabled": bool(os.getenv("IMMICH_API_KEY")), }, "n8n": { "url": os.getenv("N8N_URL", "http://n8n.local:5678"), "webhook_url": os.getenv("N8N_WEBHOOK_URL", ""), "enabled": bool(os.getenv("N8N_WEBHOOK_URL")), }, } @app.get( "/", response_model=RootResponse, summary="API Root", description="Get basic API information", tags=["General"], ) async def root(): """Get basic API information and version""" return RootResponse(message="LabFusion Service Adapters API", version="1.0.0") @app.get( "/health", response_model=HealthResponse, summary="Health Check", description="Check service health status", tags=["General"], ) async def health_check(): """Check the health status of the service adapters""" return HealthResponse(status="healthy", timestamp=datetime.now().isoformat()) @app.get( "/services", response_model=Dict[str, ServiceStatus], summary="Get Service Status", description="Get status of all configured external services", tags=["Services"], ) async def get_services(): """Get status of all configured external services (Home Assistant, Frigate, Immich, n8n)""" service_status = {} for service_name, config in SERVICES.items(): service_status[service_name] = ServiceStatus( enabled=config["enabled"], url=config["url"], status="unknown", # Would check actual service status ) return service_status @app.get( "/home-assistant/entities", response_model=HAEntitiesResponse, summary="Get Home Assistant Entities", description="Retrieve all entities from Home Assistant", responses={ 200: {"description": "Successfully retrieved entities"}, 503: {"description": "Home Assistant integration not configured"}, }, tags=["Home Assistant"], ) async def get_ha_entities(): """Get Home Assistant entities including sensors, switches, and other devices""" if not SERVICES["home_assistant"]["enabled"]: raise HTTPException( status_code=503, detail="Home Assistant integration not configured. Please set HOME_ASSISTANT_TOKEN environment variable.", ) # This would make actual API calls to Home Assistant # For now, return mock data return HAEntitiesResponse( entities=[ HAEntity( entity_id="sensor.cpu_usage", state="45.2", attributes=HAAttributes( unit_of_measurement="%", friendly_name="CPU Usage" ), ), HAEntity( entity_id="sensor.memory_usage", state="2.1", attributes=HAAttributes( unit_of_measurement="GB", friendly_name="Memory Usage" ), ), ] ) @app.get( "/frigate/events", response_model=FrigateEventsResponse, summary="Get Frigate Events", description="Retrieve detection events from Frigate NVR", responses={ 200: {"description": "Successfully retrieved events"}, 503: {"description": "Frigate integration not configured"}, }, tags=["Frigate"], ) async def get_frigate_events(): """Get Frigate detection events including person, vehicle, and object detections""" if not SERVICES["frigate"]["enabled"]: raise HTTPException( status_code=503, detail="Frigate integration not configured. Please set FRIGATE_TOKEN environment variable.", ) # This would make actual API calls to Frigate # For now, return mock data return FrigateEventsResponse( events=[ FrigateEvent( id="event_123", timestamp=datetime.now().isoformat(), camera="front_door", label="person", confidence=0.95, ) ] ) @app.get( "/immich/assets", response_model=ImmichAssetsResponse, summary="Get Immich Assets", description="Retrieve photo assets from Immich", responses={ 200: {"description": "Successfully retrieved assets"}, 503: {"description": "Immich integration not configured"}, }, tags=["Immich"], ) async def get_immich_assets(): """Get Immich photo assets including metadata, tags, and face detection results""" if not SERVICES["immich"]["enabled"]: raise HTTPException( status_code=503, detail="Immich integration not configured. Please set IMMICH_API_KEY environment variable.", ) # This would make actual API calls to Immich # For now, return mock data return ImmichAssetsResponse( assets=[ ImmichAsset( id="asset_123", filename="photo_001.jpg", created_at=datetime.now().isoformat(), tags=["person", "outdoor"], faces=["Alice", "Bob"], ) ] ) @app.post( "/publish-event", response_model=EventResponse, summary="Publish Event", description="Publish an event to the Redis message bus", responses={ 200: {"description": "Event published successfully"}, 500: {"description": "Failed to publish event"}, }, tags=["Events"], ) async def publish_event(event_data: EventData, background_tasks: BackgroundTasks): """Publish an event to the Redis message bus for consumption by other services""" try: event = { "timestamp": datetime.now().isoformat(), "service": event_data.service, "event_type": event_data.event_type, "metadata": json.dumps(event_data.metadata), } # Publish to Redis redis_client.lpush("events", json.dumps(event)) return EventResponse(status="published", event=event) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get( "/events", response_model=EventsResponse, summary="Get Events", description="Retrieve recent events from the message bus", responses={ 200: {"description": "Successfully retrieved events"}, 500: {"description": "Failed to retrieve events"}, }, tags=["Events"], ) async def get_events( limit: int = Query( 100, ge=1, le=1000, description="Maximum number of events to retrieve" ) ): """Get recent events from the Redis message bus""" try: events = redis_client.lrange("events", 0, limit - 1) parsed_events = [] for event in events: try: event_data = json.loads(event) parsed_events.append(Event(**event_data)) except json.JSONDecodeError: continue return EventsResponse(events=parsed_events) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get( "/home-assistant/entity/{entity_id}", response_model=HAEntity, summary="Get Specific HA Entity", description="Get a specific Home Assistant entity by ID", responses={ 200: {"description": "Successfully retrieved entity"}, 404: {"description": "Entity not found"}, 503: {"description": "Home Assistant integration not configured"}, }, tags=["Home Assistant"], ) async def get_ha_entity(entity_id: str = Path(..., description="Entity ID")): """Get a specific Home Assistant entity by its ID""" if not SERVICES["home_assistant"]["enabled"]: raise HTTPException( status_code=503, detail="Home Assistant integration not configured. Please set HOME_ASSISTANT_TOKEN environment variable.", ) # This would make actual API calls to Home Assistant # For now, return mock data return HAEntity( entity_id=entity_id, state="unknown", attributes=HAAttributes( unit_of_measurement="", friendly_name=f"Entity {entity_id}" ), ) @app.get( "/frigate/cameras", summary="Get Frigate Cameras", description="Get list of Frigate cameras", responses={ 200: {"description": "Successfully retrieved cameras"}, 503: {"description": "Frigate integration not configured"}, }, tags=["Frigate"], ) async def get_frigate_cameras(): """Get list of available Frigate cameras""" if not SERVICES["frigate"]["enabled"]: raise HTTPException( status_code=503, detail="Frigate integration not configured. Please set FRIGATE_TOKEN environment variable.", ) # This would make actual API calls to Frigate # For now, return mock data return { "cameras": [ {"name": "front_door", "enabled": True}, {"name": "back_yard", "enabled": True}, {"name": "garage", "enabled": False}, ] } @app.get( "/immich/albums", summary="Get Immich Albums", description="Get list of Immich albums", responses={ 200: {"description": "Successfully retrieved albums"}, 503: {"description": "Immich integration not configured"}, }, tags=["Immich"], ) async def get_immich_albums(): """Get list of Immich albums""" if not SERVICES["immich"]["enabled"]: raise HTTPException( status_code=503, detail="Immich integration not configured. Please set IMMICH_API_KEY environment variable.", ) # This would make actual API calls to Immich # For now, return mock data return { "albums": [ {"id": "album_1", "name": "Family Photos", "asset_count": 150}, {"id": "album_2", "name": "Vacation 2024", "asset_count": 75}, ] } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)