diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fc36f7d --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.env +__pycache__/ +*.pyc +.venv/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..083d8dc --- /dev/null +++ b/README.md @@ -0,0 +1,61 @@ +# S01E05 — Railway Route Activator + +Aplikacja aktywująca trasę kolejową **X-01** przez API `hub.ag3nts.org`. +Zadanie z kursu AI Devs 4 — zarządzanie jawnymi oraz niejawnymi limitami modeli. + +## Architektura (Clean Architecture) + +``` +domain/ # Encje i porty (interfejsy) +├── entities.py # Route, RouteStatus, RouteMode, ApiResponse +└── ports.py # RailwayApiPort, EventBusPort + +application/ # Use case'y (logika biznesowa) +└── activate_route.py + +infrastructure/ # Implementacje +├── api_client.py # Klient API z retry i obsługą limitów +├── config.py # Konfiguracja z .env +├── event_bus.py # Szyna zdarzeń +└── logger_setup.py # Logowanie i monitoring zdarzeń + +main.py # Punkt wejścia +``` + +### Koncepcje z lekcji + +- **Event-driven architecture** — `EventBus` emituje zdarzenia (`api:request`, `api:response`, `api:retry`, `workflow:step`) umożliwiając monitoring i reakcję na zmiany stanu +- **Obsługa limitów API** — automatyczny retry z respektowaniem `retry_after` przy 429 +- **Obsługa błędów 503** — retry z backoff (symulacja przeciążenia serwera) +- **Heartbeat** — logi informujące o postępie każdego kroku workflow +- **Logowanie interakcji** — każde wywołanie i odpowiedź API jest rejestrowane +- **Dependency Injection** — porty (abstrakcje) w `domain/`, implementacje w `infrastructure/` + +## Uruchomienie + +```bash +python3 -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt +``` + +Utwórz plik `.env`: + +```env +RAILWAY_API_KEY=twoj-klucz-api +``` + +Uruchom: + +```bash +python main.py # domyślnie trasa x-01 +python main.py x-01 # lub jawnie podaj nazwę trasy +``` + +## Sekwencja API + +1. `help` — pobranie dokumentacji API +2. `getstatus` — sprawdzenie bieżącego statusu trasy +3. `reconfigure` — włączenie trybu rekonfiguracji +4. `setstatus` (RTOPEN) — otwarcie trasy +5. `save` — zapisanie zmian, zwraca flagę diff --git a/application/__init__.py b/application/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/application/activate_route.py b/application/activate_route.py new file mode 100644 index 0000000..cb34f1f --- /dev/null +++ b/application/activate_route.py @@ -0,0 +1,96 @@ +import logging +from dataclasses import dataclass + +from domain.entities import Route, RouteMode, RouteStatus +from domain.ports import RailwayApiPort, EventBusPort + +logger = logging.getLogger(__name__) + + +@dataclass +class ActivateRouteResult: + success: bool + route: Route + flag: str | None = None + error: str | None = None + + +class ActivateRouteUseCase: + """Use case: aktywacja trasy kolejowej. + + Realizuje sekwencję kroków zgodnie z dokumentacją API: + 1. help — pobranie dokumentacji (weryfikacja dostępności) + 2. getstatus — sprawdzenie bieżącego statusu trasy + 3. reconfigure — włączenie trybu rekonfiguracji + 4. setstatus — ustawienie statusu na RTOPEN + 5. save — zapisanie zmian i wyjście z trybu rekonfiguracji + + Zgodnie z lekcją — logika agenta opiera się na: + - pętli z jasno zdefiniowanymi krokami + - zdarzeniach informujących o postępie (heartbeat) + - obsłudze błędów z możliwością recovery + """ + + def __init__(self, api: RailwayApiPort, event_bus: EventBusPort) -> None: + self._api = api + self._event_bus = event_bus + + def execute(self, route_name: str) -> ActivateRouteResult: + route = Route(name=route_name) + + try: + self._step("Pobieranie dokumentacji API (help)") + help_resp = self._api.send_action("help") + if not help_resp.ok: + return self._fail(route, "Nie udało się pobrać dokumentacji API") + logger.info("Dokumentacja API pobrana, dostępne akcje: %s", + [a["action"] for a in help_resp.data.get("help", {}).get("actions", [])]) + + self._step(f"Sprawdzanie statusu trasy {route_name}") + status_resp = self._api.send_action("getstatus", route=route_name) + if not status_resp.ok: + return self._fail(route, f"Nie udało się pobrać statusu: {status_resp.message}") + route.status = status_resp.data.get("status", "unknown") + route.mode = RouteMode(status_resp.data.get("mode", "normal")) + logger.info("Trasa %s: status=%s, mode=%s", route_name, route.status, route.mode.value) + + if route.is_open: + logger.info("Trasa %s jest już otwarta!", route_name) + return ActivateRouteResult(success=True, route=route) + + self._step(f"Włączanie trybu rekonfiguracji dla {route_name}") + reconf_resp = self._api.send_action("reconfigure", route=route_name) + if not reconf_resp.ok: + return self._fail(route, f"Reconfigure failed: {reconf_resp.message}") + route.mode = RouteMode.RECONFIGURE + logger.info("Tryb rekonfiguracji włączony") + + self._step(f"Ustawianie statusu RTOPEN dla {route_name}") + set_resp = self._api.send_action("setstatus", route=route_name, value=RouteStatus.OPEN.value) + if not set_resp.ok: + return self._fail(route, f"Setstatus failed: {set_resp.message}") + route.status = "open" + logger.info("Status zmieniony na OPEN") + + self._step(f"Zapisywanie konfiguracji trasy {route_name}") + save_resp = self._api.send_action("save", route=route_name) + route.mode = RouteMode.NORMAL + + flag = save_resp.flag + if flag: + logger.info("Flaga znaleziona: %s", flag) + self._event_bus.emit("workflow:complete", {"flag": flag}) + return ActivateRouteResult(success=True, route=route, flag=flag) + + return ActivateRouteResult(success=True, route=route) + + except RuntimeError as exc: + return self._fail(route, str(exc)) + + def _step(self, description: str) -> None: + self._event_bus.emit("workflow:step", {"description": description}) + + def _fail(self, route: Route, error: str) -> ActivateRouteResult: + logger.error("Błąd: %s", error) + self._event_bus.emit("workflow:error", {"error": error}) + return ActivateRouteResult(success=False, route=route, error=error) diff --git a/domain/__init__.py b/domain/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/domain/entities.py b/domain/entities.py new file mode 100644 index 0000000..6c938b4 --- /dev/null +++ b/domain/entities.py @@ -0,0 +1,59 @@ +from dataclasses import dataclass, field +from enum import Enum +from typing import Any + + +class RouteStatus(Enum): + OPEN = "RTOPEN" + CLOSE = "RTCLOSE" + + +class RouteMode(Enum): + NORMAL = "normal" + RECONFIGURE = "reconfigure" + + +@dataclass +class Route: + name: str + mode: RouteMode = RouteMode.NORMAL + status: str = "close" + + @property + def is_open(self) -> bool: + return self.status == "open" + + @property + def is_reconfigurable(self) -> bool: + return self.mode == RouteMode.RECONFIGURE + + +@dataclass +class ApiResponse: + ok: bool + data: dict[str, Any] + http_code: int + headers: dict[str, str] = field(default_factory=dict) + + @property + def is_rate_limited(self) -> bool: + return self.http_code == 429 + + @property + def is_server_error(self) -> bool: + return self.http_code == 503 + + @property + def retry_after(self) -> int: + return self.data.get("retry_after", 5) + + @property + def message(self) -> str: + return self.data.get("message", "") + + @property + def flag(self) -> str | None: + msg = self.message + if msg and "{FLG:" in msg: + return msg + return None diff --git a/domain/ports.py b/domain/ports.py new file mode 100644 index 0000000..9e84f8a --- /dev/null +++ b/domain/ports.py @@ -0,0 +1,24 @@ +from abc import ABC, abstractmethod +from typing import Any + +from domain.entities import ApiResponse + + +class RailwayApiPort(ABC): + """Port (interfejs) do komunikacji z API kolejowym.""" + + @abstractmethod + def send_action(self, action: str, **params: Any) -> ApiResponse: + ... + + +class EventBusPort(ABC): + """Port do architektury opartej o zdarzenia (event-driven).""" + + @abstractmethod + def emit(self, event: str, data: dict[str, Any]) -> None: + ... + + @abstractmethod + def on(self, event: str, callback: Any) -> None: + ... diff --git a/infrastructure/__init__.py b/infrastructure/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/infrastructure/api_client.py b/infrastructure/api_client.py new file mode 100644 index 0000000..6c63c85 --- /dev/null +++ b/infrastructure/api_client.py @@ -0,0 +1,85 @@ +import time +import logging +from typing import Any + +import requests + +from domain.entities import ApiResponse +from domain.ports import RailwayApiPort, EventBusPort +from infrastructure.config import Config + +logger = logging.getLogger(__name__) + + +class RailwayApiClient(RailwayApiPort): + """Implementacja klienta API kolejowego z obsługą: + + - Retry z backoff przy błędach 503 (symulacja przeciążenia) + - Respektowanie limitów zapytań (429) z retry_after + - Logowanie każdego wywołania i odpowiedzi + - Emitowanie zdarzeń przez EventBus + """ + + def __init__(self, config: Config, event_bus: EventBusPort) -> None: + self._config = config + self._event_bus = event_bus + self._session = requests.Session() + self._session.headers.update({"Content-Type": "application/json"}) + + def send_action(self, action: str, **params: Any) -> ApiResponse: + payload = { + "apikey": self._config.api_key, + "task": self._config.task_name, + "answer": {"action": action, **params}, + } + + for attempt in range(1, self._config.max_retries + 1): + self._event_bus.emit("api:request", { + "action": action, + "params": params, + "attempt": attempt, + }) + + try: + resp = self._session.post(self._config.api_url, json=payload, timeout=30) + except requests.RequestException as exc: + logger.error("Request failed: %s", exc) + self._wait(self._config.retry_base_delay * attempt) + continue + + data = resp.json() + api_resp = ApiResponse( + ok=data.get("ok", False), + data=data, + http_code=resp.status_code, + headers=dict(resp.headers), + ) + + self._event_bus.emit("api:response", { + "action": action, + "http_code": resp.status_code, + "data": data, + }) + + if api_resp.is_server_error: + delay = self._config.retry_base_delay * attempt + logger.warning("503 Server outage (attempt %d/%d), retrying in %.1fs", + attempt, self._config.max_retries, delay) + self._event_bus.emit("api:retry", {"reason": "503", "delay": delay}) + self._wait(delay) + continue + + if api_resp.is_rate_limited: + delay = api_resp.retry_after + 1 + logger.warning("429 Rate limited, waiting %ds", delay) + self._event_bus.emit("api:retry", {"reason": "429", "delay": delay}) + self._wait(delay) + continue + + return api_resp + + raise RuntimeError(f"Action '{action}' failed after {self._config.max_retries} retries") + + def _wait(self, seconds: float) -> None: + logger.info("Waiting %.1fs...", seconds) + time.sleep(seconds) diff --git a/infrastructure/config.py b/infrastructure/config.py new file mode 100644 index 0000000..78d0ea6 --- /dev/null +++ b/infrastructure/config.py @@ -0,0 +1,21 @@ +import os +from dataclasses import dataclass + + +@dataclass(frozen=True) +class Config: + api_url: str + api_key: str + task_name: str + max_retries: int + retry_base_delay: float + + @classmethod + def from_env(cls) -> "Config": + return cls( + api_url=os.getenv("RAILWAY_API_URL", "https://hub.ag3nts.org/verify"), + api_key=os.environ["RAILWAY_API_KEY"], + task_name=os.getenv("RAILWAY_TASK_NAME", "railway"), + max_retries=int(os.getenv("RAILWAY_MAX_RETRIES", "10")), + retry_base_delay=float(os.getenv("RAILWAY_RETRY_BASE_DELAY", "2.0")), + ) diff --git a/infrastructure/event_bus.py b/infrastructure/event_bus.py new file mode 100644 index 0000000..82545dc --- /dev/null +++ b/infrastructure/event_bus.py @@ -0,0 +1,24 @@ +from collections import defaultdict +from typing import Any, Callable + +from domain.ports import EventBusPort + + +class EventBus(EventBusPort): + """Prosta implementacja szyny zdarzeń (event bus). + + Zgodnie z lekcją — architektura oparta o zdarzenia umożliwia: + - monitorowanie działań agenta + - podejmowanie akcji (np. kompresja kontekstu, logowanie) + - subskrypcję zdarzeń z różnych komponentów + """ + + def __init__(self) -> None: + self._listeners: dict[str, list[Callable]] = defaultdict(list) + + def emit(self, event: str, data: dict[str, Any]) -> None: + for callback in self._listeners[event]: + callback(data) + + def on(self, event: str, callback: Callable) -> None: + self._listeners[event].append(callback) diff --git a/infrastructure/logger_setup.py b/infrastructure/logger_setup.py new file mode 100644 index 0000000..5eb29c6 --- /dev/null +++ b/infrastructure/logger_setup.py @@ -0,0 +1,40 @@ +import logging +import sys +from typing import Any + +from domain.ports import EventBusPort + + +def setup_logging() -> None: + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s | %(levelname)-7s | %(name)s | %(message)s", + datefmt="%H:%M:%S", + stream=sys.stdout, + ) + + +def attach_event_logger(event_bus: EventBusPort) -> None: + """Podłącza monitorowanie zdarzeń API — zgodnie z lekcją: + 'warto zapisywać i monitorować wszystkie zdarzenia'.""" + + log = logging.getLogger("events") + + def on_request(data: dict[str, Any]) -> None: + log.info(">>> [%s] attempt=%d params=%s", + data["action"], data["attempt"], data.get("params", {})) + + def on_response(data: dict[str, Any]) -> None: + log.info("<<< [%s] http=%d data=%s", + data["action"], data["http_code"], data["data"]) + + def on_retry(data: dict[str, Any]) -> None: + log.warning("... retry reason=%s delay=%.1fs", data["reason"], data["delay"]) + + def on_step(data: dict[str, Any]) -> None: + log.info("=== STEP: %s", data.get("description", "")) + + event_bus.on("api:request", on_request) + event_bus.on("api:response", on_response) + event_bus.on("api:retry", on_retry) + event_bus.on("workflow:step", on_step) diff --git a/main.py b/main.py new file mode 100644 index 0000000..0d98521 --- /dev/null +++ b/main.py @@ -0,0 +1,49 @@ +"""Railway Route Activator — AI Devs 4, S01E05 + +Aplikacja aktywująca trasę kolejową X-01 przez API hub.ag3nts.org. +Zbudowana w clean architecture z uwzględnieniem koncepcji z lekcji: +- Architektura oparta o zdarzenia (EventBus) +- Obsługa limitów API (rate limiting, retry z backoff) +- Obsługa błędów 503 (symulacja przeciążenia serwera) +- Logowanie i monitorowanie wszystkich interakcji +- Konfiguracja przez zmienne środowiskowe +""" + +import sys + +from dotenv import load_dotenv + +from infrastructure.config import Config +from infrastructure.event_bus import EventBus +from infrastructure.api_client import RailwayApiClient +from infrastructure.logger_setup import setup_logging, attach_event_logger +from application.activate_route import ActivateRouteUseCase + + +def main() -> None: + load_dotenv() + setup_logging() + + config = Config.from_env() + + event_bus = EventBus() + attach_event_logger(event_bus) + + api_client = RailwayApiClient(config, event_bus) + + use_case = ActivateRouteUseCase(api_client, event_bus) + + route_name = sys.argv[1] if len(sys.argv) > 1 else "x-01" + result = use_case.execute(route_name) + + if result.success: + print(f"\nTrasa {result.route.name} aktywowana pomyślnie!") + if result.flag: + print(f"Flaga: {result.flag}") + else: + print(f"\nBłąd: {result.error}", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..df7458c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +requests +python-dotenv