Skip to content

Commit

Permalink
Refactor to implement a custom Collector to track TunnelTime.
Browse files Browse the repository at this point in the history
  • Loading branch information
sbruens committed Mar 22, 2024
1 parent 4112238 commit 7cb66b3
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 142 deletions.
200 changes: 96 additions & 104 deletions cmd/outline-ss-server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

// How often to report the active IP key TunnelTime.
const tunnelTimeTrackerReportingInterval = 5 * time.Second
const namespace = "shadowsocks"

// `now` is stubbable for testing.
var now = time.Now

type outlineMetrics struct {
ipinfo.IPInfoMap
tunnelTimeTracker
tunnelTimeCollector

buildInfo *prometheus.GaugeVec
accessKeys prometheus.Gauge
Expand All @@ -46,9 +45,6 @@ type outlineMetrics struct {
timeToCipherMs *prometheus.HistogramVec
// TODO: Add time to first byte.

TunnelTimePerKey *prometheus.CounterVec
TunnelTimePerLocation *prometheus.CounterVec

tcpProbes *prometheus.HistogramVec
tcpOpenConnections *prometheus.CounterVec
tcpClosedConnections *prometheus.CounterVec
Expand All @@ -72,97 +68,110 @@ func toIPKey(addr net.Addr, accessKey string) (*IPKey, error) {
return &IPKey{ip, accessKey}, nil
}

type ReportTunnelTimeFunc func(IPKey, ipinfo.IPInfo, time.Duration)

// Represents the clients that are or have been active recently. They stick
// around until they are inactive, or get reported to Prometheus, whichever
// comes last.
type activeClient struct {
clientInfo ipinfo.IPInfo
connectionCount int
startTime time.Time
info ipinfo.IPInfo
connCount int // The active connection count.
startTime time.Time
connDuration time.Duration // If the client has become inactive, this holds the connection duration.
}

type IPKey struct {
ip netip.Addr
accessKey string
}

type tunnelTimeTracker struct {
type tunnelTimeCollector struct {
ipinfo.IPInfoMap
mu sync.Mutex // Protects the activeClients map.
activeClients map[IPKey]*activeClient
reportTunnelTime ReportTunnelTimeFunc
mu sync.Mutex // Protects the activeClients map.
activeClients map[IPKey]*activeClient

tunnelTimePerKey *prometheus.Desc
tunnelTimePerLocation *prometheus.Desc
}

// Reports time connected for all active clients, called at a regular interval.
func (t *tunnelTimeTracker) reportAll(now time.Time) {
if len(t.activeClients) == 0 {
logger.Debugf("No active clients. No TunnelTime activity to report.")
return
}
t.mu.Lock()
defer t.mu.Unlock()
for ipKey, c := range t.activeClients {
t.reportDuration(ipKey, c, now)
}
func (c *tunnelTimeCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.tunnelTimePerKey
ch <- c.tunnelTimePerLocation
}

// Reports time connected for a given active client.
func (t *tunnelTimeTracker) reportDuration(ipKey IPKey, c *activeClient, tNow time.Time) {
connDuration := tNow.Sub(c.startTime)
logger.Debugf("Reporting activity for key `%v`, duration: %v", ipKey.accessKey, connDuration)
t.reportTunnelTime(ipKey, c.clientInfo, connDuration)
// Reset the start time now that it's been reported.
c.startTime = tNow
// Collects time connected for all active clients.
func (c *tunnelTimeCollector) Collect(ch chan<- prometheus.Metric) {
c.mu.Lock()
defer c.mu.Unlock()
tNow := now()
for ipKey, client := range c.activeClients {
var connDuration = client.connDuration
if client.connCount > 0 {
connDuration += tNow.Sub(client.startTime)
}
logger.Debugf("Reporting activity for key `%v`, duration: %v", ipKey.accessKey, connDuration)
ch <- prometheus.MustNewConstMetric(c.tunnelTimePerKey, prometheus.CounterValue, connDuration.Seconds(), ipKey.accessKey)
ch <- prometheus.MustNewConstMetric(c.tunnelTimePerLocation, prometheus.CounterValue, connDuration.Seconds(), client.info.CountryCode.String(), asnLabel(client.info.ASN))
if client.connCount == 0 {
delete(c.activeClients, ipKey)
continue
}
// Reset the start time now that it's been reported.
client.startTime = tNow
client.connDuration = 0
}
}

// Registers a new active connection for a client [net.Addr] and access key.
func (t *tunnelTimeTracker) startConnection(ipKey IPKey) {
t.mu.Lock()
defer t.mu.Unlock()
c, exists := t.activeClients[ipKey]
func (c *tunnelTimeCollector) startConnection(ipKey IPKey) {
c.mu.Lock()
defer c.mu.Unlock()
client, exists := c.activeClients[ipKey]
if !exists {
clientInfo, _ := ipinfo.GetIPInfoFromIP(t.IPInfoMap, net.IP(ipKey.ip.AsSlice()))
// Initialize the TunnelTime for this IPKey with default value of 0:
// https://prometheus.io/docs/practices/instrumentation/#avoid-missing-metrics
t.reportTunnelTime(ipKey, clientInfo, 0)
c = &activeClient{
clientInfo: clientInfo,
startTime: now(),
}
clientInfo, _ := ipinfo.GetIPInfoFromIP(c.IPInfoMap, net.IP(ipKey.ip.AsSlice()))
client = &activeClient{info: clientInfo}
}
c.connectionCount++
t.activeClients[ipKey] = c
if client.connCount == 0 {
// This client is new or was recently stopped (before it was measured and
// deleted by the metrics reporter). Reset the start time so we are
// accurately capturing this new session.
client.startTime = now()
}
client.connCount++
c.activeClients[ipKey] = client
}

// Removes an active connection for a client [net.Addr] and access key.
func (t *tunnelTimeTracker) stopConnection(ipKey IPKey) {
t.mu.Lock()
defer t.mu.Unlock()
c, exists := t.activeClients[ipKey]
func (c *tunnelTimeCollector) stopConnection(ipKey IPKey) {
c.mu.Lock()
defer c.mu.Unlock()
client, exists := c.activeClients[ipKey]
if !exists {
logger.Warningf("Failed to find active client")
return
}
c.connectionCount--
if c.connectionCount <= 0 {
t.reportDuration(ipKey, c, now())
delete(t.activeClients, ipKey)
return
client.connCount--
if client.connCount == 0 {
client.connDuration = now().Sub(client.startTime)
}
}

func newTunnelTimeTracker(ip2info ipinfo.IPInfoMap, report ReportTunnelTimeFunc) *tunnelTimeTracker {
tracker := &tunnelTimeTracker{
IPInfoMap: ip2info,
activeClients: make(map[IPKey]*activeClient),
reportTunnelTime: report,
func newTunnelTimeTracker(ip2info ipinfo.IPInfoMap, registerer prometheus.Registerer) *tunnelTimeCollector {
c := &tunnelTimeCollector{
IPInfoMap: ip2info,
activeClients: make(map[IPKey]*activeClient),

tunnelTimePerKey: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "tunnel_time_seconds"),
"Time at least 1 connection was open for a (IP, access key) pair, per key.",
[]string{"access_key"}, nil,
),
tunnelTimePerLocation: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "tunnel_time_seconds_per_location"),
"Time at least 1 connection was open for a (IP, access key) pair, per location.",
[]string{"location", "asn"}, nil,
),
}
ticker := time.NewTicker(tunnelTimeTrackerReportingInterval)
go func() {
for t := range ticker.C {
tracker.reportAll(t)
}
}()
return tracker
registerer.MustRegister(c)
return c
}

// newPrometheusOutlineMetrics constructs a metrics object that uses
Expand All @@ -173,41 +182,41 @@ func newPrometheusOutlineMetrics(ip2info ipinfo.IPInfoMap, registerer prometheus
m := &outlineMetrics{
IPInfoMap: ip2info,
buildInfo: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "shadowsocks",
Namespace: namespace,
Name: "build_info",
Help: "Information on the outline-ss-server build",
}, []string{"version"}),
accessKeys: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "shadowsocks",
Namespace: namespace,
Name: "keys",
Help: "Count of access keys",
}),
ports: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "shadowsocks",
Namespace: namespace,
Name: "ports",
Help: "Count of open Shadowsocks ports",
}),
tcpProbes: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "shadowsocks",
Namespace: namespace,
Name: "tcp_probes",
Buckets: []float64{0, 49, 50, 51, 73, 91},
Help: "Histogram of number of bytes from client to proxy, for detecting possible probes",
}, []string{"port", "status", "error"}),
tcpOpenConnections: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "shadowsocks",
Namespace: namespace,
Subsystem: "tcp",
Name: "connections_opened",
Help: "Count of open TCP connections",
}, []string{"location", "asn"}),
tcpClosedConnections: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "shadowsocks",
Namespace: namespace,
Subsystem: "tcp",
Name: "connections_closed",
Help: "Count of closed TCP connections",
}, []string{"location", "asn", "status", "access_key"}),
tcpConnectionDurationMs: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "shadowsocks",
Namespace: namespace,
Subsystem: "tcp",
Name: "connection_duration_ms",
Help: "TCP connection duration distributions.",
Expand All @@ -220,63 +229,52 @@ func newPrometheusOutlineMetrics(ip2info ipinfo.IPInfoMap, registerer prometheus
float64(7 * 24 * time.Hour.Milliseconds()), // Week
},
}, []string{"status"}),
TunnelTimePerKey: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "shadowsocks",
Name: "tunnel_time_seconds",
Help: "Time at least 1 connection was open for a (IP, access key) pair, per key",
}, []string{"access_key"}),
TunnelTimePerLocation: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "shadowsocks",
Name: "tunnel_time_seconds_per_location",
Help: "Time at least 1 connection was open for a (IP, access key) pair, per location",
}, []string{"location", "asn"}),
dataBytes: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "shadowsocks",
Namespace: namespace,
Name: "data_bytes",
Help: "Bytes transferred by the proxy, per access key",
}, []string{"dir", "proto", "access_key"}),
dataBytesPerLocation: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "shadowsocks",
Namespace: namespace,
Name: "data_bytes_per_location",
Help: "Bytes transferred by the proxy, per location",
}, []string{"dir", "proto", "location", "asn"}),
timeToCipherMs: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "shadowsocks",
Namespace: namespace,
Name: "time_to_cipher_ms",
Help: "Time needed to find the cipher",
Buckets: []float64{0.1, 1, 10, 100, 1000},
}, []string{"proto", "found_key"}),
udpPacketsFromClientPerLocation: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "shadowsocks",
Namespace: namespace,
Subsystem: "udp",
Name: "packets_from_client_per_location",
Help: "Packets received from the client, per location and status",
}, []string{"location", "asn", "status"}),
udpAddedNatEntries: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "shadowsocks",
Namespace: namespace,
Subsystem: "udp",
Name: "nat_entries_added",
Help: "Entries added to the UDP NAT table",
}),
udpRemovedNatEntries: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "shadowsocks",
Namespace: namespace,
Subsystem: "udp",
Name: "nat_entries_removed",
Help: "Entries removed from the UDP NAT table",
}),
}
m.tunnelTimeTracker = *newTunnelTimeTracker(ip2info, m.addTunnelTime)
m.tunnelTimeCollector = *newTunnelTimeTracker(ip2info, registerer)

// TODO: Is it possible to pass where to register the collectors?
registerer.MustRegister(m.buildInfo, m.accessKeys, m.ports, m.tcpProbes, m.tcpOpenConnections, m.tcpClosedConnections, m.tcpConnectionDurationMs,
m.dataBytes, m.dataBytesPerLocation, m.timeToCipherMs, m.udpPacketsFromClientPerLocation, m.udpAddedNatEntries, m.udpRemovedNatEntries,
m.TunnelTimePerKey, m.TunnelTimePerLocation)
m.dataBytes, m.dataBytesPerLocation, m.timeToCipherMs, m.udpPacketsFromClientPerLocation, m.udpAddedNatEntries, m.udpRemovedNatEntries)
return m
}

Expand All @@ -293,16 +291,10 @@ func (m *outlineMetrics) AddOpenTCPConnection(clientInfo ipinfo.IPInfo) {
m.tcpOpenConnections.WithLabelValues(clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN)).Inc()
}

// Reports total time connected (i.e. TunnelTime), by access key and by country.
func (m *outlineMetrics) addTunnelTime(ipKey IPKey, clientInfo ipinfo.IPInfo, duration time.Duration) {
m.TunnelTimePerKey.WithLabelValues(ipKey.accessKey).Add(duration.Seconds())
m.TunnelTimePerLocation.WithLabelValues(clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN)).Add(duration.Seconds())
}

func (m *outlineMetrics) AddAuthenticatedTCPConnection(clientAddr net.Addr, accessKey string) {
ipKey, err := toIPKey(clientAddr, accessKey)
if err == nil {
m.tunnelTimeTracker.startConnection(*ipKey)
m.tunnelTimeCollector.startConnection(*ipKey)
}
}

Expand Down Expand Up @@ -334,7 +326,7 @@ func (m *outlineMetrics) AddClosedTCPConnection(clientInfo ipinfo.IPInfo, client

ipKey, err := toIPKey(clientAddr, accessKey)
if err == nil {
m.tunnelTimeTracker.stopConnection(*ipKey)
m.tunnelTimeCollector.stopConnection(*ipKey)
}
}

Expand All @@ -358,7 +350,7 @@ func (m *outlineMetrics) AddUDPNatEntry(clientAddr net.Addr, accessKey string) {

ipKey, err := toIPKey(clientAddr, accessKey)
if err == nil {
m.tunnelTimeTracker.startConnection(*ipKey)
m.tunnelTimeCollector.startConnection(*ipKey)
}
}

Expand All @@ -367,7 +359,7 @@ func (m *outlineMetrics) RemoveUDPNatEntry(clientAddr net.Addr, accessKey string

ipKey, err := toIPKey(clientAddr, accessKey)
if err == nil {
m.tunnelTimeTracker.stopConnection(*ipKey)
m.tunnelTimeCollector.stopConnection(*ipKey)
}
}

Expand Down
Loading

0 comments on commit 7cb66b3

Please sign in to comment.