Skip to content

Commit

Permalink
[ingester] fix v6.4.9 update v6.5.9 ckissu failed
Browse files Browse the repository at this point in the history
  • Loading branch information
lzf575 committed Jun 13, 2024
1 parent 98f0678 commit e091e09
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 47 deletions.
142 changes: 95 additions & 47 deletions server/ingester/ckissu/ckissu.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"regexp"
"strings"

logging "github.com/op/go-logging"

"database/sql"

logging "github.com/op/go-logging"

"github.com/deepflowio/deepflow/server/ingester/common"
"github.com/deepflowio/deepflow/server/ingester/config"
"github.com/deepflowio/deepflow/server/ingester/datasource"
Expand Down Expand Up @@ -105,6 +105,8 @@ type ColumnAdd struct {
ColumnName string
ColumnType ckdb.ColumnType
DefaultValue string
IsMetrics bool
AggrFunc string
}

type ColumnAdds struct {
Expand Down Expand Up @@ -142,22 +144,26 @@ type IndexAdd struct {
}

type ColumnDatasourceAdds struct {
ColumnNames []string
OldColumnNames []string
ColumnTypes []ckdb.ColumnType
OnlyMapTable, OnlyAppTable bool
DefaultValue string
ColumnNames []string
OldColumnNames []string
ColumnTypes []ckdb.ColumnType
OnlyMapTable, OnlyAppTable, OnlyNetworkTable bool
DefaultValue string
IsMetrics bool
IsSummable bool
}

type ColumnDatasourceAdd struct {
ColumnName string
OldColumnName string
ColumnType ckdb.ColumnType
OnlyMapTable, OnlyAppTable bool
DefaultValue string
ColumnName string
OldColumnName string
ColumnType ckdb.ColumnType
OnlyMapTable, OnlyAppTable, OnlyNetworkTable bool
DefaultValue string
IsMetrics bool
IsSummable bool
}

func getTables(connect *sql.DB, db, tableName string) ([]string, error) {
func getTables(connect *sql.DB, db, tablePrefix string) ([]string, error) {
sql := fmt.Sprintf("SHOW TABLES IN %s", db)
log.Infof("exec sql: %s", sql)
rows, err := connect.Query(sql)
Expand All @@ -171,16 +177,16 @@ func getTables(connect *sql.DB, db, tableName string) ([]string, error) {
if err != nil {
return nil, err
}
if strings.HasPrefix(table, tableName) ||
len(tableName) == 0 {
if strings.HasPrefix(table, tablePrefix) ||
len(tablePrefix) == 0 {
tables = append(tables, table)
}
}
return tables, nil
}

func getMvTables(connect *sql.DB, db, tableName string) ([]string, error) {
tables, err := getTables(connect, db, tableName)
func getMvTables(connect *sql.DB, db, tablePrefix string) ([]string, error) {
tables, err := getTables(connect, db, tablePrefix)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -271,11 +277,12 @@ func (i *Issu) getDatasourceInfo(connect *sql.DB, db, mvTableName string) (*Data

// 找出自定义数据源和参数
func (i *Issu) getUserDefinedDatasourceInfos(connect *sql.DB, db, tableName string) ([]*DatasourceInfo, error) {
tables, err := getTables(connect, db, tableName)
tables, err := getTables(connect, db, tableName+".")
if err != nil {
log.Info(err)
return nil, nil
}
log.Infof("get db %s prefix %s tables: %v", db, tableName, tables)

aggTables := []string{}
aggSuffix := "_agg"
Expand All @@ -285,6 +292,7 @@ func (i *Issu) getUserDefinedDatasourceInfos(connect *sql.DB, db, tableName stri
}
}

log.Infof("get agg tables: %v", aggTables)
dSInfos := []*DatasourceInfo{}
for _, name := range aggTables {
ds, err := i.getDatasourceInfo(connect, db, name+"_mv")
Expand All @@ -297,7 +305,7 @@ func (i *Issu) getUserDefinedDatasourceInfos(connect *sql.DB, db, tableName stri
return dSInfos, nil
}

func (i *Issu) addColumnDatasource(connect *sql.DB, d *DatasourceInfo, isMapTable bool, isAppTable bool) ([]*ColumnAdd, error) {
func (i *Issu) addColumnDatasource(connect *sql.DB, d *DatasourceInfo, isMapTable, isAppTable, isNetworkTable bool) ([]*ColumnAdd, error) {
// mod table agg, global
dones := []*ColumnAdd{}

Expand All @@ -308,23 +316,31 @@ func (i *Issu) addColumnDatasource(connect *sql.DB, d *DatasourceInfo, isMapTabl
}

for _, add := range columnDatasourceAdds {
version, err := i.getTableVersion(connect, d.db, d.name)
aggTable := d.name + "_agg"
version, err := i.getTableRawVersion(connect, d.db, aggTable)
if err != nil {
return dones, err
}
if version == common.CK_VERSION {
continue
}
if (add.OnlyMapTable && !isMapTable) || (add.OnlyAppTable && !isAppTable) {
if (add.OnlyMapTable && !isMapTable) || (add.OnlyAppTable && !isAppTable) || (add.OnlyNetworkTable && !isNetworkTable) {
continue
}
aggTable := d.name + "_agg"
aggrFunc := ""
if add.IsMetrics && add.IsSummable {
aggrFunc = d.summable
} else if add.IsMetrics {
aggrFunc = d.unsummable
}
addColumn := &ColumnAdd{
Db: d.db,
Table: aggTable,
ColumnName: add.ColumnName,
ColumnType: add.ColumnType,
DefaultValue: add.DefaultValue,
IsMetrics: add.IsMetrics,
AggrFunc: aggrFunc,
}
if err := i.addColumn(connect, addColumn); err != nil {
return dones, err
Expand Down Expand Up @@ -447,6 +463,9 @@ func (i *Issu) RunRenameTable(ds *datasource.DatasourceManager) error {
if err != nil {
log.Warningf("renameTablesV65 err: %s", err)
}
if len(AllTableRenames) == 0 {
return nil
}
i.tableRenames = AllTableRenames
for _, connection := range i.Connections {
oldVersion, err := i.getTableVersion(connection, "flow_log", "l4_flow_log_local")
Expand All @@ -461,7 +480,7 @@ func (i *Issu) RunRenameTable(ds *datasource.DatasourceManager) error {
return err
}
}
if err := i.renameUserDefineDatasource(connection, ds); err != nil {
if err := i.renameUserDefineDatasource(connection, ckdb.DEFAULT_ORG_ID, ds); err != nil {
log.Warning(err)
}
}
Expand Down Expand Up @@ -542,8 +561,16 @@ func (i *Issu) addColumn(connect *sql.DB, c *ColumnAdd) error {
if len(c.DefaultValue) > 0 {
defaultValue = fmt.Sprintf("default %s", c.DefaultValue)
}

columnName := c.ColumnName
columnType := c.ColumnType.String()
if c.IsMetrics && c.AggrFunc != "" {
columnName = c.ColumnName + "__agg"
columnType = fmt.Sprintf("AggregateFunction(%s, %s)", c.AggrFunc, c.ColumnType)
}
sql := fmt.Sprintf("ALTER TABLE %s.`%s` ADD COLUMN %s %s %s",
c.Db, c.Table, c.ColumnName, c.ColumnType, defaultValue)
c.Db, c.Table, columnName, columnType, defaultValue)

log.Info(sql)
_, err := connect.Exec(sql)
if err != nil {
Expand Down Expand Up @@ -664,7 +691,7 @@ func (i *Issu) renameColumn(connect *sql.DB, cr *ColumnRename) error {
log.Infof("rename column failed, will retry rename column later. err: %s", err)

if cr.DropMvTable {
mvTables, err := getMvTables(connect, cr.Db, strings.Split(cr.Table, ".")[0])
mvTables, err := getMvTables(connect, cr.Db, strings.Split(cr.Table, ".")[0]+".")
if err != nil {
log.Error(err)
return err
Expand Down Expand Up @@ -812,6 +839,14 @@ func getColumnDrops(columnDrops []*ColumnDrops) []*ColumnDrop {
}

func (i *Issu) getTableVersion(connect *sql.DB, db, table string) (string, error) {
version, err := i.getTableRawVersion(connect, db, table)
if version == "" {
version = common.CK_VERSION
}
return version, err
}

func (i *Issu) getTableRawVersion(connect *sql.DB, db, table string) (string, error) {
sql := fmt.Sprintf("SELECT comment FROM system.columns WHERE database='%s' AND table='%s' AND name='time'",
db, table)
rows, err := connect.Query(sql)
Expand All @@ -826,9 +861,6 @@ func (i *Issu) getTableVersion(connect *sql.DB, db, table string) (string, error
}
return version, nil
}
if version == "" {
version = common.CK_VERSION
}
return version, nil
}

Expand Down Expand Up @@ -1012,12 +1044,15 @@ func getColumnDatasourceAdds(columnDatasourceAddss []*ColumnDatasourceAdds) []*C
OldColumnName = columnAdds.OldColumnNames[i]
}
adds = append(adds, &ColumnDatasourceAdd{
ColumnName: name,
OldColumnName: OldColumnName,
ColumnType: columnAdds.ColumnTypes[i],
OnlyMapTable: columnAdds.OnlyMapTable,
OnlyAppTable: columnAdds.OnlyAppTable,
DefaultValue: columnAdds.DefaultValue,
ColumnName: name,
OldColumnName: OldColumnName,
ColumnType: columnAdds.ColumnTypes[i],
OnlyMapTable: columnAdds.OnlyMapTable,
OnlyAppTable: columnAdds.OnlyAppTable,
OnlyNetworkTable: columnAdds.OnlyNetworkTable,
DefaultValue: columnAdds.DefaultValue,
IsMetrics: columnAdds.IsMetrics,
IsSummable: columnAdds.IsSummable,
})
}
}
Expand Down Expand Up @@ -1051,8 +1086,9 @@ func (i *Issu) addColumns(connect *sql.DB, orgIDPrefix string) ([]*ColumnAdd, er
continue
}
for _, dsInfo := range datasourceInfos {
adds, err := i.addColumnDatasource(connect, dsInfo, strings.Contains(tableName, "_map"), strings.Contains(tableName, "application"))
adds, err := i.addColumnDatasource(connect, dsInfo, strings.Contains(tableName, "_map"), strings.Contains(tableName, "application"), strings.Contains(tableName, "network"))
if err != nil {
log.Warningf("add column datasource failed: %s", err)
return nil, nil
}
dones = append(dones, adds...)
Expand Down Expand Up @@ -1188,6 +1224,7 @@ func (i *Issu) Start() error {
return fmt.Errorf("connections is nil")
}

errCount := 0
for _, connect := range connects {
orgIDPrefixs, err := i.getOrgIDPrefixs(connect)
if err != nil {
Expand All @@ -1196,7 +1233,13 @@ func (i *Issu) Start() error {
for _, orgIDPrefix := range orgIDPrefixs {
err := i.startOrg(connect, orgIDPrefix)
if err != nil {
log.Errorf("orgIDPrefix %s run issu failed, err: %s", orgIDPrefix, err)
errCount++
err = fmt.Errorf("orgIDPrefix %s run issu failed, err: %s", orgIDPrefix, err)
log.Error(err)
// if more than 1 Org upgrade fails, or the Default Org upgrade fails, an error will be returned and restarted.
if errCount > 1 || orgIDPrefix == "" {
return err
}
}
}
}
Expand All @@ -1211,28 +1254,33 @@ func (i *Issu) Close() error {
return i.Connections.Close()
}

func (i *Issu) renameUserDefineDatasource(connect *sql.DB, ds *datasource.DatasourceManager) error {
func (i *Issu) renameUserDefineDatasource(connect *sql.DB, orgId uint16, ds *datasource.DatasourceManager) error {
for _, tableGroup := range []string{"application", "network"} {
datasourceInfos, err := i.getUserDefinedDatasourceInfos(connect, "flow_metrics", tableGroup)
if err != nil {
return err
}
for _, dsInfo := range datasourceInfos {
if err := i.renameTable(connect,
&TableRename{
OldDb: dsInfo.db,
OldTables: []string{dsInfo.name + "_agg"},
NewDb: ckdb.METRICS_DB,
NewTables: []string{fmt.Sprintf("%s.%s", dsInfo.db, dsInfo.name+"_agg")},
}); err != nil {
return err
}
log.Infof("get datasource info: %+v", dsInfo)
interval := INTERVAL_HOUR
if dsInfo.interval == ckdb.TimeFuncDay {
interval = INTERVAL_DAY
}
// drop table mv
sql := fmt.Sprintf("DROP TABLE IF EXISTS %s.`%s`", ckdb.OrgDatabasePrefix(orgId)+dsInfo.db, dsInfo.name+"_mv")
log.Info(sql)
_, err := connect.Exec(sql)
if err != nil {
return err
}
// dsInfo.name like 'application.1d' should convert to '1d'
name := dsInfo.name
names := strings.Split(dsInfo.name, ".")
if len(names) == 2 {
name = names[1]
}
//readd mvTable,localTable,gobalTable
if err := ds.Handle(ckdb.DEFAULT_ORG_ID, datasource.ADD, tableGroup, dsInfo.baseTable, dsInfo.name, dsInfo.summable, dsInfo.unsummable, interval, DEFAULT_TTL); err != nil {
if err := ds.Handle(int(orgId), datasource.ADD, tableGroup, dsInfo.baseTable, name, dsInfo.summable, dsInfo.unsummable, interval, DEFAULT_TTL); err != nil {
return err
}
}
Expand Down
10 changes: 10 additions & 0 deletions server/ingester/ckissu/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,14 @@ var ColumnDatasourceAdd65 = []*ColumnDatasourceAdds{
OnlyAppTable: false,
DefaultValue: "1",
},
{
ColumnNames: []string{"server_syn_miss", "client_ack_miss"},
OldColumnNames: []string{"", ""},
ColumnTypes: []ckdb.ColumnType{ckdb.UInt64, ckdb.UInt64},
OnlyMapTable: false,
OnlyAppTable: false,
OnlyNetworkTable: true,
IsMetrics: true,
IsSummable: true,
},
}
4 changes: 4 additions & 0 deletions server/ingester/datasource/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,10 @@ func (m *DatasourceManager) makeAggTableCreateSQL(t *ckdb.Table, db, dstTable, a
codec = fmt.Sprintf("codec(%s)", p.Codec.String())
}

if p.Name == t.TimeKey {
p.Comment = t.Version
}

if p.GroupBy {
if !stringSliceHas(orderKeys, p.Name) {
orderKeys = append(orderKeys, p.Name)
Expand Down

0 comments on commit e091e09

Please sign in to comment.