diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index 404e6b7f1cf2..88304f4432a1 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -3528,56 +3528,6 @@ fn build_upsert_table_deduplicated_label(deduplicated_label: String) -> TxnOp { ) } -#[allow(clippy::type_complexity)] -#[logcall::logcall(input = "")] -#[fastrace::trace] -async fn batch_filter_table_info( - kv_api: &(impl kvapi::KVApi + ?Sized), - args: &[( - Range>>, - &Arc, - u64, - String, - )], - filter_tb_infos: &mut Vec<(Arc, u64)>, -) -> Result<(), KVAppError> { - let table_id_idents = args - .iter() - .map(|(_f, _db, table_id, _table_name)| TableId::new(*table_id)); - - let seq_metas = kv_api.get_pb_values_vec(table_id_idents).await?; - - for (seq_meta, (rng, db_info, table_id, table_name)) in seq_metas.into_iter().zip(args.iter()) { - let Some(seq_meta) = seq_meta else { - error!( - "batch_filter_table_info cannot find {:?} table_meta", - table_id - ); - continue; - }; - - if !rng.contains(&seq_meta.data.drop_on) { - continue; - } - - let tb_info = TableInfo { - ident: TableIdent { - table_id: *table_id, - seq: seq_meta.seq, - }, - desc: format!("'{}'.'{}'", db_info.name_ident.database_name(), table_name,), - name: (*table_name).clone(), - meta: seq_meta.data, - db_type: DatabaseType::NormalDB, - catalog_info: Default::default(), - }; - - filter_tb_infos.push((Arc::new(tb_info), db_info.database_id.db_id)); - } - - Ok(()) -} - #[logcall::logcall(input = "")] #[fastrace::trace] async fn do_get_table_history( @@ -3595,25 +3545,48 @@ async fn do_get_table_history( let dir_name = DirName::new(dbid_tbname_idlist); let table_history_kvs = kv_api.list_pb_vec(&dir_name).await?; - let mut the_list = vec![]; + let mut args = vec![]; for (ident, table_history) in table_history_kvs { for table_id in table_history.id_list.iter() { - the_list.push(( - drop_time_range.clone(), - &db_info, - *table_id, - ident.table_name.clone(), - )); + args.push((TableId::new(*table_id), ident.table_name.clone())); } } let mut filter_tb_infos = vec![]; - for c in the_list[..std::cmp::min(limit, the_list.len())].chunks(DEFAULT_MGET_SIZE) { - let mut infos = vec![]; - batch_filter_table_info(kv_api, c, &mut infos).await?; - filter_tb_infos.extend(infos); + for chunk in args[..std::cmp::min(limit, args.len())].chunks(DEFAULT_MGET_SIZE) { + let table_id_idents = chunk.iter().map(|(table_id, _)| table_id.clone()); + + let seq_metas = kv_api.get_pb_values_vec(table_id_idents).await?; + + for (seq_meta, (table_id, table_name)) in seq_metas.into_iter().zip(chunk.iter()) { + let Some(seq_meta) = seq_meta else { + error!( + "batch_filter_table_info cannot find {:?} table_meta", + table_id + ); + continue; + }; + + if !drop_time_range.contains(&seq_meta.data.drop_on) { + continue; + } + + let tb_info = TableInfo { + ident: TableIdent { + table_id: table_id.table_id, + seq: seq_meta.seq, + }, + desc: format!("'{}'.'{}'", db_info.name_ident.database_name(), table_name,), + name: (*table_name).clone(), + meta: seq_meta.data, + db_type: DatabaseType::NormalDB, + catalog_info: Default::default(), + }; + + filter_tb_infos.push((Arc::new(tb_info), db_info.database_id.db_id)); + } } Ok(filter_tb_infos) @@ -3681,7 +3654,7 @@ async fn gc_dropped_db_by_id( // TODO: mark table as gc_in_progress remove_copied_files_for_dropped_table(kv_api, &table_id_ident).await?; - remove_data_for_dropped_table(kv_api, &table_id_ident, &mut txn).await?; + let _ = remove_data_for_dropped_table(kv_api, &table_id_ident, &mut txn).await?; remove_index_for_dropped_table(kv_api, tenant, &table_id_ident, &mut txn).await?; } @@ -3731,7 +3704,7 @@ async fn gc_dropped_table_by_id( let mut txn = TxnRequest::default(); // 1) - remove_data_for_dropped_table(kv_api, table_id_ident, &mut txn).await?; + let _ = remove_data_for_dropped_table(kv_api, table_id_ident, &mut txn).await?; // 2) let table_id_history_ident = TableIdHistoryIdent { @@ -3814,21 +3787,20 @@ async fn remove_data_for_dropped_table( kv_api: &(impl kvapi::KVApi + ?Sized), table_id: &TableId, txn: &mut TxnRequest, -) -> Result<(), KVAppError> { +) -> Result, MetaError> { let seq_meta = kv_api.get_pb(table_id).await?; let Some(seq_meta) = seq_meta else { - error!( - "gc_dropped_table_by_id cannot find {:?} table_meta", - table_id - ); - return Ok(()); + let err = format!("cannot find TableMeta by id: {:?}, ", table_id); + error!("{}", err); + return Ok(Err(err)); }; // TODO: enable this check. Currently when gc db, the table may not be dropped. // if seq_meta.data.drop_on.is_none() { - // warn!("gc_dropped_table_by_id {:?} is not dropped", table_id); - // return Ok(()); + // let err = format!("Table {:?} is not dropped, can not remove", table_id); + // warn!("{}", err); + // return Ok(Err(err)); // } txn_delete_exact(txn, table_id, seq_meta.seq); @@ -3844,7 +3816,7 @@ async fn remove_data_for_dropped_table( txn_delete_exact(txn, &id_to_name, seq_name.seq); } - Ok(()) + Ok(Ok(())) } async fn remove_index_for_dropped_table(