Files
GSRN e5ae5e3a0c
Some checks failed
Integration Tests / integration-tests (push) Failing after 28s
Integration Tests / performance-tests (push) Has been skipped
Docker Build and Push / build-and-push (push) Failing after 32s
Service Adapters (Python FastAPI) / test (3.11) (push) Failing after 39s
Service Adapters (Python FastAPI) / test (3.14) (push) Failing after 10s
Service Adapters (Python FastAPI) / test (3.12) (push) Failing after 42s
Service Adapters (Python FastAPI) / test (3.13) (push) Failing after 40s
Service Adapters (Python FastAPI) / build (push) Has been skipped
fix: Correct type hinting for events retrieval in service-adapters
### Summary of Changes
- Updated the type hinting in `events.py` to use `cast` for the list of events retrieved from Redis, ensuring type safety and clarity in the code.

### Expected Results
- Improved type checking and maintainability of the service-adapters module, enhancing overall code quality.
2025-09-16 23:50:07 +02:00

68 lines
2.2 KiB
Python

import json
from datetime import datetime
from typing import List, cast
from fastapi import APIRouter, BackgroundTasks, HTTPException, Query
from models.schemas import Event, EventData, EventResponse, EventsResponse
from services.redis_client import redis_client
router = APIRouter()
@router.post(
"/publish-event",
response_model=EventResponse,
summary="Publish Event",
description="Publish an event to the Redis message bus",
responses={
200: {"description": "Event published successfully"},
500: {"description": "Failed to publish event"},
},
tags=["Events"],
)
async def publish_event(event_data: EventData, background_tasks: BackgroundTasks):
"""Publish an event to the Redis message bus for consumption by other services"""
try:
event = {
"timestamp": datetime.now().isoformat(),
"service": event_data.service,
"event_type": event_data.event_type,
"metadata": json.dumps(event_data.metadata),
}
# Publish to Redis
redis_client.lpush("events", json.dumps(event))
return EventResponse(status="published", event=event)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get(
"/events",
response_model=EventsResponse,
summary="Get Events",
description="Retrieve recent events from the message bus",
responses={
200: {"description": "Successfully retrieved events"},
500: {"description": "Failed to retrieve events"},
},
tags=["Events"],
)
async def get_events(limit: int = Query(100, ge=1, le=1000, description="Maximum number of events to retrieve")):
"""Get recent events from the Redis message bus"""
try:
events: List[str] = cast(List[str], redis_client.lrange("events", 0, limit - 1))
parsed_events = []
for event in events:
try:
event_data = json.loads(event)
parsed_events.append(Event(**event_data))
except json.JSONDecodeError:
continue
return EventsResponse(events=parsed_events)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))