From 610c5459e53ad3d6ff9e09ac09f9f38a896baf69 Mon Sep 17 00:00:00 2001 From: nessi Date: Wed, 18 Mar 2026 07:43:22 +0100 Subject: [PATCH] feat: add device traffic metrics with gateway telemetry reporting and admin UI display Add rx_bytes and tx_bytes fields to Device type and API responses. Add formatDataSize helper for human-readable byte formatting with units from B to TB. Add Received and Sent columns to devices table in admin UI with formatted traffic totals. Add traffic metrics display to device action panel. Add TelemetrySnapshot and PeerTelemetry types for gateway runtime stats. Add gateway telemetry endpoint at POST /gateway --- admin-web/src/api/client.ts | 2 + .../src/features/devices/DevicesPage.tsx | 23 +++ backend/internal/device/repository.go | 74 +++++++- backend/internal/device/types.go | 2 + backend/internal/gateway/handler.go | 20 +++ backend/internal/gateway/repository.go | 17 ++ backend/internal/gateway/service.go | 8 + backend/internal/gateway/types.go | 13 ++ backend/internal/httpserver/router.go | 1 + deploy/scripts/gateway-entrypoint.sh | 48 +++++ desktop-client/src-tauri/src/lib.rs | 22 ++- .../src-tauri/src/tunnel_manager.rs | 21 ++- desktop-client/src/App.tsx | 87 +++++++-- desktop-client/tunnel-helper/src/main.rs | 168 ++++++++++++++++-- 14 files changed, 472 insertions(+), 34 deletions(-) diff --git a/admin-web/src/api/client.ts b/admin-web/src/api/client.ts index cbb0272..fe2d523 100644 --- a/admin-web/src/api/client.ts +++ b/admin-web/src/api/client.ts @@ -17,6 +17,8 @@ export type Device = { platform: string; status: string; assigned_ip?: string; + rx_bytes: number; + tx_bytes: number; }; export type DeviceProfile = { diff --git a/admin-web/src/features/devices/DevicesPage.tsx b/admin-web/src/features/devices/DevicesPage.tsx index 54044a8..ea621ba 100644 --- a/admin-web/src/features/devices/DevicesPage.tsx +++ b/admin-web/src/features/devices/DevicesPage.tsx @@ -9,9 +9,28 @@ const columns = [ { key: "owner", label: "Owner" }, { key: "platform", label: "Platform" }, { key: "ip", label: "VPN IP" }, + { key: "received", label: "Received" }, + { key: "sent", label: "Sent" }, { key: "status", label: "Status" } ]; +function formatDataSize(bytes: number) { + if (!bytes) { + return "0 MB"; + } + + const units = ["B", "KB", "MB", "GB", "TB"]; + let value = bytes; + let unitIndex = 0; + + while (value >= 1024 && unitIndex < units.length - 1) { + value /= 1024; + unitIndex += 1; + } + + return `${value >= 100 || unitIndex === 0 ? value.toFixed(0) : value.toFixed(1)} ${units[unitIndex]}`; +} + export function DevicesPage() { const queryClient = useQueryClient(); const query = useQuery({ @@ -38,6 +57,8 @@ export function DevicesPage() { owner: device.user_id ?? "assigned user", platform: device.platform, ip: device.assigned_ip ?? "-", + received: formatDataSize(device.rx_bytes ?? 0), + sent: formatDataSize(device.tx_bytes ?? 0), status: device.status })) ?? []; @@ -59,6 +80,8 @@ export function DevicesPage() {

Device actions

Target: {rows[0].name}

+

Total received: {rows[0].received}

+

Total sent: {rows[0].sent}

diff --git a/backend/internal/device/repository.go b/backend/internal/device/repository.go index c9141bc..3c8d3c5 100644 --- a/backend/internal/device/repository.go +++ b/backend/internal/device/repository.go @@ -2,6 +2,7 @@ package device import ( "context" + "encoding/json" "errors" "net/netip" "time" @@ -198,7 +199,10 @@ func (r *PGRepository) ListByUser(ctx context.Context, userID uuid.UUID) ([]Devi } items = append(items, item) } - return items, rows.Err() + if err := rows.Err(); err != nil { + return nil, err + } + return r.applyRuntimeStats(ctx, items) } func (r *PGRepository) ListAll(ctx context.Context) ([]Device, error) { @@ -222,7 +226,10 @@ func (r *PGRepository) ListAll(ctx context.Context) ([]Device, error) { } items = append(items, item) } - return items, rows.Err() + if err := rows.Err(); err != nil { + return nil, err + } + return r.applyRuntimeStats(ctx, items) } func (r *PGRepository) Revoke(ctx context.Context, deviceID uuid.UUID) error { @@ -258,6 +265,69 @@ type enrollmentRowScanner interface { Scan(dest ...any) error } +type runtimeSnapshot struct { + Peers []runtimePeer `json:"peers"` +} + +type runtimePeer struct { + DeviceID string `json:"device_id"` + RXBytes uint64 `json:"rx_bytes"` + TXBytes uint64 `json:"tx_bytes"` +} + +func (r *PGRepository) applyRuntimeStats(ctx context.Context, items []Device) ([]Device, error) { + if len(items) == 0 { + return items, nil + } + + rows, err := r.db.Query(ctx, ` + select value + from settings + where category = 'gateway_runtime' + `) + if err != nil { + return nil, err + } + defer rows.Close() + + statsByDevice := make(map[uuid.UUID]runtimePeer) + for rows.Next() { + var raw []byte + if err := rows.Scan(&raw); err != nil { + return nil, err + } + + var snapshot runtimeSnapshot + if err := json.Unmarshal(raw, &snapshot); err != nil { + continue + } + + for _, peer := range snapshot.Peers { + deviceID, err := uuid.Parse(peer.DeviceID) + if err != nil { + continue + } + existing := statsByDevice[deviceID] + existing.DeviceID = peer.DeviceID + existing.RXBytes += peer.RXBytes + existing.TXBytes += peer.TXBytes + statsByDevice[deviceID] = existing + } + } + if err := rows.Err(); err != nil { + return nil, err + } + + for index := range items { + if stats, ok := statsByDevice[items[index].ID]; ok { + items[index].RXBytes = stats.RXBytes + items[index].TXBytes = stats.TXBytes + } + } + + return items, nil +} + func scanEnrollmentRow(row enrollmentRowScanner) (EnrollmentResponse, error) { var response EnrollmentResponse var profileRevision int diff --git a/backend/internal/device/types.go b/backend/internal/device/types.go index 7928602..0745876 100644 --- a/backend/internal/device/types.go +++ b/backend/internal/device/types.go @@ -19,6 +19,8 @@ type Device struct { Platform string `json:"platform"` Status string `json:"status"` AssignedIP string `json:"assigned_ip,omitempty"` + RXBytes uint64 `json:"rx_bytes"` + TXBytes uint64 `json:"tx_bytes"` } type ConnectionStatus struct { diff --git a/backend/internal/gateway/handler.go b/backend/internal/gateway/handler.go index 231259e..4c98072 100644 --- a/backend/internal/gateway/handler.go +++ b/backend/internal/gateway/handler.go @@ -89,3 +89,23 @@ func (h *Handler) AgentSyncBundle(w http.ResponseWriter, r *http.Request) { apiutil.JSON(w, http.StatusOK, bundle) } + +func (h *Handler) Telemetry(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("X-Gateway-Bootstrap-Token") != h.bootstrapToken { + apiutil.Error(w, http.StatusUnauthorized, "unauthorized", "invalid gateway bootstrap token") + return + } + + var input TelemetrySnapshot + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { + apiutil.Error(w, http.StatusBadRequest, "invalid_json", "invalid request body") + return + } + + if err := h.service.StoreTelemetry(r.Context(), chi.URLParam(r, "id"), input); err != nil { + apiutil.Error(w, http.StatusBadRequest, "gateway_telemetry_failed", "unable to store gateway telemetry") + return + } + + apiutil.JSON(w, http.StatusOK, map[string]any{"ok": true}) +} diff --git a/backend/internal/gateway/repository.go b/backend/internal/gateway/repository.go index 0296fff..e1b8e97 100644 --- a/backend/internal/gateway/repository.go +++ b/backend/internal/gateway/repository.go @@ -2,6 +2,7 @@ package gateway import ( "context" + "encoding/json" "net/netip" "github.com/google/uuid" @@ -14,6 +15,7 @@ type Repository interface { List(ctx context.Context) ([]Gateway, error) FirstActive(ctx context.Context) (Gateway, error) BuildSyncBundle(ctx context.Context, gatewayID uuid.UUID) (wireguard.GatewayBundle, error) + StoreTelemetry(ctx context.Context, gatewayID uuid.UUID, snapshot TelemetrySnapshot) error Update(ctx context.Context, gatewayID uuid.UUID, input UpdateRequest) (Gateway, error) UpsertByName(ctx context.Context, input BootstrapRequest) (Gateway, error) } @@ -139,6 +141,21 @@ func (r *PGRepository) Update(ctx context.Context, gatewayID uuid.UUID, input Up return item, err } +func (r *PGRepository) StoreTelemetry(ctx context.Context, gatewayID uuid.UUID, snapshot TelemetrySnapshot) error { + payload, err := json.Marshal(snapshot) + if err != nil { + return err + } + + _, err = r.db.Exec(ctx, ` + insert into settings (category, key, value, updated_at) + values ('gateway_runtime', $1, $2::jsonb, now()) + on conflict (category, key) + do update set value = excluded.value, updated_at = now() + `, gatewayID.String(), string(payload)) + return err +} + func (r *PGRepository) UpsertByName(ctx context.Context, input BootstrapRequest) (Gateway, error) { row := r.db.QueryRow(ctx, ` insert into gateways (id, name, endpoint, public_key, listen_port, vpn_cidr, dns_servers, is_active) diff --git a/backend/internal/gateway/service.go b/backend/internal/gateway/service.go index c4714ec..1fe1fa5 100644 --- a/backend/internal/gateway/service.go +++ b/backend/internal/gateway/service.go @@ -40,6 +40,14 @@ func (s *Service) Update(ctx context.Context, gatewayID string, input UpdateRequ return s.repo.Update(ctx, id, input) } +func (s *Service) StoreTelemetry(ctx context.Context, gatewayID string, snapshot TelemetrySnapshot) error { + id, err := uuid.Parse(gatewayID) + if err != nil { + return err + } + return s.repo.StoreTelemetry(ctx, id, snapshot) +} + func (s *Service) Bootstrap(ctx context.Context, input BootstrapRequest) (Gateway, error) { if input.Name == "" { input.Name = "primary-gateway" diff --git a/backend/internal/gateway/types.go b/backend/internal/gateway/types.go index 3834a51..d2aba06 100644 --- a/backend/internal/gateway/types.go +++ b/backend/internal/gateway/types.go @@ -30,3 +30,16 @@ type BootstrapRequest struct { VPNCIDR string `json:"vpn_cidr"` DNSServers []string `json:"dns_servers"` } + +type TelemetrySnapshot struct { + CollectedAt string `json:"collected_at"` + Peers []PeerTelemetry `json:"peers"` +} + +type PeerTelemetry struct { + DeviceID string `json:"device_id"` + PublicKey string `json:"public_key"` + RXBytes uint64 `json:"rx_bytes"` + TXBytes uint64 `json:"tx_bytes"` + LatestHandshakeAt *int64 `json:"latest_handshake_at,omitempty"` +} diff --git a/backend/internal/httpserver/router.go b/backend/internal/httpserver/router.go index cd3bd6a..a5adb1c 100644 --- a/backend/internal/httpserver/router.go +++ b/backend/internal/httpserver/router.go @@ -41,6 +41,7 @@ func NewRouter(jwtSecret string, handlers Handlers) http.Handler { r.Post("/auth/logout", handlers.Auth.Logout) r.Post("/gateway-agent/bootstrap", handlers.Gateway.Bootstrap) r.Get("/gateway-agent/{id}/sync", handlers.Gateway.AgentSyncBundle) + r.Post("/gateway-agent/{id}/telemetry", handlers.Gateway.Telemetry) r.Group(func(r chi.Router) { r.Use(AuthMiddleware(jwtSecret)) diff --git a/deploy/scripts/gateway-entrypoint.sh b/deploy/scripts/gateway-entrypoint.sh index f2d0228..1a5cf00 100644 --- a/deploy/scripts/gateway-entrypoint.sh +++ b/deploy/scripts/gateway-entrypoint.sh @@ -144,6 +144,54 @@ EOF echo "Applied WireGuard config from ${WG_CONF}" echo "Applied nftables config from ${NFT_CONF}" wg show "${IFACE}" latest-handshakes transfer 2>/dev/null || true + post_telemetry || true +} + +post_telemetry() { + if [ ! -f "${STATE_JSON}" ]; then + return 0 + fi + + TELEMETRY_URL="${SYNC_BASE_URL}/${NEXAVPN_GATEWAY_ID}/telemetry" + TMP_TELEMETRY_JSON="/tmp/nexavpn-gateway-telemetry.json" + + { + printf '{\"collected_at\":\"%s\",\"peers\":[' "$(date -u +"%Y-%m-%dT%H:%M:%SZ")" + FIRST=1 + + while IFS="$(printf '\t')" read -r PUBLIC_KEY _PRESHARED _ENDPOINT _ALLOWED_IPS LATEST_HANDSHAKE RX_BYTES TX_BYTES _KEEPALIVE; do + if [ -z "${PUBLIC_KEY:-}" ] || [ "${PUBLIC_KEY}" = "private_key" ]; then + continue + fi + + DEVICE_ID="$(jq -r --arg public_key "${PUBLIC_KEY}" '.peers[]? | select(.public_key == $public_key) | .device_id' "${STATE_JSON}" | head -n1)" + if [ -z "${DEVICE_ID:-}" ] || [ "${DEVICE_ID}" = "null" ]; then + continue + fi + + if [ "${FIRST}" -eq 0 ]; then + printf ',' + fi + FIRST=0 + + if [ "${LATEST_HANDSHAKE:-0}" -gt 0 ] 2>/dev/null; then + printf '{\"device_id\":\"%s\",\"public_key\":\"%s\",\"rx_bytes\":%s,\"tx_bytes\":%s,\"latest_handshake_at\":%s}' \ + "${DEVICE_ID}" "${PUBLIC_KEY}" "${RX_BYTES:-0}" "${TX_BYTES:-0}" "${LATEST_HANDSHAKE}" + else + printf '{\"device_id\":\"%s\",\"public_key\":\"%s\",\"rx_bytes\":%s,\"tx_bytes\":%s}' \ + "${DEVICE_ID}" "${PUBLIC_KEY}" "${RX_BYTES:-0}" "${TX_BYTES:-0}" + fi + done < <(wg show "${IFACE}" dump 2>/dev/null | tail -n +2) + + printf ']}' + } > "${TMP_TELEMETRY_JSON}" + + curl -fsSL \ + -H "Content-Type: application/json" \ + -H "X-Gateway-Bootstrap-Token: ${GATEWAY_BOOTSTRAP_TOKEN}" \ + -X POST \ + --data @"${TMP_TELEMETRY_JSON}" \ + "${TELEMETRY_URL}" >/dev/null } while true; do diff --git a/desktop-client/src-tauri/src/lib.rs b/desktop-client/src-tauri/src/lib.rs index 3662a47..14c9cb5 100644 --- a/desktop-client/src-tauri/src/lib.rs +++ b/desktop-client/src-tauri/src/lib.rs @@ -53,6 +53,14 @@ struct EnrollmentResult { tunnel_strategy: String, } +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct TunnelMetrics { + active: bool, + rx_bytes: u64, + tx_bytes: u64, +} + #[derive(Debug, Serialize)] struct LoginRequest<'a> { username: &'a str, @@ -318,6 +326,18 @@ fn tunnel_status(app: AppHandle, state: State<'_, AppState>) -> Result) -> Result { + let session = state.session.lock().map_err(|_| "Unable to read client state".to_string())?; + let session = session.as_ref().ok_or_else(|| "No active session is available".to_string())?; + let metrics = tunnel_manager::metrics(&app, std::path::Path::new(&session.profile_path))?; + Ok(TunnelMetrics { + active: metrics.active, + rx_bytes: metrics.rx_bytes, + tx_bytes: metrics.tx_bytes, + }) +} + fn generate_keypair() -> (String, String) { let private = StaticSecret::random_from_rng(OsRng); let public = PublicKey::from(&private); @@ -444,7 +464,7 @@ pub fn run() { } _ => {} }) - .invoke_handler(tauri::generate_handler![load_state, clear_session, enroll_device, sync_profile, connect_tunnel, disconnect_tunnel, tunnel_status]) + .invoke_handler(tauri::generate_handler![load_state, clear_session, enroll_device, sync_profile, connect_tunnel, disconnect_tunnel, tunnel_status, tunnel_metrics]) .run(tauri::generate_context!()) .expect("error while running tauri application"); } diff --git a/desktop-client/src-tauri/src/tunnel_manager.rs b/desktop-client/src-tauri/src/tunnel_manager.rs index ec452a0..2fe798b 100644 --- a/desktop-client/src-tauri/src/tunnel_manager.rs +++ b/desktop-client/src-tauri/src/tunnel_manager.rs @@ -3,6 +3,7 @@ use std::{ process::Command, }; +use serde::{Deserialize, Serialize}; use tauri::{AppHandle, Manager}; pub fn current_tunnel_strategy() -> &'static str { @@ -48,20 +49,32 @@ pub fn disconnect(app: &AppHandle, profile_path: &Path) -> Result<(), String> { } pub fn is_active(app: &AppHandle, profile_path: &Path) -> Result { + Ok(metrics(app, profile_path)?.active) +} + +pub fn metrics(app: &AppHandle, profile_path: &Path) -> Result { let backend = bundled_backend(app)?; let output = Command::new(backend) - .arg("status") + .arg("metrics") .arg("--profile") .arg(profile_path) .output() .map_err(|err| format!("Unable to query embedded tunnel backend: {}", err))?; if !output.status.success() { - return Err(format_helper_error("status", &output)); + return Err(format_helper_error("metrics", &output)); } - let stdout = String::from_utf8_lossy(&output.stdout); - Ok(stdout.trim().eq_ignore_ascii_case("active")) + serde_json::from_slice::(&output.stdout) + .map_err(|err| format!("Unable to decode tunnel metrics: {}", err)) +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TunnelMetrics { + pub active: bool, + pub rx_bytes: u64, + pub tx_bytes: u64, } fn bundled_backend(app: &AppHandle) -> Result { diff --git a/desktop-client/src/App.tsx b/desktop-client/src/App.tsx index 33c3922..62284e8 100644 --- a/desktop-client/src/App.tsx +++ b/desktop-client/src/App.tsx @@ -11,6 +11,12 @@ type EnrollmentState = { tunnelStrategy: string; }; +type TunnelMetrics = { + active: boolean; + rxBytes: number; + txBytes: number; +}; + function formatInvokeError(err: unknown, fallback: string) { if (typeof err === "string" && err.trim().length > 0) { return err; @@ -39,6 +45,23 @@ function currentProfileLabel(state: EnrollmentState | null) { return `Split tunnel (${state.resources.length} resources)`; } +function formatDataSize(bytes: number) { + if (!bytes) { + return "0 MB"; + } + + const units = ["B", "KB", "MB", "GB", "TB"]; + let value = bytes; + let unitIndex = 0; + + while (value >= 1024 && unitIndex < units.length - 1) { + value /= 1024; + unitIndex += 1; + } + + return `${value >= 100 || unitIndex === 0 ? value.toFixed(0) : value.toFixed(1)} ${units[unitIndex]}`; +} + export function App() { const [serverUrl, setServerUrl] = useState("http://localhost"); const [username, setUsername] = useState(""); @@ -48,34 +71,61 @@ export function App() { const [error, setError] = useState(null); const [connected, setConnected] = useState(false); const [state, setState] = useState(null); + const [metrics, setMetrics] = useState({ + active: false, + rxBytes: 0, + txBytes: 0 + }); + + async function refreshTunnelMetrics() { + try { + const value = await invoke("tunnel_metrics"); + setMetrics(value); + setConnected(value.active); + } catch { + setMetrics({ active: false, rxBytes: 0, txBytes: 0 }); + } + } useEffect(() => { void invoke("load_state") .then(async (value) => { if (value) { setState(value); - try { - const active = await invoke("tunnel_status"); - setConnected(active); - } catch { - setConnected(false); - } + await refreshTunnelMetrics(); } }) .catch(() => undefined); }, []); + useEffect(() => { + if (!state) { + setMetrics({ active: false, rxBytes: 0, txBytes: 0 }); + return undefined; + } + + void refreshTunnelMetrics(); + const timer = window.setInterval(() => { + void refreshTunnelMetrics(); + }, 5000); + + return () => window.clearInterval(timer); + }, [state]); + const profileLabel = useMemo(() => currentProfileLabel(state), [state]); async function waitForTunnelStatus(expected: boolean) { for (let attempt = 0; attempt < 8; attempt += 1) { try { - const active = await invoke("tunnel_status"); - if (active === expected) { - return active; + const value = await invoke("tunnel_metrics"); + setMetrics(value); + setConnected(value.active); + if (value.active === expected) { + return value.active; } } catch { if (!expected) { + setMetrics({ active: false, rxBytes: 0, txBytes: 0 }); return false; } } @@ -83,7 +133,13 @@ export function App() { await new Promise((resolve) => window.setTimeout(resolve, 500)); } - return invoke("tunnel_status").catch(() => false); + return invoke("tunnel_metrics") + .then((value) => { + setMetrics(value); + setConnected(value.active); + return value.active; + }) + .catch(() => false); } async function onSubmit(event: FormEvent) { @@ -96,6 +152,7 @@ export function App() { payload: { serverUrl, username, password } }); setState(result); + setMetrics({ active: false, rxBytes: 0, txBytes: 0 }); } catch (err) { setError(formatInvokeError(err, "Enrollment failed")); } finally { @@ -110,6 +167,7 @@ export function App() { try { const result = await invoke("sync_profile"); setState(result); + await refreshTunnelMetrics(); } catch (err) { setError(formatInvokeError(err, "Profile sync failed")); } finally { @@ -140,6 +198,7 @@ export function App() { await invoke("clear_session"); setConnected(false); setState(null); + setMetrics({ active: false, rxBytes: 0, txBytes: 0 }); setError(null); } catch (err) { setError(formatInvokeError(err, "Unable to clear local profile")); @@ -242,6 +301,14 @@ export function App() { Last sync {state.lastSyncTime}
+
+ Received + {formatDataSize(metrics.rxBytes)} +
+
+ Sent + {formatDataSize(metrics.txBytes)} +
diff --git a/desktop-client/tunnel-helper/src/main.rs b/desktop-client/tunnel-helper/src/main.rs index 7e4687e..06c046b 100644 --- a/desktop-client/tunnel-helper/src/main.rs +++ b/desktop-client/tunnel-helper/src/main.rs @@ -55,6 +55,15 @@ struct TunnelResponse { ok: bool, error: Option, active: Option, + rx_bytes: Option, + tx_bytes: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +struct TunnelMetrics { + active: bool, + rx_bytes: u64, + tx_bytes: u64, } fn main() -> ExitCode { @@ -84,6 +93,15 @@ fn run() -> Result<(), String> { println!("{}", if active { "active" } else { "inactive" }); Ok(()) } + "metrics" => { + let profile = parse_profile_arg(args)?; + println!( + "{}", + serde_json::to_string(&windows_client_metrics(&profile)?) + .map_err(|err| format!("Unable to encode tunnel metrics: {err}"))? + ); + Ok(()) + } "connect" | "disconnect" => { let profile = parse_profile_arg(args)?; windows_client_request(command.as_str(), &profile) @@ -101,6 +119,14 @@ fn run() -> Result<(), String> { println!("{}", if active { "active" } else { "inactive" }); Ok(()) } + "metrics" => { + println!( + "{}", + serde_json::to_string(&tunnel_metrics(&profile)?) + .map_err(|err| format!("Unable to encode tunnel metrics: {err}"))? + ); + Ok(()) + } "connect" => connect_direct(&profile), "disconnect" => disconnect_direct(&profile), _ => Err("unsupported command".into()), @@ -214,19 +240,23 @@ fn handle_service_client(mut stream: TcpStream) -> Result<(), String> { .map_err(|err| format!("Unable to decode IPC request: {err}"))?; let response = match request.action.as_str() { "connect" => match connect_direct(Path::new(&request.profile)) { - Ok(()) => TunnelResponse { ok: true, error: None, active: None }, + Ok(()) => TunnelResponse { ok: true, error: None, active: None, rx_bytes: None, tx_bytes: None }, Err(err) => TunnelResponse { ok: false, error: Some(err), active: None, + rx_bytes: None, + tx_bytes: None, }, }, "disconnect" => match disconnect_direct(Path::new(&request.profile)) { - Ok(()) => TunnelResponse { ok: true, error: None, active: None }, + Ok(()) => TunnelResponse { ok: true, error: None, active: None, rx_bytes: None, tx_bytes: None }, Err(err) => TunnelResponse { ok: false, error: Some(err), active: None, + rx_bytes: None, + tx_bytes: None, }, }, "status" => match tunnel_is_active(Path::new(&request.profile)) { @@ -234,17 +264,39 @@ fn handle_service_client(mut stream: TcpStream) -> Result<(), String> { ok: true, error: None, active: Some(active), + rx_bytes: None, + tx_bytes: None, }, Err(err) => TunnelResponse { ok: false, error: Some(err), active: None, + rx_bytes: None, + tx_bytes: None, + }, + }, + "metrics" => match tunnel_metrics(Path::new(&request.profile)) { + Ok(metrics) => TunnelResponse { + ok: true, + error: None, + active: Some(metrics.active), + rx_bytes: Some(metrics.rx_bytes), + tx_bytes: Some(metrics.tx_bytes), + }, + Err(err) => TunnelResponse { + ok: false, + error: Some(err), + active: None, + rx_bytes: None, + tx_bytes: None, }, }, _ => TunnelResponse { ok: false, error: Some("unsupported tunnel action".into()), active: None, + rx_bytes: None, + tx_bytes: None, }, }; @@ -367,34 +419,43 @@ fn windows_client_request(action: &str, profile: &Path) -> Result<(), String> { #[cfg(target_os = "windows")] fn windows_client_status(profile: &Path) -> Result { + Ok(windows_client_metrics(profile)?.active) +} + +#[cfg(target_os = "windows")] +fn windows_client_metrics(profile: &Path) -> Result { let mut stream = connect_to_service()?; let payload = serde_json::to_string(&TunnelRequest { - action: "status".to_string(), + action: "metrics".to_string(), profile: profile.display().to_string(), }) - .map_err(|err| format!("Unable to encode tunnel status request: {err}"))?; + .map_err(|err| format!("Unable to encode tunnel metrics request: {err}"))?; stream .write_all(payload.as_bytes()) .and_then(|_| stream.write_all(b"\n")) - .map_err(|err| format!("Unable to send tunnel status request: {err}"))?; + .map_err(|err| format!("Unable to send tunnel metrics request: {err}"))?; let mut reader = BufReader::new(stream); let mut line = String::new(); reader .read_line(&mut line) - .map_err(|err| format!("Unable to read tunnel status response: {err}"))?; + .map_err(|err| format!("Unable to read tunnel metrics response: {err}"))?; let response = serde_json::from_str::(&line) - .map_err(|err| format!("Unable to decode tunnel status response: {err}"))?; + .map_err(|err| format!("Unable to decode tunnel metrics response: {err}"))?; if !response.ok { return Err(response .error - .unwrap_or_else(|| "NexaVPN tunnel service status check failed".into())); + .unwrap_or_else(|| "NexaVPN tunnel service metrics check failed".into())); } - Ok(response.active.unwrap_or(false)) + Ok(TunnelMetrics { + active: response.active.unwrap_or(false), + rx_bytes: response.rx_bytes.unwrap_or(0), + tx_bytes: response.tx_bytes.unwrap_or(0), + }) } #[cfg(target_os = "windows")] @@ -517,12 +578,31 @@ fn disconnect_direct(profile: &Path) -> Result<(), String> { } fn tunnel_is_active(profile: &Path) -> Result { + Ok(tunnel_metrics(profile)?.active) +} + +fn tunnel_metrics(profile: &Path) -> Result { + let active = tunnel_service_is_active(profile)?; + if !active { + return Ok(TunnelMetrics { + active: false, + rx_bytes: 0, + tx_bytes: 0, + }); + } + + let (rx_bytes, tx_bytes) = read_transfer_totals(profile).unwrap_or((0, 0)); + Ok(TunnelMetrics { + active: true, + rx_bytes, + tx_bytes, + }) +} + +fn tunnel_service_is_active(profile: &Path) -> Result { #[cfg(target_os = "windows")] { - let tunnel_name = profile - .file_stem() - .and_then(|value| value.to_str()) - .ok_or_else(|| "invalid profile filename".to_string())?; + let tunnel_name = tunnel_name(profile)?; let service_name = format!("WireGuardTunnel${}", tunnel_name); let output = Command::new("sc") .arg("query") @@ -541,10 +621,7 @@ fn tunnel_is_active(profile: &Path) -> Result { #[cfg(target_os = "macos")] { - let tunnel_name = profile - .file_stem() - .and_then(|value| value.to_str()) - .ok_or_else(|| "invalid profile filename".to_string())?; + let tunnel_name = tunnel_name(profile)?; let status = Command::new("wg") .arg("show") .arg(tunnel_name) @@ -557,6 +634,63 @@ fn tunnel_is_active(profile: &Path) -> Result { Err("unsupported platform".into()) } +fn tunnel_name(profile: &Path) -> Result<&str, String> { + profile + .file_stem() + .and_then(|value| value.to_str()) + .ok_or_else(|| "invalid profile filename".to_string()) +} + +fn read_transfer_totals(profile: &Path) -> Result<(u64, u64), String> { + let tunnel_name = tunnel_name(profile)?; + let mut command = Command::new(find_wg_cli()?); + command.arg("show").arg(tunnel_name).arg("dump"); + #[cfg(target_os = "windows")] + command.creation_flags(CREATE_NO_WINDOW); + + let output = command + .output() + .map_err(|err| format!("Unable to query tunnel transfer counters: {err}"))?; + + if !output.status.success() { + return Err("Unable to read WireGuard transfer counters.".into()); + } + + let stdout = String::from_utf8_lossy(&output.stdout); + let mut rx_bytes = 0_u64; + let mut tx_bytes = 0_u64; + + for line in stdout.lines().skip(1) { + let fields: Vec<&str> = line.split('\t').collect(); + if fields.len() < 7 { + continue; + } + + rx_bytes = rx_bytes.saturating_add(fields[5].parse::().unwrap_or(0)); + tx_bytes = tx_bytes.saturating_add(fields[6].parse::().unwrap_or(0)); + } + + Ok((rx_bytes, tx_bytes)) +} + +#[cfg(target_os = "windows")] +fn find_wg_cli() -> Result { + let candidates = [ + PathBuf::from(r"C:\Program Files\WireGuard\wg.exe"), + PathBuf::from(r"C:\Program Files (x86)\WireGuard\wg.exe"), + ]; + + candidates + .into_iter() + .find(|path| path.exists()) + .ok_or_else(|| "required Windows WireGuard CLI is not available".to_string()) +} + +#[cfg(target_os = "macos")] +fn find_wg_cli() -> Result { + Ok(PathBuf::from("wg")) +} + #[cfg(target_os = "windows")] fn find_windows_wireguard() -> Result { let candidates = [