Files
labFusion/services/service-adapters/routes/events.py
GSRN 4dc2f147ec
Some checks failed
Docker Build and Push / build-and-push (push) Failing after 37s
LabFusion CI/CD Pipeline / api-gateway (push) Failing after 1m16s
Integration Tests / integration-tests (push) Failing after 58s
Integration Tests / performance-tests (push) Has been skipped
Service Adapters (Python FastAPI) / test (3.1) (push) Failing after 12s
LabFusion CI/CD Pipeline / api-docs (push) Successful in 1m42s
LabFusion CI/CD Pipeline / integration-tests (push) Has been skipped
Service Adapters (Python FastAPI) / test (3.12) (push) Failing after 46s
Service Adapters (Python FastAPI) / test (3.9) (push) Failing after 47s
Service Adapters (Python FastAPI) / build (push) Has been skipped
LabFusion CI/CD Pipeline / frontend (push) Failing after 1m53s
Service Adapters (Python FastAPI) / test (3.11) (push) Failing after 45s
LabFusion CI/CD Pipeline / service-adapters (push) Failing after 28s
fix: Standardize isort command in CI workflows
### Summary of Changes
- Updated the `isort` command in both CI workflows to include the `--profile black` option for consistent code formatting.
- Refactored function definitions in service adapters to improve readability by consolidating parameters into single lines.

### Expected Results
- Enhanced consistency in code formatting checks across CI workflows, ensuring adherence to the Black style guide.
- Improved readability and maintainability of function definitions in service adapters.
2025-09-15 21:16:37 +02:00

67 lines
2.1 KiB
Python

import json
from datetime import datetime
from fastapi import APIRouter, BackgroundTasks, HTTPException, Query
from models.schemas import Event, EventData, EventResponse, EventsResponse
from services.redis_client import redis_client
router = APIRouter()
@router.post(
"/publish-event",
response_model=EventResponse,
summary="Publish Event",
description="Publish an event to the Redis message bus",
responses={
200: {"description": "Event published successfully"},
500: {"description": "Failed to publish event"},
},
tags=["Events"],
)
async def publish_event(event_data: EventData, background_tasks: BackgroundTasks):
"""Publish an event to the Redis message bus for consumption by other services"""
try:
event = {
"timestamp": datetime.now().isoformat(),
"service": event_data.service,
"event_type": event_data.event_type,
"metadata": json.dumps(event_data.metadata),
}
# Publish to Redis
redis_client.lpush("events", json.dumps(event))
return EventResponse(status="published", event=event)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get(
"/events",
response_model=EventsResponse,
summary="Get Events",
description="Retrieve recent events from the message bus",
responses={
200: {"description": "Successfully retrieved events"},
500: {"description": "Failed to retrieve events"},
},
tags=["Events"],
)
async def get_events(limit: int = Query(100, ge=1, le=1000, description="Maximum number of events to retrieve")):
"""Get recent events from the Redis message bus"""
try:
events = redis_client.lrange("events", 0, limit - 1)
parsed_events = []
for event in events:
try:
event_data = json.loads(event)
parsed_events.append(Event(**event_data))
except json.JSONDecodeError:
continue
return EventsResponse(events=parsed_events)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))