Skip to content

Commit

Permalink
refactor: replace absolute expiration time with relative TTL in schem…
Browse files Browse the repository at this point in the history
…a API (#16268)

* refactor: replace absolute expiration time with relative TTL in schema API

The use of absolute expiration time can lead to issues due to local time
discrepancies when calling `SchemaAPI`. If the local time lags behind
the meta-service leader's time, records may expire prematurely.

By switching to a relative expiration time (`ttl`), the expiration is
calculated by the meta-service leader, which synchronizes the time
through the Raft log. This ensures that records expire correctly.

* chore: enable INFO log for meta to address the TableLock issue

https://github.com/datafuselabs/databend/actions/runs/10389326033/job/28775319145?pr=16207#step:4:1201

* chore: SchemaApi::update_multi_table_meta() should use ttl to replace absolute expiration time
  • Loading branch information
drmingdrmer committed Aug 18, 2024
1 parent 9197028 commit 77248da
Show file tree
Hide file tree
Showing 11 changed files with 40 additions and 42 deletions.
2 changes: 1 addition & 1 deletion scripts/ci/deploy/databend-query-management-mode.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ done
sleep 1

echo 'Start databend-meta...'
nohup target/${BUILD_PROFILE}/databend-meta --single --log-level=ERROR &
nohup target/${BUILD_PROFILE}/databend-meta --single --log-level=INFO &
echo "Waiting on databend-meta 10 seconds..."
python3 scripts/ci/wait_tcp.py --timeout 30 --port 9191

Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/deploy/databend-query-standalone-hive.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ done
sleep 1

echo 'Start databend-meta...'
nohup target/${BUILD_PROFILE}/databend-meta --single --log-level=ERROR &
nohup target/${BUILD_PROFILE}/databend-meta --single --log-level=INFO &
echo "Waiting on databend-meta 10 seconds..."
python3 scripts/ci/wait_tcp.py --timeout 30 --port 9191

Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/deploy/databend-query-standalone-logging.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ done
sleep 1

echo 'Start databend-meta...'
nohup target/${BUILD_PROFILE}/databend-meta --single --log-level=ERROR &
nohup target/${BUILD_PROFILE}/databend-meta --single --log-level=INFO &
echo "Waiting on databend-meta 10 seconds..."
python3 scripts/ci/wait_tcp.py --timeout 30 --port 9191

Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/deploy/databend-query-standalone-native.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ done
sleep 1

echo 'Start databend-meta...'
nohup target/${BUILD_PROFILE}/databend-meta --single --log-level=ERROR &
nohup target/${BUILD_PROFILE}/databend-meta --single --log-level=INFO &
echo "Waiting on databend-meta 10 seconds..."
python3 scripts/ci/wait_tcp.py --timeout 30 --port 9191

Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/deploy/databend-query-standalone.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ done
sleep 1

echo 'Start databend-meta...'
nohup target/${BUILD_PROFILE}/databend-meta --single --log-level=ERROR &
nohup target/${BUILD_PROFILE}/databend-meta --single --log-level=INFO &
echo "Waiting on databend-meta 10 seconds..."
python3 scripts/ci/wait_tcp.py --timeout 30 --port 9191

Expand Down
1 change: 0 additions & 1 deletion src/meta/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,4 @@ pub use util::txn_cond_seq;
pub use util::txn_op_del;
pub use util::txn_op_get;
pub use util::txn_op_put;
pub use util::txn_op_put_with_expire;
pub use util::DEFAULT_MGET_SIZE;
34 changes: 18 additions & 16 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ use crate::txn_cond_seq;
use crate::txn_op_del;
use crate::txn_op_get;
use crate::txn_op_put;
use crate::txn_op_put_with_expire;
use crate::util::db_id_has_to_exist;
use crate::util::deserialize_id_get_response;
use crate::util::deserialize_struct_get_response;
Expand Down Expand Up @@ -3794,10 +3793,10 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_cond_seq(&key, Eq, 0),
];

let if_then = vec![txn_op_put_with_expire(
&key,
let if_then = vec![TxnOp::put_with_ttl(
key.to_string_key(),
serialize_struct(&lock_meta)?,
SeqV::<()>::now_ms() / 1000 + req.expire_secs,
Some(req.expire_secs * 1000),
)];

let txn_req = TxnRequest {
Expand Down Expand Up @@ -3859,10 +3858,10 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_cond_seq(&key, Eq, lock_seq),
];

let if_then = vec![txn_op_put_with_expire(
&key,
let if_then = vec![TxnOp::put_with_ttl(
key.to_string_key(),
serialize_struct(&lock_meta)?,
SeqV::<()>::now_ms() / 1000 + req.expire_secs,
Some(req.expire_secs * 1000),
)];

let txn_req = TxnRequest {
Expand Down Expand Up @@ -5226,28 +5225,31 @@ fn build_upsert_table_copied_file_info_conditions(
// "fail_if_duplicated" mode, assumes files are absent
condition.push(txn_cond_seq(&key, Eq, 0));
}
set_update_expire_operation(&key, &file_info, &req.expire_at, &mut if_then)?;
set_update_expire_operation(&key, &file_info, req.ttl, &mut if_then)?;
}
Ok((condition, if_then))
}

fn build_upsert_table_deduplicated_label(deduplicated_label: String) -> TxnOp {
let expire_at = Some(SeqV::<()>::now_ms() / 1000 + 24 * 60 * 60);
TxnOp::put_with_expire(deduplicated_label, 1_i8.to_le_bytes().to_vec(), expire_at)
TxnOp::put_with_ttl(
deduplicated_label,
1_i8.to_le_bytes().to_vec(),
Some(86400 * 1000),
)
}

fn set_update_expire_operation(
key: &TableCopiedFileNameIdent,
file_info: &TableCopiedFileInfo,
expire_at_opt: &Option<u64>,
ttl: Option<std::time::Duration>,
then_branch: &mut Vec<TxnOp>,
) -> Result<(), KVAppError> {
match expire_at_opt {
Some(expire_at) => {
then_branch.push(txn_op_put_with_expire(
key,
match ttl {
Some(ttl) => {
then_branch.push(TxnOp::put_with_ttl(
key.to_string_key(),
serialize_struct(file_info)?,
*expire_at,
Some(ttl.as_millis() as u64),
));
}
None => {
Expand Down
23 changes: 12 additions & 11 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2663,7 +2663,7 @@ impl SchemaApiTestSuite {

let upsert_source_table = UpsertTableCopiedFileReq {
file_info,
expire_at: None,
ttl: None,
fail_if_duplicated: true,
};

Expand Down Expand Up @@ -2713,7 +2713,7 @@ impl SchemaApiTestSuite {

let upsert_source_table = UpsertTableCopiedFileReq {
file_info,
expire_at: None,
ttl: None,
fail_if_duplicated: true,
};
let req = UpdateTableMetaReq {
Expand Down Expand Up @@ -2762,7 +2762,7 @@ impl SchemaApiTestSuite {

let upsert_source_table = UpsertTableCopiedFileReq {
file_info,
expire_at: None,
ttl: None,
fail_if_duplicated: true,
};
let req = UpdateTableMetaReq {
Expand Down Expand Up @@ -3614,7 +3614,7 @@ impl SchemaApiTestSuite {

let copied_file_req = UpsertTableCopiedFileReq {
file_info: file_info.clone(),
expire_at: Some((Utc::now().timestamp() + 86400) as u64),
ttl: Some(std::time::Duration::from_secs(86400)),
fail_if_duplicated: true,
};

Expand Down Expand Up @@ -3778,7 +3778,7 @@ impl SchemaApiTestSuite {

let copied_file_req = UpsertTableCopiedFileReq {
file_info: file_info.clone(),
expire_at: Some((Utc::now().timestamp() + 86400) as u64),
ttl: Some(std::time::Duration::from_secs(86400)),
fail_if_duplicated: true,
};

Expand Down Expand Up @@ -5733,7 +5733,7 @@ impl SchemaApiTestSuite {

let copied_file_req = UpsertTableCopiedFileReq {
file_info: file_info.clone(),
expire_at: Some((Utc::now().timestamp() + 86400) as u64),
ttl: Some(std::time::Duration::from_secs(86400)),
fail_if_duplicated: true,
};

Expand Down Expand Up @@ -5782,7 +5782,8 @@ impl SchemaApiTestSuite {

let copied_file_req = UpsertTableCopiedFileReq {
file_info: file_info.clone(),
expire_at: Some((Utc::now().timestamp() - 86400) as u64),
// Make it expire at once.
ttl: Some(std::time::Duration::from_secs(0)),
fail_if_duplicated: true,
};

Expand Down Expand Up @@ -7208,7 +7209,7 @@ impl SchemaApiTestSuite {

let copied_file_req = UpsertTableCopiedFileReq {
file_info: file_info.clone(),
expire_at: Some((Utc::now().timestamp() + 86400) as u64),
ttl: Some(std::time::Duration::from_secs(86400)),
fail_if_duplicated: true,
};

Expand Down Expand Up @@ -7266,7 +7267,7 @@ impl SchemaApiTestSuite {

let copied_file_req = UpsertTableCopiedFileReq {
file_info: file_info.clone(),
expire_at: Some((Utc::now().timestamp() + 86400) as u64),
ttl: Some(std::time::Duration::from_secs(86400)),
fail_if_duplicated: true,
};

Expand Down Expand Up @@ -7321,7 +7322,7 @@ impl SchemaApiTestSuite {

let copied_file_req = UpsertTableCopiedFileReq {
file_info: file_info.clone(),
expire_at: Some((Utc::now().timestamp() + 86400) as u64),
ttl: Some(std::time::Duration::from_secs(86400)),
fail_if_duplicated: false,
};

Expand Down Expand Up @@ -7680,7 +7681,7 @@ where MT: SchemaApi + kvapi::AsKVApi<Error = MetaError>

let copied_file_req = UpsertTableCopiedFileReq {
file_info: file_infos.clone(),
expire_at: Some((Utc::now().timestamp() + 86400) as u64),
ttl: Some(std::time::Duration::from_secs(86400)),
fail_if_duplicated: true,
};

Expand Down
5 changes: 0 additions & 5 deletions src/meta/api/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,6 @@ pub fn txn_op_get(key: &impl kvapi::Key) -> TxnOp {
TxnOp::get(key.to_string_key())
}

// TODO: replace it with common_meta_types::with::With
pub fn txn_op_put_with_expire(key: &impl kvapi::Key, value: Vec<u8>, expire_at: u64) -> TxnOp {
TxnOp::put_with_expire(key.to_string_key(), value, Some(expire_at))
}

/// Build a txn operation that deletes a record.
pub fn txn_op_del(key: &impl kvapi::Key) -> TxnOp {
TxnOp::delete(key.to_string_key())
Expand Down
4 changes: 3 additions & 1 deletion src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::fmt::Display;
use std::fmt::Formatter;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;

use anyerror::func_name;
use chrono::DateTime;
Expand Down Expand Up @@ -983,7 +984,8 @@ pub struct GetTableCopiedFileReply {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct UpsertTableCopiedFileReq {
pub file_info: BTreeMap<String, TableCopiedFileInfo>,
pub expire_at: Option<u64>,
/// If not None, specifies the time-to-live for the keys.
pub ttl: Option<Duration>,
pub fail_if_duplicated: bool,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;

use chrono::Utc;
use databend_common_catalog::table::AppendMode;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::TableContext;
Expand Down Expand Up @@ -231,10 +231,9 @@ impl PipelineBuilder {
None
} else {
debug!("upsert_copied_files_info: {:?}", copied_file_tree);
let expire_at = expire_hours * 60 * 60 + Utc::now().timestamp() as u64;
let req = UpsertTableCopiedFileReq {
file_info: copied_file_tree,
expire_at: Some(expire_at),
ttl: Some(Duration::from_hours(expire_hours)),
fail_if_duplicated: !force,
};
Some(req)
Expand Down

0 comments on commit 77248da

Please sign in to comment.