From d9dfde1c8777c027df88e433205bfae3cff59c6b Mon Sep 17 00:00:00 2001 From: nessi Date: Sat, 14 Feb 2026 11:44:49 +0100 Subject: [PATCH] [NX-102 Issue] Add exponential backoff with jitter for retry logic Introduced an exponential backoff mechanism with a configurable base, max delay, and jitter factor to handle retries for target failures. This improves resilience by reducing the load during repeated failures and avoids synchronized retry storms. Additionally, stale target cleanup logic has been implemented to prevent unnecessary state retention. --- backend/app/services/collector.py | 41 ++++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/backend/app/services/collector.py b/backend/app/services/collector.py index 6befd5c..77f47bc 100644 --- a/backend/app/services/collector.py +++ b/backend/app/services/collector.py @@ -1,6 +1,7 @@ import asyncio import logging from datetime import datetime, timezone +from random import uniform from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.exc import SQLAlchemyError @@ -15,6 +16,9 @@ logger = logging.getLogger(__name__) settings = get_settings() _failure_state: dict[int, dict[str, object]] = {} _failure_log_interval_seconds = 300 +_backoff_base_seconds = max(3, int(settings.poll_interval_seconds)) +_backoff_max_seconds = 300 +_backoff_jitter_factor = 0.15 def build_target_dsn(target: Target) -> str: @@ -181,38 +185,66 @@ async def collect_once() -> None: async with SessionLocal() as db: targets = (await db.scalars(select(Target))).all() + active_target_ids = {target.id for target in targets} + stale_target_ids = [target_id for target_id in _failure_state.keys() if target_id not in active_target_ids] + for stale_target_id in stale_target_ids: + _failure_state.pop(stale_target_id, None) + for target in targets: + now = datetime.now(timezone.utc) + state = _failure_state.get(target.id) + if state: + next_attempt_at = state.get("next_attempt_at") + if isinstance(next_attempt_at, datetime) and now < next_attempt_at: + continue + try: await collect_target(target) prev = _failure_state.pop(target.id, None) if prev: + first_failure_at = prev.get("first_failure_at") + downtime_seconds = None + if isinstance(first_failure_at, datetime): + downtime_seconds = max(0, int((now - first_failure_at).total_seconds())) logger.info( - "collector_target_recovered target=%s after_failures=%s last_error=%s", + "collector_target_recovered target=%s after_failures=%s downtime_seconds=%s last_error=%s", target.id, prev.get("count", 0), + downtime_seconds, prev.get("error"), ) except (OSError, SQLAlchemyError, asyncpg.PostgresError, Exception) as exc: - now = datetime.now(timezone.utc) current_error = str(exc) error_class = exc.__class__.__name__ state = _failure_state.get(target.id) if state is None: + next_delay = min(_backoff_max_seconds, _backoff_base_seconds) + jitter = next_delay * _backoff_jitter_factor + next_delay = max(1, int(next_delay + uniform(-jitter, jitter))) + next_attempt_at = now.timestamp() + next_delay _failure_state[target.id] = { "count": 1, + "first_failure_at": now, "last_log_at": now, "error": current_error, + "next_attempt_at": datetime.fromtimestamp(next_attempt_at, tz=timezone.utc), } logger.warning( - "collector_target_unreachable target=%s error_class=%s err=%s consecutive_failures=%s", + "collector_target_unreachable target=%s error_class=%s err=%s consecutive_failures=%s retry_in_seconds=%s", target.id, error_class, current_error, 1, + next_delay, ) continue count = int(state.get("count", 0)) + 1 + raw_backoff = min(_backoff_max_seconds, _backoff_base_seconds * (2 ** min(count - 1, 10))) + jitter = raw_backoff * _backoff_jitter_factor + next_delay = max(1, int(raw_backoff + uniform(-jitter, jitter))) + state["next_attempt_at"] = datetime.fromtimestamp(now.timestamp() + next_delay, tz=timezone.utc) + last_log_at = state.get("last_log_at") last_logged_error = str(state.get("error", "")) should_log = False @@ -228,11 +260,12 @@ async def collect_once() -> None: state["last_log_at"] = now state["error"] = current_error logger.warning( - "collector_target_unreachable target=%s error_class=%s err=%s consecutive_failures=%s", + "collector_target_unreachable target=%s error_class=%s err=%s consecutive_failures=%s retry_in_seconds=%s", target.id, error_class, current_error, count, + next_delay, )