Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: SchemaAPI::do_get_table_history #16540

Merged
merged 1 commit into from
Sep 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 44 additions & 72 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3528,56 +3528,6 @@ fn build_upsert_table_deduplicated_label(deduplicated_label: String) -> TxnOp {
)
}

#[allow(clippy::type_complexity)]
#[logcall::logcall(input = "")]
#[fastrace::trace]
async fn batch_filter_table_info(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
args: &[(
Range<Option<DateTime<Utc>>>,
&Arc<DatabaseInfo>,
u64,
String,
)],
filter_tb_infos: &mut Vec<(Arc<TableInfo>, u64)>,
) -> Result<(), KVAppError> {
let table_id_idents = args
.iter()
.map(|(_f, _db, table_id, _table_name)| TableId::new(*table_id));

let seq_metas = kv_api.get_pb_values_vec(table_id_idents).await?;

for (seq_meta, (rng, db_info, table_id, table_name)) in seq_metas.into_iter().zip(args.iter()) {
let Some(seq_meta) = seq_meta else {
error!(
"batch_filter_table_info cannot find {:?} table_meta",
table_id
);
continue;
};

if !rng.contains(&seq_meta.data.drop_on) {
continue;
}

let tb_info = TableInfo {
ident: TableIdent {
table_id: *table_id,
seq: seq_meta.seq,
},
desc: format!("'{}'.'{}'", db_info.name_ident.database_name(), table_name,),
name: (*table_name).clone(),
meta: seq_meta.data,
db_type: DatabaseType::NormalDB,
catalog_info: Default::default(),
};

filter_tb_infos.push((Arc::new(tb_info), db_info.database_id.db_id));
}

Ok(())
}

#[logcall::logcall(input = "")]
#[fastrace::trace]
async fn do_get_table_history(
Expand All @@ -3595,25 +3545,48 @@ async fn do_get_table_history(
let dir_name = DirName::new(dbid_tbname_idlist);
let table_history_kvs = kv_api.list_pb_vec(&dir_name).await?;

let mut the_list = vec![];
let mut args = vec![];

for (ident, table_history) in table_history_kvs {
for table_id in table_history.id_list.iter() {
the_list.push((
drop_time_range.clone(),
&db_info,
*table_id,
ident.table_name.clone(),
));
args.push((TableId::new(*table_id), ident.table_name.clone()));
}
}

let mut filter_tb_infos = vec![];

for c in the_list[..std::cmp::min(limit, the_list.len())].chunks(DEFAULT_MGET_SIZE) {
let mut infos = vec![];
batch_filter_table_info(kv_api, c, &mut infos).await?;
filter_tb_infos.extend(infos);
for chunk in args[..std::cmp::min(limit, args.len())].chunks(DEFAULT_MGET_SIZE) {
let table_id_idents = chunk.iter().map(|(table_id, _)| table_id.clone());

let seq_metas = kv_api.get_pb_values_vec(table_id_idents).await?;

for (seq_meta, (table_id, table_name)) in seq_metas.into_iter().zip(chunk.iter()) {
let Some(seq_meta) = seq_meta else {
error!(
"batch_filter_table_info cannot find {:?} table_meta",
table_id
);
continue;
};

if !drop_time_range.contains(&seq_meta.data.drop_on) {
continue;
}

let tb_info = TableInfo {
ident: TableIdent {
table_id: table_id.table_id,
seq: seq_meta.seq,
},
desc: format!("'{}'.'{}'", db_info.name_ident.database_name(), table_name,),
name: (*table_name).clone(),
meta: seq_meta.data,
db_type: DatabaseType::NormalDB,
catalog_info: Default::default(),
};

filter_tb_infos.push((Arc::new(tb_info), db_info.database_id.db_id));
}
}

Ok(filter_tb_infos)
Expand Down Expand Up @@ -3681,7 +3654,7 @@ async fn gc_dropped_db_by_id(
// TODO: mark table as gc_in_progress

remove_copied_files_for_dropped_table(kv_api, &table_id_ident).await?;
remove_data_for_dropped_table(kv_api, &table_id_ident, &mut txn).await?;
let _ = remove_data_for_dropped_table(kv_api, &table_id_ident, &mut txn).await?;
remove_index_for_dropped_table(kv_api, tenant, &table_id_ident, &mut txn).await?;
}

Expand Down Expand Up @@ -3731,7 +3704,7 @@ async fn gc_dropped_table_by_id(
let mut txn = TxnRequest::default();

// 1)
remove_data_for_dropped_table(kv_api, table_id_ident, &mut txn).await?;
let _ = remove_data_for_dropped_table(kv_api, table_id_ident, &mut txn).await?;

// 2)
let table_id_history_ident = TableIdHistoryIdent {
Expand Down Expand Up @@ -3814,21 +3787,20 @@ async fn remove_data_for_dropped_table(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
table_id: &TableId,
txn: &mut TxnRequest,
) -> Result<(), KVAppError> {
) -> Result<Result<(), String>, MetaError> {
let seq_meta = kv_api.get_pb(table_id).await?;

let Some(seq_meta) = seq_meta else {
error!(
"gc_dropped_table_by_id cannot find {:?} table_meta",
table_id
);
return Ok(());
let err = format!("cannot find TableMeta by id: {:?}, ", table_id);
error!("{}", err);
return Ok(Err(err));
};

// TODO: enable this check. Currently when gc db, the table may not be dropped.
// if seq_meta.data.drop_on.is_none() {
// warn!("gc_dropped_table_by_id {:?} is not dropped", table_id);
// return Ok(());
// let err = format!("Table {:?} is not dropped, can not remove", table_id);
// warn!("{}", err);
// return Ok(Err(err));
// }

txn_delete_exact(txn, table_id, seq_meta.seq);
Expand All @@ -3844,7 +3816,7 @@ async fn remove_data_for_dropped_table(
txn_delete_exact(txn, &id_to_name, seq_name.seq);
}

Ok(())
Ok(Ok(()))
}

async fn remove_index_for_dropped_table(
Expand Down
Loading