diff --git a/cli/ctl/cli.go b/cli/ctl/cli.go index be42812828c..974713a557d 100644 --- a/cli/ctl/cli.go +++ b/cli/ctl/cli.go @@ -65,7 +65,7 @@ func Execute(version string) { root.AddCommand(RegisterServerCommand()) root.AddCommand(RegisterRepoCommand()) root.AddCommand(RegisterPluginCommand()) - root.AddCommand(RegisterPrometheusCacheCommand()) + root.AddCommand(RegisterPrometheusCommand()) root.AddCommand(RegisterPromQLCommand()) cmd.RegisterIngesterCommand(root) diff --git a/cli/ctl/cloud.go b/cli/ctl/cloud.go index 966bf69e215..383864aa253 100644 --- a/cli/ctl/cloud.go +++ b/cli/ctl/cloud.go @@ -32,7 +32,7 @@ func RegisterCloudCommand() *cobra.Command { Use: "cloud", Short: "debug cloud data commands", Run: func(cmd *cobra.Command, args []string) { - fmt.Printf("please run with 'info'.\n") + fmt.Printf("please run with 'info | task'.\n") }, } diff --git a/cli/ctl/prometheus_cache.go b/cli/ctl/prometheus.go similarity index 62% rename from cli/ctl/prometheus_cache.go rename to cli/ctl/prometheus.go index f926545bb89..abab164f7cb 100644 --- a/cli/ctl/prometheus_cache.go +++ b/cli/ctl/prometheus.go @@ -19,27 +19,51 @@ package ctl import ( "context" "fmt" + "net/http" + "os" + "github.com/deepflowio/deepflow/cli/ctl/common" "github.com/deepflowio/deepflow/message/controller" "github.com/spf13/cobra" ) -func RegisterPrometheusCacheCommand() *cobra.Command { +func RegisterPrometheusCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "prometheus", + Short: "prometheus operation commands", + Run: func(cmd *cobra.Command, args []string) { + fmt.Printf("please run with 'cache | cleaner'.\n") + }, + } + var t string - prometheusCmd := &cobra.Command{ - Use: "prometheus-cache", + cacheCmd := &cobra.Command{ + Use: "cache", Short: "pull prometheus cache data from deepflow-server", - Example: "deepflow-ctl prometheus-cache -t metric-name", + Example: "deepflow-ctl prometheus cache -t metric-name", Run: func(cmd *cobra.Command, args []string) { if err := prometheusCache(cmd, t); err != nil { fmt.Println(err) } }, } - prometheusCmd.Flags().StringVarP(&t, "type", "t", "all", "cache type, options: all, metric-name, label-name, "+ + cacheCmd.Flags().StringVarP(&t, "type", "t", "all", "cache type, options: all, metric-name, label-name, "+ "label-value, metric-and-app-layout, target, label, metric-label, metric-target") + cmd.AddCommand(cacheCmd) + + var expiredAt string + clearCmd := &cobra.Command{ + Use: "clear", + Short: "clear prometheus data in MySQL by deepflow-server, use with caution and not frequently!", + Example: "deepflow-ctl prometheus clear \"2006-01-02 15:04:05\"", + Run: func(cmd *cobra.Command, args []string) { + prometheusClear(cmd, expiredAt) + }, + } + clearCmd.Flags().StringVarP(&expiredAt, "expired-time", "e", "", "expired-time format: 2006-01-02 15:04:05") + cmd.AddCommand(clearCmd) - return prometheusCmd + return cmd } func prometheusCache(cmd *cobra.Command, t string) error { @@ -87,3 +111,15 @@ func prometheusCache(cmd *cobra.Command, t string) error { fmt.Printf("%s\n", resp.GetContent()) return nil } + +func prometheusClear(cmd *cobra.Command, expiredAt string) { + server := common.GetServerInfo(cmd) + url := fmt.Sprintf("http://%s:%d/v1/prometheus-cleaner-tasks/", server.IP, server.Port) + resp, err := common.CURLPerform(http.MethodPost, url, map[string]interface{}{"EXPIRED_AT": expiredAt}, "", []common.HTTPOption{common.WithTimeout(common.GetTimeout(cmd))}...) + if err != nil { + fmt.Fprintln(os.Stderr, err) + return + } + fmt.Println(resp) + return +} diff --git a/server/controller/common/controller.go b/server/controller/common/controller.go index 7cb9ab46520..7296028defe 100644 --- a/server/controller/common/controller.go +++ b/server/controller/common/controller.go @@ -26,7 +26,7 @@ import ( "github.com/deepflowio/deepflow/server/controller/db/mysql" ) -func GetCurrentController() (*mysql.Controller, error) { +func GetSelfController() (*mysql.Controller, error) { var controller *mysql.Controller err := mysql.Db.Where("ip = ?", GetNodeIP()).Find(&controller).Error return controller, err @@ -34,7 +34,7 @@ func GetCurrentController() (*mysql.Controller, error) { func GetMasterControllerHostPort() (masterIP string, httpPort, grpcPort int, err error) { var host string - curController, err := GetCurrentController() + curController, err := GetSelfController() if err != nil { return } @@ -84,3 +84,9 @@ func GetMasterControllerHostPort() (masterIP string, httpPort, grpcPort int, err } return } + +func CheckSelfAndGetMasterControllerHostPort() (ok bool, masterCtrlIP string, httpPort, grpcPort int, err error) { + curCtrlIP := GetPodIP() + masterCtrlIP, httpPort, grpcPort, err = GetMasterControllerHostPort() + return curCtrlIP == masterCtrlIP, masterCtrlIP, httpPort, grpcPort, err +} diff --git a/server/controller/http/router/health.go b/server/controller/http/router/health.go index b3aecf00103..1531b2bbdf0 100644 --- a/server/controller/http/router/health.go +++ b/server/controller/http/router/health.go @@ -28,7 +28,7 @@ import ( servicecommon "github.com/deepflowio/deepflow/server/controller/http/service/common" ) -var log = logging.MustGetLogger("router.health") +var log = logging.MustGetLogger("router") const OK = "ok" diff --git a/server/controller/http/router/prometheus.go b/server/controller/http/router/prometheus.go new file mode 100644 index 00000000000..1cdb545b00b --- /dev/null +++ b/server/controller/http/router/prometheus.go @@ -0,0 +1,75 @@ +/** + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package router + +import ( + "fmt" + "net/http" + "time" + + "github.com/gin-gonic/gin" + "github.com/gin-gonic/gin/binding" + + "github.com/deepflowio/deepflow/server/controller/common" + routercommon "github.com/deepflowio/deepflow/server/controller/http/router/common" + "github.com/deepflowio/deepflow/server/controller/prometheus" +) + +type Prometheus struct{} + +func NewPrometheus() *Prometheus { + return &Prometheus{} +} + +func (p *Prometheus) RegisterTo(e *gin.Engine) { + e.POST("/v1/prometheus-cleaner-tasks/", createPrometheusCleanTask) +} + +func createPrometheusCleanTask(c *gin.Context) { + body := make(map[string]interface{}) + err := c.ShouldBindBodyWith(&body, binding.JSON) + log.Errorf("body: %v", body) + if err != nil { + log.Errorf("body: %v", err) + routercommon.JsonResponse(c, body, err) + return + } + + isMaster, masterCtrlIP, httpPort, _, err := common.CheckSelfAndGetMasterControllerHostPort() + if err != nil { + log.Errorf("body: %v", err) + routercommon.JsonResponse(c, body, err) + return + } + if isMaster { + expiredAt := time.Time{} + if e, ok := body["EXPIRED_AT"]; ok { + expiredAt, err = time.Parse(common.GO_BIRTHDAY, e.(string)) + if err != nil { + log.Errorf("body: %v", err) + routercommon.JsonResponse(c, body, err) + return + } + } + log.Errorf("body: %v", body) + err = prometheus.GetCleaner().Clear(expiredAt) + } else { + _, err = common.CURLPerform(http.MethodPost, fmt.Sprintf("http://%s:%d/v1/prometheus-cleaner-tasks/", masterCtrlIP, httpPort), body) + log.Errorf("body: %v", err) + } + routercommon.JsonResponse(c, body, err) +} diff --git a/server/controller/http/server.go b/server/controller/http/server.go index b10f8d85a3b..eed84b8fc0f 100644 --- a/server/controller/http/server.go +++ b/server/controller/http/server.go @@ -111,6 +111,7 @@ func (s *Server) appendRegistrant() []registrant.Registrant { router.NewVtapRepo(), router.NewPlugin(), router.NewMail(), + router.NewPrometheus(), // resource resource.NewDomain(s.controllerConfig), diff --git a/server/controller/prometheus/cleaner.go b/server/controller/prometheus/cleaner.go index 8e6bbf05a4e..6c3ea7e7d36 100644 --- a/server/controller/prometheus/cleaner.go +++ b/server/controller/prometheus/cleaner.go @@ -18,6 +18,7 @@ package prometheus import ( "context" + "errors" "sync" "time" @@ -43,7 +44,7 @@ type Cleaner struct { ctx context.Context cancel context.CancelFunc - mux sync.Mutex + canClean chan struct{} interval time.Duration encoder *encoder.Encoder @@ -52,7 +53,8 @@ type Cleaner struct { func GetCleaner() *Cleaner { cleanerOnce.Do(func() { cleaner = &Cleaner{ - encoder: encoder.GetSingleton(), + encoder: encoder.GetSingleton(), + canClean: make(chan struct{}, 1), } }) return cleaner @@ -61,6 +63,7 @@ func GetCleaner() *Cleaner { func (c *Cleaner) Init(ctx context.Context, cfg *prometheuscfg.Config) { c.ctx, c.cancel = context.WithCancel(ctx) c.interval = time.Duration(cfg.DataCleanInterval) * time.Hour + c.canClean <- struct{}{} } func (c *Cleaner) Start() error { @@ -73,7 +76,7 @@ func (c *Cleaner) Start() error { case <-c.ctx.Done(): return case <-ticker.C: - c.clean() + c.clear(time.Time{}) } } }() @@ -87,18 +90,25 @@ func (c *Cleaner) Stop() { log.Info("prometheus data cleaner stopped") } -func (c *Cleaner) Clean() { - c.clean() +func (c *Cleaner) Clear(expiredAt time.Time) error { + log.Infof("prometheus data cleaner clear by hand started") + return c.clear(expiredAt) } -func (c *Cleaner) clean() { - c.mux.Lock() - defer c.mux.Unlock() - log.Info("prometheus data cleaner clean started") - if err := c.deleteExpired(); err == nil { - c.encoder.Refresh() +func (c *Cleaner) clear(expiredAt time.Time) error { + select { + case <-c.canClean: + log.Info("prometheus data cleaner clear started") + if err := c.deleteExpired(expiredAt); err == nil { + c.encoder.Refresh() + } + log.Info("prometheus data cleaner clear completed") + c.canClean <- struct{}{} + return nil + default: + log.Info("prometheus data cleaner clear skipped") + return errors.New("cleaner is busy, try again later") } - log.Info("prometheus data cleaner clean stopped") } // total retention time is data_source retention hours (get from db) + 24 hours @@ -122,9 +132,11 @@ func (c *Cleaner) getTargetInstanceJobValues() (values []string) { return } -func (c *Cleaner) deleteExpired() error { - expiredAt := c.getExpiredTime() - log.Infof("clean expired data (synced_at < %s) started", expiredAt.Format(common.GO_BIRTHDAY)) +func (c *Cleaner) deleteExpired(expiredAt time.Time) error { + if expiredAt.IsZero() { + expiredAt = c.getExpiredTime() + } + log.Infof("clear expired data (synced_at < %s) started", expiredAt.Format(common.GO_BIRTHDAY)) err := mysql.Db.Transaction(func(tx *gorm.DB) error { if err := c.deleteExpiredMetricLabel(tx, expiredAt); err != nil { @@ -148,10 +160,10 @@ func (c *Cleaner) deleteExpired() error { return nil }) if err != nil { - log.Errorf("clean expired data failed: %v, delete operation will rollback", err) + log.Errorf("clear expired data failed: %v, delete operation will rollback", err) return err } - log.Info("clean expired data completed") + log.Info("clear expired data completed") return nil } @@ -177,12 +189,6 @@ func (c *Cleaner) deleteExpiredMetricName(tx *gorm.DB, expiredAt time.Time) erro if err := mysql.Db.Where("metric_name IN (?)", mns).Delete(&metricTargets).Error; err != nil { return err } - log.Infof("clean data %v count: %d", new(mysql.PrometheusMetricLabel), len(metricLabels)) - log.Infof("clean data detail: %v", metricLabels) // TODO change to debug log - log.Infof("clean data %v count: %d", new(mysql.PrometheusMetricAPPLabelLayout), len(appLabels)) - log.Infof("clean data detail: %v", appLabels) // TODO change to debug log - log.Infof("clean data %v count: %d", new(mysql.PrometheusMetricTarget), len(metricTargets)) - log.Infof("clean data detail: %v", metricTargets) // TODO change to debug log } return nil } @@ -201,8 +207,6 @@ func (c *Cleaner) deleteExpiredLabel(tx *gorm.DB, expiredAt time.Time) error { if err := mysql.Db.Where("label_id IN (?)", lis).Delete(&metricLabels).Error; err != nil { return err } - log.Infof("clean data %v count: %d", new(mysql.PrometheusMetricLabel), len(metricLabels)) - log.Infof("clean data detail: %v", metricLabels) // TODO change to debug log } return nil } @@ -225,10 +229,6 @@ func (c *Cleaner) deleteExpiredLabelName(tx *gorm.DB, expiredAt time.Time) error if err := mysql.Db.Where("app_label_name IN (?)", lns).Delete(&appLabels).Error; err != nil { return err } - log.Infof("clean data %v count: %d", new(mysql.PrometheusLabel), len(labels)) - log.Infof("clean data detail: %v", labels) // TODO change to debug log - log.Infof("clean data %v count: %d", new(mysql.PrometheusMetricAPPLabelLayout), len(appLabels)) - log.Infof("clean data detail: %v", appLabels) // TODO change to debug log } return nil } @@ -248,8 +248,6 @@ func (c *Cleaner) deleteExpiredLabelValue(tx *gorm.DB, expiredAt time.Time) erro if err := mysql.Db.Where("value IN (?)", lvs).Delete(&labels).Error; err != nil { return err } - log.Infof("clean data %v count: %d", new(mysql.PrometheusLabel), len(labels)) - log.Infof("clean data detail: %v", labels) // TODO change to debug log } return nil } @@ -272,12 +270,18 @@ func (c *Cleaner) deleteExpiredMetricAPPLabelLayout(tx *gorm.DB, expiredAt time. func DeleteExpired[MT any](tx *gorm.DB, expiredAt time.Time) ([]MT, error) { var items []MT - err := tx.Where("synced_at < ?", expiredAt).Delete(&items).Error - if err != nil { + if err := tx.Where("synced_at < ?", expiredAt).Find(&items).Error; err != nil { + log.Errorf("mysql delete resource failed: %v", err) + return items, err + } + if len(items) == 0 { + return items, nil + } + if err := tx.Delete(&items).Error; err != nil { log.Errorf("mysql delete resource failed: %v", err) return items, err } - log.Infof("clean data %v count: %d", new(MT), len(items)) - log.Infof("clean data detail: %v", items) // TODO change to debug log + log.Infof("clear data count: %d", len(items)) + log.Infof("clear data detail: %v", items) return items, nil }