Skip to content

Commit

Permalink
refactor: dropped table listing and GC
Browse files Browse the repository at this point in the history
- Replace `TableInfoFilter` with `Range<Option<DateTime<Utc>>>` for more flexible filtering
- Simplify `ListDroppedTableReq` struct and add builder methods
- Update schema_api_impl and tests to use new ListDroppedTableReq API
- Adjust vacuum logic in interpreter_vacuum_drop_tables to use new API
- Minor code cleanup and documentation improvements
  • Loading branch information
drmingdrmer committed Sep 27, 2024
1 parent 27157a3 commit 4cebd0f
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 108 deletions.
94 changes: 52 additions & 42 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::convert::Infallible;
use std::fmt::Display;
use std::ops::Range;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -140,7 +141,6 @@ use databend_common_meta_app::schema::TableIdToName;
use databend_common_meta_app::schema::TableIdent;
use databend_common_meta_app::schema::TableIndex;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableInfoFilter;
use databend_common_meta_app::schema::TableMeta;
use databend_common_meta_app::schema::TableNameIdent;
use databend_common_meta_app::schema::TruncateTableReply;
Expand Down Expand Up @@ -2644,11 +2644,13 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

let the_limit = req.limit.unwrap_or(usize::MAX);

if let TableInfoFilter::DroppedTableOrDroppedDatabase(retention_boundary) = &req.filter {
let drop_time_range = req.drop_time_range;

if req.database_name.is_none() {
let db_infos = self
.get_tenant_history_databases(
ListDatabaseReq {
tenant: req.inner.tenant().clone(),
tenant: req.tenant.clone(),
},
true,
)
Expand All @@ -2665,24 +2667,17 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
});
}

// If boundary is None, it means choose all tables.
// Thus, we just choose a very large time.
let boundary = retention_boundary.unwrap_or(DateTime::<Utc>::MAX_UTC);

let vacuum_db = {
let drop_on = db_info.meta.drop_on;
drop_on.is_some() && drop_on <= Some(boundary)
};
let vacuum_db = drop_time_range.contains(&db_info.meta.drop_on);

// If to vacuum a db, just vacuum all tables.
// Otherwise, choose only dropped tables(before retention time).
let filter = if vacuum_db {
TableInfoFilter::All
let table_drop_time_range = if vacuum_db {
None..Some(DateTime::<Utc>::MAX_UTC)
} else {
TableInfoFilter::DroppedTables(*retention_boundary)
drop_time_range.clone()
};

let db_filter = (filter, db_info.clone());
let db_filter = (table_drop_time_range, db_info.clone());

let capacity = the_limit - vacuum_table_infos.len();
let table_infos = do_get_table_history(self, db_filter, capacity).await?;
Expand Down Expand Up @@ -2723,12 +2718,13 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
});
}

let tenant_dbname = &req.inner;
let database_name = req.database_name.clone().unwrap();
let tenant_dbname = DatabaseNameIdent::new(&req.tenant, database_name);

// Get db by name to ensure presence
let res = get_db_or_err(
self,
tenant_dbname,
&tenant_dbname,
format!("get_table_history: {}", tenant_dbname.display()),
)
.await;
Expand All @@ -2742,10 +2738,10 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

let db_info = Arc::new(DatabaseInfo {
database_id: seq_db_id.data,
name_ident: req.inner.clone(),
name_ident: tenant_dbname.clone(),
meta: db_meta,
});
let db_filter = (req.filter, db_info);
let db_filter = (drop_time_range.clone(), db_info);
let table_infos = do_get_table_history(self, db_filter, the_limit).await?;
let mut drop_ids = vec![];
let mut drop_table_infos = vec![];
Expand Down Expand Up @@ -3531,11 +3527,17 @@ fn build_upsert_table_deduplicated_label(deduplicated_label: String) -> TxnOp {
)
}

#[allow(clippy::type_complexity)]
#[logcall::logcall(input = "")]
#[fastrace::trace]
async fn batch_filter_table_info(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
args: &[(&TableInfoFilter, &Arc<DatabaseInfo>, u64, String)],
args: &[(
Range<Option<DateTime<Utc>>>,
&Arc<DatabaseInfo>,
u64,
String,
)],
filter_tb_infos: &mut Vec<(Arc<TableInfo>, u64)>,
) -> Result<(), KVAppError> {
let table_id_idents = args
Expand All @@ -3544,9 +3546,7 @@ async fn batch_filter_table_info(

let seq_metas = kv_api.get_pb_values_vec(table_id_idents).await?;

for (seq_meta, (filter, db_info, table_id, table_name)) in
seq_metas.into_iter().zip(args.iter())
{
for (seq_meta, (rng, db_info, table_id, table_name)) in seq_metas.into_iter().zip(args.iter()) {
let Some(seq_meta) = seq_meta else {
error!(
"batch_filter_table_info cannot find {:?} table_meta",
Expand All @@ -3555,14 +3555,8 @@ async fn batch_filter_table_info(
continue;
};

if let TableInfoFilter::DroppedTables(retention_boundary) = filter {
let Some(meta_drop_on) = seq_meta.drop_on else {
continue;
};

if meta_drop_on > retention_boundary.unwrap_or(DateTime::<Utc>::MAX_UTC) {
continue;
}
if !rng.contains(&seq_meta.data.drop_on) {
continue;
}

let tb_info = TableInfo {
Expand All @@ -3583,7 +3577,12 @@ async fn batch_filter_table_info(
Ok(())
}

type TableFilterInfoList<'a> = Vec<(&'a TableInfoFilter, &'a Arc<DatabaseInfo>, u64, String)>;
type TableFilterInfoList<'a> = Vec<(
Range<Option<DateTime<Utc>>>,
&'a Arc<DatabaseInfo>,
u64,
String,
)>;

#[logcall::logcall(input = "")]
#[fastrace::trace]
Expand All @@ -3607,19 +3606,15 @@ async fn get_gc_table_info(
#[fastrace::trace]
async fn do_get_table_history(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
db_filter: (TableInfoFilter, Arc<DatabaseInfo>),
db_filter: (Range<Option<DateTime<Utc>>>, Arc<DatabaseInfo>),
limit: usize,
) -> Result<Vec<(Arc<TableInfo>, u64)>, KVAppError> {
let mut filter_tb_infos = vec![];

// step 1: list db table name with db id
let mut filter_db_info_with_table_id_key_list: Vec<(
&TableInfoFilter,
&Arc<DatabaseInfo>,
TableIdHistoryIdent,
)> = vec![];
let mut filter_db_info_with_table_id_key_list: Vec<_> = vec![];

let (filter, db_info) = db_filter;
let (drop_time_range, db_info) = db_filter;
let db_id = db_info.database_id.db_id;

// List tables by tenant, db_id, table_name.
Expand All @@ -3633,7 +3628,7 @@ async fn do_get_table_history(

let keys = table_id_list_keys
.iter()
.map(|table_id_list_key| (&filter, &db_info, table_id_list_key.clone()))
.map(|table_id_list_key| (drop_time_range.clone(), &db_info, table_id_list_key.clone()))
.collect::<Vec<_>>();

filter_db_info_with_table_id_key_list.extend(keys);
Expand All @@ -3660,10 +3655,17 @@ async fn do_get_table_history(
let (filter, db_info, table_id_list_key) = table_id_list_keys_iter.next().unwrap();
let tb_id_list = seq_table_id_list.data;

let id_list: Vec<(&TableInfoFilter, &Arc<DatabaseInfo>, u64, String)> = tb_id_list
let id_list: Vec<_> = tb_id_list
.id_list
.iter()
.map(|id| (filter, db_info, *id, table_id_list_key.table_name.clone()))
.map(|id| {
(
filter.clone(),
db_info,
*id,
table_id_list_key.table_name.clone(),
)
})
.collect();

filter_db_info_with_table_id_list.extend(id_list);
Expand Down Expand Up @@ -3694,6 +3696,10 @@ async fn do_get_table_history(
Ok(filter_tb_infos)
}

/// Permanently remove a dropped database from the meta-service.
///
/// Upon calling this method, the dropped database must be already marked as `gc_in_progress`,
/// then remove all **dropped and non-dropped** tables in the database.
async fn gc_dropped_db_by_id(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
db_id: u64,
Expand Down Expand Up @@ -3892,6 +3898,10 @@ async fn update_txn_to_remove_table_history(
Ok(())
}

/// Update TxnRequest to remove a dropped table's own data.
///
/// This function returns the updated TxnRequest,
/// or Err of the reason in string if it can not proceed.
async fn remove_data_for_dropped_table(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
table_id: &TableId,
Expand Down
46 changes: 11 additions & 35 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ use databend_common_meta_app::schema::TableIdList;
use databend_common_meta_app::schema::TableIdToName;
use databend_common_meta_app::schema::TableIdent;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableInfoFilter;
use databend_common_meta_app::schema::TableMeta;
use databend_common_meta_app::schema::TableNameIdent;
use databend_common_meta_app::schema::TableStatistics;
Expand Down Expand Up @@ -3472,11 +3471,7 @@ impl SchemaApiTestSuite {
assert_eq!(id_list.len(), 2);

{
let req = ListDroppedTableReq {
inner: DatabaseNameIdent::new(&tenant, ""),
filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None),
limit: None,
};
let req = ListDroppedTableReq::new(&tenant);
let resp = mt.get_drop_table_infos(req).await?;

let req = GcDroppedTableReq {
Expand Down Expand Up @@ -3682,11 +3677,7 @@ impl SchemaApiTestSuite {

// gc the drop tables
{
let req = ListDroppedTableReq {
inner: DatabaseNameIdent::new(&tenant, ""),
filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None),
limit: None,
};
let req = ListDroppedTableReq::new(&tenant);
let resp = mt.get_drop_table_infos(req).await?;

let req = GcDroppedTableReq {
Expand Down Expand Up @@ -3879,11 +3870,7 @@ impl SchemaApiTestSuite {

// gc the data
{
let req = ListDroppedTableReq {
inner: DatabaseNameIdent::new(&tenant, ""),
filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None),
limit: None,
};
let req = ListDroppedTableReq::new(&tenant);
let resp = mt.get_drop_table_infos(req).await?;

let req = GcDroppedTableReq {
Expand Down Expand Up @@ -4352,11 +4339,7 @@ impl SchemaApiTestSuite {
// case 1: test AllDroppedTables with filter time
{
let now = Utc::now();
let req = ListDroppedTableReq {
inner: DatabaseNameIdent::new(&tenant, ""),
filter: TableInfoFilter::DroppedTableOrDroppedDatabase(Some(now)),
limit: None,
};
let req = ListDroppedTableReq::new(&tenant).with_retention_boundary(now);
let resp = mt.get_drop_table_infos(req).await?;

let got = resp
Expand Down Expand Up @@ -4390,11 +4373,7 @@ impl SchemaApiTestSuite {

// case 2: test AllDroppedTables without filter time
{
let req = ListDroppedTableReq {
inner: DatabaseNameIdent::new(&tenant, ""),
filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None),
limit: None,
};
let req = ListDroppedTableReq::new(&tenant);
let resp = mt.get_drop_table_infos(req).await?;

let got = resp
Expand Down Expand Up @@ -4609,10 +4588,11 @@ impl SchemaApiTestSuite {
),
];
for (limit, number, drop_ids) in limit_and_drop_ids {
let req = ListDroppedTableReq {
inner: DatabaseNameIdent::new(&tenant, ""),
filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None),
limit,
let req = ListDroppedTableReq::new(&tenant);
let req = if let Some(limit) = limit {
req.with_limit(limit)
} else {
req
};
let resp = mt.get_drop_table_infos(req).await?;
assert_eq!(resp.drop_ids.len(), number);
Expand Down Expand Up @@ -5290,11 +5270,7 @@ impl SchemaApiTestSuite {
assert!(seqv.is_some() && seqv.unwrap().seq != 0);

// vacuum drop table
let req = ListDroppedTableReq {
inner: DatabaseNameIdent::new(&tenant, ""),
filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None),
limit: None,
};
let req = ListDroppedTableReq::new(&tenant);
let resp = mt.get_drop_table_infos(req).await?;
assert!(!resp.drop_ids.is_empty());

Expand Down
1 change: 0 additions & 1 deletion src/meta/app/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ pub use table::TableIdToName;
pub use table::TableIdent;
pub use table::TableIndex;
pub use table::TableInfo;
pub use table::TableInfoFilter;
pub use table::TableMeta;
pub use table::TableNameIdent;
pub use table::TableStatistics;
Expand Down
Loading

0 comments on commit 4cebd0f

Please sign in to comment.