From 6315088449518cce4732ca0f2e8c13dc930d3837 Mon Sep 17 00:00:00 2001 From: akvlad Date: Tue, 6 Sep 2022 17:01:47 +0000 Subject: [PATCH] failed metrics debug --- package.json | 3 +- .../app_janus/filter_app_janus_tracer.js | 77 ++++++++++------- plugins/filters/app_janus/histogram.js | 83 +++++++++++++++++++ plugins/filters/app_janus/prometheus.js | 27 ++++++ 4 files changed, 160 insertions(+), 30 deletions(-) create mode 100644 plugins/filters/app_janus/histogram.js create mode 100644 plugins/filters/app_janus/prometheus.js diff --git a/package.json b/package.json index b1b0c61c..d88ab4a1 100644 --- a/package.json +++ b/package.json @@ -59,7 +59,8 @@ "lru-cache": "4.1.x", "mkdirp": "0.5.1", "optimist": "0.6.1", - "requireg": "^0.2.1" + "requireg": "^0.2.1", + "prom-client": "^14.1.0" }, "optionalDependencies": { "amqplib": "0.5.1", diff --git a/plugins/filters/app_janus/filter_app_janus_tracer.js b/plugins/filters/app_janus/filter_app_janus_tracer.js index e3701529..cfc91d23 100644 --- a/plugins/filters/app_janus/filter_app_janus_tracer.js +++ b/plugins/filters/app_janus/filter_app_janus_tracer.js @@ -77,6 +77,21 @@ FilterAppJanusTracer.prototype.start = async function (callback) { this.ctx.init() logger.info('Initialized App Janus Span + Metrics Tracer'); sender.init(this) + + this.histogram = new (require('./prometheus').client.Histogram)({ + name: 'real_rtt', + help: 'metric_help', + buckets: [10, 200, 400, 700, 1500], + labelNames: ['emitter', 'server', 'client'] + }); //new histogram([10, 200, 400, 700, 1500], 'real_rtt'); + require('./prometheus').registry.registerMetric(this.histogram); + require('./prometheus').emitter.on('data', data => { + if (!data.streams.length) { + return + } + sender.sendMetrics(data); + }); + callback(); }; @@ -152,7 +167,7 @@ function ContextManager (self, tracerName, lru) { line.event line.event.name -> created line.event.transport - line.event.transport.id + line.event.transport?.id || 'undef' */ event = { eventName: line.event.name, @@ -180,7 +195,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } this.sessionMap.set(session.session_id, { ...session }) // logger.info('PJU -- Session event:', sessionSpan, session) @@ -197,7 +212,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } const destroySpan = this.startSpan( @@ -255,7 +270,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } const handleSpan = this.startSpan( @@ -299,7 +314,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } const detachedSpan = this.startSpan( @@ -352,7 +367,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } const extSpan = this.startSpan( @@ -406,7 +421,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } const sdpSpan = this.startSpan( @@ -435,7 +450,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } const sdpSpan = this.startSpan( @@ -492,7 +507,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } const iceSpan = this.startSpan( @@ -519,7 +534,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } const conIceSpan = this.startSpan( @@ -545,7 +560,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } const conIceSpan = this.startSpan( @@ -571,7 +586,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } const readySpan = this.startSpan( @@ -610,7 +625,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } const candidateSpan = this.startSpan( @@ -646,7 +661,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } const candidateSpan = this.startSpan( @@ -682,7 +697,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } const candidateSpan = this.startSpan( @@ -722,7 +737,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } const trySpan = this.startSpan( @@ -751,7 +766,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } const conSpan = this.startSpan( @@ -788,7 +803,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } const conSpan = this.startSpan( @@ -886,7 +901,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } let mediaSpan = this.startSpan( @@ -931,7 +946,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undefined' } } let mediaSpan = this.startSpan( @@ -1077,7 +1092,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } let joinSpan = this.startSpan( @@ -1123,7 +1138,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } let confSpan = this.startSpan( @@ -1156,7 +1171,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } let pubSpan = this.startSpan( @@ -1196,7 +1211,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } let subSpan = this.startSpan( @@ -1229,7 +1244,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } try { @@ -1258,7 +1273,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } let upSpan = this.startSpan( @@ -1291,7 +1306,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } let unpubSpan = this.startSpan( @@ -1333,7 +1348,7 @@ function ContextManager (self, tracerName, lru) { sessionSpanId: sessionSpan.id, sessionSpan: sessionSpan, status: 'Open', - transportId: line.event.transport.id + transportId: line.event.transport?.id || 'undef' } } try { @@ -1528,7 +1543,11 @@ function ContextManager (self, tracerName, lru) { below with <= upperBound */ - const rtt = event.metrics["rtt"] || 0 + const rtt = parseInt(event.metrics["rtt"] || 0) + + if (!isNaN(rtt)) { + self.histogram.labels(event.emitter, event.emitter, 'rtt').observe(rtt) + } if (rtt <= 10) { mediaMetrics.streams.push({ diff --git a/plugins/filters/app_janus/histogram.js b/plugins/filters/app_janus/histogram.js new file mode 100644 index 00000000..b4e7d7f4 --- /dev/null +++ b/plugins/filters/app_janus/histogram.js @@ -0,0 +1,83 @@ +const {EventEmitter} = require('events') + +module.exports = class extends EventEmitter { + /** + * + * @param classes {number[]} + * @param metric {string} + */ + constructor(classes, metric) { + super(); + classes.sort((a,b) => a-b); + this.metric = metric; + this.classes = classes; + this.streams = {}; + const self = this; + setInterval(() => { + Object.values(self.streams).forEach(stream => { + const now = `${Date.now() * 1e6}` + stream.buckets.forEach(b => b.values[0][0] = now) + stream.buckets_strict.forEach(b => b.values[0][0] = now) + stream.sum.values[0][0] = now + stream.count.values[0][0] = now + const res = {streams: [ + ...stream.buckets, + ...stream.buckets_strict, + stream.count, + stream.sum + ]} + self.emit('data', res); + }); + self.streams = {}; + }, 15000); + } + + /** + * + * @param tags {Object} + */ + get(tags) { + const strTags = JSON.stringify(tags); + if (this.streams[strTags]) { + return this.streams[strTags]; + } + const _classes = [...this.classes, '+Inf'] + this.streams[strTags] = { + buckets: _classes.map(cls => ({ + stream: {...tags, __name__: this.metric + '_bucket', le: cls.toString()}, + values: [['0', '', 0]]})), + buckets_strict: _classes.map((cls, i) => ({ + stream: { + ...tags, + __name__: this.metric + '_bucket_strict', + le: cls.toString(), + gt: i === 0 ? '0' : _classes[i-1].toString() + }, + values: [['0', '', 0]]})), + count: {stream: {...tags, __name__: this.metric + '_count'}, + values: [['0', '', 0]]}, + sum: {stream: {...tags, __name__: this.metric + '_sum'}, + values: [['0', '', 0]]} + } + return this.streams[strTags]; + } + + /** + * @param tags {Object} + * @param val {number} + */ + put(tags, val) { + const stream = this.get(tags) + stream.sum.values[0][2] += val; + stream.count.values[0][2] ++; + let i = 0; + while (i < this.classes.length && val > this.classes[i]) { + i++; + } + stream.buckets_strict[i].values[0][2]++; + while (i < stream.buckets.length) { + stream.buckets[i].values[0][2]++; + i++ + } + } +} diff --git a/plugins/filters/app_janus/prometheus.js b/plugins/filters/app_janus/prometheus.js new file mode 100644 index 00000000..978877cd --- /dev/null +++ b/plugins/filters/app_janus/prometheus.js @@ -0,0 +1,27 @@ +const client = require('prom-client'); +const Registry = client.Registry; +const registry = new Registry(); +const {EventEmitter} = require('events') + +const emitter = new EventEmitter() + +setInterval(async () => { + const metrics = await Promise.all(registry.getMetricsAsArray().map(descr => descr.get())); + metrics.forEach(metric => { + const streams = metric.values.map(value => ({ + stream: { + ...Object.fromEntries(Object.entries(value.labels).map(e => [e[0], e[1].toString()])), + __name__: value.metricName || metric.name + }, + values: [[`${Date.now()}000000`, '', value.value]] + })); + emitter.emit('data', {streams: streams}) + }); + registry.resetMetrics(); +}, 15000); + +module.exports = { + client, + registry, + emitter +} \ No newline at end of file