from fastapi import FastAPI, HTTPException, BackgroundTasks, Query, Path from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from pydantic import BaseModel, Field from typing import List, Optional, Dict, Any import asyncio import redis import json from datetime import datetime import os from dotenv import load_dotenv # Load environment variables load_dotenv() # Pydantic models for request/response schemas class ServiceStatus(BaseModel): enabled: bool = Field(..., description="Whether the service is enabled") url: str = Field(..., description="Service URL") status: str = Field(..., description="Service status") class HAAttributes(BaseModel): unit_of_measurement: Optional[str] = Field(None, description="Unit of measurement") friendly_name: Optional[str] = Field(None, description="Friendly name") class HAEntity(BaseModel): entity_id: str = Field(..., description="Entity ID") state: str = Field(..., description="Current state") attributes: HAAttributes = Field(..., description="Entity attributes") class HAEntitiesResponse(BaseModel): entities: List[HAEntity] = Field(..., description="List of Home Assistant entities") class FrigateEvent(BaseModel): id: str = Field(..., description="Event ID") timestamp: str = Field(..., description="Event timestamp") camera: str = Field(..., description="Camera name") label: str = Field(..., description="Detection label") confidence: float = Field(..., ge=0, le=1, description="Detection confidence") class FrigateEventsResponse(BaseModel): events: List[FrigateEvent] = Field(..., description="List of Frigate events") class ImmichAsset(BaseModel): id: str = Field(..., description="Asset ID") filename: str = Field(..., description="Filename") created_at: str = Field(..., description="Creation timestamp") tags: List[str] = Field(..., description="Asset tags") faces: List[str] = Field(..., description="Detected faces") class ImmichAssetsResponse(BaseModel): assets: List[ImmichAsset] = Field(..., description="List of Immich assets") class EventData(BaseModel): service: str = Field(..., description="Service name") event_type: str = Field(..., description="Event type") metadata: Dict[str, Any] = Field(default_factory=dict, description="Event metadata") class EventResponse(BaseModel): status: str = Field(..., description="Publication status") event: Dict[str, Any] = Field(..., description="Published event") class Event(BaseModel): timestamp: str = Field(..., description="Event timestamp") service: str = Field(..., description="Service name") event_type: str = Field(..., description="Event type") metadata: str = Field(..., description="Event metadata as JSON string") class EventsResponse(BaseModel): events: List[Event] = Field(..., description="List of events") class HealthResponse(BaseModel): status: str = Field(..., description="Service health status") timestamp: str = Field(..., description="Health check timestamp") class RootResponse(BaseModel): message: str = Field(..., description="API message") version: str = Field(..., description="API version") app = FastAPI( title="LabFusion Service Adapters", description="Service integration adapters for Home Assistant, Frigate, Immich, and other homelab services", version="1.0.0", contact={ "name": "LabFusion Team", "url": "https://github.com/labfusion/labfusion", "email": "team@labfusion.dev" }, license_info={ "name": "MIT License", "url": "https://opensource.org/licenses/MIT" }, servers=[ { "url": "http://localhost:8000", "description": "Development Server" }, { "url": "https://adapters.labfusion.dev", "description": "Production Server" } ] ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Redis connection redis_client = redis.Redis( host=os.getenv("REDIS_HOST", "localhost"), port=int(os.getenv("REDIS_PORT", 6379)), decode_responses=True ) # Service configurations SERVICES = { "home_assistant": { "url": os.getenv("HOME_ASSISTANT_URL", "https://homeassistant.local:8123"), "token": os.getenv("HOME_ASSISTANT_TOKEN", ""), "enabled": bool(os.getenv("HOME_ASSISTANT_TOKEN")) }, "frigate": { "url": os.getenv("FRIGATE_URL", "http://frigate.local:5000"), "token": os.getenv("FRIGATE_TOKEN", ""), "enabled": bool(os.getenv("FRIGATE_TOKEN")) }, "immich": { "url": os.getenv("IMMICH_URL", "http://immich.local:2283"), "api_key": os.getenv("IMMICH_API_KEY", ""), "enabled": bool(os.getenv("IMMICH_API_KEY")) }, "n8n": { "url": os.getenv("N8N_URL", "http://n8n.local:5678"), "webhook_url": os.getenv("N8N_WEBHOOK_URL", ""), "enabled": bool(os.getenv("N8N_WEBHOOK_URL")) } } @app.get("/", response_model=RootResponse, summary="API Root", description="Get basic API information", tags=["General"]) async def root(): """Get basic API information and version""" return RootResponse( message="LabFusion Service Adapters API", version="1.0.0" ) @app.get("/health", response_model=HealthResponse, summary="Health Check", description="Check service health status", tags=["General"]) async def health_check(): """Check the health status of the service adapters""" return HealthResponse( status="healthy", timestamp=datetime.now().isoformat() ) @app.get("/services", response_model=Dict[str, ServiceStatus], summary="Get Service Status", description="Get status of all configured external services", tags=["Services"]) async def get_services(): """Get status of all configured external services (Home Assistant, Frigate, Immich, n8n)""" service_status = {} for service_name, config in SERVICES.items(): service_status[service_name] = ServiceStatus( enabled=config["enabled"], url=config["url"], status="unknown" # Would check actual service status ) return service_status @app.get("/home-assistant/entities", response_model=HAEntitiesResponse, summary="Get Home Assistant Entities", description="Retrieve all entities from Home Assistant", responses={ 200: {"description": "Successfully retrieved entities"}, 503: {"description": "Home Assistant integration not configured"} }, tags=["Home Assistant"]) async def get_ha_entities(): """Get Home Assistant entities including sensors, switches, and other devices""" if not SERVICES["home_assistant"]["enabled"]: raise HTTPException( status_code=503, detail="Home Assistant integration not configured. Please set HOME_ASSISTANT_TOKEN environment variable." ) # This would make actual API calls to Home Assistant # For now, return mock data return HAEntitiesResponse( entities=[ HAEntity( entity_id="sensor.cpu_usage", state="45.2", attributes=HAAttributes( unit_of_measurement="%", friendly_name="CPU Usage" ) ), HAEntity( entity_id="sensor.memory_usage", state="2.1", attributes=HAAttributes( unit_of_measurement="GB", friendly_name="Memory Usage" ) ) ] ) @app.get("/frigate/events", response_model=FrigateEventsResponse, summary="Get Frigate Events", description="Retrieve detection events from Frigate NVR", responses={ 200: {"description": "Successfully retrieved events"}, 503: {"description": "Frigate integration not configured"} }, tags=["Frigate"]) async def get_frigate_events(): """Get Frigate detection events including person, vehicle, and object detections""" if not SERVICES["frigate"]["enabled"]: raise HTTPException( status_code=503, detail="Frigate integration not configured. Please set FRIGATE_TOKEN environment variable." ) # This would make actual API calls to Frigate # For now, return mock data return FrigateEventsResponse( events=[ FrigateEvent( id="event_123", timestamp=datetime.now().isoformat(), camera="front_door", label="person", confidence=0.95 ) ] ) @app.get("/immich/assets", response_model=ImmichAssetsResponse, summary="Get Immich Assets", description="Retrieve photo assets from Immich", responses={ 200: {"description": "Successfully retrieved assets"}, 503: {"description": "Immich integration not configured"} }, tags=["Immich"]) async def get_immich_assets(): """Get Immich photo assets including metadata, tags, and face detection results""" if not SERVICES["immich"]["enabled"]: raise HTTPException( status_code=503, detail="Immich integration not configured. Please set IMMICH_API_KEY environment variable." ) # This would make actual API calls to Immich # For now, return mock data return ImmichAssetsResponse( assets=[ ImmichAsset( id="asset_123", filename="photo_001.jpg", created_at=datetime.now().isoformat(), tags=["person", "outdoor"], faces=["Alice", "Bob"] ) ] ) @app.post("/publish-event", response_model=EventResponse, summary="Publish Event", description="Publish an event to the Redis message bus", responses={ 200: {"description": "Event published successfully"}, 500: {"description": "Failed to publish event"} }, tags=["Events"]) async def publish_event(event_data: EventData, background_tasks: BackgroundTasks): """Publish an event to the Redis message bus for consumption by other services""" try: event = { "timestamp": datetime.now().isoformat(), "service": event_data.service, "event_type": event_data.event_type, "metadata": json.dumps(event_data.metadata) } # Publish to Redis redis_client.lpush("events", json.dumps(event)) return EventResponse( status="published", event=event ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/events", response_model=EventsResponse, summary="Get Events", description="Retrieve recent events from the message bus", responses={ 200: {"description": "Successfully retrieved events"}, 500: {"description": "Failed to retrieve events"} }, tags=["Events"]) async def get_events(limit: int = Query(100, ge=1, le=1000, description="Maximum number of events to retrieve")): """Get recent events from the Redis message bus""" try: events = redis_client.lrange("events", 0, limit - 1) parsed_events = [] for event in events: try: event_data = json.loads(event) parsed_events.append(Event(**event_data)) except json.JSONDecodeError: continue return EventsResponse(events=parsed_events) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/home-assistant/entity/{entity_id}", response_model=HAEntity, summary="Get Specific HA Entity", description="Get a specific Home Assistant entity by ID", responses={ 200: {"description": "Successfully retrieved entity"}, 404: {"description": "Entity not found"}, 503: {"description": "Home Assistant integration not configured"} }, tags=["Home Assistant"]) async def get_ha_entity(entity_id: str = Path(..., description="Entity ID")): """Get a specific Home Assistant entity by its ID""" if not SERVICES["home_assistant"]["enabled"]: raise HTTPException( status_code=503, detail="Home Assistant integration not configured. Please set HOME_ASSISTANT_TOKEN environment variable." ) # This would make actual API calls to Home Assistant # For now, return mock data return HAEntity( entity_id=entity_id, state="unknown", attributes=HAAttributes( unit_of_measurement="", friendly_name=f"Entity {entity_id}" ) ) @app.get("/frigate/cameras", summary="Get Frigate Cameras", description="Get list of Frigate cameras", responses={ 200: {"description": "Successfully retrieved cameras"}, 503: {"description": "Frigate integration not configured"} }, tags=["Frigate"]) async def get_frigate_cameras(): """Get list of available Frigate cameras""" if not SERVICES["frigate"]["enabled"]: raise HTTPException( status_code=503, detail="Frigate integration not configured. Please set FRIGATE_TOKEN environment variable." ) # This would make actual API calls to Frigate # For now, return mock data return { "cameras": [ {"name": "front_door", "enabled": True}, {"name": "back_yard", "enabled": True}, {"name": "garage", "enabled": False} ] } @app.get("/immich/albums", summary="Get Immich Albums", description="Get list of Immich albums", responses={ 200: {"description": "Successfully retrieved albums"}, 503: {"description": "Immich integration not configured"} }, tags=["Immich"]) async def get_immich_albums(): """Get list of Immich albums""" if not SERVICES["immich"]["enabled"]: raise HTTPException( status_code=503, detail="Immich integration not configured. Please set IMMICH_API_KEY environment variable." ) # This would make actual API calls to Immich # For now, return mock data return { "albums": [ {"id": "album_1", "name": "Family Photos", "asset_count": 150}, {"id": "album_2", "name": "Vacation 2024", "asset_count": 75} ] } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)