All checks were successful
PostgreSQL Compatibility Matrix / PG14 smoke (push) Successful in 10s
PostgreSQL Compatibility Matrix / PG15 smoke (push) Successful in 8s
PostgreSQL Compatibility Matrix / PG16 smoke (push) Successful in 8s
PostgreSQL Compatibility Matrix / PG17 smoke (push) Successful in 8s
PostgreSQL Compatibility Matrix / PG18 smoke (push) Successful in 8s
Updated the "Check for Updates" functionality to rely solely on the latest published release in the upstream repository. Removed redundant code for fetching tags and commits, enhancing maintainability and reducing complexity.
92 lines
3.1 KiB
Python
92 lines
3.1 KiB
Python
import asyncio
|
|
import json
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from urllib.error import URLError
|
|
from urllib.request import Request, urlopen
|
|
|
|
UPSTREAM_REPO_WEB = "https://git.nesterovic.cc/nessi/NexaPG"
|
|
UPSTREAM_REPO_API = "https://git.nesterovic.cc/api/v1/repos/nessi/NexaPG"
|
|
|
|
|
|
def _extract_version(payload: str) -> str | None:
|
|
txt = payload.strip()
|
|
if not txt:
|
|
return None
|
|
|
|
try:
|
|
data = json.loads(txt)
|
|
if isinstance(data, dict):
|
|
for key in ("latest_version", "version", "tag_name", "name"):
|
|
value = data.get(key)
|
|
if isinstance(value, str) and value.strip():
|
|
return value.strip()
|
|
if isinstance(data, list) and data and isinstance(data[0], dict):
|
|
for key in ("latest_version", "version", "tag_name", "name"):
|
|
value = data[0].get(key)
|
|
if isinstance(value, str) and value.strip():
|
|
return value.strip()
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
first_line = txt.splitlines()[0].strip()
|
|
if first_line:
|
|
return first_line[:64]
|
|
return None
|
|
|
|
|
|
def _parse_semver(value: str) -> tuple[int, ...] | None:
|
|
normalized = value.strip().lower()
|
|
if normalized.startswith("v"):
|
|
normalized = normalized[1:]
|
|
parts = re.findall(r"\d+", normalized)
|
|
if not parts:
|
|
return None
|
|
return tuple(int(p) for p in parts[:4])
|
|
|
|
|
|
def is_update_available(current_version: str, latest_version: str) -> bool:
|
|
current = _parse_semver(current_version)
|
|
latest = _parse_semver(latest_version)
|
|
if current and latest:
|
|
max_len = max(len(current), len(latest))
|
|
current = current + (0,) * (max_len - len(current))
|
|
latest = latest + (0,) * (max_len - len(latest))
|
|
return latest > current
|
|
return latest_version.strip() != current_version.strip()
|
|
|
|
|
|
def _get_json(url: str):
|
|
req = Request(url, headers={"User-Agent": "NexaPG/1.0"})
|
|
with urlopen(req, timeout=8) as response:
|
|
raw = response.read(64_000).decode("utf-8", errors="replace")
|
|
return json.loads(raw)
|
|
|
|
|
|
def _fetch_latest_from_upstream_sync() -> tuple[str, str]:
|
|
latest_release_url = f"{UPSTREAM_REPO_API}/releases/latest"
|
|
|
|
try:
|
|
release = _get_json(latest_release_url)
|
|
if isinstance(release, dict):
|
|
tag = (release.get("tag_name") or release.get("name") or "").strip()
|
|
if tag:
|
|
return tag[:64], "release"
|
|
except Exception as exc:
|
|
raise ValueError(f"Could not fetch latest release from upstream repository: {exc}") from exc
|
|
raise ValueError("No published release found in upstream repository")
|
|
|
|
|
|
async def fetch_latest_from_upstream() -> tuple[str | None, str | None, str | None]:
|
|
try:
|
|
latest, ref = await asyncio.to_thread(_fetch_latest_from_upstream_sync)
|
|
return latest, ref, None
|
|
except URLError as exc:
|
|
return None, None, f"Version check failed: {exc.reason}"
|
|
except Exception as exc: # noqa: BLE001
|
|
return None, None, f"Version check failed: {exc}"
|
|
|
|
|
|
def utcnow() -> datetime:
|
|
return datetime.now(timezone.utc)
|