424 lines
15 KiB
Python
424 lines
15 KiB
Python
from fastapi import FastAPI, HTTPException, BackgroundTasks, Query, Path
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import JSONResponse
|
|
from pydantic import BaseModel, Field
|
|
from typing import List, Optional, Dict, Any
|
|
import asyncio
|
|
import redis
|
|
import json
|
|
from datetime import datetime
|
|
import os
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Pydantic models for request/response schemas
|
|
class ServiceStatus(BaseModel):
|
|
enabled: bool = Field(..., description="Whether the service is enabled")
|
|
url: str = Field(..., description="Service URL")
|
|
status: str = Field(..., description="Service status")
|
|
|
|
class HAAttributes(BaseModel):
|
|
unit_of_measurement: Optional[str] = Field(None, description="Unit of measurement")
|
|
friendly_name: Optional[str] = Field(None, description="Friendly name")
|
|
|
|
class HAEntity(BaseModel):
|
|
entity_id: str = Field(..., description="Entity ID")
|
|
state: str = Field(..., description="Current state")
|
|
attributes: HAAttributes = Field(..., description="Entity attributes")
|
|
|
|
class HAEntitiesResponse(BaseModel):
|
|
entities: List[HAEntity] = Field(..., description="List of Home Assistant entities")
|
|
|
|
class FrigateEvent(BaseModel):
|
|
id: str = Field(..., description="Event ID")
|
|
timestamp: str = Field(..., description="Event timestamp")
|
|
camera: str = Field(..., description="Camera name")
|
|
label: str = Field(..., description="Detection label")
|
|
confidence: float = Field(..., ge=0, le=1, description="Detection confidence")
|
|
|
|
class FrigateEventsResponse(BaseModel):
|
|
events: List[FrigateEvent] = Field(..., description="List of Frigate events")
|
|
|
|
class ImmichAsset(BaseModel):
|
|
id: str = Field(..., description="Asset ID")
|
|
filename: str = Field(..., description="Filename")
|
|
created_at: str = Field(..., description="Creation timestamp")
|
|
tags: List[str] = Field(..., description="Asset tags")
|
|
faces: List[str] = Field(..., description="Detected faces")
|
|
|
|
class ImmichAssetsResponse(BaseModel):
|
|
assets: List[ImmichAsset] = Field(..., description="List of Immich assets")
|
|
|
|
class EventData(BaseModel):
|
|
service: str = Field(..., description="Service name")
|
|
event_type: str = Field(..., description="Event type")
|
|
metadata: Dict[str, Any] = Field(default_factory=dict, description="Event metadata")
|
|
|
|
class EventResponse(BaseModel):
|
|
status: str = Field(..., description="Publication status")
|
|
event: Dict[str, Any] = Field(..., description="Published event")
|
|
|
|
class Event(BaseModel):
|
|
timestamp: str = Field(..., description="Event timestamp")
|
|
service: str = Field(..., description="Service name")
|
|
event_type: str = Field(..., description="Event type")
|
|
metadata: str = Field(..., description="Event metadata as JSON string")
|
|
|
|
class EventsResponse(BaseModel):
|
|
events: List[Event] = Field(..., description="List of events")
|
|
|
|
class HealthResponse(BaseModel):
|
|
status: str = Field(..., description="Service health status")
|
|
timestamp: str = Field(..., description="Health check timestamp")
|
|
|
|
class RootResponse(BaseModel):
|
|
message: str = Field(..., description="API message")
|
|
version: str = Field(..., description="API version")
|
|
|
|
app = FastAPI(
|
|
title="LabFusion Service Adapters",
|
|
description="Service integration adapters for Home Assistant, Frigate, Immich, and other homelab services",
|
|
version="1.0.0",
|
|
contact={
|
|
"name": "LabFusion Team",
|
|
"url": "https://github.com/labfusion/labfusion",
|
|
"email": "team@labfusion.dev"
|
|
},
|
|
license_info={
|
|
"name": "MIT License",
|
|
"url": "https://opensource.org/licenses/MIT"
|
|
},
|
|
servers=[
|
|
{
|
|
"url": "http://localhost:8000",
|
|
"description": "Development Server"
|
|
},
|
|
{
|
|
"url": "https://adapters.labfusion.dev",
|
|
"description": "Production Server"
|
|
}
|
|
]
|
|
)
|
|
|
|
# CORS middleware
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Redis connection
|
|
redis_client = redis.Redis(
|
|
host=os.getenv("REDIS_HOST", "localhost"),
|
|
port=int(os.getenv("REDIS_PORT", 6379)),
|
|
decode_responses=True
|
|
)
|
|
|
|
# Service configurations
|
|
SERVICES = {
|
|
"home_assistant": {
|
|
"url": os.getenv("HOME_ASSISTANT_URL", "https://homeassistant.local:8123"),
|
|
"token": os.getenv("HOME_ASSISTANT_TOKEN", ""),
|
|
"enabled": bool(os.getenv("HOME_ASSISTANT_TOKEN"))
|
|
},
|
|
"frigate": {
|
|
"url": os.getenv("FRIGATE_URL", "http://frigate.local:5000"),
|
|
"token": os.getenv("FRIGATE_TOKEN", ""),
|
|
"enabled": bool(os.getenv("FRIGATE_TOKEN"))
|
|
},
|
|
"immich": {
|
|
"url": os.getenv("IMMICH_URL", "http://immich.local:2283"),
|
|
"api_key": os.getenv("IMMICH_API_KEY", ""),
|
|
"enabled": bool(os.getenv("IMMICH_API_KEY"))
|
|
},
|
|
"n8n": {
|
|
"url": os.getenv("N8N_URL", "http://n8n.local:5678"),
|
|
"webhook_url": os.getenv("N8N_WEBHOOK_URL", ""),
|
|
"enabled": bool(os.getenv("N8N_WEBHOOK_URL"))
|
|
}
|
|
}
|
|
|
|
@app.get("/",
|
|
response_model=RootResponse,
|
|
summary="API Root",
|
|
description="Get basic API information",
|
|
tags=["General"])
|
|
async def root():
|
|
"""Get basic API information and version"""
|
|
return RootResponse(
|
|
message="LabFusion Service Adapters API",
|
|
version="1.0.0"
|
|
)
|
|
|
|
@app.get("/health",
|
|
response_model=HealthResponse,
|
|
summary="Health Check",
|
|
description="Check service health status",
|
|
tags=["General"])
|
|
async def health_check():
|
|
"""Check the health status of the service adapters"""
|
|
return HealthResponse(
|
|
status="healthy",
|
|
timestamp=datetime.now().isoformat()
|
|
)
|
|
|
|
@app.get("/services",
|
|
response_model=Dict[str, ServiceStatus],
|
|
summary="Get Service Status",
|
|
description="Get status of all configured external services",
|
|
tags=["Services"])
|
|
async def get_services():
|
|
"""Get status of all configured external services (Home Assistant, Frigate, Immich, n8n)"""
|
|
service_status = {}
|
|
for service_name, config in SERVICES.items():
|
|
service_status[service_name] = ServiceStatus(
|
|
enabled=config["enabled"],
|
|
url=config["url"],
|
|
status="unknown" # Would check actual service status
|
|
)
|
|
return service_status
|
|
|
|
@app.get("/home-assistant/entities",
|
|
response_model=HAEntitiesResponse,
|
|
summary="Get Home Assistant Entities",
|
|
description="Retrieve all entities from Home Assistant",
|
|
responses={
|
|
200: {"description": "Successfully retrieved entities"},
|
|
503: {"description": "Home Assistant integration not configured"}
|
|
},
|
|
tags=["Home Assistant"])
|
|
async def get_ha_entities():
|
|
"""Get Home Assistant entities including sensors, switches, and other devices"""
|
|
if not SERVICES["home_assistant"]["enabled"]:
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="Home Assistant integration not configured. Please set HOME_ASSISTANT_TOKEN environment variable."
|
|
)
|
|
|
|
# This would make actual API calls to Home Assistant
|
|
# For now, return mock data
|
|
return HAEntitiesResponse(
|
|
entities=[
|
|
HAEntity(
|
|
entity_id="sensor.cpu_usage",
|
|
state="45.2",
|
|
attributes=HAAttributes(
|
|
unit_of_measurement="%",
|
|
friendly_name="CPU Usage"
|
|
)
|
|
),
|
|
HAEntity(
|
|
entity_id="sensor.memory_usage",
|
|
state="2.1",
|
|
attributes=HAAttributes(
|
|
unit_of_measurement="GB",
|
|
friendly_name="Memory Usage"
|
|
)
|
|
)
|
|
]
|
|
)
|
|
|
|
@app.get("/frigate/events",
|
|
response_model=FrigateEventsResponse,
|
|
summary="Get Frigate Events",
|
|
description="Retrieve detection events from Frigate NVR",
|
|
responses={
|
|
200: {"description": "Successfully retrieved events"},
|
|
503: {"description": "Frigate integration not configured"}
|
|
},
|
|
tags=["Frigate"])
|
|
async def get_frigate_events():
|
|
"""Get Frigate detection events including person, vehicle, and object detections"""
|
|
if not SERVICES["frigate"]["enabled"]:
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="Frigate integration not configured. Please set FRIGATE_TOKEN environment variable."
|
|
)
|
|
|
|
# This would make actual API calls to Frigate
|
|
# For now, return mock data
|
|
return FrigateEventsResponse(
|
|
events=[
|
|
FrigateEvent(
|
|
id="event_123",
|
|
timestamp=datetime.now().isoformat(),
|
|
camera="front_door",
|
|
label="person",
|
|
confidence=0.95
|
|
)
|
|
]
|
|
)
|
|
|
|
@app.get("/immich/assets",
|
|
response_model=ImmichAssetsResponse,
|
|
summary="Get Immich Assets",
|
|
description="Retrieve photo assets from Immich",
|
|
responses={
|
|
200: {"description": "Successfully retrieved assets"},
|
|
503: {"description": "Immich integration not configured"}
|
|
},
|
|
tags=["Immich"])
|
|
async def get_immich_assets():
|
|
"""Get Immich photo assets including metadata, tags, and face detection results"""
|
|
if not SERVICES["immich"]["enabled"]:
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="Immich integration not configured. Please set IMMICH_API_KEY environment variable."
|
|
)
|
|
|
|
# This would make actual API calls to Immich
|
|
# For now, return mock data
|
|
return ImmichAssetsResponse(
|
|
assets=[
|
|
ImmichAsset(
|
|
id="asset_123",
|
|
filename="photo_001.jpg",
|
|
created_at=datetime.now().isoformat(),
|
|
tags=["person", "outdoor"],
|
|
faces=["Alice", "Bob"]
|
|
)
|
|
]
|
|
)
|
|
|
|
@app.post("/publish-event",
|
|
response_model=EventResponse,
|
|
summary="Publish Event",
|
|
description="Publish an event to the Redis message bus",
|
|
responses={
|
|
200: {"description": "Event published successfully"},
|
|
500: {"description": "Failed to publish event"}
|
|
},
|
|
tags=["Events"])
|
|
async def publish_event(event_data: EventData, background_tasks: BackgroundTasks):
|
|
"""Publish an event to the Redis message bus for consumption by other services"""
|
|
try:
|
|
event = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"service": event_data.service,
|
|
"event_type": event_data.event_type,
|
|
"metadata": json.dumps(event_data.metadata)
|
|
}
|
|
|
|
# Publish to Redis
|
|
redis_client.lpush("events", json.dumps(event))
|
|
|
|
return EventResponse(
|
|
status="published",
|
|
event=event
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@app.get("/events",
|
|
response_model=EventsResponse,
|
|
summary="Get Events",
|
|
description="Retrieve recent events from the message bus",
|
|
responses={
|
|
200: {"description": "Successfully retrieved events"},
|
|
500: {"description": "Failed to retrieve events"}
|
|
},
|
|
tags=["Events"])
|
|
async def get_events(limit: int = Query(100, ge=1, le=1000, description="Maximum number of events to retrieve")):
|
|
"""Get recent events from the Redis message bus"""
|
|
try:
|
|
events = redis_client.lrange("events", 0, limit - 1)
|
|
parsed_events = []
|
|
for event in events:
|
|
try:
|
|
event_data = json.loads(event)
|
|
parsed_events.append(Event(**event_data))
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
return EventsResponse(events=parsed_events)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@app.get("/home-assistant/entity/{entity_id}",
|
|
response_model=HAEntity,
|
|
summary="Get Specific HA Entity",
|
|
description="Get a specific Home Assistant entity by ID",
|
|
responses={
|
|
200: {"description": "Successfully retrieved entity"},
|
|
404: {"description": "Entity not found"},
|
|
503: {"description": "Home Assistant integration not configured"}
|
|
},
|
|
tags=["Home Assistant"])
|
|
async def get_ha_entity(entity_id: str = Path(..., description="Entity ID")):
|
|
"""Get a specific Home Assistant entity by its ID"""
|
|
if not SERVICES["home_assistant"]["enabled"]:
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="Home Assistant integration not configured. Please set HOME_ASSISTANT_TOKEN environment variable."
|
|
)
|
|
|
|
# This would make actual API calls to Home Assistant
|
|
# For now, return mock data
|
|
return HAEntity(
|
|
entity_id=entity_id,
|
|
state="unknown",
|
|
attributes=HAAttributes(
|
|
unit_of_measurement="",
|
|
friendly_name=f"Entity {entity_id}"
|
|
)
|
|
)
|
|
|
|
@app.get("/frigate/cameras",
|
|
summary="Get Frigate Cameras",
|
|
description="Get list of Frigate cameras",
|
|
responses={
|
|
200: {"description": "Successfully retrieved cameras"},
|
|
503: {"description": "Frigate integration not configured"}
|
|
},
|
|
tags=["Frigate"])
|
|
async def get_frigate_cameras():
|
|
"""Get list of available Frigate cameras"""
|
|
if not SERVICES["frigate"]["enabled"]:
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="Frigate integration not configured. Please set FRIGATE_TOKEN environment variable."
|
|
)
|
|
|
|
# This would make actual API calls to Frigate
|
|
# For now, return mock data
|
|
return {
|
|
"cameras": [
|
|
{"name": "front_door", "enabled": True},
|
|
{"name": "back_yard", "enabled": True},
|
|
{"name": "garage", "enabled": False}
|
|
]
|
|
}
|
|
|
|
@app.get("/immich/albums",
|
|
summary="Get Immich Albums",
|
|
description="Get list of Immich albums",
|
|
responses={
|
|
200: {"description": "Successfully retrieved albums"},
|
|
503: {"description": "Immich integration not configured"}
|
|
},
|
|
tags=["Immich"])
|
|
async def get_immich_albums():
|
|
"""Get list of Immich albums"""
|
|
if not SERVICES["immich"]["enabled"]:
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="Immich integration not configured. Please set IMMICH_API_KEY environment variable."
|
|
)
|
|
|
|
# This would make actual API calls to Immich
|
|
# For now, return mock data
|
|
return {
|
|
"albums": [
|
|
{"id": "album_1", "name": "Family Photos", "asset_count": 150},
|
|
{"id": "album_2", "name": "Vacation 2024", "asset_count": 75}
|
|
]
|
|
}
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|