Files
s01e05/application/activate_route.py
2026-03-16 06:52:12 +01:00

97 lines
4.0 KiB
Python

import logging
from dataclasses import dataclass
from domain.entities import Route, RouteMode, RouteStatus
from domain.ports import RailwayApiPort, EventBusPort
logger = logging.getLogger(__name__)
@dataclass
class ActivateRouteResult:
success: bool
route: Route
flag: str | None = None
error: str | None = None
class ActivateRouteUseCase:
"""Use case: aktywacja trasy kolejowej.
Realizuje sekwencję kroków zgodnie z dokumentacją API:
1. help — pobranie dokumentacji (weryfikacja dostępności)
2. getstatus — sprawdzenie bieżącego statusu trasy
3. reconfigure — włączenie trybu rekonfiguracji
4. setstatus — ustawienie statusu na RTOPEN
5. save — zapisanie zmian i wyjście z trybu rekonfiguracji
Zgodnie z lekcją — logika agenta opiera się na:
- pętli z jasno zdefiniowanymi krokami
- zdarzeniach informujących o postępie (heartbeat)
- obsłudze błędów z możliwością recovery
"""
def __init__(self, api: RailwayApiPort, event_bus: EventBusPort) -> None:
self._api = api
self._event_bus = event_bus
def execute(self, route_name: str) -> ActivateRouteResult:
route = Route(name=route_name)
try:
self._step("Pobieranie dokumentacji API (help)")
help_resp = self._api.send_action("help")
if not help_resp.ok:
return self._fail(route, "Nie udało się pobrać dokumentacji API")
logger.info("Dokumentacja API pobrana, dostępne akcje: %s",
[a["action"] for a in help_resp.data.get("help", {}).get("actions", [])])
self._step(f"Sprawdzanie statusu trasy {route_name}")
status_resp = self._api.send_action("getstatus", route=route_name)
if not status_resp.ok:
return self._fail(route, f"Nie udało się pobrać statusu: {status_resp.message}")
route.status = status_resp.data.get("status", "unknown")
route.mode = RouteMode(status_resp.data.get("mode", "normal"))
logger.info("Trasa %s: status=%s, mode=%s", route_name, route.status, route.mode.value)
if route.is_open:
logger.info("Trasa %s jest już otwarta!", route_name)
return ActivateRouteResult(success=True, route=route)
self._step(f"Włączanie trybu rekonfiguracji dla {route_name}")
reconf_resp = self._api.send_action("reconfigure", route=route_name)
if not reconf_resp.ok:
return self._fail(route, f"Reconfigure failed: {reconf_resp.message}")
route.mode = RouteMode.RECONFIGURE
logger.info("Tryb rekonfiguracji włączony")
self._step(f"Ustawianie statusu RTOPEN dla {route_name}")
set_resp = self._api.send_action("setstatus", route=route_name, value=RouteStatus.OPEN.value)
if not set_resp.ok:
return self._fail(route, f"Setstatus failed: {set_resp.message}")
route.status = "open"
logger.info("Status zmieniony na OPEN")
self._step(f"Zapisywanie konfiguracji trasy {route_name}")
save_resp = self._api.send_action("save", route=route_name)
route.mode = RouteMode.NORMAL
flag = save_resp.flag
if flag:
logger.info("Flaga znaleziona: %s", flag)
self._event_bus.emit("workflow:complete", {"flag": flag})
return ActivateRouteResult(success=True, route=route, flag=flag)
return ActivateRouteResult(success=True, route=route)
except RuntimeError as exc:
return self._fail(route, str(exc))
def _step(self, description: str) -> None:
self._event_bus.emit("workflow:step", {"description": description})
def _fail(self, route: Route, error: str) -> ActivateRouteResult:
logger.error("Błąd: %s", error)
self._event_bus.emit("workflow:error", {"error": error})
return ActivateRouteResult(success=False, route=route, error=error)