#!/usr/bin/env python3 """Gate pip-audit results with an auditable allowlist policy. Policy: - Block unresolved HIGH/CRITICAL vulnerabilities. - If severity is missing, treat as HIGH by default. - Allow temporary exceptions via allowlist with expiry metadata. """ from __future__ import annotations import argparse import datetime as dt import json import sys from pathlib import Path SEVERITY_ORDER = {"unknown": 0, "low": 1, "medium": 2, "high": 3, "critical": 4} BLOCKING_SEVERITIES = {"high", "critical"} def _parse_date(s: str) -> dt.date: return dt.date.fromisoformat(s) def _normalize_severity(value: object) -> str: """Normalize various pip-audit/osv-style severity payloads.""" if isinstance(value, str): v = value.strip().lower() if v in SEVERITY_ORDER: return v try: # CVSS numeric string fallback score = float(v) if score >= 9.0: return "critical" if score >= 7.0: return "high" if score >= 4.0: return "medium" return "low" except ValueError: return "unknown" if isinstance(value, (int, float)): score = float(value) if score >= 9.0: return "critical" if score >= 7.0: return "high" if score >= 4.0: return "medium" return "low" if isinstance(value, list): # OSV sometimes returns a list of dicts. Pick the max-known severity. best = "unknown" for item in value: if isinstance(item, dict): sev = _normalize_severity(item.get("severity")) if SEVERITY_ORDER.get(sev, 0) > SEVERITY_ORDER.get(best, 0): best = sev return best if isinstance(value, dict): return _normalize_severity(value.get("severity")) return "unknown" def _load_allowlist(path: Path) -> tuple[list[dict], list[str]]: if not path.exists(): return [], [] data = json.loads(path.read_text(encoding="utf-8")) entries = data.get("entries", []) today = dt.date.today() active: list[dict] = [] errors: list[str] = [] required = {"id", "reason", "approved_by", "issue", "expires_on"} for idx, entry in enumerate(entries, start=1): missing = required - set(entry.keys()) if missing: errors.append(f"allowlist entry #{idx} missing keys: {', '.join(sorted(missing))}") continue try: expires = _parse_date(str(entry["expires_on"])) except ValueError: errors.append(f"allowlist entry #{idx} has invalid expires_on: {entry['expires_on']}") continue if expires < today: errors.append( f"allowlist entry #{idx} ({entry['id']}) expired on {entry['expires_on']}" ) continue active.append(entry) return active, errors def _iter_findings(report: object): # pip-audit JSON can be list[dep] or dict with dependencies. deps = report if isinstance(report, list) else report.get("dependencies", []) for dep in deps: package = dep.get("name", "unknown") version = dep.get("version", "unknown") for vuln in dep.get("vulns", []): vuln_id = vuln.get("id", "unknown") aliases = vuln.get("aliases", []) or [] severity = _normalize_severity(vuln.get("severity")) if severity == "unknown": severity = "high" # conservative default for policy safety yield { "package": package, "version": version, "id": vuln_id, "aliases": aliases, "severity": severity, "fix_versions": vuln.get("fix_versions", []), } def _is_allowlisted(finding: dict, allowlist: list[dict]) -> bool: ids = {finding["id"], *finding["aliases"]} pkg = finding["package"] for entry in allowlist: entry_pkg = entry.get("package") if entry["id"] in ids and (not entry_pkg or entry_pkg == pkg): return True return False def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--report", required=True, help="Path to pip-audit JSON report") parser.add_argument("--allowlist", required=True, help="Path to allowlist JSON") args = parser.parse_args() report_path = Path(args.report) allowlist_path = Path(args.allowlist) if not report_path.exists(): print(f"[pip-audit-gate] Missing report: {report_path}") return 1 report = json.loads(report_path.read_text(encoding="utf-8")) allowlist, allowlist_errors = _load_allowlist(allowlist_path) if allowlist_errors: print("[pip-audit-gate] Allowlist validation failed:") for err in allowlist_errors: print(f" - {err}") return 1 unresolved_blocking: list[dict] = [] summary = {"critical": 0, "high": 0, "medium": 0, "low": 0, "unknown": 0} ignored = 0 for finding in _iter_findings(report): sev = finding["severity"] summary[sev] = summary.get(sev, 0) + 1 if _is_allowlisted(finding, allowlist): ignored += 1 continue if sev in BLOCKING_SEVERITIES: unresolved_blocking.append(finding) print("[pip-audit-gate] Summary:") print( f" CRITICAL={summary['critical']} HIGH={summary['high']} " f"MEDIUM={summary['medium']} LOW={summary['low']} ALLOWLISTED={ignored}" ) if unresolved_blocking: print("[pip-audit-gate] Blocking vulnerabilities found:") for f in unresolved_blocking: aliases = ", ".join(f["aliases"]) if f["aliases"] else "-" fixes = ", ".join(f["fix_versions"]) if f["fix_versions"] else "-" print( f" - {f['severity'].upper()} {f['package']}=={f['version']} " f"id={f['id']} aliases=[{aliases}] fixes=[{fixes}]" ) return 1 print("[pip-audit-gate] No unresolved HIGH/CRITICAL vulnerabilities.") return 0 if __name__ == "__main__": sys.exit(main())