From ec12a7bb414b66f5c102dfd11a4ec79601cfe2f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Thu, 26 Sep 2024 15:45:44 +0800 Subject: [PATCH] refactor: dropped table listing and GC - Replace `TableInfoFilter` with `Range>>` for more flexible filtering - Simplify `ListDroppedTableReq` struct and add builder methods - Update schema_api_impl and tests to use new ListDroppedTableReq API - Adjust vacuum logic in interpreter_vacuum_drop_tables to use new API - Minor code cleanup and documentation improvements --- src/meta/api/src/schema_api_impl.rs | 94 ++++++++++--------- src/meta/api/src/schema_api_test_suite.rs | 46 +++------ src/meta/app/src/schema/mod.rs | 1 - src/meta/app/src/schema/table.rs | 91 ++++++++++++++---- .../interpreter_vacuum_drop_tables.rs | 18 ++-- 5 files changed, 143 insertions(+), 107 deletions(-) diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index 8c0991aade57..e3b482986866 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; use std::collections::HashSet; use std::convert::Infallible; use std::fmt::Display; +use std::ops::Range; use std::sync::Arc; use std::time::Duration; @@ -140,7 +141,6 @@ use databend_common_meta_app::schema::TableIdToName; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableIndex; use databend_common_meta_app::schema::TableInfo; -use databend_common_meta_app::schema::TableInfoFilter; use databend_common_meta_app::schema::TableMeta; use databend_common_meta_app::schema::TableNameIdent; use databend_common_meta_app::schema::TruncateTableReply; @@ -2644,11 +2644,13 @@ impl + ?Sized> SchemaApi for KV { let the_limit = req.limit.unwrap_or(usize::MAX); - if let TableInfoFilter::DroppedTableOrDroppedDatabase(retention_boundary) = &req.filter { + let drop_time_range = req.drop_time_range; + + if req.database_name.is_none() { let db_infos = self .get_tenant_history_databases( ListDatabaseReq { - tenant: req.inner.tenant().clone(), + tenant: req.tenant.clone(), }, true, ) @@ -2665,24 +2667,17 @@ impl + ?Sized> SchemaApi for KV { }); } - // If boundary is None, it means choose all tables. - // Thus, we just choose a very large time. - let boundary = retention_boundary.unwrap_or(DateTime::::MAX_UTC); - - let vacuum_db = { - let drop_on = db_info.meta.drop_on; - drop_on.is_some() && drop_on <= Some(boundary) - }; + let vacuum_db = drop_time_range.contains(&db_info.meta.drop_on); // If to vacuum a db, just vacuum all tables. // Otherwise, choose only dropped tables(before retention time). - let filter = if vacuum_db { - TableInfoFilter::All + let table_drop_time_range = if vacuum_db { + None..Some(DateTime::::MAX_UTC) } else { - TableInfoFilter::DroppedTables(*retention_boundary) + drop_time_range.clone() }; - let db_filter = (filter, db_info.clone()); + let db_filter = (table_drop_time_range, db_info.clone()); let capacity = the_limit - vacuum_table_infos.len(); let table_infos = do_get_table_history(self, db_filter, capacity).await?; @@ -2723,12 +2718,13 @@ impl + ?Sized> SchemaApi for KV { }); } - let tenant_dbname = &req.inner; + let database_name = req.database_name.clone().unwrap(); + let tenant_dbname = DatabaseNameIdent::new(&req.tenant, database_name); // Get db by name to ensure presence let res = get_db_or_err( self, - tenant_dbname, + &tenant_dbname, format!("get_table_history: {}", tenant_dbname.display()), ) .await; @@ -2742,10 +2738,10 @@ impl + ?Sized> SchemaApi for KV { let db_info = Arc::new(DatabaseInfo { database_id: seq_db_id.data, - name_ident: req.inner.clone(), + name_ident: tenant_dbname.clone(), meta: db_meta, }); - let db_filter = (req.filter, db_info); + let db_filter = (drop_time_range.clone(), db_info); let table_infos = do_get_table_history(self, db_filter, the_limit).await?; let mut drop_ids = vec![]; let mut drop_table_infos = vec![]; @@ -3531,11 +3527,17 @@ fn build_upsert_table_deduplicated_label(deduplicated_label: String) -> TxnOp { ) } +#[allow(clippy::type_complexity)] #[logcall::logcall(input = "")] #[fastrace::trace] async fn batch_filter_table_info( kv_api: &(impl kvapi::KVApi + ?Sized), - args: &[(&TableInfoFilter, &Arc, u64, String)], + args: &[( + Range>>, + &Arc, + u64, + String, + )], filter_tb_infos: &mut Vec<(Arc, u64)>, ) -> Result<(), KVAppError> { let table_id_idents = args @@ -3544,9 +3546,7 @@ async fn batch_filter_table_info( let seq_metas = kv_api.get_pb_values_vec(table_id_idents).await?; - for (seq_meta, (filter, db_info, table_id, table_name)) in - seq_metas.into_iter().zip(args.iter()) - { + for (seq_meta, (rng, db_info, table_id, table_name)) in seq_metas.into_iter().zip(args.iter()) { let Some(seq_meta) = seq_meta else { error!( "batch_filter_table_info cannot find {:?} table_meta", @@ -3555,14 +3555,8 @@ async fn batch_filter_table_info( continue; }; - if let TableInfoFilter::DroppedTables(retention_boundary) = filter { - let Some(meta_drop_on) = seq_meta.drop_on else { - continue; - }; - - if meta_drop_on > retention_boundary.unwrap_or(DateTime::::MAX_UTC) { - continue; - } + if !rng.contains(&seq_meta.data.drop_on) { + continue; } let tb_info = TableInfo { @@ -3583,7 +3577,12 @@ async fn batch_filter_table_info( Ok(()) } -type TableFilterInfoList<'a> = Vec<(&'a TableInfoFilter, &'a Arc, u64, String)>; +type TableFilterInfoList<'a> = Vec<( + Range>>, + &'a Arc, + u64, + String, +)>; #[logcall::logcall(input = "")] #[fastrace::trace] @@ -3607,19 +3606,15 @@ async fn get_gc_table_info( #[fastrace::trace] async fn do_get_table_history( kv_api: &(impl kvapi::KVApi + ?Sized), - db_filter: (TableInfoFilter, Arc), + db_filter: (Range>>, Arc), limit: usize, ) -> Result, u64)>, KVAppError> { let mut filter_tb_infos = vec![]; // step 1: list db table name with db id - let mut filter_db_info_with_table_id_key_list: Vec<( - &TableInfoFilter, - &Arc, - TableIdHistoryIdent, - )> = vec![]; + let mut filter_db_info_with_table_id_key_list: Vec<_> = vec![]; - let (filter, db_info) = db_filter; + let (drop_time_range, db_info) = db_filter; let db_id = db_info.database_id.db_id; // List tables by tenant, db_id, table_name. @@ -3633,7 +3628,7 @@ async fn do_get_table_history( let keys = table_id_list_keys .iter() - .map(|table_id_list_key| (&filter, &db_info, table_id_list_key.clone())) + .map(|table_id_list_key| (drop_time_range.clone(), &db_info, table_id_list_key.clone())) .collect::>(); filter_db_info_with_table_id_key_list.extend(keys); @@ -3660,10 +3655,17 @@ async fn do_get_table_history( let (filter, db_info, table_id_list_key) = table_id_list_keys_iter.next().unwrap(); let tb_id_list = seq_table_id_list.data; - let id_list: Vec<(&TableInfoFilter, &Arc, u64, String)> = tb_id_list + let id_list: Vec<_> = tb_id_list .id_list .iter() - .map(|id| (filter, db_info, *id, table_id_list_key.table_name.clone())) + .map(|id| { + ( + filter.clone(), + db_info, + *id, + table_id_list_key.table_name.clone(), + ) + }) .collect(); filter_db_info_with_table_id_list.extend(id_list); @@ -3694,6 +3696,10 @@ async fn do_get_table_history( Ok(filter_tb_infos) } +/// Permanently remove a dropped database from the meta-service. +/// +/// Upon calling this method, the dropped database must be already marked as `gc_in_progress`, +/// then remove all **dropped and non-dropped** tables in the database. async fn gc_dropped_db_by_id( kv_api: &(impl kvapi::KVApi + ?Sized), db_id: u64, @@ -3892,6 +3898,10 @@ async fn update_txn_to_remove_table_history( Ok(()) } +/// Update TxnRequest to remove a dropped table's own data. +/// +/// This function returns the updated TxnRequest, +/// or Err of the reason in string if it can not proceed. async fn remove_data_for_dropped_table( kv_api: &(impl kvapi::KVApi + ?Sized), table_id: &TableId, diff --git a/src/meta/api/src/schema_api_test_suite.rs b/src/meta/api/src/schema_api_test_suite.rs index cc87b3e3a707..e17b6b752288 100644 --- a/src/meta/api/src/schema_api_test_suite.rs +++ b/src/meta/api/src/schema_api_test_suite.rs @@ -110,7 +110,6 @@ use databend_common_meta_app::schema::TableIdList; use databend_common_meta_app::schema::TableIdToName; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; -use databend_common_meta_app::schema::TableInfoFilter; use databend_common_meta_app::schema::TableMeta; use databend_common_meta_app::schema::TableNameIdent; use databend_common_meta_app::schema::TableStatistics; @@ -3472,11 +3471,7 @@ impl SchemaApiTestSuite { assert_eq!(id_list.len(), 2); { - let req = ListDroppedTableReq { - inner: DatabaseNameIdent::new(&tenant, ""), - filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None), - limit: None, - }; + let req = ListDroppedTableReq::new(&tenant); let resp = mt.get_drop_table_infos(req).await?; let req = GcDroppedTableReq { @@ -3682,11 +3677,7 @@ impl SchemaApiTestSuite { // gc the drop tables { - let req = ListDroppedTableReq { - inner: DatabaseNameIdent::new(&tenant, ""), - filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None), - limit: None, - }; + let req = ListDroppedTableReq::new(&tenant); let resp = mt.get_drop_table_infos(req).await?; let req = GcDroppedTableReq { @@ -3879,11 +3870,7 @@ impl SchemaApiTestSuite { // gc the data { - let req = ListDroppedTableReq { - inner: DatabaseNameIdent::new(&tenant, ""), - filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None), - limit: None, - }; + let req = ListDroppedTableReq::new(&tenant); let resp = mt.get_drop_table_infos(req).await?; let req = GcDroppedTableReq { @@ -4352,11 +4339,7 @@ impl SchemaApiTestSuite { // case 1: test AllDroppedTables with filter time { let now = Utc::now(); - let req = ListDroppedTableReq { - inner: DatabaseNameIdent::new(&tenant, ""), - filter: TableInfoFilter::DroppedTableOrDroppedDatabase(Some(now)), - limit: None, - }; + let req = ListDroppedTableReq::new(&tenant).with_retention_boundary(now); let resp = mt.get_drop_table_infos(req).await?; let got = resp @@ -4390,11 +4373,7 @@ impl SchemaApiTestSuite { // case 2: test AllDroppedTables without filter time { - let req = ListDroppedTableReq { - inner: DatabaseNameIdent::new(&tenant, ""), - filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None), - limit: None, - }; + let req = ListDroppedTableReq::new(&tenant); let resp = mt.get_drop_table_infos(req).await?; let got = resp @@ -4609,10 +4588,11 @@ impl SchemaApiTestSuite { ), ]; for (limit, number, drop_ids) in limit_and_drop_ids { - let req = ListDroppedTableReq { - inner: DatabaseNameIdent::new(&tenant, ""), - filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None), - limit, + let req = ListDroppedTableReq::new(&tenant); + let req = if let Some(limit) = limit { + req.with_limit(limit) + } else { + req }; let resp = mt.get_drop_table_infos(req).await?; assert_eq!(resp.drop_ids.len(), number); @@ -5290,11 +5270,7 @@ impl SchemaApiTestSuite { assert!(seqv.is_some() && seqv.unwrap().seq != 0); // vacuum drop table - let req = ListDroppedTableReq { - inner: DatabaseNameIdent::new(&tenant, ""), - filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None), - limit: None, - }; + let req = ListDroppedTableReq::new(&tenant); let resp = mt.get_drop_table_infos(req).await?; assert!(!resp.drop_ids.is_empty()); diff --git a/src/meta/app/src/schema/mod.rs b/src/meta/app/src/schema/mod.rs index 095f8e67f43b..e7e12d9e7bce 100644 --- a/src/meta/app/src/schema/mod.rs +++ b/src/meta/app/src/schema/mod.rs @@ -115,7 +115,6 @@ pub use table::TableIdToName; pub use table::TableIdent; pub use table::TableIndex; pub use table::TableInfo; -pub use table::TableInfoFilter; pub use table::TableMeta; pub use table::TableNameIdent; pub use table::TableStatistics; diff --git a/src/meta/app/src/schema/table.rs b/src/meta/app/src/schema/table.rs index 9a0384a9f5ec..7542b8177ddd 100644 --- a/src/meta/app/src/schema/table.rs +++ b/src/meta/app/src/schema/table.rs @@ -19,6 +19,7 @@ use std::fmt; use std::fmt::Display; use std::fmt::Formatter; use std::ops::Deref; +use std::ops::Range; use std::sync::Arc; use std::time::Duration; @@ -906,31 +907,81 @@ impl ListTableReq { } #[derive(Clone, Debug, PartialEq, Eq)] -pub enum TableInfoFilter { - /// Choose only dropped tables. - /// - /// If the arg `retention_boundary` time is Some, choose only tables dropped before this boundary time. - DroppedTables(Option>), - /// Choose dropped table or all table in dropped databases. - /// - /// In this case, `ListTableReq`.db_name will be ignored. - /// - /// If the `retention_boundary` time is Some, - /// choose the table dropped before this time - /// or choose the database before this time. - DroppedTableOrDroppedDatabase(Option>), +pub struct ListDroppedTableReq { + pub tenant: Tenant, - /// return all tables, ignore drop on time. - All, -} + /// If `database_name` is None, choose all tables in all databases. + /// Otherwise, choose only tables in this database. + pub database_name: Option, + + /// The time range in which the database/table will be returned. + /// choose only tables/databases dropped before this boundary time. + /// It can include non-dropped tables/databases with `None..Some()` + pub drop_time_range: Range>>, -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct ListDroppedTableReq { - pub inner: DatabaseNameIdent, - pub filter: TableInfoFilter, pub limit: Option, } +impl ListDroppedTableReq { + pub fn new(tenant: &Tenant) -> ListDroppedTableReq { + let rng_start = Some(DateTime::::MIN_UTC); + let rng_end = Some(DateTime::::MAX_UTC); + ListDroppedTableReq { + tenant: tenant.clone(), + database_name: None, + drop_time_range: rng_start..rng_end, + limit: None, + } + } + + pub fn with_db(self, db_name: impl ToString) -> Self { + Self { + database_name: Some(db_name.to_string()), + ..self + } + } + + pub fn with_retention_boundary(self, d: DateTime) -> Self { + let rng_start = Some(DateTime::::MIN_UTC); + let rng_end = Some(d); + Self { + drop_time_range: rng_start..rng_end, + ..self + } + } + + pub fn with_limit(self, limit: usize) -> Self { + Self { + limit: Some(limit), + ..self + } + } + + pub fn new4( + tenant: &Tenant, + database_name: Option, + retention_boundary: Option>, + limit: Option, + ) -> ListDroppedTableReq { + let rng_start = Some(DateTime::::MIN_UTC); + let rng_end = if let Some(b) = retention_boundary { + Some(b) + } else { + Some(DateTime::::MAX_UTC) + }; + ListDroppedTableReq { + tenant: tenant.clone(), + database_name: database_name.map(|s| s.to_string()), + drop_time_range: rng_start..rng_end, + limit, + } + } + + pub fn database_name(&self) -> Option<&str> { + self.database_name.as_deref() + } +} + #[derive(Clone, Debug, PartialEq, Eq)] pub enum DroppedId { Db { diff --git a/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs b/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs index cb7c4d768662..1287cfda5fa7 100644 --- a/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs +++ b/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs @@ -29,7 +29,6 @@ use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent; use databend_common_meta_app::schema::DroppedId; use databend_common_meta_app::schema::GcDroppedTableReq; use databend_common_meta_app::schema::ListDroppedTableReq; -use databend_common_meta_app::schema::TableInfoFilter; use databend_common_sql::plans::VacuumDropTablePlan; use databend_enterprise_vacuum_handler::get_vacuum_handler; use log::info; @@ -125,19 +124,20 @@ impl Interpreter for VacuumDropTablesInterpreter { self.plan.database, retention_time ); // if database if empty, vacuum all tables - let filter = if self.plan.database.is_empty() { - TableInfoFilter::DroppedTableOrDroppedDatabase(Some(retention_time)) + let database_name = if self.plan.database.is_empty() { + None } else { - TableInfoFilter::DroppedTables(Some(retention_time)) + Some(self.plan.database.clone()) }; let tenant = self.ctx.get_tenant(); let (tables, drop_ids) = catalog - .get_drop_table_infos(ListDroppedTableReq { - inner: DatabaseNameIdent::new(&tenant, &self.plan.database), - filter, - limit: self.plan.option.limit, - }) + .get_drop_table_infos(ListDroppedTableReq::new4( + &tenant, + database_name, + Some(retention_time), + self.plan.option.limit, + )) .await?; info!(