Skip to content

Commit

Permalink
PMM-12848 Refactoring currentop_collector.
Browse files Browse the repository at this point in the history
  • Loading branch information
BupycHuk committed Aug 8, 2024
1 parent 6d70eba commit 771d0fa
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 21 deletions.
22 changes: 10 additions & 12 deletions exporter/currentop_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (d *currentopCollector) collect(ch chan<- prometheus.Metric) {
client := d.base.client
slowtime, err := time.ParseDuration(d.currentopslowtime)
if err != nil {
logger.Errorf("Failed to parse slowtime: %s", err)
ch <- prometheus.NewInvalidMetric(prometheus.NewInvalidDesc(err), err)
return
}
Expand All @@ -89,6 +90,7 @@ func (d *currentopCollector) collect(ch chan<- prometheus.Metric) {

var r primitive.M
if err := res.Decode(&r); err != nil {
logger.Errorf("Failed to decode currentOp response: %s", err)
ch <- prometheus.NewInvalidMetric(prometheus.NewInvalidDesc(err), err)
return
}
Expand All @@ -99,10 +101,16 @@ func (d *currentopCollector) collect(ch chan<- prometheus.Metric) {
inprog, ok := r["inprog"].(primitive.A)

if !ok {
logger.Errorf("Invalid type primitive.A assertion for 'inprog': %T", r["inprog"])
ch <- prometheus.NewInvalidMetric(prometheus.NewInvalidDesc(ErrInvalidOrMissingInprogEntry),
ErrInvalidOrMissingInprogEntry)
}

labels := d.topologyInfo.baseLabels()
ln := []string{"opid", "op", "desc", "database", "collection", "ns"}
const name = "mongodb_currentop_query_uptime"
pd := prometheus.NewDesc(name, " mongodb_currentop_query_uptime currentop_query", ln, labels)

for _, bsonMap := range inprog {

bsonMapElement, ok := bsonMap.(primitive.M)
Expand Down Expand Up @@ -137,18 +145,8 @@ func (d *currentopCollector) collect(ch chan<- prometheus.Metric) {
continue
}

labels := d.topologyInfo.baseLabels()
labels["opid"] = strconv.Itoa(int(opid))
labels["op"] = op
labels["desc"] = desc
labels["database"] = db
labels["collection"] = collection
labels["ns"] = namespace

m := primitive.M{"uptime": microsecs_running}
lv := []string{strconv.Itoa(int(opid)), op, desc, db, collection, namespace}

for _, metric := range makeMetrics("currentop_query", m, labels, d.compatibleMode) {
ch <- metric
}
ch <- prometheus.MustNewConstMetric(pd, prometheus.GaugeValue, float64(microsecs_running), lv...)
}
}
24 changes: 15 additions & 9 deletions exporter/currentop_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,47 @@ package exporter
import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"sync"
"testing"
"time"

"github.com/percona/mongodb_exporter/internal/tu"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"go.mongodb.org/mongo-driver/bson"

"github.com/percona/mongodb_exporter/internal/tu"
)

func TestCurrentopCollector(t *testing.T) {
// It seems like this test needs the queries to continue running so that current oplog is not empty.
// TODO: figure out how to restore this test.
t.Skip()
//t.Skip()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

var wg sync.WaitGroup

client := tu.DefaultTestClient(ctx, t)

database := client.Database("testdb")
database.Drop(ctx)
_ = database.Drop(ctx)

defer func() {
err := database.Drop(ctx)
assert.NoError(t, err)
}()
ch := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 300; i++ {
coll := fmt.Sprintf("testcol_%02d", i)
_, err := database.Collection(coll).InsertOne(ctx, bson.M{"f1": 1, "f2": "2"})
coll := fmt.Sprintf("testcol_01")
for j := 0; j < 100; j++ {
_, err := database.Collection(coll).InsertOne(ctx, bson.M{"f1": j, "f2": "2"})
assert.NoError(t, err)
}
ch <- struct{}{}
_, _ = database.Collection(coll).Find(ctx, bson.M{"$where": "function() {return sleep(100)}"})
}()

ti := labelsGetterMock{}
Expand All @@ -76,6 +78,10 @@ func TestCurrentopCollector(t *testing.T) {
"mongodb_currentop_query_uptime",
}

<-ch

time.Sleep(1 * time.Second)

count := testutil.CollectAndCount(c, filter...)
assert.True(t, count > 0)
wg.Wait()
Expand Down

0 comments on commit 771d0fa

Please sign in to comment.