Skip to content

Commit

Permalink
feat: supports non-real-time deletion of ck of deleted orgs
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhengYa-0110 committed Aug 7, 2024
1 parent 1213e01 commit eddbb02
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 10 deletions.
2 changes: 0 additions & 2 deletions server/controller/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,4 @@ type GlobalConfig struct {
HTTPNodePort int
GRPCPort int
GRPCNodePort int

MySQLResultSetMax int
}
4 changes: 4 additions & 0 deletions server/controller/controller/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/deepflowio/deepflow/server/controller/db/mysql/migrator"
"github.com/deepflowio/deepflow/server/controller/election"
"github.com/deepflowio/deepflow/server/controller/http"
"github.com/deepflowio/deepflow/server/controller/http/service"
resoureservice "github.com/deepflowio/deepflow/server/controller/http/service/resource"
"github.com/deepflowio/deepflow/server/controller/monitor"
"github.com/deepflowio/deepflow/server/controller/monitor/license"
Expand Down Expand Up @@ -102,6 +103,7 @@ func checkAndStartMasterFunctions(
tagRecorder := tagrecorder.GetSingleton()
tagrecordercheck.GetSingleton().Init(ctx, *cfg)
tr := tagrecordercheck.GetSingleton()
deletedORGChecker := service.GetDeletedORGChecker(ctx)

httpService := http.GetSingleton()

Expand Down Expand Up @@ -165,6 +167,7 @@ func checkAndStartMasterFunctions(

if cfg.DFWebService.Enabled {
httpService.TaskManager.Start(sCtx, cfg.FPermit, cfg.RedisCfg)
deletedORGChecker.Start(sCtx)
}
} else if thisIsMasterController {
thisIsMasterController = false
Expand All @@ -179,6 +182,7 @@ func checkAndStartMasterFunctions(
// stop prometheus related
// stop http task mananger
// stop resource cleaner
// stop delete org checker
if sCancel != nil {
sCancel()
}
Expand Down
20 changes: 16 additions & 4 deletions server/controller/db/mysql/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,10 +502,22 @@ func (AlarmPolicy) TableName() string {
return "alarm_policy"
}

type Org struct {
ID int `gorm:"primaryKey;column:id;type:int;not null" json:"ID"`
Name string `gorm:"column:name;type:char(128);default:''" json:"NAME"`
ORGID int `gorm:"column:org_id;type:int;default:0" json:"ORG_ID"`
type ORG struct {
ID int `gorm:"primaryKey;column:id;type:int;not null" json:"ID"`
Name string `gorm:"column:name;type:char(128);default:''" json:"NAME"`
ORGID int `gorm:"column:org_id;type:int;default:0" json:"ORG_ID"`
Lcuuid string `gorm:"column:lcuuid;type:char(64);not null" json:"LCUUID"`
OwnerUserID int `gorm:"column:owner_user_id;type:int;default:0" json:"OWNER_USER_ID"`
}

type DeletedORG struct {
ID int `gorm:"primaryKey;column:id;type:int;not null" json:"ID"`
Name string `gorm:"column:name;type:char(128);default:''" json:"NAME"`
ORGID int `gorm:"column:org_id;type:int;default:0" json:"ORG_ID"`
Lcuuid string `gorm:"column:lcuuid;type:char(64);not null" json:"LCUUID"`
OwnerUserID int `gorm:"column:owner_user_id;type:int;default:0" json:"OWNER_USER_ID"`
CreatedAt time.Time `gorm:"autoCreateTime;column:created_at;type:datetime" json:"CREATED_AT" mapstructure:"CREATED_AT"`
UpdatedAt time.Time `gorm:"autoUpdateTime;column:updated_at;type:datetime" json:"UPDATED_AT" mapstructure:"UPDATED_AT"`
}

type Team struct {
Expand Down
2 changes: 1 addition & 1 deletion server/controller/db/mysql/org.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func GetNonDefaultORGIDs() ([]int, error) {
return ids, nil
}

var orgs []*Org
var orgs []*ORG
if err := DefaultDB.Where("org_id != ?", common.DEFAULT_ORG_ID).Find(&orgs).Error; err != nil {
log.Errorf("failed to get org ids: %v", err.Error())
return ids, err
Expand Down
24 changes: 22 additions & 2 deletions server/controller/http/router/org_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ func NewDatabase(cfg *config.ControllerConfig) *ORGData {
}

func (d *ORGData) RegisterTo(e *gin.Engine) {
e.POST("/v1/org/", d.Create)
e.DELETE("/v1/org/:id/", d.Delete)
e.GET("/v1/orgs/", d.Get)
e.POST("/v1/org/", d.Create)
e.DELETE("/v1/org/:id/", d.Delete) // provide for real-time call when deleting an organization
e.DELETE("/v1/org/", d.DeleteNonRealTime) // provide for non-real-time call from master controller after deleting an organization
}

func (d *ORGData) Create(c *gin.Context) {
Expand All @@ -71,6 +72,25 @@ func (d *ORGData) Delete(c *gin.Context) {
common.JsonResponse(c, nil, err)
}

func (d *ORGData) DeleteNonRealTime(c *gin.Context) {
orgIDs, ok := c.GetQueryArray("org_id")
if !ok {
common.BadRequestResponse(c, httpcommon.INVALID_POST_DATA, "org_id is required")
return
}
ints := make([]int, 0, len(orgIDs))
for _, id := range orgIDs {
i, err := strconv.Atoi(id)
if err != nil {
common.BadRequestResponse(c, httpcommon.INVALID_POST_DATA, err.Error())
return
}
ints = append(ints, i)
}
err := service.DeleteORGDataNonRealTime(ints)
common.JsonResponse(c, nil, err)
}

func (d *ORGData) Get(c *gin.Context) {
data, err := service.GetORGData(d.cfg)
common.JsonResponse(c, data, err)
Expand Down
120 changes: 119 additions & 1 deletion server/controller/http/service/org_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
package service

import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"time"

"github.com/bitly/go-simplejson"
servercommon "github.com/deepflowio/deepflow/server/common"
Expand Down Expand Up @@ -78,7 +81,31 @@ func DeleteORGData(orgID int, mysqlCfg mysqlcfg.MySqlConfig) (err error) {
if err = migrator.DropDatabase(cfg); err != nil {
return err
}
return servercommon.DropOrg(uint16(orgID))
// copy deleted org info to deleted_org table which is used for deleting clickhouse org data asynchronously
var org *mysql.ORG
if err = mysql.DefaultDB.Where("org_id = ?", orgID).First(&org).Error; err != nil {
return err
}
deletedORG := &mysql.DeletedORG{
ORGID: org.ORGID,
Name: org.Name,
Lcuuid: org.Lcuuid,
OwnerUserID: org.OwnerUserID,
}
if err = mysql.DefaultDB.Create(deletedORG).Error; err != nil {
return err
}
return nil
}

func DeleteORGDataNonRealTime(orgIDs []int) error {
var res error
for _, id := range orgIDs {
if err := servercommon.DropOrg(uint16(id)); err != nil {
res = err
}
}
return res
}

func GetORGData(cfg *config.ControllerConfig) (*simplejson.Json, error) {
Expand Down Expand Up @@ -126,3 +153,94 @@ func GetORGData(cfg *config.ControllerConfig) (*simplejson.Json, error) {
response := orgResponse.Get("DATA")
return response, err
}

type DeletedORGChecker struct {
ctx context.Context
cancel context.CancelFunc
}

func GetDeletedORGChecker(ctx context.Context) *DeletedORGChecker {
cCtx, cCancel := context.WithCancel(ctx)
return &DeletedORGChecker{ctx: cCtx, cancel: cCancel}
}

func (c *DeletedORGChecker) Start(sCtx context.Context) {
log.Info("deleted org check started")
c.checkRegularly(sCtx)
}

func (c *DeletedORGChecker) Stop() {
if c.cancel != nil {
c.cancel()
}
log.Info("deleted org check stopped")
}

func (c *DeletedORGChecker) checkRegularly(sCtx context.Context) {
go func() {
ticker := time.NewTicker(time.Duration(5) * time.Minute)
defer ticker.Stop()
LOOP:
for {
select {
case <-ticker.C:
c.check()
case <-sCtx.Done():
break LOOP
case <-c.ctx.Done():
break LOOP
}
}
}()
}

func (c *DeletedORGChecker) check() {
log.Infof("check deleted orgs start")
defer log.Infof("check deleted orgs end")

var deletedORGs []*mysql.DeletedORG
if err := mysql.DefaultDB.Find(&deletedORGs).Error; err != nil {
log.Errorf("failed to get deleted orgs: %s", err.Error(), mysql.DefaultDB.LogPrefixORGID)
return
}
if len(deletedORGs) == 0 {
return
}
if err := c.triggerAllServersToDelete(deletedORGs); err != nil {
log.Errorf("failed to trigger all servers to delete orgs: %s", err.Error())
return
}
if err := mysql.DefaultDB.Delete(&deletedORGs).Error; err != nil {
log.Errorf("failed to delete deleted orgs: %s", err.Error(), mysql.DefaultDB.LogPrefixORGID)
}
return
}

func (c *DeletedORGChecker) triggerAllServersToDelete(deletedORGs []*mysql.DeletedORG) error {
query := ""
for i, org := range deletedORGs {
if i == 0 {
query += fmt.Sprintf("org_id=%d", org.ORGID)
} else {
query += fmt.Sprintf("&org_id=%d", org.ORGID)
}
}
var controllers []*mysql.Controller
if err := mysql.DefaultDB.Find(&controllers).Error; err != nil {
log.Errorf("failed to get controllers: %s", err.Error(), mysql.DefaultDB.LogPrefixORGID)
return err
}
var res error
for _, controller := range controllers {
_, err := controllerCommon.CURLPerform(
"DELETE",
fmt.Sprintf("http://%s/v1/org/?%s", net.JoinHostPort(controller.IP, fmt.Sprintf("%d", controllerCommon.GConfig.HTTPNodePort)), query),
nil,
)
if err != nil {
log.Errorf("failed to call controller %s: %s", controller.IP, err.Error(), mysql.DefaultDB.LogPrefixORGID)
res = err
}
}
return res
}

0 comments on commit eddbb02

Please sign in to comment.