Refactor API Docs CI workflow to correct ESLint command syntax and enhance Python formatting in CI pipelines; update progress tracking documentation
Some checks failed
API Docs (Node.js Express) / test (16) (push) Failing after 5m29s
API Docs (Node.js Express) / test (18) (push) Failing after 5m25s
API Docs (Node.js Express) / test (20) (push) Failing after 1m4s
API Docs (Node.js Express) / build (push) Has been skipped
API Docs (Node.js Express) / security (push) Has been skipped
LabFusion CI/CD Pipeline / api-gateway (push) Failing after 4m52s
LabFusion CI/CD Pipeline / service-adapters (push) Failing after 5m1s
LabFusion CI/CD Pipeline / api-docs (push) Failing after 5m12s
LabFusion CI/CD Pipeline / frontend (push) Failing after 6m39s
LabFusion CI/CD Pipeline / integration-tests (push) Has been skipped
LabFusion CI/CD Pipeline / security-scan (push) Has been skipped
Docker Build and Push / build-and-push (push) Failing after 34s
Docker Build and Push / security-scan (push) Has been skipped
Integration Tests / integration-tests (push) Failing after 1m33s
Integration Tests / performance-tests (push) Has been skipped
Service Adapters (Python FastAPI) / test (3.1) (push) Failing after 35s
Service Adapters (Python FastAPI) / test (3.11) (push) Failing after 5m20s
Service Adapters (Python FastAPI) / test (3.12) (push) Failing after 5m27s
Service Adapters (Python FastAPI) / test (3.9) (push) Failing after 5m50s
Docker Build and Push / deploy-staging (push) Has been skipped
Service Adapters (Python FastAPI) / build (push) Has been skipped
Service Adapters (Python FastAPI) / security (push) Has been skipped
Docker Build and Push / deploy-production (push) Has been skipped

This commit is contained in:
glenn schrooyen
2025-09-12 17:44:23 +02:00
parent 60da5ea115
commit 8d1755fd52
13 changed files with 642 additions and 336 deletions

View File

@@ -0,0 +1,230 @@
{
"errors": [],
"generated_at": "2025-09-12T15:43:08Z",
"metrics": {
".\\main.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 1,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 1,
"SEVERITY.UNDEFINED": 0,
"loc": 28,
"nosec": 0,
"skipped_tests": 0
},
".\\main_old.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 1,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 1,
"SEVERITY.UNDEFINED": 0,
"loc": 368,
"nosec": 0,
"skipped_tests": 0
},
".\\models\\__init__.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 0,
"nosec": 0,
"skipped_tests": 0
},
".\\models\\schemas.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 51,
"nosec": 0,
"skipped_tests": 0
},
".\\routes\\__init__.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 0,
"nosec": 0,
"skipped_tests": 0
},
".\\routes\\events.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 59,
"nosec": 0,
"skipped_tests": 0
},
".\\routes\\frigate.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 58,
"nosec": 0,
"skipped_tests": 0
},
".\\routes\\general.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 42,
"nosec": 0,
"skipped_tests": 0
},
".\\routes\\home_assistant.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 66,
"nosec": 0,
"skipped_tests": 0
},
".\\routes\\immich.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 57,
"nosec": 0,
"skipped_tests": 0
},
".\\services\\__init__.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 0,
"nosec": 0,
"skipped_tests": 0
},
".\\services\\config.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 25,
"nosec": 0,
"skipped_tests": 0
},
".\\services\\redis_client.py": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 0,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 0,
"SEVERITY.UNDEFINED": 0,
"loc": 7,
"nosec": 0,
"skipped_tests": 0
},
"_totals": {
"CONFIDENCE.HIGH": 0,
"CONFIDENCE.LOW": 0,
"CONFIDENCE.MEDIUM": 2,
"CONFIDENCE.UNDEFINED": 0,
"SEVERITY.HIGH": 0,
"SEVERITY.LOW": 0,
"SEVERITY.MEDIUM": 2,
"SEVERITY.UNDEFINED": 0,
"loc": 761,
"nosec": 0,
"skipped_tests": 0
}
},
"results": [
{
"code": "37 \n38 uvicorn.run(app, host=\"0.0.0.0\", port=8000)\n",
"col_offset": 26,
"end_col_offset": 35,
"filename": ".\\main.py",
"issue_confidence": "MEDIUM",
"issue_cwe": {
"id": 605,
"link": "https://cwe.mitre.org/data/definitions/605.html"
},
"issue_severity": "MEDIUM",
"issue_text": "Possible binding to all interfaces.",
"line_number": 38,
"line_range": [
38
],
"more_info": "https://bandit.readthedocs.io/en/1.8.6/plugins/b104_hardcoded_bind_all_interfaces.html",
"test_id": "B104",
"test_name": "hardcoded_bind_all_interfaces"
},
{
"code": "454 \n455 uvicorn.run(app, host=\"0.0.0.0\", port=8000)\n",
"col_offset": 26,
"end_col_offset": 35,
"filename": ".\\main_old.py",
"issue_confidence": "MEDIUM",
"issue_cwe": {
"id": 605,
"link": "https://cwe.mitre.org/data/definitions/605.html"
},
"issue_severity": "MEDIUM",
"issue_text": "Possible binding to all interfaces.",
"line_number": 455,
"line_range": [
455
],
"more_info": "https://bandit.readthedocs.io/en/1.8.6/plugins/b104_hardcoded_bind_all_interfaces.html",
"test_id": "B104",
"test_name": "hardcoded_bind_all_interfaces"
}
]
}

View File

@@ -2,27 +2,18 @@ from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
# Import route modules
from routes import general, home_assistant, frigate, immich, events
from routes import events, frigate, general, home_assistant, immich
# Create FastAPI app
app = FastAPI(
title="LabFusion Service Adapters",
description="Service integration adapters for Home Assistant, Frigate, Immich, and other homelab services",
version="1.0.0",
license_info={
"name": "MIT License",
"url": "https://opensource.org/licenses/MIT"
},
license_info={"name": "MIT License", "url": "https://opensource.org/licenses/MIT"},
servers=[
{
"url": "http://localhost:8000",
"description": "Development Server"
},
{
"url": "https://adapters.labfusion.dev",
"description": "Production Server"
}
]
{"url": "http://localhost:8000", "description": "Development Server"},
{"url": "https://adapters.labfusion.dev", "description": "Production Server"},
],
)
# CORS middleware
@@ -43,4 +34,5 @@ app.include_router(events.router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -1,36 +1,40 @@
from fastapi import FastAPI, HTTPException, BackgroundTasks, Query, Path
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
import asyncio
import redis
import json
from datetime import datetime
import os
from datetime import datetime
from typing import Any, Dict, List, Optional
import redis
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI, HTTPException, Path, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
# Load environment variables
load_dotenv()
# Pydantic models for request/response schemas
class ServiceStatus(BaseModel):
enabled: bool = Field(..., description="Whether the service is enabled")
url: str = Field(..., description="Service URL")
status: str = Field(..., description="Service status")
class HAAttributes(BaseModel):
unit_of_measurement: Optional[str] = Field(None, description="Unit of measurement")
friendly_name: Optional[str] = Field(None, description="Friendly name")
class HAEntity(BaseModel):
entity_id: str = Field(..., description="Entity ID")
state: str = Field(..., description="Current state")
attributes: HAAttributes = Field(..., description="Entity attributes")
class HAEntitiesResponse(BaseModel):
entities: List[HAEntity] = Field(..., description="List of Home Assistant entities")
class FrigateEvent(BaseModel):
id: str = Field(..., description="Event ID")
timestamp: str = Field(..., description="Event timestamp")
@@ -38,9 +42,11 @@ class FrigateEvent(BaseModel):
label: str = Field(..., description="Detection label")
confidence: float = Field(..., ge=0, le=1, description="Detection confidence")
class FrigateEventsResponse(BaseModel):
events: List[FrigateEvent] = Field(..., description="List of Frigate events")
class ImmichAsset(BaseModel):
id: str = Field(..., description="Asset ID")
filename: str = Field(..., description="Filename")
@@ -48,35 +54,43 @@ class ImmichAsset(BaseModel):
tags: List[str] = Field(..., description="Asset tags")
faces: List[str] = Field(..., description="Detected faces")
class ImmichAssetsResponse(BaseModel):
assets: List[ImmichAsset] = Field(..., description="List of Immich assets")
class EventData(BaseModel):
service: str = Field(..., description="Service name")
event_type: str = Field(..., description="Event type")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Event metadata")
class EventResponse(BaseModel):
status: str = Field(..., description="Publication status")
event: Dict[str, Any] = Field(..., description="Published event")
class Event(BaseModel):
timestamp: str = Field(..., description="Event timestamp")
service: str = Field(..., description="Service name")
event_type: str = Field(..., description="Event type")
metadata: str = Field(..., description="Event metadata as JSON string")
class EventsResponse(BaseModel):
events: List[Event] = Field(..., description="List of events")
class HealthResponse(BaseModel):
status: str = Field(..., description="Service health status")
timestamp: str = Field(..., description="Health check timestamp")
class RootResponse(BaseModel):
message: str = Field(..., description="API message")
version: str = Field(..., description="API version")
app = FastAPI(
title="LabFusion Service Adapters",
description="Service integration adapters for Home Assistant, Frigate, Immich, and other homelab services",
@@ -84,22 +98,13 @@ app = FastAPI(
contact={
"name": "LabFusion Team",
"url": "https://github.com/labfusion/labfusion",
"email": "team@labfusion.dev"
},
license_info={
"name": "MIT License",
"url": "https://opensource.org/licenses/MIT"
"email": "team@labfusion.dev",
},
license_info={"name": "MIT License", "url": "https://opensource.org/licenses/MIT"},
servers=[
{
"url": "http://localhost:8000",
"description": "Development Server"
},
{
"url": "https://adapters.labfusion.dev",
"description": "Production Server"
}
]
{"url": "http://localhost:8000", "description": "Development Server"},
{"url": "https://adapters.labfusion.dev", "description": "Production Server"},
],
)
# CORS middleware
@@ -115,7 +120,7 @@ app.add_middleware(
redis_client = redis.Redis(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", 6379)),
decode_responses=True
decode_responses=True,
)
# Service configurations
@@ -123,54 +128,57 @@ SERVICES = {
"home_assistant": {
"url": os.getenv("HOME_ASSISTANT_URL", "https://homeassistant.local:8123"),
"token": os.getenv("HOME_ASSISTANT_TOKEN", ""),
"enabled": bool(os.getenv("HOME_ASSISTANT_TOKEN"))
"enabled": bool(os.getenv("HOME_ASSISTANT_TOKEN")),
},
"frigate": {
"url": os.getenv("FRIGATE_URL", "http://frigate.local:5000"),
"token": os.getenv("FRIGATE_TOKEN", ""),
"enabled": bool(os.getenv("FRIGATE_TOKEN"))
"enabled": bool(os.getenv("FRIGATE_TOKEN")),
},
"immich": {
"url": os.getenv("IMMICH_URL", "http://immich.local:2283"),
"api_key": os.getenv("IMMICH_API_KEY", ""),
"enabled": bool(os.getenv("IMMICH_API_KEY"))
"enabled": bool(os.getenv("IMMICH_API_KEY")),
},
"n8n": {
"url": os.getenv("N8N_URL", "http://n8n.local:5678"),
"webhook_url": os.getenv("N8N_WEBHOOK_URL", ""),
"enabled": bool(os.getenv("N8N_WEBHOOK_URL"))
}
"enabled": bool(os.getenv("N8N_WEBHOOK_URL")),
},
}
@app.get("/",
response_model=RootResponse,
summary="API Root",
description="Get basic API information",
tags=["General"])
@app.get(
"/",
response_model=RootResponse,
summary="API Root",
description="Get basic API information",
tags=["General"],
)
async def root():
"""Get basic API information and version"""
return RootResponse(
message="LabFusion Service Adapters API",
version="1.0.0"
)
return RootResponse(message="LabFusion Service Adapters API", version="1.0.0")
@app.get("/health",
response_model=HealthResponse,
summary="Health Check",
description="Check service health status",
tags=["General"])
@app.get(
"/health",
response_model=HealthResponse,
summary="Health Check",
description="Check service health status",
tags=["General"],
)
async def health_check():
"""Check the health status of the service adapters"""
return HealthResponse(
status="healthy",
timestamp=datetime.now().isoformat()
)
return HealthResponse(status="healthy", timestamp=datetime.now().isoformat())
@app.get("/services",
response_model=Dict[str, ServiceStatus],
summary="Get Service Status",
description="Get status of all configured external services",
tags=["Services"])
@app.get(
"/services",
response_model=Dict[str, ServiceStatus],
summary="Get Service Status",
description="Get status of all configured external services",
tags=["Services"],
)
async def get_services():
"""Get status of all configured external services (Home Assistant, Frigate, Immich, n8n)"""
service_status = {}
@@ -178,27 +186,30 @@ async def get_services():
service_status[service_name] = ServiceStatus(
enabled=config["enabled"],
url=config["url"],
status="unknown" # Would check actual service status
status="unknown", # Would check actual service status
)
return service_status
@app.get("/home-assistant/entities",
response_model=HAEntitiesResponse,
summary="Get Home Assistant Entities",
description="Retrieve all entities from Home Assistant",
responses={
200: {"description": "Successfully retrieved entities"},
503: {"description": "Home Assistant integration not configured"}
},
tags=["Home Assistant"])
@app.get(
"/home-assistant/entities",
response_model=HAEntitiesResponse,
summary="Get Home Assistant Entities",
description="Retrieve all entities from Home Assistant",
responses={
200: {"description": "Successfully retrieved entities"},
503: {"description": "Home Assistant integration not configured"},
},
tags=["Home Assistant"],
)
async def get_ha_entities():
"""Get Home Assistant entities including sensors, switches, and other devices"""
if not SERVICES["home_assistant"]["enabled"]:
raise HTTPException(
status_code=503,
detail="Home Assistant integration not configured. Please set HOME_ASSISTANT_TOKEN environment variable."
status_code=503,
detail="Home Assistant integration not configured. Please set HOME_ASSISTANT_TOKEN environment variable.",
)
# This would make actual API calls to Home Assistant
# For now, return mock data
return HAEntitiesResponse(
@@ -207,38 +218,39 @@ async def get_ha_entities():
entity_id="sensor.cpu_usage",
state="45.2",
attributes=HAAttributes(
unit_of_measurement="%",
friendly_name="CPU Usage"
)
unit_of_measurement="%", friendly_name="CPU Usage"
),
),
HAEntity(
entity_id="sensor.memory_usage",
state="2.1",
attributes=HAAttributes(
unit_of_measurement="GB",
friendly_name="Memory Usage"
)
)
unit_of_measurement="GB", friendly_name="Memory Usage"
),
),
]
)
@app.get("/frigate/events",
response_model=FrigateEventsResponse,
summary="Get Frigate Events",
description="Retrieve detection events from Frigate NVR",
responses={
200: {"description": "Successfully retrieved events"},
503: {"description": "Frigate integration not configured"}
},
tags=["Frigate"])
@app.get(
"/frigate/events",
response_model=FrigateEventsResponse,
summary="Get Frigate Events",
description="Retrieve detection events from Frigate NVR",
responses={
200: {"description": "Successfully retrieved events"},
503: {"description": "Frigate integration not configured"},
},
tags=["Frigate"],
)
async def get_frigate_events():
"""Get Frigate detection events including person, vehicle, and object detections"""
if not SERVICES["frigate"]["enabled"]:
raise HTTPException(
status_code=503,
detail="Frigate integration not configured. Please set FRIGATE_TOKEN environment variable."
status_code=503,
detail="Frigate integration not configured. Please set FRIGATE_TOKEN environment variable.",
)
# This would make actual API calls to Frigate
# For now, return mock data
return FrigateEventsResponse(
@@ -248,28 +260,31 @@ async def get_frigate_events():
timestamp=datetime.now().isoformat(),
camera="front_door",
label="person",
confidence=0.95
confidence=0.95,
)
]
)
@app.get("/immich/assets",
response_model=ImmichAssetsResponse,
summary="Get Immich Assets",
description="Retrieve photo assets from Immich",
responses={
200: {"description": "Successfully retrieved assets"},
503: {"description": "Immich integration not configured"}
},
tags=["Immich"])
@app.get(
"/immich/assets",
response_model=ImmichAssetsResponse,
summary="Get Immich Assets",
description="Retrieve photo assets from Immich",
responses={
200: {"description": "Successfully retrieved assets"},
503: {"description": "Immich integration not configured"},
},
tags=["Immich"],
)
async def get_immich_assets():
"""Get Immich photo assets including metadata, tags, and face detection results"""
if not SERVICES["immich"]["enabled"]:
raise HTTPException(
status_code=503,
detail="Immich integration not configured. Please set IMMICH_API_KEY environment variable."
status_code=503,
detail="Immich integration not configured. Please set IMMICH_API_KEY environment variable.",
)
# This would make actual API calls to Immich
# For now, return mock data
return ImmichAssetsResponse(
@@ -279,20 +294,23 @@ async def get_immich_assets():
filename="photo_001.jpg",
created_at=datetime.now().isoformat(),
tags=["person", "outdoor"],
faces=["Alice", "Bob"]
faces=["Alice", "Bob"],
)
]
)
@app.post("/publish-event",
response_model=EventResponse,
summary="Publish Event",
description="Publish an event to the Redis message bus",
responses={
200: {"description": "Event published successfully"},
500: {"description": "Failed to publish event"}
},
tags=["Events"])
@app.post(
"/publish-event",
response_model=EventResponse,
summary="Publish Event",
description="Publish an event to the Redis message bus",
responses={
200: {"description": "Event published successfully"},
500: {"description": "Failed to publish event"},
},
tags=["Events"],
)
async def publish_event(event_data: EventData, background_tasks: BackgroundTasks):
"""Publish an event to the Redis message bus for consumption by other services"""
try:
@@ -300,29 +318,33 @@ async def publish_event(event_data: EventData, background_tasks: BackgroundTasks
"timestamp": datetime.now().isoformat(),
"service": event_data.service,
"event_type": event_data.event_type,
"metadata": json.dumps(event_data.metadata)
"metadata": json.dumps(event_data.metadata),
}
# Publish to Redis
redis_client.lpush("events", json.dumps(event))
return EventResponse(
status="published",
event=event
)
return EventResponse(status="published", event=event)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/events",
response_model=EventsResponse,
summary="Get Events",
description="Retrieve recent events from the message bus",
responses={
200: {"description": "Successfully retrieved events"},
500: {"description": "Failed to retrieve events"}
},
tags=["Events"])
async def get_events(limit: int = Query(100, ge=1, le=1000, description="Maximum number of events to retrieve")):
@app.get(
"/events",
response_model=EventsResponse,
summary="Get Events",
description="Retrieve recent events from the message bus",
responses={
200: {"description": "Successfully retrieved events"},
500: {"description": "Failed to retrieve events"},
},
tags=["Events"],
)
async def get_events(
limit: int = Query(
100, ge=1, le=1000, description="Maximum number of events to retrieve"
)
):
"""Get recent events from the Redis message bus"""
try:
events = redis_client.lrange("events", 0, limit - 1)
@@ -333,91 +355,101 @@ async def get_events(limit: int = Query(100, ge=1, le=1000, description="Maximum
parsed_events.append(Event(**event_data))
except json.JSONDecodeError:
continue
return EventsResponse(events=parsed_events)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/home-assistant/entity/{entity_id}",
response_model=HAEntity,
summary="Get Specific HA Entity",
description="Get a specific Home Assistant entity by ID",
responses={
200: {"description": "Successfully retrieved entity"},
404: {"description": "Entity not found"},
503: {"description": "Home Assistant integration not configured"}
},
tags=["Home Assistant"])
@app.get(
"/home-assistant/entity/{entity_id}",
response_model=HAEntity,
summary="Get Specific HA Entity",
description="Get a specific Home Assistant entity by ID",
responses={
200: {"description": "Successfully retrieved entity"},
404: {"description": "Entity not found"},
503: {"description": "Home Assistant integration not configured"},
},
tags=["Home Assistant"],
)
async def get_ha_entity(entity_id: str = Path(..., description="Entity ID")):
"""Get a specific Home Assistant entity by its ID"""
if not SERVICES["home_assistant"]["enabled"]:
raise HTTPException(
status_code=503,
detail="Home Assistant integration not configured. Please set HOME_ASSISTANT_TOKEN environment variable."
status_code=503,
detail="Home Assistant integration not configured. Please set HOME_ASSISTANT_TOKEN environment variable.",
)
# This would make actual API calls to Home Assistant
# For now, return mock data
return HAEntity(
entity_id=entity_id,
state="unknown",
attributes=HAAttributes(
unit_of_measurement="",
friendly_name=f"Entity {entity_id}"
)
unit_of_measurement="", friendly_name=f"Entity {entity_id}"
),
)
@app.get("/frigate/cameras",
summary="Get Frigate Cameras",
description="Get list of Frigate cameras",
responses={
200: {"description": "Successfully retrieved cameras"},
503: {"description": "Frigate integration not configured"}
},
tags=["Frigate"])
@app.get(
"/frigate/cameras",
summary="Get Frigate Cameras",
description="Get list of Frigate cameras",
responses={
200: {"description": "Successfully retrieved cameras"},
503: {"description": "Frigate integration not configured"},
},
tags=["Frigate"],
)
async def get_frigate_cameras():
"""Get list of available Frigate cameras"""
if not SERVICES["frigate"]["enabled"]:
raise HTTPException(
status_code=503,
detail="Frigate integration not configured. Please set FRIGATE_TOKEN environment variable."
status_code=503,
detail="Frigate integration not configured. Please set FRIGATE_TOKEN environment variable.",
)
# This would make actual API calls to Frigate
# For now, return mock data
return {
"cameras": [
{"name": "front_door", "enabled": True},
{"name": "back_yard", "enabled": True},
{"name": "garage", "enabled": False}
{"name": "garage", "enabled": False},
]
}
@app.get("/immich/albums",
summary="Get Immich Albums",
description="Get list of Immich albums",
responses={
200: {"description": "Successfully retrieved albums"},
503: {"description": "Immich integration not configured"}
},
tags=["Immich"])
@app.get(
"/immich/albums",
summary="Get Immich Albums",
description="Get list of Immich albums",
responses={
200: {"description": "Successfully retrieved albums"},
503: {"description": "Immich integration not configured"},
},
tags=["Immich"],
)
async def get_immich_albums():
"""Get list of Immich albums"""
if not SERVICES["immich"]["enabled"]:
raise HTTPException(
status_code=503,
detail="Immich integration not configured. Please set IMMICH_API_KEY environment variable."
status_code=503,
detail="Immich integration not configured. Please set IMMICH_API_KEY environment variable.",
)
# This would make actual API calls to Immich
# For now, return mock data
return {
"albums": [
{"id": "album_1", "name": "Family Photos", "asset_count": 150},
{"id": "album_2", "name": "Vacation 2024", "asset_count": 75}
{"id": "album_2", "name": "Vacation 2024", "asset_count": 75},
]
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -1,23 +1,29 @@
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
class ServiceStatus(BaseModel):
enabled: bool = Field(..., description="Whether the service is enabled")
url: str = Field(..., description="Service URL")
status: str = Field(..., description="Service status")
class HAAttributes(BaseModel):
unit_of_measurement: Optional[str] = Field(None, description="Unit of measurement")
friendly_name: Optional[str] = Field(None, description="Friendly name")
class HAEntity(BaseModel):
entity_id: str = Field(..., description="Entity ID")
state: str = Field(..., description="Current state")
attributes: HAAttributes = Field(..., description="Entity attributes")
class HAEntitiesResponse(BaseModel):
entities: List[HAEntity] = Field(..., description="List of Home Assistant entities")
class FrigateEvent(BaseModel):
id: str = Field(..., description="Event ID")
timestamp: str = Field(..., description="Event timestamp")
@@ -25,9 +31,11 @@ class FrigateEvent(BaseModel):
label: str = Field(..., description="Detection label")
confidence: float = Field(..., ge=0, le=1, description="Detection confidence")
class FrigateEventsResponse(BaseModel):
events: List[FrigateEvent] = Field(..., description="List of Frigate events")
class ImmichAsset(BaseModel):
id: str = Field(..., description="Asset ID")
filename: str = Field(..., description="Filename")
@@ -35,31 +43,38 @@ class ImmichAsset(BaseModel):
tags: List[str] = Field(..., description="Asset tags")
faces: List[str] = Field(..., description="Detected faces")
class ImmichAssetsResponse(BaseModel):
assets: List[ImmichAsset] = Field(..., description="List of Immich assets")
class EventData(BaseModel):
service: str = Field(..., description="Service name")
event_type: str = Field(..., description="Event type")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Event metadata")
class EventResponse(BaseModel):
status: str = Field(..., description="Publication status")
event: Dict[str, Any] = Field(..., description="Published event")
class Event(BaseModel):
timestamp: str = Field(..., description="Event timestamp")
service: str = Field(..., description="Service name")
event_type: str = Field(..., description="Event type")
metadata: str = Field(..., description="Event metadata as JSON string")
class EventsResponse(BaseModel):
events: List[Event] = Field(..., description="List of events")
class HealthResponse(BaseModel):
status: str = Field(..., description="Service health status")
timestamp: str = Field(..., description="Health check timestamp")
class RootResponse(BaseModel):
message: str = Field(..., description="API message")
version: str = Field(..., description="API version")

View File

@@ -1,20 +1,25 @@
from fastapi import APIRouter, HTTPException, Query, BackgroundTasks
from models.schemas import EventData, EventResponse, EventsResponse, Event
from services.redis_client import redis_client
from datetime import datetime
import json
from datetime import datetime
from fastapi import APIRouter, BackgroundTasks, HTTPException, Query
from models.schemas import Event, EventData, EventResponse, EventsResponse
from services.redis_client import redis_client
router = APIRouter()
@router.post("/publish-event",
response_model=EventResponse,
summary="Publish Event",
description="Publish an event to the Redis message bus",
responses={
200: {"description": "Event published successfully"},
500: {"description": "Failed to publish event"}
},
tags=["Events"])
@router.post(
"/publish-event",
response_model=EventResponse,
summary="Publish Event",
description="Publish an event to the Redis message bus",
responses={
200: {"description": "Event published successfully"},
500: {"description": "Failed to publish event"},
},
tags=["Events"],
)
async def publish_event(event_data: EventData, background_tasks: BackgroundTasks):
"""Publish an event to the Redis message bus for consumption by other services"""
try:
@@ -22,29 +27,33 @@ async def publish_event(event_data: EventData, background_tasks: BackgroundTasks
"timestamp": datetime.now().isoformat(),
"service": event_data.service,
"event_type": event_data.event_type,
"metadata": json.dumps(event_data.metadata)
"metadata": json.dumps(event_data.metadata),
}
# Publish to Redis
redis_client.lpush("events", json.dumps(event))
return EventResponse(
status="published",
event=event
)
return EventResponse(status="published", event=event)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/events",
response_model=EventsResponse,
summary="Get Events",
description="Retrieve recent events from the message bus",
responses={
200: {"description": "Successfully retrieved events"},
500: {"description": "Failed to retrieve events"}
},
tags=["Events"])
async def get_events(limit: int = Query(100, ge=1, le=1000, description="Maximum number of events to retrieve")):
@router.get(
"/events",
response_model=EventsResponse,
summary="Get Events",
description="Retrieve recent events from the message bus",
responses={
200: {"description": "Successfully retrieved events"},
500: {"description": "Failed to retrieve events"},
},
tags=["Events"],
)
async def get_events(
limit: int = Query(
100, ge=1, le=1000, description="Maximum number of events to retrieve"
)
):
"""Get recent events from the Redis message bus"""
try:
events = redis_client.lrange("events", 0, limit - 1)
@@ -55,7 +64,7 @@ async def get_events(limit: int = Query(100, ge=1, le=1000, description="Maximum
parsed_events.append(Event(**event_data))
except json.JSONDecodeError:
continue
return EventsResponse(events=parsed_events)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -1,27 +1,32 @@
from fastapi import APIRouter, HTTPException
from models.schemas import FrigateEventsResponse, FrigateEvent
from services.config import SERVICES
from datetime import datetime
from fastapi import APIRouter, HTTPException
from models.schemas import FrigateEvent, FrigateEventsResponse
from services.config import SERVICES
router = APIRouter()
@router.get("/frigate/events",
response_model=FrigateEventsResponse,
summary="Get Frigate Events",
description="Retrieve detection events from Frigate NVR",
responses={
200: {"description": "Successfully retrieved events"},
503: {"description": "Frigate integration not configured"}
},
tags=["Frigate"])
@router.get(
"/frigate/events",
response_model=FrigateEventsResponse,
summary="Get Frigate Events",
description="Retrieve detection events from Frigate NVR",
responses={
200: {"description": "Successfully retrieved events"},
503: {"description": "Frigate integration not configured"},
},
tags=["Frigate"],
)
async def get_frigate_events():
"""Get Frigate detection events including person, vehicle, and object detections"""
if not SERVICES["frigate"]["enabled"]:
raise HTTPException(
status_code=503,
detail="Frigate integration not configured. Please set FRIGATE_TOKEN environment variable."
status_code=503,
detail="Frigate integration not configured. Please set FRIGATE_TOKEN environment variable.",
)
# This would make actual API calls to Frigate
# For now, return mock data
return FrigateEventsResponse(
@@ -31,33 +36,36 @@ async def get_frigate_events():
timestamp=datetime.now().isoformat(),
camera="front_door",
label="person",
confidence=0.95
confidence=0.95,
)
]
)
@router.get("/frigate/cameras",
summary="Get Frigate Cameras",
description="Get list of Frigate cameras",
responses={
200: {"description": "Successfully retrieved cameras"},
503: {"description": "Frigate integration not configured"}
},
tags=["Frigate"])
@router.get(
"/frigate/cameras",
summary="Get Frigate Cameras",
description="Get list of Frigate cameras",
responses={
200: {"description": "Successfully retrieved cameras"},
503: {"description": "Frigate integration not configured"},
},
tags=["Frigate"],
)
async def get_frigate_cameras():
"""Get list of available Frigate cameras"""
if not SERVICES["frigate"]["enabled"]:
raise HTTPException(
status_code=503,
detail="Frigate integration not configured. Please set FRIGATE_TOKEN environment variable."
status_code=503,
detail="Frigate integration not configured. Please set FRIGATE_TOKEN environment variable.",
)
# This would make actual API calls to Frigate
# For now, return mock data
return {
"cameras": [
{"name": "front_door", "enabled": True},
{"name": "back_yard", "enabled": True},
{"name": "garage", "enabled": False}
{"name": "garage", "enabled": False},
]
}

View File

@@ -1,39 +1,44 @@
from fastapi import APIRouter
from datetime import datetime
from models.schemas import RootResponse, HealthResponse, ServiceStatus
from fastapi import APIRouter
from models.schemas import HealthResponse, RootResponse, ServiceStatus
from services.config import SERVICES
router = APIRouter()
@router.get("/",
response_model=RootResponse,
summary="API Root",
description="Get basic API information",
tags=["General"])
@router.get(
"/",
response_model=RootResponse,
summary="API Root",
description="Get basic API information",
tags=["General"],
)
async def root():
"""Get basic API information and version"""
return RootResponse(
message="LabFusion Service Adapters API",
version="1.0.0"
)
return RootResponse(message="LabFusion Service Adapters API", version="1.0.0")
@router.get("/health",
response_model=HealthResponse,
summary="Health Check",
description="Check service health status",
tags=["General"])
@router.get(
"/health",
response_model=HealthResponse,
summary="Health Check",
description="Check service health status",
tags=["General"],
)
async def health_check():
"""Check the health status of the service adapters"""
return HealthResponse(
status="healthy",
timestamp=datetime.now().isoformat()
)
return HealthResponse(status="healthy", timestamp=datetime.now().isoformat())
@router.get("/services",
response_model=dict,
summary="Get Service Status",
description="Get status of all configured external services",
tags=["Services"])
@router.get(
"/services",
response_model=dict,
summary="Get Service Status",
description="Get status of all configured external services",
tags=["Services"],
)
async def get_services():
"""Get status of all configured external services (Home Assistant, Frigate, Immich, n8n)"""
service_status = {}
@@ -41,6 +46,6 @@ async def get_services():
service_status[service_name] = ServiceStatus(
enabled=config["enabled"],
url=config["url"],
status="unknown" # Would check actual service status
status="unknown", # Would check actual service status
)
return service_status

View File

@@ -1,26 +1,30 @@
from fastapi import APIRouter, HTTPException, Path
from models.schemas import HAEntitiesResponse, HAEntity, HAAttributes
from models.schemas import HAAttributes, HAEntitiesResponse, HAEntity
from services.config import SERVICES
router = APIRouter()
@router.get("/home-assistant/entities",
response_model=HAEntitiesResponse,
summary="Get Home Assistant Entities",
description="Retrieve all entities from Home Assistant",
responses={
200: {"description": "Successfully retrieved entities"},
503: {"description": "Home Assistant integration not configured"}
},
tags=["Home Assistant"])
@router.get(
"/home-assistant/entities",
response_model=HAEntitiesResponse,
summary="Get Home Assistant Entities",
description="Retrieve all entities from Home Assistant",
responses={
200: {"description": "Successfully retrieved entities"},
503: {"description": "Home Assistant integration not configured"},
},
tags=["Home Assistant"],
)
async def get_ha_entities():
"""Get Home Assistant entities including sensors, switches, and other devices"""
if not SERVICES["home_assistant"]["enabled"]:
raise HTTPException(
status_code=503,
detail="Home Assistant integration not configured. Please set HOME_ASSISTANT_TOKEN environment variable."
status_code=503,
detail="Home Assistant integration not configured. Please set HOME_ASSISTANT_TOKEN environment variable.",
)
# This would make actual API calls to Home Assistant
# For now, return mock data
return HAEntitiesResponse(
@@ -29,46 +33,46 @@ async def get_ha_entities():
entity_id="sensor.cpu_usage",
state="45.2",
attributes=HAAttributes(
unit_of_measurement="%",
friendly_name="CPU Usage"
)
unit_of_measurement="%", friendly_name="CPU Usage"
),
),
HAEntity(
entity_id="sensor.memory_usage",
state="2.1",
attributes=HAAttributes(
unit_of_measurement="GB",
friendly_name="Memory Usage"
)
)
unit_of_measurement="GB", friendly_name="Memory Usage"
),
),
]
)
@router.get("/home-assistant/entity/{entity_id}",
response_model=HAEntity,
summary="Get Specific HA Entity",
description="Get a specific Home Assistant entity by ID",
responses={
200: {"description": "Successfully retrieved entity"},
404: {"description": "Entity not found"},
503: {"description": "Home Assistant integration not configured"}
},
tags=["Home Assistant"])
@router.get(
"/home-assistant/entity/{entity_id}",
response_model=HAEntity,
summary="Get Specific HA Entity",
description="Get a specific Home Assistant entity by ID",
responses={
200: {"description": "Successfully retrieved entity"},
404: {"description": "Entity not found"},
503: {"description": "Home Assistant integration not configured"},
},
tags=["Home Assistant"],
)
async def get_ha_entity(entity_id: str = Path(..., description="Entity ID")):
"""Get a specific Home Assistant entity by its ID"""
if not SERVICES["home_assistant"]["enabled"]:
raise HTTPException(
status_code=503,
detail="Home Assistant integration not configured. Please set HOME_ASSISTANT_TOKEN environment variable."
status_code=503,
detail="Home Assistant integration not configured. Please set HOME_ASSISTANT_TOKEN environment variable.",
)
# This would make actual API calls to Home Assistant
# For now, return mock data
return HAEntity(
entity_id=entity_id,
state="unknown",
attributes=HAAttributes(
unit_of_measurement="",
friendly_name=f"Entity {entity_id}"
)
unit_of_measurement="", friendly_name=f"Entity {entity_id}"
),
)

View File

@@ -1,27 +1,32 @@
from fastapi import APIRouter, HTTPException
from models.schemas import ImmichAssetsResponse, ImmichAsset
from services.config import SERVICES
from datetime import datetime
from fastapi import APIRouter, HTTPException
from models.schemas import ImmichAsset, ImmichAssetsResponse
from services.config import SERVICES
router = APIRouter()
@router.get("/immich/assets",
response_model=ImmichAssetsResponse,
summary="Get Immich Assets",
description="Retrieve photo assets from Immich",
responses={
200: {"description": "Successfully retrieved assets"},
503: {"description": "Immich integration not configured"}
},
tags=["Immich"])
@router.get(
"/immich/assets",
response_model=ImmichAssetsResponse,
summary="Get Immich Assets",
description="Retrieve photo assets from Immich",
responses={
200: {"description": "Successfully retrieved assets"},
503: {"description": "Immich integration not configured"},
},
tags=["Immich"],
)
async def get_immich_assets():
"""Get Immich photo assets including metadata, tags, and face detection results"""
if not SERVICES["immich"]["enabled"]:
raise HTTPException(
status_code=503,
detail="Immich integration not configured. Please set IMMICH_API_KEY environment variable."
status_code=503,
detail="Immich integration not configured. Please set IMMICH_API_KEY environment variable.",
)
# This would make actual API calls to Immich
# For now, return mock data
return ImmichAssetsResponse(
@@ -31,32 +36,35 @@ async def get_immich_assets():
filename="photo_001.jpg",
created_at=datetime.now().isoformat(),
tags=["person", "outdoor"],
faces=["Alice", "Bob"]
faces=["Alice", "Bob"],
)
]
)
@router.get("/immich/albums",
summary="Get Immich Albums",
description="Get list of Immich albums",
responses={
200: {"description": "Successfully retrieved albums"},
503: {"description": "Immich integration not configured"}
},
tags=["Immich"])
@router.get(
"/immich/albums",
summary="Get Immich Albums",
description="Get list of Immich albums",
responses={
200: {"description": "Successfully retrieved albums"},
503: {"description": "Immich integration not configured"},
},
tags=["Immich"],
)
async def get_immich_albums():
"""Get list of Immich albums"""
if not SERVICES["immich"]["enabled"]:
raise HTTPException(
status_code=503,
detail="Immich integration not configured. Please set IMMICH_API_KEY environment variable."
status_code=503,
detail="Immich integration not configured. Please set IMMICH_API_KEY environment variable.",
)
# This would make actual API calls to Immich
# For now, return mock data
return {
"albums": [
{"id": "album_1", "name": "Family Photos", "asset_count": 150},
{"id": "album_2", "name": "Vacation 2024", "asset_count": 75}
{"id": "album_2", "name": "Vacation 2024", "asset_count": 75},
]
}

View File

@@ -1,4 +1,5 @@
import os
from dotenv import load_dotenv
# Load environment variables
@@ -9,21 +10,21 @@ SERVICES = {
"home_assistant": {
"url": os.getenv("HOME_ASSISTANT_URL", "https://homeassistant.local:8123"),
"token": os.getenv("HOME_ASSISTANT_TOKEN", ""),
"enabled": bool(os.getenv("HOME_ASSISTANT_TOKEN"))
"enabled": bool(os.getenv("HOME_ASSISTANT_TOKEN")),
},
"frigate": {
"url": os.getenv("FRIGATE_URL", "http://frigate.local:5000"),
"token": os.getenv("FRIGATE_TOKEN", ""),
"enabled": bool(os.getenv("FRIGATE_TOKEN"))
"enabled": bool(os.getenv("FRIGATE_TOKEN")),
},
"immich": {
"url": os.getenv("IMMICH_URL", "http://immich.local:2283"),
"api_key": os.getenv("IMMICH_API_KEY", ""),
"enabled": bool(os.getenv("IMMICH_API_KEY"))
"enabled": bool(os.getenv("IMMICH_API_KEY")),
},
"n8n": {
"url": os.getenv("N8N_URL", "http://n8n.local:5678"),
"webhook_url": os.getenv("N8N_WEBHOOK_URL", ""),
"enabled": bool(os.getenv("N8N_WEBHOOK_URL"))
}
"enabled": bool(os.getenv("N8N_WEBHOOK_URL")),
},
}

View File

@@ -1,9 +1,10 @@
import redis
import os
import redis
# Redis connection
redis_client = redis.Redis(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", 6379)),
decode_responses=True
decode_responses=True,
)