Files
NexaVPN/backend/internal/device/repository.go
nessi 610c5459e5 feat: add device traffic metrics with gateway telemetry reporting and admin UI display
Add rx_bytes and tx_bytes fields to Device type and API responses. Add formatDataSize helper for human-readable byte formatting with units from B to TB. Add Received and Sent columns to devices table in admin UI with formatted traffic totals. Add traffic metrics display to device action panel.

Add TelemetrySnapshot and PeerTelemetry types for gateway runtime stats. Add gateway telemetry endpoint at POST /gateway
2026-03-18 07:43:22 +01:00

382 lines
10 KiB
Go

package device
import (
"context"
"encoding/json"
"errors"
"net/netip"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
)
type Repository interface {
FindNextAvailableIP(ctx context.Context, gatewayID uuid.UUID, vpnCIDR string, startOffset int) (string, error)
Enroll(ctx context.Context, userID uuid.UUID, gatewayID uuid.UUID, input EnrollRequest, assignedIP string, dnsServers []string, allowedIPs []string) (EnrollmentResponse, error)
ListByUser(ctx context.Context, userID uuid.UUID) ([]Device, error)
ListAll(ctx context.Context) ([]Device, error)
GetLatestEnrollmentByUser(ctx context.Context, userID uuid.UUID) (EnrollmentResponse, error)
GetEnrollmentByDeviceID(ctx context.Context, deviceID uuid.UUID) (EnrollmentResponse, error)
Revoke(ctx context.Context, deviceID uuid.UUID) error
Rotate(ctx context.Context, deviceID uuid.UUID) error
}
type PGRepository struct {
db *pgxpool.Pool
}
func NewPGRepository(db *pgxpool.Pool) *PGRepository {
return &PGRepository{db: db}
}
func (r *PGRepository) FindNextAvailableIP(ctx context.Context, gatewayID uuid.UUID, vpnCIDR string, startOffset int) (string, error) {
prefix, err := netip.ParsePrefix(vpnCIDR)
if err != nil {
return "", err
}
rows, err := r.db.Query(ctx, `
select host(address)
from ip_allocations
where gateway_id = $1
`, gatewayID)
if err != nil {
return "", err
}
defer rows.Close()
used := map[string]struct{}{}
for rows.Next() {
var address string
if err := rows.Scan(&address); err != nil {
return "", err
}
used[address] = struct{}{}
}
if err := rows.Err(); err != nil {
return "", err
}
address := prefix.Addr().Next()
for i := 1; i < startOffset; i++ {
address = address.Next()
}
for prefix.Contains(address) {
if _, exists := used[address.String()]; !exists {
return address.String() + "/32", nil
}
address = address.Next()
}
return "", errors.New("no available ip addresses for gateway")
}
func (r *PGRepository) Enroll(ctx context.Context, userID uuid.UUID, gatewayID uuid.UUID, input EnrollRequest, assignedIP string, dnsServers []string, allowedIPs []string) (EnrollmentResponse, error) {
tx, err := r.db.Begin(ctx)
if err != nil {
return EnrollmentResponse{}, err
}
defer tx.Rollback(ctx)
deviceID := uuid.New()
peerID := uuid.New()
_, err = tx.Exec(ctx, `
insert into devices (
id, user_id, gateway_id, name, platform, os_version, app_version, device_fingerprint, public_key, status, approved_at
) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, 'active', now())
`, deviceID, userID, gatewayID, input.Name, input.Platform, input.OSVersion, input.AppVersion, input.DeviceFingerprint, input.PublicKey)
if err != nil {
return EnrollmentResponse{}, err
}
_, err = tx.Exec(ctx, `
insert into wireguard_peers (
id, device_id, gateway_id, public_key, assigned_ip, allowed_ips, dns_servers, last_profile_issued_at
) values ($1, $2, $3, $4, $5::inet, $6::cidr[], $7::text[], now())
`, peerID, deviceID, gatewayID, input.PublicKey, assignedIP, allowedIPs, dnsServers)
if err != nil {
return EnrollmentResponse{}, err
}
_, err = tx.Exec(ctx, `
insert into ip_allocations (id, gateway_id, device_id, address, status)
values ($1, $2, $3, $4::inet, 'allocated')
`, uuid.New(), gatewayID, deviceID, assignedIP)
if err != nil {
return EnrollmentResponse{}, err
}
if err := tx.Commit(ctx); err != nil {
return EnrollmentResponse{}, err
}
return EnrollmentResponse{
Device: Device{
ID: deviceID,
UserID: userID,
GatewayID: gatewayID,
Name: input.Name,
Platform: input.Platform,
Status: "active",
AssignedIP: assignedIP,
},
}, nil
}
func (r *PGRepository) GetLatestEnrollmentByUser(ctx context.Context, userID uuid.UUID) (EnrollmentResponse, error) {
row := r.db.QueryRow(ctx, `
select
d.id,
d.user_id,
d.gateway_id,
d.name,
d.platform,
d.status,
host(wp.assigned_ip),
wp.profile_revision,
wp.last_profile_issued_at,
g.name,
g.endpoint,
g.public_key,
coalesce(wp.dns_servers, '{}')::text[],
coalesce(array(select cidr::text from unnest(wp.allowed_ips) as cidr), '{}')::text[]
from devices d
join wireguard_peers wp on wp.device_id = d.id and wp.deleted_at is null
join gateways g on g.id = wp.gateway_id
where d.user_id = $1 and d.deleted_at is null
order by d.created_at desc
limit 1
`, userID)
return scanEnrollmentRow(row)
}
func (r *PGRepository) GetEnrollmentByDeviceID(ctx context.Context, deviceID uuid.UUID) (EnrollmentResponse, error) {
row := r.db.QueryRow(ctx, `
select
d.id,
d.user_id,
d.gateway_id,
d.name,
d.platform,
d.status,
host(wp.assigned_ip),
wp.profile_revision,
wp.last_profile_issued_at,
g.name,
g.endpoint,
g.public_key,
coalesce(wp.dns_servers, '{}')::text[],
coalesce(array(select cidr::text from unnest(wp.allowed_ips) as cidr), '{}')::text[]
from devices d
join wireguard_peers wp on wp.device_id = d.id and wp.deleted_at is null
join gateways g on g.id = wp.gateway_id
where d.id = $1 and d.deleted_at is null
`, deviceID)
return scanEnrollmentRow(row)
}
func (r *PGRepository) ListByUser(ctx context.Context, userID uuid.UUID) ([]Device, error) {
rows, err := r.db.Query(ctx, `
select d.id, d.user_id, d.gateway_id, d.name, d.platform, d.status, coalesce(host(wp.assigned_ip), '')
from devices d
left join wireguard_peers wp on wp.device_id = d.id and wp.deleted_at is null
where d.user_id = $1 and d.deleted_at is null
order by d.created_at desc
`, userID)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Device
for rows.Next() {
var item Device
if err := rows.Scan(&item.ID, &item.UserID, &item.GatewayID, &item.Name, &item.Platform, &item.Status, &item.AssignedIP); err != nil {
return nil, err
}
items = append(items, item)
}
if err := rows.Err(); err != nil {
return nil, err
}
return r.applyRuntimeStats(ctx, items)
}
func (r *PGRepository) ListAll(ctx context.Context) ([]Device, error) {
rows, err := r.db.Query(ctx, `
select d.id, d.user_id, d.gateway_id, d.name, d.platform, d.status, coalesce(host(wp.assigned_ip), '')
from devices d
left join wireguard_peers wp on wp.device_id = d.id and wp.deleted_at is null
where d.deleted_at is null
order by d.created_at desc
`)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Device
for rows.Next() {
var item Device
if err := rows.Scan(&item.ID, &item.UserID, &item.GatewayID, &item.Name, &item.Platform, &item.Status, &item.AssignedIP); err != nil {
return nil, err
}
items = append(items, item)
}
if err := rows.Err(); err != nil {
return nil, err
}
return r.applyRuntimeStats(ctx, items)
}
func (r *PGRepository) Revoke(ctx context.Context, deviceID uuid.UUID) error {
tx, err := r.db.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
if _, err := tx.Exec(ctx, `update devices set status = 'revoked', revoked_at = now(), updated_at = now() where id = $1`, deviceID); err != nil {
return err
}
if _, err := tx.Exec(ctx, `update wireguard_peers set deleted_at = now(), updated_at = now() where device_id = $1 and deleted_at is null`, deviceID); err != nil {
return err
}
if _, err := tx.Exec(ctx, `update ip_allocations set status = 'released', released_at = now(), updated_at = now() where device_id = $1 and status = 'allocated'`, deviceID); err != nil {
return err
}
return tx.Commit(ctx)
}
func (r *PGRepository) Rotate(ctx context.Context, deviceID uuid.UUID) error {
_, err := r.db.Exec(ctx, `
update wireguard_peers
set profile_revision = profile_revision + 1, last_profile_issued_at = now(), updated_at = now()
where device_id = $1 and deleted_at is null
`, deviceID)
return err
}
type enrollmentRowScanner interface {
Scan(dest ...any) error
}
type runtimeSnapshot struct {
Peers []runtimePeer `json:"peers"`
}
type runtimePeer struct {
DeviceID string `json:"device_id"`
RXBytes uint64 `json:"rx_bytes"`
TXBytes uint64 `json:"tx_bytes"`
}
func (r *PGRepository) applyRuntimeStats(ctx context.Context, items []Device) ([]Device, error) {
if len(items) == 0 {
return items, nil
}
rows, err := r.db.Query(ctx, `
select value
from settings
where category = 'gateway_runtime'
`)
if err != nil {
return nil, err
}
defer rows.Close()
statsByDevice := make(map[uuid.UUID]runtimePeer)
for rows.Next() {
var raw []byte
if err := rows.Scan(&raw); err != nil {
return nil, err
}
var snapshot runtimeSnapshot
if err := json.Unmarshal(raw, &snapshot); err != nil {
continue
}
for _, peer := range snapshot.Peers {
deviceID, err := uuid.Parse(peer.DeviceID)
if err != nil {
continue
}
existing := statsByDevice[deviceID]
existing.DeviceID = peer.DeviceID
existing.RXBytes += peer.RXBytes
existing.TXBytes += peer.TXBytes
statsByDevice[deviceID] = existing
}
}
if err := rows.Err(); err != nil {
return nil, err
}
for index := range items {
if stats, ok := statsByDevice[items[index].ID]; ok {
items[index].RXBytes = stats.RXBytes
items[index].TXBytes = stats.TXBytes
}
}
return items, nil
}
func scanEnrollmentRow(row enrollmentRowScanner) (EnrollmentResponse, error) {
var response EnrollmentResponse
var profileRevision int
var lastIssuedAt *time.Time
var gatewayName string
var gatewayEndpoint string
var gatewayPublicKey string
var dnsServers []string
var allowedIPs []string
if err := row.Scan(
&response.Device.ID,
&response.Device.UserID,
&response.Device.GatewayID,
&response.Device.Name,
&response.Device.Platform,
&response.Device.Status,
&response.Device.AssignedIP,
&profileRevision,
&lastIssuedAt,
&gatewayName,
&gatewayEndpoint,
&gatewayPublicKey,
&dnsServers,
&allowedIPs,
); err != nil {
return EnrollmentResponse{}, err
}
response.Peer = PeerView{
AssignedIP: response.Device.AssignedIP,
DNSServers: dnsServers,
AllowedIPs: allowedIPs,
Gateway: GatewayView{
ID: response.Device.GatewayID,
Name: gatewayName,
Endpoint: gatewayEndpoint,
PublicKey: gatewayPublicKey,
},
ProfileRevision: profileRevision,
}
for _, destination := range allowedIPs {
response.Resources = append(response.Resources, Resource{
Type: "cidr",
Value: destination,
Label: destination,
})
}
_ = lastIssuedAt
return response, nil
}