Files
labFusion/services/service-adapters/main.py
2025-09-11 22:08:12 +02:00

172 lines
5.3 KiB
Python

from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
import asyncio
import redis
import json
from datetime import datetime
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
app = FastAPI(
title="LabFusion Service Adapters",
description="Service integration adapters for Home Assistant, Frigate, Immich, and other homelab services",
version="1.0.0"
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Redis connection
redis_client = redis.Redis(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", 6379)),
decode_responses=True
)
# Service configurations
SERVICES = {
"home_assistant": {
"url": os.getenv("HOME_ASSISTANT_URL", "https://homeassistant.local:8123"),
"token": os.getenv("HOME_ASSISTANT_TOKEN", ""),
"enabled": bool(os.getenv("HOME_ASSISTANT_TOKEN"))
},
"frigate": {
"url": os.getenv("FRIGATE_URL", "http://frigate.local:5000"),
"token": os.getenv("FRIGATE_TOKEN", ""),
"enabled": bool(os.getenv("FRIGATE_TOKEN"))
},
"immich": {
"url": os.getenv("IMMICH_URL", "http://immich.local:2283"),
"api_key": os.getenv("IMMICH_API_KEY", ""),
"enabled": bool(os.getenv("IMMICH_API_KEY"))
},
"n8n": {
"url": os.getenv("N8N_URL", "http://n8n.local:5678"),
"webhook_url": os.getenv("N8N_WEBHOOK_URL", ""),
"enabled": bool(os.getenv("N8N_WEBHOOK_URL"))
}
}
@app.get("/")
async def root():
return {"message": "LabFusion Service Adapters API", "version": "1.0.0"}
@app.get("/health")
async def health_check():
return {"status": "healthy", "timestamp": datetime.now().isoformat()}
@app.get("/services")
async def get_services():
"""Get status of all configured services"""
service_status = {}
for service_name, config in SERVICES.items():
service_status[service_name] = {
"enabled": config["enabled"],
"url": config["url"],
"status": "unknown" # Would check actual service status
}
return service_status
@app.get("/home-assistant/entities")
async def get_ha_entities():
"""Get Home Assistant entities"""
if not SERVICES["home_assistant"]["enabled"]:
raise HTTPException(status_code=503, detail="Home Assistant integration not configured")
# This would make actual API calls to Home Assistant
# For now, return mock data
return {
"entities": [
{
"entity_id": "sensor.cpu_usage",
"state": "45.2",
"attributes": {"unit_of_measurement": "%", "friendly_name": "CPU Usage"}
},
{
"entity_id": "sensor.memory_usage",
"state": "2.1",
"attributes": {"unit_of_measurement": "GB", "friendly_name": "Memory Usage"}
}
]
}
@app.get("/frigate/events")
async def get_frigate_events():
"""Get Frigate detection events"""
if not SERVICES["frigate"]["enabled"]:
raise HTTPException(status_code=503, detail="Frigate integration not configured")
# This would make actual API calls to Frigate
# For now, return mock data
return {
"events": [
{
"id": "event_123",
"timestamp": datetime.now().isoformat(),
"camera": "front_door",
"label": "person",
"confidence": 0.95
}
]
}
@app.get("/immich/assets")
async def get_immich_assets():
"""Get Immich photo assets"""
if not SERVICES["immich"]["enabled"]:
raise HTTPException(status_code=503, detail="Immich integration not configured")
# This would make actual API calls to Immich
# For now, return mock data
return {
"assets": [
{
"id": "asset_123",
"filename": "photo_001.jpg",
"created_at": datetime.now().isoformat(),
"tags": ["person", "outdoor"],
"faces": ["Alice", "Bob"]
}
]
}
@app.post("/publish-event")
async def publish_event(event_data: dict, background_tasks: BackgroundTasks):
"""Publish an event to the message bus"""
try:
event = {
"timestamp": datetime.now().isoformat(),
"service": event_data.get("service", "unknown"),
"event_type": event_data.get("event_type", "unknown"),
"metadata": json.dumps(event_data.get("metadata", {}))
}
# Publish to Redis
redis_client.lpush("events", json.dumps(event))
return {"status": "published", "event": event}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/events")
async def get_events(limit: int = 100):
"""Get recent events from the message bus"""
try:
events = redis_client.lrange("events", 0, limit - 1)
return {"events": [json.loads(event) for event in events]}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)