feat: add device traffic metrics with gateway telemetry reporting and admin UI display
Add rx_bytes and tx_bytes fields to Device type and API responses. Add formatDataSize helper for human-readable byte formatting with units from B to TB. Add Received and Sent columns to devices table in admin UI with formatted traffic totals. Add traffic metrics display to device action panel. Add TelemetrySnapshot and PeerTelemetry types for gateway runtime stats. Add gateway telemetry endpoint at POST /gateway
This commit is contained in:
@@ -2,6 +2,7 @@ package device
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/netip"
|
||||
"time"
|
||||
@@ -198,7 +199,10 @@ func (r *PGRepository) ListByUser(ctx context.Context, userID uuid.UUID) ([]Devi
|
||||
}
|
||||
items = append(items, item)
|
||||
}
|
||||
return items, rows.Err()
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return r.applyRuntimeStats(ctx, items)
|
||||
}
|
||||
|
||||
func (r *PGRepository) ListAll(ctx context.Context) ([]Device, error) {
|
||||
@@ -222,7 +226,10 @@ func (r *PGRepository) ListAll(ctx context.Context) ([]Device, error) {
|
||||
}
|
||||
items = append(items, item)
|
||||
}
|
||||
return items, rows.Err()
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return r.applyRuntimeStats(ctx, items)
|
||||
}
|
||||
|
||||
func (r *PGRepository) Revoke(ctx context.Context, deviceID uuid.UUID) error {
|
||||
@@ -258,6 +265,69 @@ type enrollmentRowScanner interface {
|
||||
Scan(dest ...any) error
|
||||
}
|
||||
|
||||
type runtimeSnapshot struct {
|
||||
Peers []runtimePeer `json:"peers"`
|
||||
}
|
||||
|
||||
type runtimePeer struct {
|
||||
DeviceID string `json:"device_id"`
|
||||
RXBytes uint64 `json:"rx_bytes"`
|
||||
TXBytes uint64 `json:"tx_bytes"`
|
||||
}
|
||||
|
||||
func (r *PGRepository) applyRuntimeStats(ctx context.Context, items []Device) ([]Device, error) {
|
||||
if len(items) == 0 {
|
||||
return items, nil
|
||||
}
|
||||
|
||||
rows, err := r.db.Query(ctx, `
|
||||
select value
|
||||
from settings
|
||||
where category = 'gateway_runtime'
|
||||
`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
statsByDevice := make(map[uuid.UUID]runtimePeer)
|
||||
for rows.Next() {
|
||||
var raw []byte
|
||||
if err := rows.Scan(&raw); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var snapshot runtimeSnapshot
|
||||
if err := json.Unmarshal(raw, &snapshot); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, peer := range snapshot.Peers {
|
||||
deviceID, err := uuid.Parse(peer.DeviceID)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
existing := statsByDevice[deviceID]
|
||||
existing.DeviceID = peer.DeviceID
|
||||
existing.RXBytes += peer.RXBytes
|
||||
existing.TXBytes += peer.TXBytes
|
||||
statsByDevice[deviceID] = existing
|
||||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for index := range items {
|
||||
if stats, ok := statsByDevice[items[index].ID]; ok {
|
||||
items[index].RXBytes = stats.RXBytes
|
||||
items[index].TXBytes = stats.TXBytes
|
||||
}
|
||||
}
|
||||
|
||||
return items, nil
|
||||
}
|
||||
|
||||
func scanEnrollmentRow(row enrollmentRowScanner) (EnrollmentResponse, error) {
|
||||
var response EnrollmentResponse
|
||||
var profileRevision int
|
||||
|
||||
@@ -19,6 +19,8 @@ type Device struct {
|
||||
Platform string `json:"platform"`
|
||||
Status string `json:"status"`
|
||||
AssignedIP string `json:"assigned_ip,omitempty"`
|
||||
RXBytes uint64 `json:"rx_bytes"`
|
||||
TXBytes uint64 `json:"tx_bytes"`
|
||||
}
|
||||
|
||||
type ConnectionStatus struct {
|
||||
|
||||
@@ -89,3 +89,23 @@ func (h *Handler) AgentSyncBundle(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
apiutil.JSON(w, http.StatusOK, bundle)
|
||||
}
|
||||
|
||||
func (h *Handler) Telemetry(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Header.Get("X-Gateway-Bootstrap-Token") != h.bootstrapToken {
|
||||
apiutil.Error(w, http.StatusUnauthorized, "unauthorized", "invalid gateway bootstrap token")
|
||||
return
|
||||
}
|
||||
|
||||
var input TelemetrySnapshot
|
||||
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
|
||||
apiutil.Error(w, http.StatusBadRequest, "invalid_json", "invalid request body")
|
||||
return
|
||||
}
|
||||
|
||||
if err := h.service.StoreTelemetry(r.Context(), chi.URLParam(r, "id"), input); err != nil {
|
||||
apiutil.Error(w, http.StatusBadRequest, "gateway_telemetry_failed", "unable to store gateway telemetry")
|
||||
return
|
||||
}
|
||||
|
||||
apiutil.JSON(w, http.StatusOK, map[string]any{"ok": true})
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package gateway
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/netip"
|
||||
|
||||
"github.com/google/uuid"
|
||||
@@ -14,6 +15,7 @@ type Repository interface {
|
||||
List(ctx context.Context) ([]Gateway, error)
|
||||
FirstActive(ctx context.Context) (Gateway, error)
|
||||
BuildSyncBundle(ctx context.Context, gatewayID uuid.UUID) (wireguard.GatewayBundle, error)
|
||||
StoreTelemetry(ctx context.Context, gatewayID uuid.UUID, snapshot TelemetrySnapshot) error
|
||||
Update(ctx context.Context, gatewayID uuid.UUID, input UpdateRequest) (Gateway, error)
|
||||
UpsertByName(ctx context.Context, input BootstrapRequest) (Gateway, error)
|
||||
}
|
||||
@@ -139,6 +141,21 @@ func (r *PGRepository) Update(ctx context.Context, gatewayID uuid.UUID, input Up
|
||||
return item, err
|
||||
}
|
||||
|
||||
func (r *PGRepository) StoreTelemetry(ctx context.Context, gatewayID uuid.UUID, snapshot TelemetrySnapshot) error {
|
||||
payload, err := json.Marshal(snapshot)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = r.db.Exec(ctx, `
|
||||
insert into settings (category, key, value, updated_at)
|
||||
values ('gateway_runtime', $1, $2::jsonb, now())
|
||||
on conflict (category, key)
|
||||
do update set value = excluded.value, updated_at = now()
|
||||
`, gatewayID.String(), string(payload))
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *PGRepository) UpsertByName(ctx context.Context, input BootstrapRequest) (Gateway, error) {
|
||||
row := r.db.QueryRow(ctx, `
|
||||
insert into gateways (id, name, endpoint, public_key, listen_port, vpn_cidr, dns_servers, is_active)
|
||||
|
||||
@@ -40,6 +40,14 @@ func (s *Service) Update(ctx context.Context, gatewayID string, input UpdateRequ
|
||||
return s.repo.Update(ctx, id, input)
|
||||
}
|
||||
|
||||
func (s *Service) StoreTelemetry(ctx context.Context, gatewayID string, snapshot TelemetrySnapshot) error {
|
||||
id, err := uuid.Parse(gatewayID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s.repo.StoreTelemetry(ctx, id, snapshot)
|
||||
}
|
||||
|
||||
func (s *Service) Bootstrap(ctx context.Context, input BootstrapRequest) (Gateway, error) {
|
||||
if input.Name == "" {
|
||||
input.Name = "primary-gateway"
|
||||
|
||||
@@ -30,3 +30,16 @@ type BootstrapRequest struct {
|
||||
VPNCIDR string `json:"vpn_cidr"`
|
||||
DNSServers []string `json:"dns_servers"`
|
||||
}
|
||||
|
||||
type TelemetrySnapshot struct {
|
||||
CollectedAt string `json:"collected_at"`
|
||||
Peers []PeerTelemetry `json:"peers"`
|
||||
}
|
||||
|
||||
type PeerTelemetry struct {
|
||||
DeviceID string `json:"device_id"`
|
||||
PublicKey string `json:"public_key"`
|
||||
RXBytes uint64 `json:"rx_bytes"`
|
||||
TXBytes uint64 `json:"tx_bytes"`
|
||||
LatestHandshakeAt *int64 `json:"latest_handshake_at,omitempty"`
|
||||
}
|
||||
|
||||
@@ -41,6 +41,7 @@ func NewRouter(jwtSecret string, handlers Handlers) http.Handler {
|
||||
r.Post("/auth/logout", handlers.Auth.Logout)
|
||||
r.Post("/gateway-agent/bootstrap", handlers.Gateway.Bootstrap)
|
||||
r.Get("/gateway-agent/{id}/sync", handlers.Gateway.AgentSyncBundle)
|
||||
r.Post("/gateway-agent/{id}/telemetry", handlers.Gateway.Telemetry)
|
||||
|
||||
r.Group(func(r chi.Router) {
|
||||
r.Use(AuthMiddleware(jwtSecret))
|
||||
|
||||
Reference in New Issue
Block a user