Files
NexaPG/backend/scripts/pip_audit_gate.py
nessi 3932aa56f7
All checks were successful
Container CVE Scan (development) / Scan backend/frontend images for CVEs (push) Successful in 2m41s
PostgreSQL Compatibility Matrix / PG14 smoke (push) Successful in 8s
PostgreSQL Compatibility Matrix / PG15 smoke (push) Successful in 8s
PostgreSQL Compatibility Matrix / PG16 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG17 smoke (push) Successful in 7s
PostgreSQL Compatibility Matrix / PG18 smoke (push) Successful in 7s
Python Dependency Security / pip-audit (block high/critical) (push) Successful in 50s
[NX-202 Issue] Add pip-audit CI enforcement for Python dependency security
This commit integrates pip-audit to enforce vulnerability checks in CI. Dependencies with unresolved HIGH/CRITICAL vulnerabilities will block builds unless explicitly allowlisted. The process is documented, with a strict policy to ensure exceptions are trackable and time-limited.
2026-02-15 10:44:33 +01:00

193 lines
6.1 KiB
Python

#!/usr/bin/env python3
"""Gate pip-audit results with an auditable allowlist policy.
Policy:
- Block unresolved HIGH/CRITICAL vulnerabilities.
- If severity is missing, treat as HIGH by default.
- Allow temporary exceptions via allowlist with expiry metadata.
"""
from __future__ import annotations
import argparse
import datetime as dt
import json
import sys
from pathlib import Path
SEVERITY_ORDER = {"unknown": 0, "low": 1, "medium": 2, "high": 3, "critical": 4}
BLOCKING_SEVERITIES = {"high", "critical"}
def _parse_date(s: str) -> dt.date:
return dt.date.fromisoformat(s)
def _normalize_severity(value: object) -> str:
"""Normalize various pip-audit/osv-style severity payloads."""
if isinstance(value, str):
v = value.strip().lower()
if v in SEVERITY_ORDER:
return v
try:
# CVSS numeric string fallback
score = float(v)
if score >= 9.0:
return "critical"
if score >= 7.0:
return "high"
if score >= 4.0:
return "medium"
return "low"
except ValueError:
return "unknown"
if isinstance(value, (int, float)):
score = float(value)
if score >= 9.0:
return "critical"
if score >= 7.0:
return "high"
if score >= 4.0:
return "medium"
return "low"
if isinstance(value, list):
# OSV sometimes returns a list of dicts. Pick the max-known severity.
best = "unknown"
for item in value:
if isinstance(item, dict):
sev = _normalize_severity(item.get("severity"))
if SEVERITY_ORDER.get(sev, 0) > SEVERITY_ORDER.get(best, 0):
best = sev
return best
if isinstance(value, dict):
return _normalize_severity(value.get("severity"))
return "unknown"
def _load_allowlist(path: Path) -> tuple[list[dict], list[str]]:
if not path.exists():
return [], []
data = json.loads(path.read_text(encoding="utf-8"))
entries = data.get("entries", [])
today = dt.date.today()
active: list[dict] = []
errors: list[str] = []
required = {"id", "reason", "approved_by", "issue", "expires_on"}
for idx, entry in enumerate(entries, start=1):
missing = required - set(entry.keys())
if missing:
errors.append(f"allowlist entry #{idx} missing keys: {', '.join(sorted(missing))}")
continue
try:
expires = _parse_date(str(entry["expires_on"]))
except ValueError:
errors.append(f"allowlist entry #{idx} has invalid expires_on: {entry['expires_on']}")
continue
if expires < today:
errors.append(
f"allowlist entry #{idx} ({entry['id']}) expired on {entry['expires_on']}"
)
continue
active.append(entry)
return active, errors
def _iter_findings(report: object):
# pip-audit JSON can be list[dep] or dict with dependencies.
deps = report if isinstance(report, list) else report.get("dependencies", [])
for dep in deps:
package = dep.get("name", "unknown")
version = dep.get("version", "unknown")
for vuln in dep.get("vulns", []):
vuln_id = vuln.get("id", "unknown")
aliases = vuln.get("aliases", []) or []
severity = _normalize_severity(vuln.get("severity"))
if severity == "unknown":
severity = "high" # conservative default for policy safety
yield {
"package": package,
"version": version,
"id": vuln_id,
"aliases": aliases,
"severity": severity,
"fix_versions": vuln.get("fix_versions", []),
}
def _is_allowlisted(finding: dict, allowlist: list[dict]) -> bool:
ids = {finding["id"], *finding["aliases"]}
pkg = finding["package"]
for entry in allowlist:
entry_pkg = entry.get("package")
if entry["id"] in ids and (not entry_pkg or entry_pkg == pkg):
return True
return False
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--report", required=True, help="Path to pip-audit JSON report")
parser.add_argument("--allowlist", required=True, help="Path to allowlist JSON")
args = parser.parse_args()
report_path = Path(args.report)
allowlist_path = Path(args.allowlist)
if not report_path.exists():
print(f"[pip-audit-gate] Missing report: {report_path}")
return 1
report = json.loads(report_path.read_text(encoding="utf-8"))
allowlist, allowlist_errors = _load_allowlist(allowlist_path)
if allowlist_errors:
print("[pip-audit-gate] Allowlist validation failed:")
for err in allowlist_errors:
print(f" - {err}")
return 1
unresolved_blocking: list[dict] = []
summary = {"critical": 0, "high": 0, "medium": 0, "low": 0, "unknown": 0}
ignored = 0
for finding in _iter_findings(report):
sev = finding["severity"]
summary[sev] = summary.get(sev, 0) + 1
if _is_allowlisted(finding, allowlist):
ignored += 1
continue
if sev in BLOCKING_SEVERITIES:
unresolved_blocking.append(finding)
print("[pip-audit-gate] Summary:")
print(
f" CRITICAL={summary['critical']} HIGH={summary['high']} "
f"MEDIUM={summary['medium']} LOW={summary['low']} ALLOWLISTED={ignored}"
)
if unresolved_blocking:
print("[pip-audit-gate] Blocking vulnerabilities found:")
for f in unresolved_blocking:
aliases = ", ".join(f["aliases"]) if f["aliases"] else "-"
fixes = ", ".join(f["fix_versions"]) if f["fix_versions"] else "-"
print(
f" - {f['severity'].upper()} {f['package']}=={f['version']} "
f"id={f['id']} aliases=[{aliases}] fixes=[{fixes}]"
)
return 1
print("[pip-audit-gate] No unresolved HIGH/CRITICAL vulnerabilities.")
return 0
if __name__ == "__main__":
sys.exit(main())