diff --git a/admin-web/src/api/client.ts b/admin-web/src/api/client.ts index cbb0272..fe2d523 100644 --- a/admin-web/src/api/client.ts +++ b/admin-web/src/api/client.ts @@ -17,6 +17,8 @@ export type Device = { platform: string; status: string; assigned_ip?: string; + rx_bytes: number; + tx_bytes: number; }; export type DeviceProfile = { diff --git a/admin-web/src/features/devices/DevicesPage.tsx b/admin-web/src/features/devices/DevicesPage.tsx index 54044a8..ea621ba 100644 --- a/admin-web/src/features/devices/DevicesPage.tsx +++ b/admin-web/src/features/devices/DevicesPage.tsx @@ -9,9 +9,28 @@ const columns = [ { key: "owner", label: "Owner" }, { key: "platform", label: "Platform" }, { key: "ip", label: "VPN IP" }, + { key: "received", label: "Received" }, + { key: "sent", label: "Sent" }, { key: "status", label: "Status" } ]; +function formatDataSize(bytes: number) { + if (!bytes) { + return "0 MB"; + } + + const units = ["B", "KB", "MB", "GB", "TB"]; + let value = bytes; + let unitIndex = 0; + + while (value >= 1024 && unitIndex < units.length - 1) { + value /= 1024; + unitIndex += 1; + } + + return `${value >= 100 || unitIndex === 0 ? value.toFixed(0) : value.toFixed(1)} ${units[unitIndex]}`; +} + export function DevicesPage() { const queryClient = useQueryClient(); const query = useQuery({ @@ -38,6 +57,8 @@ export function DevicesPage() { owner: device.user_id ?? "assigned user", platform: device.platform, ip: device.assigned_ip ?? "-", + received: formatDataSize(device.rx_bytes ?? 0), + sent: formatDataSize(device.tx_bytes ?? 0), status: device.status })) ?? []; @@ -59,6 +80,8 @@ export function DevicesPage() {

Device actions

Target: {rows[0].name}

+

Total received: {rows[0].received}

+

Total sent: {rows[0].sent}

diff --git a/backend/internal/device/repository.go b/backend/internal/device/repository.go index c9141bc..3c8d3c5 100644 --- a/backend/internal/device/repository.go +++ b/backend/internal/device/repository.go @@ -2,6 +2,7 @@ package device import ( "context" + "encoding/json" "errors" "net/netip" "time" @@ -198,7 +199,10 @@ func (r *PGRepository) ListByUser(ctx context.Context, userID uuid.UUID) ([]Devi } items = append(items, item) } - return items, rows.Err() + if err := rows.Err(); err != nil { + return nil, err + } + return r.applyRuntimeStats(ctx, items) } func (r *PGRepository) ListAll(ctx context.Context) ([]Device, error) { @@ -222,7 +226,10 @@ func (r *PGRepository) ListAll(ctx context.Context) ([]Device, error) { } items = append(items, item) } - return items, rows.Err() + if err := rows.Err(); err != nil { + return nil, err + } + return r.applyRuntimeStats(ctx, items) } func (r *PGRepository) Revoke(ctx context.Context, deviceID uuid.UUID) error { @@ -258,6 +265,69 @@ type enrollmentRowScanner interface { Scan(dest ...any) error } +type runtimeSnapshot struct { + Peers []runtimePeer `json:"peers"` +} + +type runtimePeer struct { + DeviceID string `json:"device_id"` + RXBytes uint64 `json:"rx_bytes"` + TXBytes uint64 `json:"tx_bytes"` +} + +func (r *PGRepository) applyRuntimeStats(ctx context.Context, items []Device) ([]Device, error) { + if len(items) == 0 { + return items, nil + } + + rows, err := r.db.Query(ctx, ` + select value + from settings + where category = 'gateway_runtime' + `) + if err != nil { + return nil, err + } + defer rows.Close() + + statsByDevice := make(map[uuid.UUID]runtimePeer) + for rows.Next() { + var raw []byte + if err := rows.Scan(&raw); err != nil { + return nil, err + } + + var snapshot runtimeSnapshot + if err := json.Unmarshal(raw, &snapshot); err != nil { + continue + } + + for _, peer := range snapshot.Peers { + deviceID, err := uuid.Parse(peer.DeviceID) + if err != nil { + continue + } + existing := statsByDevice[deviceID] + existing.DeviceID = peer.DeviceID + existing.RXBytes += peer.RXBytes + existing.TXBytes += peer.TXBytes + statsByDevice[deviceID] = existing + } + } + if err := rows.Err(); err != nil { + return nil, err + } + + for index := range items { + if stats, ok := statsByDevice[items[index].ID]; ok { + items[index].RXBytes = stats.RXBytes + items[index].TXBytes = stats.TXBytes + } + } + + return items, nil +} + func scanEnrollmentRow(row enrollmentRowScanner) (EnrollmentResponse, error) { var response EnrollmentResponse var profileRevision int diff --git a/backend/internal/device/types.go b/backend/internal/device/types.go index 7928602..0745876 100644 --- a/backend/internal/device/types.go +++ b/backend/internal/device/types.go @@ -19,6 +19,8 @@ type Device struct { Platform string `json:"platform"` Status string `json:"status"` AssignedIP string `json:"assigned_ip,omitempty"` + RXBytes uint64 `json:"rx_bytes"` + TXBytes uint64 `json:"tx_bytes"` } type ConnectionStatus struct { diff --git a/backend/internal/gateway/handler.go b/backend/internal/gateway/handler.go index 231259e..4c98072 100644 --- a/backend/internal/gateway/handler.go +++ b/backend/internal/gateway/handler.go @@ -89,3 +89,23 @@ func (h *Handler) AgentSyncBundle(w http.ResponseWriter, r *http.Request) { apiutil.JSON(w, http.StatusOK, bundle) } + +func (h *Handler) Telemetry(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("X-Gateway-Bootstrap-Token") != h.bootstrapToken { + apiutil.Error(w, http.StatusUnauthorized, "unauthorized", "invalid gateway bootstrap token") + return + } + + var input TelemetrySnapshot + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { + apiutil.Error(w, http.StatusBadRequest, "invalid_json", "invalid request body") + return + } + + if err := h.service.StoreTelemetry(r.Context(), chi.URLParam(r, "id"), input); err != nil { + apiutil.Error(w, http.StatusBadRequest, "gateway_telemetry_failed", "unable to store gateway telemetry") + return + } + + apiutil.JSON(w, http.StatusOK, map[string]any{"ok": true}) +} diff --git a/backend/internal/gateway/repository.go b/backend/internal/gateway/repository.go index 0296fff..e1b8e97 100644 --- a/backend/internal/gateway/repository.go +++ b/backend/internal/gateway/repository.go @@ -2,6 +2,7 @@ package gateway import ( "context" + "encoding/json" "net/netip" "github.com/google/uuid" @@ -14,6 +15,7 @@ type Repository interface { List(ctx context.Context) ([]Gateway, error) FirstActive(ctx context.Context) (Gateway, error) BuildSyncBundle(ctx context.Context, gatewayID uuid.UUID) (wireguard.GatewayBundle, error) + StoreTelemetry(ctx context.Context, gatewayID uuid.UUID, snapshot TelemetrySnapshot) error Update(ctx context.Context, gatewayID uuid.UUID, input UpdateRequest) (Gateway, error) UpsertByName(ctx context.Context, input BootstrapRequest) (Gateway, error) } @@ -139,6 +141,21 @@ func (r *PGRepository) Update(ctx context.Context, gatewayID uuid.UUID, input Up return item, err } +func (r *PGRepository) StoreTelemetry(ctx context.Context, gatewayID uuid.UUID, snapshot TelemetrySnapshot) error { + payload, err := json.Marshal(snapshot) + if err != nil { + return err + } + + _, err = r.db.Exec(ctx, ` + insert into settings (category, key, value, updated_at) + values ('gateway_runtime', $1, $2::jsonb, now()) + on conflict (category, key) + do update set value = excluded.value, updated_at = now() + `, gatewayID.String(), string(payload)) + return err +} + func (r *PGRepository) UpsertByName(ctx context.Context, input BootstrapRequest) (Gateway, error) { row := r.db.QueryRow(ctx, ` insert into gateways (id, name, endpoint, public_key, listen_port, vpn_cidr, dns_servers, is_active) diff --git a/backend/internal/gateway/service.go b/backend/internal/gateway/service.go index c4714ec..1fe1fa5 100644 --- a/backend/internal/gateway/service.go +++ b/backend/internal/gateway/service.go @@ -40,6 +40,14 @@ func (s *Service) Update(ctx context.Context, gatewayID string, input UpdateRequ return s.repo.Update(ctx, id, input) } +func (s *Service) StoreTelemetry(ctx context.Context, gatewayID string, snapshot TelemetrySnapshot) error { + id, err := uuid.Parse(gatewayID) + if err != nil { + return err + } + return s.repo.StoreTelemetry(ctx, id, snapshot) +} + func (s *Service) Bootstrap(ctx context.Context, input BootstrapRequest) (Gateway, error) { if input.Name == "" { input.Name = "primary-gateway" diff --git a/backend/internal/gateway/types.go b/backend/internal/gateway/types.go index 3834a51..d2aba06 100644 --- a/backend/internal/gateway/types.go +++ b/backend/internal/gateway/types.go @@ -30,3 +30,16 @@ type BootstrapRequest struct { VPNCIDR string `json:"vpn_cidr"` DNSServers []string `json:"dns_servers"` } + +type TelemetrySnapshot struct { + CollectedAt string `json:"collected_at"` + Peers []PeerTelemetry `json:"peers"` +} + +type PeerTelemetry struct { + DeviceID string `json:"device_id"` + PublicKey string `json:"public_key"` + RXBytes uint64 `json:"rx_bytes"` + TXBytes uint64 `json:"tx_bytes"` + LatestHandshakeAt *int64 `json:"latest_handshake_at,omitempty"` +} diff --git a/backend/internal/httpserver/router.go b/backend/internal/httpserver/router.go index cd3bd6a..a5adb1c 100644 --- a/backend/internal/httpserver/router.go +++ b/backend/internal/httpserver/router.go @@ -41,6 +41,7 @@ func NewRouter(jwtSecret string, handlers Handlers) http.Handler { r.Post("/auth/logout", handlers.Auth.Logout) r.Post("/gateway-agent/bootstrap", handlers.Gateway.Bootstrap) r.Get("/gateway-agent/{id}/sync", handlers.Gateway.AgentSyncBundle) + r.Post("/gateway-agent/{id}/telemetry", handlers.Gateway.Telemetry) r.Group(func(r chi.Router) { r.Use(AuthMiddleware(jwtSecret)) diff --git a/deploy/scripts/gateway-entrypoint.sh b/deploy/scripts/gateway-entrypoint.sh index f2d0228..1a5cf00 100644 --- a/deploy/scripts/gateway-entrypoint.sh +++ b/deploy/scripts/gateway-entrypoint.sh @@ -144,6 +144,54 @@ EOF echo "Applied WireGuard config from ${WG_CONF}" echo "Applied nftables config from ${NFT_CONF}" wg show "${IFACE}" latest-handshakes transfer 2>/dev/null || true + post_telemetry || true +} + +post_telemetry() { + if [ ! -f "${STATE_JSON}" ]; then + return 0 + fi + + TELEMETRY_URL="${SYNC_BASE_URL}/${NEXAVPN_GATEWAY_ID}/telemetry" + TMP_TELEMETRY_JSON="/tmp/nexavpn-gateway-telemetry.json" + + { + printf '{\"collected_at\":\"%s\",\"peers\":[' "$(date -u +"%Y-%m-%dT%H:%M:%SZ")" + FIRST=1 + + while IFS="$(printf '\t')" read -r PUBLIC_KEY _PRESHARED _ENDPOINT _ALLOWED_IPS LATEST_HANDSHAKE RX_BYTES TX_BYTES _KEEPALIVE; do + if [ -z "${PUBLIC_KEY:-}" ] || [ "${PUBLIC_KEY}" = "private_key" ]; then + continue + fi + + DEVICE_ID="$(jq -r --arg public_key "${PUBLIC_KEY}" '.peers[]? | select(.public_key == $public_key) | .device_id' "${STATE_JSON}" | head -n1)" + if [ -z "${DEVICE_ID:-}" ] || [ "${DEVICE_ID}" = "null" ]; then + continue + fi + + if [ "${FIRST}" -eq 0 ]; then + printf ',' + fi + FIRST=0 + + if [ "${LATEST_HANDSHAKE:-0}" -gt 0 ] 2>/dev/null; then + printf '{\"device_id\":\"%s\",\"public_key\":\"%s\",\"rx_bytes\":%s,\"tx_bytes\":%s,\"latest_handshake_at\":%s}' \ + "${DEVICE_ID}" "${PUBLIC_KEY}" "${RX_BYTES:-0}" "${TX_BYTES:-0}" "${LATEST_HANDSHAKE}" + else + printf '{\"device_id\":\"%s\",\"public_key\":\"%s\",\"rx_bytes\":%s,\"tx_bytes\":%s}' \ + "${DEVICE_ID}" "${PUBLIC_KEY}" "${RX_BYTES:-0}" "${TX_BYTES:-0}" + fi + done < <(wg show "${IFACE}" dump 2>/dev/null | tail -n +2) + + printf ']}' + } > "${TMP_TELEMETRY_JSON}" + + curl -fsSL \ + -H "Content-Type: application/json" \ + -H "X-Gateway-Bootstrap-Token: ${GATEWAY_BOOTSTRAP_TOKEN}" \ + -X POST \ + --data @"${TMP_TELEMETRY_JSON}" \ + "${TELEMETRY_URL}" >/dev/null } while true; do diff --git a/desktop-client/src-tauri/src/lib.rs b/desktop-client/src-tauri/src/lib.rs index 3662a47..14c9cb5 100644 --- a/desktop-client/src-tauri/src/lib.rs +++ b/desktop-client/src-tauri/src/lib.rs @@ -53,6 +53,14 @@ struct EnrollmentResult { tunnel_strategy: String, } +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct TunnelMetrics { + active: bool, + rx_bytes: u64, + tx_bytes: u64, +} + #[derive(Debug, Serialize)] struct LoginRequest<'a> { username: &'a str, @@ -318,6 +326,18 @@ fn tunnel_status(app: AppHandle, state: State<'_, AppState>) -> Result) -> Result { + let session = state.session.lock().map_err(|_| "Unable to read client state".to_string())?; + let session = session.as_ref().ok_or_else(|| "No active session is available".to_string())?; + let metrics = tunnel_manager::metrics(&app, std::path::Path::new(&session.profile_path))?; + Ok(TunnelMetrics { + active: metrics.active, + rx_bytes: metrics.rx_bytes, + tx_bytes: metrics.tx_bytes, + }) +} + fn generate_keypair() -> (String, String) { let private = StaticSecret::random_from_rng(OsRng); let public = PublicKey::from(&private); @@ -444,7 +464,7 @@ pub fn run() { } _ => {} }) - .invoke_handler(tauri::generate_handler![load_state, clear_session, enroll_device, sync_profile, connect_tunnel, disconnect_tunnel, tunnel_status]) + .invoke_handler(tauri::generate_handler![load_state, clear_session, enroll_device, sync_profile, connect_tunnel, disconnect_tunnel, tunnel_status, tunnel_metrics]) .run(tauri::generate_context!()) .expect("error while running tauri application"); } diff --git a/desktop-client/src-tauri/src/tunnel_manager.rs b/desktop-client/src-tauri/src/tunnel_manager.rs index ec452a0..2fe798b 100644 --- a/desktop-client/src-tauri/src/tunnel_manager.rs +++ b/desktop-client/src-tauri/src/tunnel_manager.rs @@ -3,6 +3,7 @@ use std::{ process::Command, }; +use serde::{Deserialize, Serialize}; use tauri::{AppHandle, Manager}; pub fn current_tunnel_strategy() -> &'static str { @@ -48,20 +49,32 @@ pub fn disconnect(app: &AppHandle, profile_path: &Path) -> Result<(), String> { } pub fn is_active(app: &AppHandle, profile_path: &Path) -> Result { + Ok(metrics(app, profile_path)?.active) +} + +pub fn metrics(app: &AppHandle, profile_path: &Path) -> Result { let backend = bundled_backend(app)?; let output = Command::new(backend) - .arg("status") + .arg("metrics") .arg("--profile") .arg(profile_path) .output() .map_err(|err| format!("Unable to query embedded tunnel backend: {}", err))?; if !output.status.success() { - return Err(format_helper_error("status", &output)); + return Err(format_helper_error("metrics", &output)); } - let stdout = String::from_utf8_lossy(&output.stdout); - Ok(stdout.trim().eq_ignore_ascii_case("active")) + serde_json::from_slice::(&output.stdout) + .map_err(|err| format!("Unable to decode tunnel metrics: {}", err)) +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TunnelMetrics { + pub active: bool, + pub rx_bytes: u64, + pub tx_bytes: u64, } fn bundled_backend(app: &AppHandle) -> Result { diff --git a/desktop-client/src/App.tsx b/desktop-client/src/App.tsx index 33c3922..62284e8 100644 --- a/desktop-client/src/App.tsx +++ b/desktop-client/src/App.tsx @@ -11,6 +11,12 @@ type EnrollmentState = { tunnelStrategy: string; }; +type TunnelMetrics = { + active: boolean; + rxBytes: number; + txBytes: number; +}; + function formatInvokeError(err: unknown, fallback: string) { if (typeof err === "string" && err.trim().length > 0) { return err; @@ -39,6 +45,23 @@ function currentProfileLabel(state: EnrollmentState | null) { return `Split tunnel (${state.resources.length} resources)`; } +function formatDataSize(bytes: number) { + if (!bytes) { + return "0 MB"; + } + + const units = ["B", "KB", "MB", "GB", "TB"]; + let value = bytes; + let unitIndex = 0; + + while (value >= 1024 && unitIndex < units.length - 1) { + value /= 1024; + unitIndex += 1; + } + + return `${value >= 100 || unitIndex === 0 ? value.toFixed(0) : value.toFixed(1)} ${units[unitIndex]}`; +} + export function App() { const [serverUrl, setServerUrl] = useState("http://localhost"); const [username, setUsername] = useState(""); @@ -48,34 +71,61 @@ export function App() { const [error, setError] = useState(null); const [connected, setConnected] = useState(false); const [state, setState] = useState(null); + const [metrics, setMetrics] = useState({ + active: false, + rxBytes: 0, + txBytes: 0 + }); + + async function refreshTunnelMetrics() { + try { + const value = await invoke("tunnel_metrics"); + setMetrics(value); + setConnected(value.active); + } catch { + setMetrics({ active: false, rxBytes: 0, txBytes: 0 }); + } + } useEffect(() => { void invoke("load_state") .then(async (value) => { if (value) { setState(value); - try { - const active = await invoke("tunnel_status"); - setConnected(active); - } catch { - setConnected(false); - } + await refreshTunnelMetrics(); } }) .catch(() => undefined); }, []); + useEffect(() => { + if (!state) { + setMetrics({ active: false, rxBytes: 0, txBytes: 0 }); + return undefined; + } + + void refreshTunnelMetrics(); + const timer = window.setInterval(() => { + void refreshTunnelMetrics(); + }, 5000); + + return () => window.clearInterval(timer); + }, [state]); + const profileLabel = useMemo(() => currentProfileLabel(state), [state]); async function waitForTunnelStatus(expected: boolean) { for (let attempt = 0; attempt < 8; attempt += 1) { try { - const active = await invoke("tunnel_status"); - if (active === expected) { - return active; + const value = await invoke("tunnel_metrics"); + setMetrics(value); + setConnected(value.active); + if (value.active === expected) { + return value.active; } } catch { if (!expected) { + setMetrics({ active: false, rxBytes: 0, txBytes: 0 }); return false; } } @@ -83,7 +133,13 @@ export function App() { await new Promise((resolve) => window.setTimeout(resolve, 500)); } - return invoke("tunnel_status").catch(() => false); + return invoke("tunnel_metrics") + .then((value) => { + setMetrics(value); + setConnected(value.active); + return value.active; + }) + .catch(() => false); } async function onSubmit(event: FormEvent) { @@ -96,6 +152,7 @@ export function App() { payload: { serverUrl, username, password } }); setState(result); + setMetrics({ active: false, rxBytes: 0, txBytes: 0 }); } catch (err) { setError(formatInvokeError(err, "Enrollment failed")); } finally { @@ -110,6 +167,7 @@ export function App() { try { const result = await invoke("sync_profile"); setState(result); + await refreshTunnelMetrics(); } catch (err) { setError(formatInvokeError(err, "Profile sync failed")); } finally { @@ -140,6 +198,7 @@ export function App() { await invoke("clear_session"); setConnected(false); setState(null); + setMetrics({ active: false, rxBytes: 0, txBytes: 0 }); setError(null); } catch (err) { setError(formatInvokeError(err, "Unable to clear local profile")); @@ -242,6 +301,14 @@ export function App() { Last sync {state.lastSyncTime}
+
+ Received + {formatDataSize(metrics.rxBytes)} +
+
+ Sent + {formatDataSize(metrics.txBytes)} +
diff --git a/desktop-client/tunnel-helper/src/main.rs b/desktop-client/tunnel-helper/src/main.rs index 7e4687e..06c046b 100644 --- a/desktop-client/tunnel-helper/src/main.rs +++ b/desktop-client/tunnel-helper/src/main.rs @@ -55,6 +55,15 @@ struct TunnelResponse { ok: bool, error: Option, active: Option, + rx_bytes: Option, + tx_bytes: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +struct TunnelMetrics { + active: bool, + rx_bytes: u64, + tx_bytes: u64, } fn main() -> ExitCode { @@ -84,6 +93,15 @@ fn run() -> Result<(), String> { println!("{}", if active { "active" } else { "inactive" }); Ok(()) } + "metrics" => { + let profile = parse_profile_arg(args)?; + println!( + "{}", + serde_json::to_string(&windows_client_metrics(&profile)?) + .map_err(|err| format!("Unable to encode tunnel metrics: {err}"))? + ); + Ok(()) + } "connect" | "disconnect" => { let profile = parse_profile_arg(args)?; windows_client_request(command.as_str(), &profile) @@ -101,6 +119,14 @@ fn run() -> Result<(), String> { println!("{}", if active { "active" } else { "inactive" }); Ok(()) } + "metrics" => { + println!( + "{}", + serde_json::to_string(&tunnel_metrics(&profile)?) + .map_err(|err| format!("Unable to encode tunnel metrics: {err}"))? + ); + Ok(()) + } "connect" => connect_direct(&profile), "disconnect" => disconnect_direct(&profile), _ => Err("unsupported command".into()), @@ -214,19 +240,23 @@ fn handle_service_client(mut stream: TcpStream) -> Result<(), String> { .map_err(|err| format!("Unable to decode IPC request: {err}"))?; let response = match request.action.as_str() { "connect" => match connect_direct(Path::new(&request.profile)) { - Ok(()) => TunnelResponse { ok: true, error: None, active: None }, + Ok(()) => TunnelResponse { ok: true, error: None, active: None, rx_bytes: None, tx_bytes: None }, Err(err) => TunnelResponse { ok: false, error: Some(err), active: None, + rx_bytes: None, + tx_bytes: None, }, }, "disconnect" => match disconnect_direct(Path::new(&request.profile)) { - Ok(()) => TunnelResponse { ok: true, error: None, active: None }, + Ok(()) => TunnelResponse { ok: true, error: None, active: None, rx_bytes: None, tx_bytes: None }, Err(err) => TunnelResponse { ok: false, error: Some(err), active: None, + rx_bytes: None, + tx_bytes: None, }, }, "status" => match tunnel_is_active(Path::new(&request.profile)) { @@ -234,17 +264,39 @@ fn handle_service_client(mut stream: TcpStream) -> Result<(), String> { ok: true, error: None, active: Some(active), + rx_bytes: None, + tx_bytes: None, }, Err(err) => TunnelResponse { ok: false, error: Some(err), active: None, + rx_bytes: None, + tx_bytes: None, + }, + }, + "metrics" => match tunnel_metrics(Path::new(&request.profile)) { + Ok(metrics) => TunnelResponse { + ok: true, + error: None, + active: Some(metrics.active), + rx_bytes: Some(metrics.rx_bytes), + tx_bytes: Some(metrics.tx_bytes), + }, + Err(err) => TunnelResponse { + ok: false, + error: Some(err), + active: None, + rx_bytes: None, + tx_bytes: None, }, }, _ => TunnelResponse { ok: false, error: Some("unsupported tunnel action".into()), active: None, + rx_bytes: None, + tx_bytes: None, }, }; @@ -367,34 +419,43 @@ fn windows_client_request(action: &str, profile: &Path) -> Result<(), String> { #[cfg(target_os = "windows")] fn windows_client_status(profile: &Path) -> Result { + Ok(windows_client_metrics(profile)?.active) +} + +#[cfg(target_os = "windows")] +fn windows_client_metrics(profile: &Path) -> Result { let mut stream = connect_to_service()?; let payload = serde_json::to_string(&TunnelRequest { - action: "status".to_string(), + action: "metrics".to_string(), profile: profile.display().to_string(), }) - .map_err(|err| format!("Unable to encode tunnel status request: {err}"))?; + .map_err(|err| format!("Unable to encode tunnel metrics request: {err}"))?; stream .write_all(payload.as_bytes()) .and_then(|_| stream.write_all(b"\n")) - .map_err(|err| format!("Unable to send tunnel status request: {err}"))?; + .map_err(|err| format!("Unable to send tunnel metrics request: {err}"))?; let mut reader = BufReader::new(stream); let mut line = String::new(); reader .read_line(&mut line) - .map_err(|err| format!("Unable to read tunnel status response: {err}"))?; + .map_err(|err| format!("Unable to read tunnel metrics response: {err}"))?; let response = serde_json::from_str::(&line) - .map_err(|err| format!("Unable to decode tunnel status response: {err}"))?; + .map_err(|err| format!("Unable to decode tunnel metrics response: {err}"))?; if !response.ok { return Err(response .error - .unwrap_or_else(|| "NexaVPN tunnel service status check failed".into())); + .unwrap_or_else(|| "NexaVPN tunnel service metrics check failed".into())); } - Ok(response.active.unwrap_or(false)) + Ok(TunnelMetrics { + active: response.active.unwrap_or(false), + rx_bytes: response.rx_bytes.unwrap_or(0), + tx_bytes: response.tx_bytes.unwrap_or(0), + }) } #[cfg(target_os = "windows")] @@ -517,12 +578,31 @@ fn disconnect_direct(profile: &Path) -> Result<(), String> { } fn tunnel_is_active(profile: &Path) -> Result { + Ok(tunnel_metrics(profile)?.active) +} + +fn tunnel_metrics(profile: &Path) -> Result { + let active = tunnel_service_is_active(profile)?; + if !active { + return Ok(TunnelMetrics { + active: false, + rx_bytes: 0, + tx_bytes: 0, + }); + } + + let (rx_bytes, tx_bytes) = read_transfer_totals(profile).unwrap_or((0, 0)); + Ok(TunnelMetrics { + active: true, + rx_bytes, + tx_bytes, + }) +} + +fn tunnel_service_is_active(profile: &Path) -> Result { #[cfg(target_os = "windows")] { - let tunnel_name = profile - .file_stem() - .and_then(|value| value.to_str()) - .ok_or_else(|| "invalid profile filename".to_string())?; + let tunnel_name = tunnel_name(profile)?; let service_name = format!("WireGuardTunnel${}", tunnel_name); let output = Command::new("sc") .arg("query") @@ -541,10 +621,7 @@ fn tunnel_is_active(profile: &Path) -> Result { #[cfg(target_os = "macos")] { - let tunnel_name = profile - .file_stem() - .and_then(|value| value.to_str()) - .ok_or_else(|| "invalid profile filename".to_string())?; + let tunnel_name = tunnel_name(profile)?; let status = Command::new("wg") .arg("show") .arg(tunnel_name) @@ -557,6 +634,63 @@ fn tunnel_is_active(profile: &Path) -> Result { Err("unsupported platform".into()) } +fn tunnel_name(profile: &Path) -> Result<&str, String> { + profile + .file_stem() + .and_then(|value| value.to_str()) + .ok_or_else(|| "invalid profile filename".to_string()) +} + +fn read_transfer_totals(profile: &Path) -> Result<(u64, u64), String> { + let tunnel_name = tunnel_name(profile)?; + let mut command = Command::new(find_wg_cli()?); + command.arg("show").arg(tunnel_name).arg("dump"); + #[cfg(target_os = "windows")] + command.creation_flags(CREATE_NO_WINDOW); + + let output = command + .output() + .map_err(|err| format!("Unable to query tunnel transfer counters: {err}"))?; + + if !output.status.success() { + return Err("Unable to read WireGuard transfer counters.".into()); + } + + let stdout = String::from_utf8_lossy(&output.stdout); + let mut rx_bytes = 0_u64; + let mut tx_bytes = 0_u64; + + for line in stdout.lines().skip(1) { + let fields: Vec<&str> = line.split('\t').collect(); + if fields.len() < 7 { + continue; + } + + rx_bytes = rx_bytes.saturating_add(fields[5].parse::().unwrap_or(0)); + tx_bytes = tx_bytes.saturating_add(fields[6].parse::().unwrap_or(0)); + } + + Ok((rx_bytes, tx_bytes)) +} + +#[cfg(target_os = "windows")] +fn find_wg_cli() -> Result { + let candidates = [ + PathBuf::from(r"C:\Program Files\WireGuard\wg.exe"), + PathBuf::from(r"C:\Program Files (x86)\WireGuard\wg.exe"), + ]; + + candidates + .into_iter() + .find(|path| path.exists()) + .ok_or_else(|| "required Windows WireGuard CLI is not available".to_string()) +} + +#[cfg(target_os = "macos")] +fn find_wg_cli() -> Result { + Ok(PathBuf::from("wg")) +} + #[cfg(target_os = "windows")] fn find_windows_wireguard() -> Result { let candidates = [