Skip to content

Commit

Permalink
refactor: get_history_tables_for_gc() should not return TableInfoโ€ฆ
Browse files Browse the repository at this point in the history
โ€ฆ, but just table name, id and values

`SchemaApi` can not provide enough information to build a valid
`TableInfo`, such as, it does not know about catalog type and database
type. Therefore `get_history_tables_for_gc()` should just return `table
name, table id and the table meta` to its caller to let the build a
`TableInfo` if needed.
  • Loading branch information
drmingdrmer committed Sep 29, 2024
1 parent dd45c19 commit 31ac744
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 46 deletions.
96 changes: 50 additions & 46 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ use databend_common_meta_app::schema::index_id_ident::IndexIdIdent;
use databend_common_meta_app::schema::index_id_to_name_ident::IndexIdToNameIdent;
use databend_common_meta_app::schema::index_name_ident::IndexName;
use databend_common_meta_app::schema::least_visible_time_ident::LeastVisibleTimeIdent;
use databend_common_meta_app::schema::table_niv::TableNIV;
use databend_common_meta_app::schema::CatalogIdToNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CatalogMeta;
Expand Down Expand Up @@ -2679,32 +2680,34 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
};

let capacity = the_limit - vacuum_table_infos.len();
let table_infos =
do_get_table_history(self, table_drop_time_range, db_info.clone(), capacity)
.await?;

for (table_info, db_id) in table_infos.iter() {
vacuum_ids.push(DroppedId::new_table(
*db_id,
table_info.ident.table_id,
table_info.name.clone(),
));
let table_nivs = get_history_tables_for_gc(
self,
table_drop_time_range,
db_info.database_id.db_id,
capacity,
)
.await?;

for table_niv in table_nivs.iter() {
vacuum_ids.push(DroppedId::from(table_niv.clone()));
}

// A DB can be removed only when all its tables are removed.
if vacuum_db && capacity > table_infos.len() {
if vacuum_db && capacity > table_nivs.len() {
vacuum_ids.push(DroppedId::Db {
db_id: db_info.database_id.db_id,
db_name: db_info.name_ident.database_name().to_string(),
});
}

vacuum_table_infos.extend(
table_infos
.iter()
.take(capacity)
.map(|(table_info, _)| table_info.clone()),
);
vacuum_table_infos.extend(table_nivs.iter().take(capacity).map(|niv| {
Arc::new(TableInfo::new(
db_info.name_ident.database_name(),
&niv.name().table_name,
TableIdent::new(niv.id().table_id, niv.value().seq),
niv.value().data.clone(),
))
}));
}

return Ok(ListDroppedTableResp {
Expand Down Expand Up @@ -2736,18 +2739,26 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
name_ident: tenant_dbname.clone(),
meta: db_meta,
});
let table_infos =
do_get_table_history(self, drop_time_range.clone(), db_info, the_limit).await?;
let table_nivs = get_history_tables_for_gc(
self,
drop_time_range.clone(),
db_info.database_id.db_id,
the_limit,
)
.await?;

let mut drop_ids = vec![];
let mut drop_table_infos = vec![];

for (table_info, db_id) in table_infos.iter().take(the_limit) {
drop_ids.push(DroppedId::new_table(
*db_id,
table_info.ident.table_id,
table_info.name.clone(),
));
drop_table_infos.push(table_info.clone());
for niv in table_nivs.iter() {
drop_ids.push(DroppedId::from(niv.clone()));

drop_table_infos.push(Arc::new(TableInfo::new(
db_info.name_ident.database_name(),
&niv.name().table_name,
TableIdent::new(niv.id().table_id, niv.value().seq),
niv.value().data.clone(),
)));
}

Ok(ListDroppedTableResp {
Expand Down Expand Up @@ -3520,21 +3531,22 @@ fn build_upsert_table_deduplicated_label(deduplicated_label: String) -> TxnOp {
)
}

/// Lists all dropped and non-dropped tables belonging to a Database,
/// returns those tables that are eligible for garbage collection,
/// i.e., whose dropped time is in the specified range.
#[logcall::logcall(input = "")]
#[fastrace::trace]
async fn do_get_table_history(
async fn get_history_tables_for_gc(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
drop_time_range: Range<Option<DateTime<Utc>>>,
db_info: Arc<DatabaseInfo>,
db_id: u64,
limit: usize,
) -> Result<Vec<(Arc<TableInfo>, u64)>, KVAppError> {
let db_id = db_info.database_id.db_id;

let dbid_tbname_idlist = TableIdHistoryIdent {
) -> Result<Vec<TableNIV>, KVAppError> {
let ident = TableIdHistoryIdent {
database_id: db_id,
table_name: "dummy".to_string(),
};
let dir_name = DirName::new(dbid_tbname_idlist);
let dir_name = DirName::new(ident);
let table_history_kvs = kv_api.list_pb_vec(&dir_name).await?;

let mut args = vec![];
Expand Down Expand Up @@ -3565,19 +3577,11 @@ async fn do_get_table_history(
continue;
}

let tb_info = TableInfo {
ident: TableIdent {
table_id: table_id.table_id,
seq: seq_meta.seq,
},
desc: format!("'{}'.'{}'", db_info.name_ident.database_name(), table_name,),
name: (*table_name).clone(),
meta: seq_meta.data,
db_type: DatabaseType::NormalDB,
catalog_info: Default::default(),
};

filter_tb_infos.push((Arc::new(tb_info), db_info.database_id.db_id));
filter_tb_infos.push(TableNIV::new(
DBIdTableName::new(db_id, table_name.clone()),
table_id.clone(),
seq_meta,
));
}
}

Expand Down
1 change: 1 addition & 0 deletions src/meta/app/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub mod index_id_to_name_ident;
pub mod index_name_ident;
pub mod least_visible_time_ident;
pub mod table_lock_ident;
pub mod table_niv;
pub mod virtual_column_ident;

mod create_option;
Expand Down
12 changes: 12 additions & 0 deletions src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use super::CatalogInfo;
use super::CreateOption;
use super::DatabaseId;
use crate::schema::database_name_ident::DatabaseNameIdent;
use crate::schema::table_niv::TableNIV;
use crate::storage::StorageParams;
use crate::tenant::Tenant;
use crate::tenant::ToTenant;
Expand Down Expand Up @@ -988,7 +989,18 @@ pub enum DroppedId {
Table { name: DBIdTableName, id: TableId },
}

impl From<TableNIV> for DroppedId {
fn from(value: TableNIV) -> Self {
let (name, id, _) = value.unpack();
Self::Table { name, id }
}
}

impl DroppedId {
pub fn new_table_name_id(name: DBIdTableName, id: TableId) -> DroppedId {
DroppedId::Table { name, id }
}

pub fn new_table(db_id: u64, table_id: u64, table_name: impl ToString) -> DroppedId {
DroppedId::Table {
name: DBIdTableName::new(db_id, table_name),
Expand Down
49 changes: 49 additions & 0 deletions src/meta/app/src/schema/table_niv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_meta_types::SeqV;

use crate::schema::DBIdTableName;
use crate::schema::TableId;
use crate::schema::TableMeta;

/// The **Name, ID, Value** for a table metadata stored in meta-service.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TableNIV {
name: DBIdTableName,
id: TableId,
value: SeqV<TableMeta>,
}

impl TableNIV {
pub fn new(name: DBIdTableName, id: TableId, value: SeqV<TableMeta>) -> Self {
TableNIV { name, id, value }
}

pub fn name(&self) -> &DBIdTableName {
&self.name
}

pub fn id(&self) -> &TableId {
&self.id
}

pub fn value(&self) -> &SeqV<TableMeta> {
&self.value
}

pub fn unpack(self) -> (DBIdTableName, TableId, SeqV<TableMeta>) {
(self.name, self.id, self.value)
}
}

0 comments on commit 31ac744

Please sign in to comment.