From e091e09fde13e39a45776f8212eecd306ec44016 Mon Sep 17 00:00:00 2001 From: zhuofeng Date: Wed, 12 Jun 2024 19:49:08 +0800 Subject: [PATCH] [ingester] fix v6.4.9 update v6.5.9 ckissu failed --- server/ingester/ckissu/ckissu.go | 142 ++++++++++++++++++--------- server/ingester/ckissu/updates.go | 10 ++ server/ingester/datasource/handle.go | 4 + 3 files changed, 109 insertions(+), 47 deletions(-) diff --git a/server/ingester/ckissu/ckissu.go b/server/ingester/ckissu/ckissu.go index 6bc802b2a64..d15927e7cec 100644 --- a/server/ingester/ckissu/ckissu.go +++ b/server/ingester/ckissu/ckissu.go @@ -21,10 +21,10 @@ import ( "regexp" "strings" - logging "github.com/op/go-logging" - "database/sql" + logging "github.com/op/go-logging" + "github.com/deepflowio/deepflow/server/ingester/common" "github.com/deepflowio/deepflow/server/ingester/config" "github.com/deepflowio/deepflow/server/ingester/datasource" @@ -105,6 +105,8 @@ type ColumnAdd struct { ColumnName string ColumnType ckdb.ColumnType DefaultValue string + IsMetrics bool + AggrFunc string } type ColumnAdds struct { @@ -142,22 +144,26 @@ type IndexAdd struct { } type ColumnDatasourceAdds struct { - ColumnNames []string - OldColumnNames []string - ColumnTypes []ckdb.ColumnType - OnlyMapTable, OnlyAppTable bool - DefaultValue string + ColumnNames []string + OldColumnNames []string + ColumnTypes []ckdb.ColumnType + OnlyMapTable, OnlyAppTable, OnlyNetworkTable bool + DefaultValue string + IsMetrics bool + IsSummable bool } type ColumnDatasourceAdd struct { - ColumnName string - OldColumnName string - ColumnType ckdb.ColumnType - OnlyMapTable, OnlyAppTable bool - DefaultValue string + ColumnName string + OldColumnName string + ColumnType ckdb.ColumnType + OnlyMapTable, OnlyAppTable, OnlyNetworkTable bool + DefaultValue string + IsMetrics bool + IsSummable bool } -func getTables(connect *sql.DB, db, tableName string) ([]string, error) { +func getTables(connect *sql.DB, db, tablePrefix string) ([]string, error) { sql := fmt.Sprintf("SHOW TABLES IN %s", db) log.Infof("exec sql: %s", sql) rows, err := connect.Query(sql) @@ -171,16 +177,16 @@ func getTables(connect *sql.DB, db, tableName string) ([]string, error) { if err != nil { return nil, err } - if strings.HasPrefix(table, tableName) || - len(tableName) == 0 { + if strings.HasPrefix(table, tablePrefix) || + len(tablePrefix) == 0 { tables = append(tables, table) } } return tables, nil } -func getMvTables(connect *sql.DB, db, tableName string) ([]string, error) { - tables, err := getTables(connect, db, tableName) +func getMvTables(connect *sql.DB, db, tablePrefix string) ([]string, error) { + tables, err := getTables(connect, db, tablePrefix) if err != nil { return nil, err } @@ -271,11 +277,12 @@ func (i *Issu) getDatasourceInfo(connect *sql.DB, db, mvTableName string) (*Data // 找出自定义数据源和参数 func (i *Issu) getUserDefinedDatasourceInfos(connect *sql.DB, db, tableName string) ([]*DatasourceInfo, error) { - tables, err := getTables(connect, db, tableName) + tables, err := getTables(connect, db, tableName+".") if err != nil { log.Info(err) return nil, nil } + log.Infof("get db %s prefix %s tables: %v", db, tableName, tables) aggTables := []string{} aggSuffix := "_agg" @@ -285,6 +292,7 @@ func (i *Issu) getUserDefinedDatasourceInfos(connect *sql.DB, db, tableName stri } } + log.Infof("get agg tables: %v", aggTables) dSInfos := []*DatasourceInfo{} for _, name := range aggTables { ds, err := i.getDatasourceInfo(connect, db, name+"_mv") @@ -297,7 +305,7 @@ func (i *Issu) getUserDefinedDatasourceInfos(connect *sql.DB, db, tableName stri return dSInfos, nil } -func (i *Issu) addColumnDatasource(connect *sql.DB, d *DatasourceInfo, isMapTable bool, isAppTable bool) ([]*ColumnAdd, error) { +func (i *Issu) addColumnDatasource(connect *sql.DB, d *DatasourceInfo, isMapTable, isAppTable, isNetworkTable bool) ([]*ColumnAdd, error) { // mod table agg, global dones := []*ColumnAdd{} @@ -308,23 +316,31 @@ func (i *Issu) addColumnDatasource(connect *sql.DB, d *DatasourceInfo, isMapTabl } for _, add := range columnDatasourceAdds { - version, err := i.getTableVersion(connect, d.db, d.name) + aggTable := d.name + "_agg" + version, err := i.getTableRawVersion(connect, d.db, aggTable) if err != nil { return dones, err } if version == common.CK_VERSION { continue } - if (add.OnlyMapTable && !isMapTable) || (add.OnlyAppTable && !isAppTable) { + if (add.OnlyMapTable && !isMapTable) || (add.OnlyAppTable && !isAppTable) || (add.OnlyNetworkTable && !isNetworkTable) { continue } - aggTable := d.name + "_agg" + aggrFunc := "" + if add.IsMetrics && add.IsSummable { + aggrFunc = d.summable + } else if add.IsMetrics { + aggrFunc = d.unsummable + } addColumn := &ColumnAdd{ Db: d.db, Table: aggTable, ColumnName: add.ColumnName, ColumnType: add.ColumnType, DefaultValue: add.DefaultValue, + IsMetrics: add.IsMetrics, + AggrFunc: aggrFunc, } if err := i.addColumn(connect, addColumn); err != nil { return dones, err @@ -447,6 +463,9 @@ func (i *Issu) RunRenameTable(ds *datasource.DatasourceManager) error { if err != nil { log.Warningf("renameTablesV65 err: %s", err) } + if len(AllTableRenames) == 0 { + return nil + } i.tableRenames = AllTableRenames for _, connection := range i.Connections { oldVersion, err := i.getTableVersion(connection, "flow_log", "l4_flow_log_local") @@ -461,7 +480,7 @@ func (i *Issu) RunRenameTable(ds *datasource.DatasourceManager) error { return err } } - if err := i.renameUserDefineDatasource(connection, ds); err != nil { + if err := i.renameUserDefineDatasource(connection, ckdb.DEFAULT_ORG_ID, ds); err != nil { log.Warning(err) } } @@ -542,8 +561,16 @@ func (i *Issu) addColumn(connect *sql.DB, c *ColumnAdd) error { if len(c.DefaultValue) > 0 { defaultValue = fmt.Sprintf("default %s", c.DefaultValue) } + + columnName := c.ColumnName + columnType := c.ColumnType.String() + if c.IsMetrics && c.AggrFunc != "" { + columnName = c.ColumnName + "__agg" + columnType = fmt.Sprintf("AggregateFunction(%s, %s)", c.AggrFunc, c.ColumnType) + } sql := fmt.Sprintf("ALTER TABLE %s.`%s` ADD COLUMN %s %s %s", - c.Db, c.Table, c.ColumnName, c.ColumnType, defaultValue) + c.Db, c.Table, columnName, columnType, defaultValue) + log.Info(sql) _, err := connect.Exec(sql) if err != nil { @@ -664,7 +691,7 @@ func (i *Issu) renameColumn(connect *sql.DB, cr *ColumnRename) error { log.Infof("rename column failed, will retry rename column later. err: %s", err) if cr.DropMvTable { - mvTables, err := getMvTables(connect, cr.Db, strings.Split(cr.Table, ".")[0]) + mvTables, err := getMvTables(connect, cr.Db, strings.Split(cr.Table, ".")[0]+".") if err != nil { log.Error(err) return err @@ -812,6 +839,14 @@ func getColumnDrops(columnDrops []*ColumnDrops) []*ColumnDrop { } func (i *Issu) getTableVersion(connect *sql.DB, db, table string) (string, error) { + version, err := i.getTableRawVersion(connect, db, table) + if version == "" { + version = common.CK_VERSION + } + return version, err +} + +func (i *Issu) getTableRawVersion(connect *sql.DB, db, table string) (string, error) { sql := fmt.Sprintf("SELECT comment FROM system.columns WHERE database='%s' AND table='%s' AND name='time'", db, table) rows, err := connect.Query(sql) @@ -826,9 +861,6 @@ func (i *Issu) getTableVersion(connect *sql.DB, db, table string) (string, error } return version, nil } - if version == "" { - version = common.CK_VERSION - } return version, nil } @@ -1012,12 +1044,15 @@ func getColumnDatasourceAdds(columnDatasourceAddss []*ColumnDatasourceAdds) []*C OldColumnName = columnAdds.OldColumnNames[i] } adds = append(adds, &ColumnDatasourceAdd{ - ColumnName: name, - OldColumnName: OldColumnName, - ColumnType: columnAdds.ColumnTypes[i], - OnlyMapTable: columnAdds.OnlyMapTable, - OnlyAppTable: columnAdds.OnlyAppTable, - DefaultValue: columnAdds.DefaultValue, + ColumnName: name, + OldColumnName: OldColumnName, + ColumnType: columnAdds.ColumnTypes[i], + OnlyMapTable: columnAdds.OnlyMapTable, + OnlyAppTable: columnAdds.OnlyAppTable, + OnlyNetworkTable: columnAdds.OnlyNetworkTable, + DefaultValue: columnAdds.DefaultValue, + IsMetrics: columnAdds.IsMetrics, + IsSummable: columnAdds.IsSummable, }) } } @@ -1051,8 +1086,9 @@ func (i *Issu) addColumns(connect *sql.DB, orgIDPrefix string) ([]*ColumnAdd, er continue } for _, dsInfo := range datasourceInfos { - adds, err := i.addColumnDatasource(connect, dsInfo, strings.Contains(tableName, "_map"), strings.Contains(tableName, "application")) + adds, err := i.addColumnDatasource(connect, dsInfo, strings.Contains(tableName, "_map"), strings.Contains(tableName, "application"), strings.Contains(tableName, "network")) if err != nil { + log.Warningf("add column datasource failed: %s", err) return nil, nil } dones = append(dones, adds...) @@ -1188,6 +1224,7 @@ func (i *Issu) Start() error { return fmt.Errorf("connections is nil") } + errCount := 0 for _, connect := range connects { orgIDPrefixs, err := i.getOrgIDPrefixs(connect) if err != nil { @@ -1196,7 +1233,13 @@ func (i *Issu) Start() error { for _, orgIDPrefix := range orgIDPrefixs { err := i.startOrg(connect, orgIDPrefix) if err != nil { - log.Errorf("orgIDPrefix %s run issu failed, err: %s", orgIDPrefix, err) + errCount++ + err = fmt.Errorf("orgIDPrefix %s run issu failed, err: %s", orgIDPrefix, err) + log.Error(err) + // if more than 1 Org upgrade fails, or the Default Org upgrade fails, an error will be returned and restarted. + if errCount > 1 || orgIDPrefix == "" { + return err + } } } } @@ -1211,28 +1254,33 @@ func (i *Issu) Close() error { return i.Connections.Close() } -func (i *Issu) renameUserDefineDatasource(connect *sql.DB, ds *datasource.DatasourceManager) error { +func (i *Issu) renameUserDefineDatasource(connect *sql.DB, orgId uint16, ds *datasource.DatasourceManager) error { for _, tableGroup := range []string{"application", "network"} { datasourceInfos, err := i.getUserDefinedDatasourceInfos(connect, "flow_metrics", tableGroup) if err != nil { return err } for _, dsInfo := range datasourceInfos { - if err := i.renameTable(connect, - &TableRename{ - OldDb: dsInfo.db, - OldTables: []string{dsInfo.name + "_agg"}, - NewDb: ckdb.METRICS_DB, - NewTables: []string{fmt.Sprintf("%s.%s", dsInfo.db, dsInfo.name+"_agg")}, - }); err != nil { - return err - } + log.Infof("get datasource info: %+v", dsInfo) interval := INTERVAL_HOUR if dsInfo.interval == ckdb.TimeFuncDay { interval = INTERVAL_DAY } + // drop table mv + sql := fmt.Sprintf("DROP TABLE IF EXISTS %s.`%s`", ckdb.OrgDatabasePrefix(orgId)+dsInfo.db, dsInfo.name+"_mv") + log.Info(sql) + _, err := connect.Exec(sql) + if err != nil { + return err + } + // dsInfo.name like 'application.1d' should convert to '1d' + name := dsInfo.name + names := strings.Split(dsInfo.name, ".") + if len(names) == 2 { + name = names[1] + } //readd mvTable,localTable,gobalTable - if err := ds.Handle(ckdb.DEFAULT_ORG_ID, datasource.ADD, tableGroup, dsInfo.baseTable, dsInfo.name, dsInfo.summable, dsInfo.unsummable, interval, DEFAULT_TTL); err != nil { + if err := ds.Handle(int(orgId), datasource.ADD, tableGroup, dsInfo.baseTable, name, dsInfo.summable, dsInfo.unsummable, interval, DEFAULT_TTL); err != nil { return err } } diff --git a/server/ingester/ckissu/updates.go b/server/ingester/ckissu/updates.go index c33ed9f9f20..e4743829781 100644 --- a/server/ingester/ckissu/updates.go +++ b/server/ingester/ckissu/updates.go @@ -359,4 +359,14 @@ var ColumnDatasourceAdd65 = []*ColumnDatasourceAdds{ OnlyAppTable: false, DefaultValue: "1", }, + { + ColumnNames: []string{"server_syn_miss", "client_ack_miss"}, + OldColumnNames: []string{"", ""}, + ColumnTypes: []ckdb.ColumnType{ckdb.UInt64, ckdb.UInt64}, + OnlyMapTable: false, + OnlyAppTable: false, + OnlyNetworkTable: true, + IsMetrics: true, + IsSummable: true, + }, } diff --git a/server/ingester/datasource/handle.go b/server/ingester/datasource/handle.go index 5983cdf8f18..6f30cb2c80c 100644 --- a/server/ingester/datasource/handle.go +++ b/server/ingester/datasource/handle.go @@ -307,6 +307,10 @@ func (m *DatasourceManager) makeAggTableCreateSQL(t *ckdb.Table, db, dstTable, a codec = fmt.Sprintf("codec(%s)", p.Codec.String()) } + if p.Name == t.TimeKey { + p.Comment = t.Version + } + if p.GroupBy { if !stringSliceHas(orderKeys, p.Name) { orderKeys = append(orderKeys, p.Name)