Add logging for failure recovery and throttled error reporting.
All checks were successful
PostgreSQL Compatibility Matrix / PG14 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG15 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG16 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG17 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG18 smoke (push) Successful in 7s
All checks were successful
PostgreSQL Compatibility Matrix / PG14 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG15 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG16 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG17 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG18 smoke (push) Successful in 7s
Introduced `_failure_state` to track consecutive failures per target and `_failure_log_interval_seconds` to control logging frequency. Added logs for recovery when a target recovers and throttled error logs to reduce noise for recurring errors.
This commit is contained in:
@@ -13,6 +13,8 @@ import asyncpg
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
settings = get_settings()
|
settings = get_settings()
|
||||||
|
_failure_state: dict[int, dict[str, object]] = {}
|
||||||
|
_failure_log_interval_seconds = 300
|
||||||
|
|
||||||
|
|
||||||
def build_target_dsn(target: Target) -> str:
|
def build_target_dsn(target: Target) -> str:
|
||||||
@@ -182,8 +184,48 @@ async def collect_once() -> None:
|
|||||||
for target in targets:
|
for target in targets:
|
||||||
try:
|
try:
|
||||||
await collect_target(target)
|
await collect_target(target)
|
||||||
|
prev = _failure_state.pop(target.id, None)
|
||||||
|
if prev:
|
||||||
|
logger.info(
|
||||||
|
"collector_target_recovered target=%s after_failures=%s last_error=%s",
|
||||||
|
target.id,
|
||||||
|
prev.get("count", 0),
|
||||||
|
prev.get("error"),
|
||||||
|
)
|
||||||
except (OSError, SQLAlchemyError, asyncpg.PostgresError, Exception) as exc:
|
except (OSError, SQLAlchemyError, asyncpg.PostgresError, Exception) as exc:
|
||||||
logger.exception("collector_error target=%s err=%s", target.id, exc)
|
now = datetime.now(timezone.utc)
|
||||||
|
current_error = str(exc)
|
||||||
|
state = _failure_state.get(target.id)
|
||||||
|
if state is None:
|
||||||
|
_failure_state[target.id] = {
|
||||||
|
"count": 1,
|
||||||
|
"last_log_at": now,
|
||||||
|
"error": current_error,
|
||||||
|
}
|
||||||
|
logger.exception("collector_error target=%s err=%s", target.id, exc)
|
||||||
|
continue
|
||||||
|
|
||||||
|
count = int(state.get("count", 0)) + 1
|
||||||
|
last_log_at = state.get("last_log_at")
|
||||||
|
last_logged_error = str(state.get("error", ""))
|
||||||
|
should_log = False
|
||||||
|
if current_error != last_logged_error:
|
||||||
|
should_log = True
|
||||||
|
elif isinstance(last_log_at, datetime):
|
||||||
|
should_log = (now - last_log_at).total_seconds() >= _failure_log_interval_seconds
|
||||||
|
else:
|
||||||
|
should_log = True
|
||||||
|
|
||||||
|
state["count"] = count
|
||||||
|
if should_log:
|
||||||
|
state["last_log_at"] = now
|
||||||
|
state["error"] = current_error
|
||||||
|
logger.error(
|
||||||
|
"collector_error_throttled target=%s err=%s consecutive_failures=%s",
|
||||||
|
target.id,
|
||||||
|
current_error,
|
||||||
|
count,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def collector_loop(stop_event: asyncio.Event) -> None:
|
async def collector_loop(stop_event: asyncio.Event) -> None:
|
||||||
|
|||||||
Reference in New Issue
Block a user