Skip to content

Commit

Permalink
[PROMETHEUS] supports clean api and cli
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengya authored and SongZhen0704 committed Jan 17, 2024
1 parent d8952b4 commit 4ada00b
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 46 deletions.
2 changes: 1 addition & 1 deletion cli/ctl/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func Execute(version string) {
root.AddCommand(RegisterServerCommand())
root.AddCommand(RegisterRepoCommand())
root.AddCommand(RegisterPluginCommand())
root.AddCommand(RegisterPrometheusCacheCommand())
root.AddCommand(RegisterPrometheusCommand())
root.AddCommand(RegisterPromQLCommand())

cmd.RegisterIngesterCommand(root)
Expand Down
2 changes: 1 addition & 1 deletion cli/ctl/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func RegisterCloudCommand() *cobra.Command {
Use: "cloud",
Short: "debug cloud data commands",
Run: func(cmd *cobra.Command, args []string) {
fmt.Printf("please run with 'info'.\n")
fmt.Printf("please run with 'info | task'.\n")
},
}

Expand Down
48 changes: 42 additions & 6 deletions cli/ctl/prometheus_cache.go → cli/ctl/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,51 @@ package ctl
import (
"context"
"fmt"
"net/http"
"os"

"github.com/deepflowio/deepflow/cli/ctl/common"
"github.com/deepflowio/deepflow/message/controller"
"github.com/spf13/cobra"
)

func RegisterPrometheusCacheCommand() *cobra.Command {
func RegisterPrometheusCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "prometheus",
Short: "prometheus operation commands",
Run: func(cmd *cobra.Command, args []string) {
fmt.Printf("please run with 'cache | cleaner'.\n")
},
}

var t string
prometheusCmd := &cobra.Command{
Use: "prometheus-cache",
cacheCmd := &cobra.Command{
Use: "cache",
Short: "pull prometheus cache data from deepflow-server",
Example: "deepflow-ctl prometheus-cache -t metric-name",
Example: "deepflow-ctl prometheus cache -t metric-name",
Run: func(cmd *cobra.Command, args []string) {
if err := prometheusCache(cmd, t); err != nil {
fmt.Println(err)
}
},
}
prometheusCmd.Flags().StringVarP(&t, "type", "t", "all", "cache type, options: all, metric-name, label-name, "+
cacheCmd.Flags().StringVarP(&t, "type", "t", "all", "cache type, options: all, metric-name, label-name, "+
"label-value, metric-and-app-layout, target, label, metric-label, metric-target")
cmd.AddCommand(cacheCmd)

var expiredAt string
clearCmd := &cobra.Command{
Use: "clear",
Short: "clear prometheus data in MySQL by deepflow-server, use with caution and not frequently!",
Example: "deepflow-ctl prometheus clear \"2006-01-02 15:04:05\"",
Run: func(cmd *cobra.Command, args []string) {
prometheusClear(cmd, expiredAt)
},
}
clearCmd.Flags().StringVarP(&expiredAt, "expired-time", "e", "", "expired-time format: 2006-01-02 15:04:05")
cmd.AddCommand(clearCmd)

return prometheusCmd
return cmd
}

func prometheusCache(cmd *cobra.Command, t string) error {
Expand Down Expand Up @@ -87,3 +111,15 @@ func prometheusCache(cmd *cobra.Command, t string) error {
fmt.Printf("%s\n", resp.GetContent())
return nil
}

func prometheusClear(cmd *cobra.Command, expiredAt string) {
server := common.GetServerInfo(cmd)
url := fmt.Sprintf("http://%s:%d/v1/prometheus-cleaner-tasks/", server.IP, server.Port)
resp, err := common.CURLPerform(http.MethodPost, url, map[string]interface{}{"EXPIRED_AT": expiredAt}, "", []common.HTTPOption{common.WithTimeout(common.GetTimeout(cmd))}...)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
fmt.Println(resp)
return
}
10 changes: 8 additions & 2 deletions server/controller/common/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ import (
"github.com/deepflowio/deepflow/server/controller/db/mysql"
)

func GetCurrentController() (*mysql.Controller, error) {
func GetSelfController() (*mysql.Controller, error) {
var controller *mysql.Controller
err := mysql.Db.Where("ip = ?", GetNodeIP()).Find(&controller).Error
return controller, err
}

func GetMasterControllerHostPort() (masterIP string, httpPort, grpcPort int, err error) {
var host string
curController, err := GetCurrentController()
curController, err := GetSelfController()
if err != nil {
return
}
Expand Down Expand Up @@ -84,3 +84,9 @@ func GetMasterControllerHostPort() (masterIP string, httpPort, grpcPort int, err
}
return
}

func CheckSelfAndGetMasterControllerHostPort() (ok bool, masterCtrlIP string, httpPort, grpcPort int, err error) {
curCtrlIP := GetPodIP()
masterCtrlIP, httpPort, grpcPort, err = GetMasterControllerHostPort()
return curCtrlIP == masterCtrlIP, masterCtrlIP, httpPort, grpcPort, err
}
2 changes: 1 addition & 1 deletion server/controller/http/router/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
servicecommon "github.com/deepflowio/deepflow/server/controller/http/service/common"
)

var log = logging.MustGetLogger("router.health")
var log = logging.MustGetLogger("router")

const OK = "ok"

Expand Down
75 changes: 75 additions & 0 deletions server/controller/http/router/prometheus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Copyright (c) 2024 Yunshan Networks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package router

import (
"fmt"
"net/http"
"time"

"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"

"github.com/deepflowio/deepflow/server/controller/common"
routercommon "github.com/deepflowio/deepflow/server/controller/http/router/common"
"github.com/deepflowio/deepflow/server/controller/prometheus"
)

type Prometheus struct{}

func NewPrometheus() *Prometheus {
return &Prometheus{}
}

func (p *Prometheus) RegisterTo(e *gin.Engine) {
e.POST("/v1/prometheus-cleaner-tasks/", createPrometheusCleanTask)
}

func createPrometheusCleanTask(c *gin.Context) {
body := make(map[string]interface{})
err := c.ShouldBindBodyWith(&body, binding.JSON)
log.Errorf("body: %v", body)
if err != nil {
log.Errorf("body: %v", err)
routercommon.JsonResponse(c, body, err)
return
}

isMaster, masterCtrlIP, httpPort, _, err := common.CheckSelfAndGetMasterControllerHostPort()
if err != nil {
log.Errorf("body: %v", err)
routercommon.JsonResponse(c, body, err)
return
}
if isMaster {
expiredAt := time.Time{}
if e, ok := body["EXPIRED_AT"]; ok {
expiredAt, err = time.Parse(common.GO_BIRTHDAY, e.(string))
if err != nil {
log.Errorf("body: %v", err)
routercommon.JsonResponse(c, body, err)
return
}
}
log.Errorf("body: %v", body)
err = prometheus.GetCleaner().Clear(expiredAt)
} else {
_, err = common.CURLPerform(http.MethodPost, fmt.Sprintf("http://%s:%d/v1/prometheus-cleaner-tasks/", masterCtrlIP, httpPort), body)
log.Errorf("body: %v", err)
}
routercommon.JsonResponse(c, body, err)
}
1 change: 1 addition & 0 deletions server/controller/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (s *Server) appendRegistrant() []registrant.Registrant {
router.NewVtapRepo(),
router.NewPlugin(),
router.NewMail(),
router.NewPrometheus(),

// resource
resource.NewDomain(s.controllerConfig),
Expand Down
74 changes: 39 additions & 35 deletions server/controller/prometheus/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package prometheus

import (
"context"
"errors"
"sync"
"time"

Expand All @@ -43,7 +44,7 @@ type Cleaner struct {
ctx context.Context
cancel context.CancelFunc

mux sync.Mutex
canClean chan struct{}
interval time.Duration

encoder *encoder.Encoder
Expand All @@ -52,7 +53,8 @@ type Cleaner struct {
func GetCleaner() *Cleaner {
cleanerOnce.Do(func() {
cleaner = &Cleaner{
encoder: encoder.GetSingleton(),
encoder: encoder.GetSingleton(),
canClean: make(chan struct{}, 1),
}
})
return cleaner
Expand All @@ -61,6 +63,7 @@ func GetCleaner() *Cleaner {
func (c *Cleaner) Init(ctx context.Context, cfg *prometheuscfg.Config) {
c.ctx, c.cancel = context.WithCancel(ctx)
c.interval = time.Duration(cfg.DataCleanInterval) * time.Hour
c.canClean <- struct{}{}
}

func (c *Cleaner) Start() error {
Expand All @@ -73,7 +76,7 @@ func (c *Cleaner) Start() error {
case <-c.ctx.Done():
return
case <-ticker.C:
c.clean()
c.clear(time.Time{})
}
}
}()
Expand All @@ -87,18 +90,25 @@ func (c *Cleaner) Stop() {
log.Info("prometheus data cleaner stopped")
}

func (c *Cleaner) Clean() {
c.clean()
func (c *Cleaner) Clear(expiredAt time.Time) error {
log.Infof("prometheus data cleaner clear by hand started")
return c.clear(expiredAt)
}

func (c *Cleaner) clean() {
c.mux.Lock()
defer c.mux.Unlock()
log.Info("prometheus data cleaner clean started")
if err := c.deleteExpired(); err == nil {
c.encoder.Refresh()
func (c *Cleaner) clear(expiredAt time.Time) error {
select {
case <-c.canClean:
log.Info("prometheus data cleaner clear started")
if err := c.deleteExpired(expiredAt); err == nil {
c.encoder.Refresh()
}
log.Info("prometheus data cleaner clear completed")
c.canClean <- struct{}{}
return nil
default:
log.Info("prometheus data cleaner clear skipped")
return errors.New("cleaner is busy, try again later")
}
log.Info("prometheus data cleaner clean stopped")
}

// total retention time is data_source retention hours (get from db) + 24 hours
Expand All @@ -122,9 +132,11 @@ func (c *Cleaner) getTargetInstanceJobValues() (values []string) {
return
}

func (c *Cleaner) deleteExpired() error {
expiredAt := c.getExpiredTime()
log.Infof("clean expired data (synced_at < %s) started", expiredAt.Format(common.GO_BIRTHDAY))
func (c *Cleaner) deleteExpired(expiredAt time.Time) error {
if expiredAt.IsZero() {
expiredAt = c.getExpiredTime()
}
log.Infof("clear expired data (synced_at < %s) started", expiredAt.Format(common.GO_BIRTHDAY))

err := mysql.Db.Transaction(func(tx *gorm.DB) error {
if err := c.deleteExpiredMetricLabel(tx, expiredAt); err != nil {
Expand All @@ -148,10 +160,10 @@ func (c *Cleaner) deleteExpired() error {
return nil
})
if err != nil {
log.Errorf("clean expired data failed: %v, delete operation will rollback", err)
log.Errorf("clear expired data failed: %v, delete operation will rollback", err)
return err
}
log.Info("clean expired data completed")
log.Info("clear expired data completed")
return nil
}

Expand All @@ -177,12 +189,6 @@ func (c *Cleaner) deleteExpiredMetricName(tx *gorm.DB, expiredAt time.Time) erro
if err := mysql.Db.Where("metric_name IN (?)", mns).Delete(&metricTargets).Error; err != nil {
return err
}
log.Infof("clean data %v count: %d", new(mysql.PrometheusMetricLabel), len(metricLabels))
log.Infof("clean data detail: %v", metricLabels) // TODO change to debug log
log.Infof("clean data %v count: %d", new(mysql.PrometheusMetricAPPLabelLayout), len(appLabels))
log.Infof("clean data detail: %v", appLabels) // TODO change to debug log
log.Infof("clean data %v count: %d", new(mysql.PrometheusMetricTarget), len(metricTargets))
log.Infof("clean data detail: %v", metricTargets) // TODO change to debug log
}
return nil
}
Expand All @@ -201,8 +207,6 @@ func (c *Cleaner) deleteExpiredLabel(tx *gorm.DB, expiredAt time.Time) error {
if err := mysql.Db.Where("label_id IN (?)", lis).Delete(&metricLabels).Error; err != nil {
return err
}
log.Infof("clean data %v count: %d", new(mysql.PrometheusMetricLabel), len(metricLabels))
log.Infof("clean data detail: %v", metricLabels) // TODO change to debug log
}
return nil
}
Expand All @@ -225,10 +229,6 @@ func (c *Cleaner) deleteExpiredLabelName(tx *gorm.DB, expiredAt time.Time) error
if err := mysql.Db.Where("app_label_name IN (?)", lns).Delete(&appLabels).Error; err != nil {
return err
}
log.Infof("clean data %v count: %d", new(mysql.PrometheusLabel), len(labels))
log.Infof("clean data detail: %v", labels) // TODO change to debug log
log.Infof("clean data %v count: %d", new(mysql.PrometheusMetricAPPLabelLayout), len(appLabels))
log.Infof("clean data detail: %v", appLabels) // TODO change to debug log
}
return nil
}
Expand All @@ -248,8 +248,6 @@ func (c *Cleaner) deleteExpiredLabelValue(tx *gorm.DB, expiredAt time.Time) erro
if err := mysql.Db.Where("value IN (?)", lvs).Delete(&labels).Error; err != nil {
return err
}
log.Infof("clean data %v count: %d", new(mysql.PrometheusLabel), len(labels))
log.Infof("clean data detail: %v", labels) // TODO change to debug log
}
return nil
}
Expand All @@ -272,12 +270,18 @@ func (c *Cleaner) deleteExpiredMetricAPPLabelLayout(tx *gorm.DB, expiredAt time.

func DeleteExpired[MT any](tx *gorm.DB, expiredAt time.Time) ([]MT, error) {
var items []MT
err := tx.Where("synced_at < ?", expiredAt).Delete(&items).Error
if err != nil {
if err := tx.Where("synced_at < ?", expiredAt).Find(&items).Error; err != nil {
log.Errorf("mysql delete resource failed: %v", err)
return items, err
}
if len(items) == 0 {
return items, nil
}
if err := tx.Delete(&items).Error; err != nil {
log.Errorf("mysql delete resource failed: %v", err)
return items, err
}
log.Infof("clean data %v count: %d", new(MT), len(items))
log.Infof("clean data detail: %v", items) // TODO change to debug log
log.Infof("clear data count: %d", len(items))
log.Infof("clear data detail: %v", items)
return items, nil
}

0 comments on commit 4ada00b

Please sign in to comment.