Skip to content

Commit

Permalink
Merge branch 'main' into refactor/window_exchange
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 committed Sep 20, 2024
2 parents ccac30e + 1fac99d commit dcf4886
Show file tree
Hide file tree
Showing 59 changed files with 629 additions and 849 deletions.
32 changes: 1 addition & 31 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -180,35 +180,9 @@ jobs:
features: python-udf
category: udf

build_hdfs:
runs-on: [self-hosted, "${{ matrix.runner }}", Linux, 16c32g, aws]
needs: create_release
strategy:
fail-fast: false
matrix:
include:
- { target: x86_64-unknown-linux-gnu, runner: X64 }
- { target: aarch64-unknown-linux-gnu, runner: ARM64 }
steps:
- name: Checkout
uses: actions/checkout@v4
with:
ref: ${{ needs.create_release.outputs.sha }}
fetch-depth: 0
- name: Build Release
uses: ./.github/actions/build_linux
env:
DATABEND_RELEASE_VERSION: ${{ needs.create_release.outputs.version }}
with:
sha: ${{ github.sha }}
target: ${{ matrix.target }}
artifacts: sqllogictests,sqlsmith,metactl,meta,query
features: storage-hdfs
category: hdfs

publish:
runs-on: [self-hosted, X64, Linux, 4c8g, aws]
needs: [create_release, build_default, build_musl, build_hdfs]
needs: [create_release, build_default, build_musl]
strategy:
fail-fast: false
matrix:
Expand All @@ -221,10 +195,6 @@ jobs:
target: x86_64-unknown-linux-musl
- category: default
target: aarch64-unknown-linux-musl
- category: hdfs
target: x86_64-unknown-linux-gnu
- category: hdfs
target: aarch64-unknown-linux-gnu
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down
7 changes: 5 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions scripts/ci/deploy/config/databend-query-node-otlp-logs.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ dir = "./.databend/logs_1"

[log.query]
on = true
otlp_endpoint = "http://127.0.0.1:4317/v1/logs"
otlp_endpoint = "http://127.0.0.1:4317"
[log.query.otlp_labels]
qkey1 = "qvalue1"
qkey2 = "qvalue2"

[log.profile]
on = true
otlp_endpoint = "http://127.0.0.1:4318/v1/logs"
otlp_endpoint = "http://127.0.0.1:4318"
otlp_protocol = "http"
[log.profile.otlp_labels]
pkey1 = "pvalue1"
Expand Down
4 changes: 3 additions & 1 deletion src/common/tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ doctest = false
test = true

[dependencies]
anyhow = { workspace = true }
backtrace = { workspace = true }
chrono = { workspace = true }
color-backtrace = { version = "0.6" }
Expand All @@ -22,7 +23,7 @@ fastrace-opentelemetry = { workspace = true }
itertools = { workspace = true }
libc = "0.2.153"
log = { workspace = true }
logforth = { version = "0.11", git = "http://github.com/andylokandy/logforth", rev = "0ca61ca", features = [
logforth = { version = "0.12", features = [
'json',
'rolling_file',
'opentelemetry',
Expand All @@ -32,6 +33,7 @@ opentelemetry = { workspace = true }
opentelemetry-otlp = { workspace = true }
opentelemetry_sdk = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
strip-ansi-escapes = "0.2"
tonic = { workspace = true }

Expand Down
56 changes: 33 additions & 23 deletions src/common/tracing/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,19 @@ pub fn init_logging(
let labels = labels
.iter()
.chain(&cfg.otlp.endpoint.labels)
.map(|(k, v)| (k.clone().into(), v.clone().into()))
.chain([("category".into(), "system".into())]);
let otel = logforth::append::OpentelemetryLog::new(
.map(|(k, v)| (Cow::from(k.clone()), Cow::from(v.clone())))
.chain([(Cow::from("category"), Cow::from("system"))]);
let mut otel_builder = logforth::append::opentelemetry::OpentelemetryLogBuilder::new(
log_name,
&cfg.otlp.endpoint.endpoint,
cfg.otlp.endpoint.protocol.into(),
labels,
format!("{}/v1/logs", &cfg.otlp.endpoint.endpoint),
)
.expect("initialize opentelemetry logger");
.with_protocol(cfg.otlp.endpoint.protocol.into());
for (k, v) in labels {
otel_builder = otel_builder.add_label(k, v);
}
let otel = otel_builder
.build()
.expect("initialize opentelemetry logger");
let dispatch = Dispatch::new()
.filter(TargetFilter::level_for(
"databend::log::query",
Expand Down Expand Up @@ -290,23 +294,26 @@ pub fn init_logging(
"databend::log::query",
LevelFilter::Off,
))
.layout(get_layout(&cfg.file.format))
.append(query_log_file);
logger = logger.dispatch(dispatch);
}
if let Some(endpoint) = &cfg.query.otlp {
let labels = labels
.iter()
.chain(&endpoint.labels)
.map(|(k, v)| (k.clone().into(), v.clone().into()))
.chain([("category".into(), "query".into())]);
let otel = logforth::append::OpentelemetryLog::new(
.map(|(k, v)| (Cow::from(k.clone()), Cow::from(v.clone())))
.chain([(Cow::from("category"), Cow::from("query"))]);
let mut otel_builder = logforth::append::opentelemetry::OpentelemetryLogBuilder::new(
log_name,
&endpoint.endpoint,
endpoint.protocol.into(),
labels,
format!("{}/v1/logs", &endpoint.endpoint),
)
.expect("initialize opentelemetry logger");
.with_protocol(endpoint.protocol.into());
for (k, v) in labels {
otel_builder = otel_builder.add_label(k, v);
}
let otel = otel_builder
.build()
.expect("initialize opentelemetry logger");
let dispatch = Dispatch::new()
.filter(TargetFilter::level_for_not(
"databend::log::query",
Expand All @@ -329,23 +336,26 @@ pub fn init_logging(
"databend::log::profile",
LevelFilter::Off,
))
.layout(get_layout(&cfg.file.format))
.append(profile_log_file);
logger = logger.dispatch(dispatch);
}
if let Some(endpoint) = &cfg.profile.otlp {
let labels = labels
.iter()
.chain(&endpoint.labels)
.map(|(k, v)| (k.clone().into(), v.clone().into()))
.chain([("category".into(), "profile".into())]);
let otel = logforth::append::OpentelemetryLog::new(
.map(|(k, v)| (Cow::from(k.clone()), Cow::from(v.clone())))
.chain([(Cow::from("category"), Cow::from("profile"))]);
let mut otel_builder = logforth::append::opentelemetry::OpentelemetryLogBuilder::new(
log_name,
&endpoint.endpoint,
endpoint.protocol.into(),
labels,
format!("{}/v1/logs", &endpoint.endpoint),
)
.expect("initialize opentelemetry logger");
.with_protocol(endpoint.protocol.into());
for (k, v) in labels {
otel_builder = otel_builder.add_label(k, v);
}
let otel = otel_builder
.build()
.expect("initialize opentelemetry logger");
let dispatch = Dispatch::new()
.filter(TargetFilter::level_for_not(
"databend::log::profile",
Expand Down
93 changes: 89 additions & 4 deletions src/common/tracing/src/loggers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Arguments;
use std::path::Path;

use databend_common_base::runtime::ThreadTracker;
use log::Record;
use logforth::append::rolling_file::NonBlockingBuilder;
use logforth::append::rolling_file::RollingFileWriter;
use logforth::append::rolling_file::Rotation;
use logforth::append::RollingFile;
use logforth::layout::JsonLayout;
use logforth::layout::TextLayout;
use logforth::layout::collect_kvs;
use logforth::layout::CustomLayout;
use logforth::layout::KvDisplay;
use logforth::Layout;
use serde_json::Map;

/// Create a `BufWriter<NonBlocking>` for a rolling file logger.
pub(crate) fn new_rolling_file_appender(
Expand All @@ -41,8 +48,86 @@ pub(crate) fn new_rolling_file_appender(

pub fn get_layout(format: &str) -> Layout {
match format {
"text" => TextLayout::default().into(),
"json" => JsonLayout::default().into(),
"text" => text_layout(),
"json" => json_layout(),
_ => unimplemented!("file logging format {format} is not supported"),
}
}

fn text_layout() -> Layout {
CustomLayout::new(
|record: &Record, f: &dyn Fn(Arguments) -> anyhow::Result<()>| {
match ThreadTracker::query_id() {
None => {
f(format_args!(
"{} {:>5} {}: {}:{} {}{}",
chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
record.level(),
record.module_path().unwrap_or(""),
Path::new(record.file().unwrap_or_default())
.file_name()
.and_then(|name| name.to_str())
.unwrap_or_default(),
record.line().unwrap_or(0),
record.args(),
KvDisplay::new(record.key_values()),
))?;
}
Some(query_id) => {
f(format_args!(
"{} {} {:>5} {}: {}:{} {}{}",
query_id,
chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
record.level(),
record.module_path().unwrap_or(""),
Path::new(record.file().unwrap_or_default())
.file_name()
.and_then(|name| name.to_str())
.unwrap_or_default(),
record.line().unwrap_or(0),
record.args(),
KvDisplay::new(record.key_values()),
))?;
}
}

Ok(())
},
)
.into()
}

fn json_layout() -> Layout {
CustomLayout::new(
|record: &Record, f: &dyn Fn(Arguments) -> anyhow::Result<()>| {
let mut fields = Map::new();
fields.insert("message".to_string(), format!("{}", record.args()).into());
for (k, v) in collect_kvs(record.key_values()) {
fields.insert(k, v.into());
}

match ThreadTracker::query_id() {
None => {
f(format_args!(
r#"{{"timestamp":"{}","level":"{}","fields":{}}}"#,
chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
record.level(),
serde_json::to_string(&fields).unwrap_or_default(),
))?;
}
Some(query_id) => {
f(format_args!(
r#"{{"timestamp":"{}","level":"{}","query_id":"{}","fields":{}}}"#,
chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
record.level(),
query_id,
serde_json::to_string(&fields).unwrap_or_default(),
))?;
}
}

Ok(())
},
)
.into()
}
1 change: 1 addition & 0 deletions src/meta/api/src/name_id_value_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ mod tests {
updated_on: Default::default(),
comment: "".to_string(),
drop_on: None,
gc_in_progress: false,
};

let v = db_meta(1).to_pb()?.encode_to_vec();
Expand Down
24 changes: 23 additions & 1 deletion src/meta/app/src/schema/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,29 @@ pub struct DatabaseMeta {
pub updated_on: DateTime<Utc>,
pub comment: String,

// if used in CreateDatabaseReq, this field MUST set to None.
/// if used in CreateDatabaseReq, this field MUST set to None.
pub drop_on: Option<DateTime<Utc>>,

/// Indicates whether garbage collection is currently in progress for this dropped database.
///
/// If it is in progress, the database should not be un-dropped, because the data may be incomplete.
///
/// ```text
/// normal <----.
/// | |
/// | drop() | undrop()
/// v |
/// dropped ----'
/// |
/// | gc()
/// v
/// gc_in_progress=True
/// |
/// | purge data from meta-service
/// v
/// completed removed
/// ```
pub gc_in_progress: bool,
}

impl Default for DatabaseMeta {
Expand All @@ -83,6 +104,7 @@ impl Default for DatabaseMeta {
updated_on: Utc::now(),
comment: "".to_string(),
drop_on: None,
gc_in_progress: false,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/meta/proto-conv/src/database_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl FromToProto for mt::DatabaseMeta {
Some(drop_on) => Some(DateTime::<Utc>::from_pb(drop_on)?),
None => None,
},
gc_in_progress: p.gc_in_progress,
comment: p.comment,
};
Ok(v)
Expand All @@ -62,6 +63,7 @@ impl FromToProto for mt::DatabaseMeta {
Some(drop_on) => Some(drop_on.to_pb()?),
None => None,
},
gc_in_progress: self.gc_in_progress,
comment: self.comment.clone(),
shared_by: vec![],
from_share: None,
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
(107, "2024-08-09: Add: datatype.proto/DataType Geography type"),
(108, "2024-08-29: Add: procedure.proto: ProcedureMeta and ProcedureIdentity"),
(109, "2024-08-29: Refactor: ProcedureMeta add arg_names"),
(110, "2024-09-18: Add: database.proto: DatabaseMeta.gc_in_progress"),
// Dear developer:
// If you're gonna add a new metadata version, you'll have to add a test for it.
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)
Expand Down
Loading

0 comments on commit dcf4886

Please sign in to comment.