Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(cluster): add cluster quota into license #15865

Draft
wants to merge 27 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
46b5561
refactor(cluster): add cluster quota into license
zhang2014 Jun 23, 2024
ec39aac
refactor(cluster): add cluster quota into license
zhang2014 Jun 23, 2024
6a0f59c
refactor(cluster): add cluster quota into license
zhang2014 Jun 23, 2024
fe9db63
Merge branch 'main' into refactor/cluster
zhang2014 Jun 23, 2024
74a34e6
refactor(cluster): add cluster quota into license
zhang2014 Jun 23, 2024
c695958
refactor(cluster): add cluster quota into license
zhang2014 Jun 23, 2024
45f4691
refactor(cluster): add cluster quota into license
zhang2014 Jun 23, 2024
3c1313f
refactor(cluster): add cluster quota into license
zhang2014 Jun 25, 2024
f3d001c
Merge branch 'main' into refactor/cluster
zhang2014 Jun 27, 2024
0d9ea68
Merge branch 'main' into refactor/cluster
zhang2014 Jun 28, 2024
24383ea
Merge branch 'main' into refactor/cluster
zhang2014 Jun 29, 2024
6ef71bd
Merge branch 'main' into refactor/cluster
zhang2014 Jul 3, 2024
cec1032
Merge branch 'main' into refactor/cluster
zhang2014 Jul 3, 2024
af17ecf
Merge branch 'main' into refactor/cluster
zhang2014 Jul 5, 2024
2144852
Merge branch 'main' into refactor/cluster
zhang2014 Jul 8, 2024
aca9776
refactor(cluster): add cluster quota into license
zhang2014 Jul 8, 2024
fb93421
refactor(cluster): add cluster quota into license
zhang2014 Jul 8, 2024
5ccfb7f
Merge branch 'main' into refactor/cluster
zhang2014 Jul 9, 2024
5228557
refactor(cluster): add cluster quota into license
zhang2014 Jul 11, 2024
d7e003b
Merge branch 'main' into refactor/cluster
zhang2014 Jul 11, 2024
fcd4de0
Merge branch 'main' into refactor/cluster
zhang2014 Jul 13, 2024
6a4059e
Merge branch 'main' into refactor/cluster
zhang2014 Jul 14, 2024
d80ff4d
Merge branch 'main' into refactor/cluster
zhang2014 Jul 15, 2024
d3d54a2
Merge branch 'main' into refactor/cluster
zhang2014 Jul 15, 2024
38bf9ba
Merge branch 'main' into refactor/cluster
zhang2014 Sep 22, 2024
ab657d3
Merge branch 'refactor/cluster' of github.com:zhang2014/datafuse into…
zhang2014 Sep 22, 2024
b8cf5a6
Merge branch 'main' into refactor/cluster
zhang2014 Sep 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 24 additions & 24 deletions src/binaries/query/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,30 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {

info!("Databend Query start with config: {:?}", conf);

// Cluster register.
{
ClusterDiscovery::instance()
.register_to_metastore()
.await
.with_context(make_error)?;
info!(
"Databend query has been registered:{:?} to metasrv:{:?}.",
conf.query.cluster_id, conf.meta.endpoints
);
}

// RPC API service.
{
let address = conf.query.flight_api_address.clone();
let mut srv = FlightService::create(conf.clone()).with_context(make_error)?;
let listening = srv
.start(address.parse().with_context(make_error)?)
.await
.with_context(make_error)?;
shutdown_handle.add_service("RPCService", srv);
info!("Listening for RPC API (interserver): {}", listening);
}

// MySQL handler.
{
let hostname = conf.query.mysql_handler_host.clone();
Expand Down Expand Up @@ -229,30 +253,6 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {
info!("Listening for FlightSQL API: {}", listening);
}

// RPC API service.
{
let address = conf.query.flight_api_address.clone();
let mut srv = FlightService::create(conf.clone()).with_context(make_error)?;
let listening = srv
.start(address.parse().with_context(make_error)?)
.await
.with_context(make_error)?;
shutdown_handle.add_service("RPCService", srv);
info!("Listening for RPC API (interserver): {}", listening);
}

// Cluster register.
{
ClusterDiscovery::instance()
.register_to_metastore(conf)
.await
.with_context(make_error)?;
info!(
"Databend query has been registered:{:?} to metasrv:{:?}.",
conf.query.cluster_id, conf.meta.endpoints
);
}

// Print information to users.
println!("Databend Query");

Expand Down
1 change: 1 addition & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ build_exceptions! {
/// For example: license key is expired
LicenseKeyInvalid(1402),
EnterpriseFeatureNotEnable(1403),
LicenseKeyExpired(1404),

BackgroundJobAlreadyExists(1501),
UnknownBackgroundJob(1502),
Expand Down
223 changes: 209 additions & 14 deletions src/common/license/src/license.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::fmt;

use databend_common_base::display::display_option::DisplayOptionExt;
use databend_common_base::display::display_slice::DisplaySliceExt;
use databend_common_exception::ErrorCode;
use serde::Deserialize;
use serde::Serialize;

Expand All @@ -25,6 +26,42 @@ pub struct ComputeQuota {
memory_usage: Option<usize>,
}

#[derive(Debug, Clone, Eq, Ord, PartialOrd, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct ClusterQuota {
pub(crate) max_clusters: Option<usize>,
pub(crate) max_nodes_per_cluster: Option<usize>,
}

impl ClusterQuota {
pub fn un_limit() -> ClusterQuota {
ClusterQuota {
max_clusters: None,
max_nodes_per_cluster: None,
}
}

pub fn limit_clusters(max_clusters: usize) -> ClusterQuota {
ClusterQuota {
max_nodes_per_cluster: None,
max_clusters: Some(max_clusters),
}
}

pub fn limit_nodes(nodes: usize) -> ClusterQuota {
ClusterQuota {
max_clusters: None,
max_nodes_per_cluster: Some(nodes),
}
}

pub fn limit_full(max_clusters: usize, nodes: usize) -> ClusterQuota {
ClusterQuota {
max_clusters: Some(max_clusters),
max_nodes_per_cluster: Some(nodes),
}
}
}

#[derive(Debug, Clone, Eq, Ord, PartialOrd, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct StorageQuota {
pub storage_usage: Option<usize>,
Expand Down Expand Up @@ -70,6 +107,8 @@ pub enum Feature {
ComputeQuota(ComputeQuota),
#[serde(alias = "storage_quota", alias = "STORAGE_QUOTA")]
StorageQuota(StorageQuota),
#[serde(alias = "cluster_quota", alias = "CLUSTER_QUOTA")]
ClusterQuota(ClusterQuota),
#[serde(alias = "amend_table", alias = "AMEND_TABLE")]
AmendTable,
#[serde(other)]
Expand Down Expand Up @@ -102,19 +141,34 @@ impl fmt::Display for Feature {

write!(f, ", memory_usage: ")?;
match v.memory_usage {
None => write!(f, "unlimited,")?,
Some(memory_usage) => write!(f, "{}", memory_usage)?,
}
None => write!(f, "memory_usage: unlimited,")?,
Some(memory_usage) => write!(f, "memory_usage: {}", memory_usage)?,
};
write!(f, ")")
}
Feature::StorageQuota(v) => {
write!(f, "storage_quota(")?;

write!(f, "storage_usage: ")?;
match v.storage_usage {
None => write!(f, "unlimited,")?,
Some(storage_usage) => write!(f, "{}", storage_usage)?,
}
None => write!(f, "storage_usage: unlimited,")?,
Some(storage_usage) => write!(f, "storage_usage: {}", storage_usage)?,
};

write!(f, ")")
}
Feature::ClusterQuota(v) => {
write!(f, "cluster_quota(")?;

match &v.max_clusters {
None => write!(f, "max_clusters: unlimited,")?,
Some(v) => write!(f, "max_clusters: {}", v)?,
};

match v.max_nodes_per_cluster {
None => write!(f, "max_nodes_per_cluster: unlimited,")?,
Some(v) => write!(f, "max_nodes_per_cluster: {}", v)?,
};
write!(f, ")")
}
Feature::AmendTable => write!(f, "amend_table"),
Expand All @@ -124,31 +178,75 @@ impl fmt::Display for Feature {
}

impl Feature {
pub fn verify(&self, feature: &Feature) -> bool {
pub fn verify_default(&self, message: impl Into<String>) -> Result<(), ErrorCode> {
match self {
Feature::ClusterQuota(cluster_quote) => {
if matches!(cluster_quote.max_clusters, Some(x) if x > 1) {
return Err(ErrorCode::LicenseKeyInvalid(
"No license found. The default configuration of Databend Community Edition only supports 1 cluster. To use more clusters, please consider upgrading to Databend Enterprise Edition. Learn more at https://docs.databend.com/guides/overview/editions/dee/",
));
}

if matches!(cluster_quote.max_nodes_per_cluster, Some(x) if x > 1) {
return Err(ErrorCode::LicenseKeyInvalid(
"No license found. The default configuration of Databend Community Edition only supports up to 1 nodes per cluster. To use more nodes per cluster, please consider upgrading to Databend Enterprise Edition. Learn more at https://docs.databend.com/guides/overview/editions/dee/",
));
}

Ok(())
}
_ => Err(ErrorCode::LicenseKeyInvalid(message.into())),
}
}

pub fn verify(&self, feature: &Feature) -> Result<bool, ErrorCode> {
match (self, feature) {
(Feature::ComputeQuota(c), Feature::ComputeQuota(v)) => {
if let Some(thread_num) = c.threads_num {
if thread_num <= v.threads_num.unwrap_or(usize::MAX) {
return false;
return Ok(false);
}
}

if let Some(max_memory_usage) = c.memory_usage {
if max_memory_usage <= v.memory_usage.unwrap_or(usize::MAX) {
return false;
return Ok(false);
}
}

true
Ok(true)
}
(Feature::StorageQuota(c), Feature::StorageQuota(v)) => {
if let Some(max_storage_usage) = c.storage_usage {
if max_storage_usage <= v.storage_usage.unwrap_or(usize::MAX) {
return false;
return Ok(false);
}
}

Ok(true)
}
(Feature::ClusterQuota(c), Feature::ClusterQuota(v)) => {
if let Some(max_clusters) = c.max_clusters {
if max_clusters < v.max_clusters.unwrap_or(usize::MAX) {
return Err(ErrorCode::LicenseKeyInvalid(format!(
"The number of clusters exceeds the quota specified in the Databend Enterprise Edition license. Maximum allowed: {}, Requested: {}. Please contact Databend to review your licensing options. Learn more at https://docs.databend.com/guides/overview/editions/dee/",
max_clusters,
v.max_clusters.unwrap_or(usize::MAX)
)));
}
}

true
if let Some(max_nodes_per_cluster) = c.max_nodes_per_cluster {
if max_nodes_per_cluster < v.max_nodes_per_cluster.unwrap_or(usize::MAX) {
return Err(ErrorCode::LicenseKeyInvalid(format!(
"The number of nodes per cluster exceeds the quota specified in the Databend Enterprise Edition license. Maximum allowed: {}, Requested: {}. Please contact Databend to review your licensing options. Learn more at https://docs.databend.com/guides/overview/editions/dee/",
max_nodes_per_cluster,
v.max_nodes_per_cluster.unwrap_or(usize::MAX)
)));
}
}

Ok(true)
}
(Feature::Test, Feature::Test)
| (Feature::AggregateIndex, Feature::AggregateIndex)
Expand All @@ -161,8 +259,8 @@ impl Feature {
| (Feature::InvertedIndex, Feature::InvertedIndex)
| (Feature::VirtualColumn, Feature::VirtualColumn)
| (Feature::AttacheTable, Feature::AttacheTable)
| (Feature::StorageEncryption, Feature::StorageEncryption) => true,
(_, _) => false,
| (Feature::StorageEncryption, Feature::StorageEncryption) => Ok(true),
(_, _) => Ok(false),
}
}
}
Expand Down Expand Up @@ -324,6 +422,15 @@ mod tests {
serde_json::from_str::<Feature>("{\"StorageQuota\":{\"storage_usage\":1}}").unwrap()
);

assert_eq!(
Feature::ClusterQuota(ClusterQuota {
max_clusters: None,
max_nodes_per_cluster: Some(1),
}),
serde_json::from_str::<Feature>("{\"ClusterQuota\":{\"max_nodes_per_cluster\":1}}")
.unwrap()
);

assert_eq!(
Feature::AmendTable,
serde_json::from_str::<Feature>("\"amend_table\"").unwrap()
Expand All @@ -336,6 +443,94 @@ mod tests {
}

#[test]
fn test_cluster_quota_verify_default() {
assert!(
Feature::ClusterQuota(ClusterQuota::limit_clusters(1))
.verify_default("")
.is_ok()
);
assert!(
Feature::ClusterQuota(ClusterQuota::limit_nodes(1))
.verify_default("")
.is_ok()
);
assert!(
Feature::ClusterQuota(ClusterQuota::limit_nodes(2))
.verify_default("")
.is_err()
);

for nodes in 0..2 {
assert!(
Feature::ClusterQuota(ClusterQuota::limit_full(1, nodes))
.verify_default("")
.is_ok()
);
}

assert!(
Feature::ClusterQuota(ClusterQuota::limit_clusters(2))
.verify_default("")
.is_err()
);
assert!(
Feature::ClusterQuota(ClusterQuota::limit_nodes(4))
.verify_default("")
.is_err()
);
assert!(
Feature::ClusterQuota(ClusterQuota::limit_full(2, 1))
.verify_default("")
.is_err()
);
assert!(
Feature::ClusterQuota(ClusterQuota::limit_full(1, 4))
.verify_default("")
.is_err()
);
}

#[test]
fn test_cluster_quota_verify() -> Result<(), ErrorCode> {
let unlimit_feature = Feature::ClusterQuota(ClusterQuota::un_limit());

for cluster_num in 0..1000 {
for node_num in 0..1000 {
let feature =
Feature::ClusterQuota(ClusterQuota::limit_full(cluster_num, node_num));
assert!(unlimit_feature.verify(&feature)?);
}
}

let unlimit_cluster_feature = Feature::ClusterQuota(ClusterQuota::limit_nodes(1));

for cluster_num in 0..1000 {
let feature = Feature::ClusterQuota(ClusterQuota::limit_full(cluster_num, 1));
assert!(unlimit_cluster_feature.verify(&feature)?);
let feature = Feature::ClusterQuota(ClusterQuota::limit_full(cluster_num, 2));
assert!(unlimit_cluster_feature.verify(&feature).is_err());
}

let unlimit_nodes_feature = Feature::ClusterQuota(ClusterQuota::limit_clusters(1));

for nodes_num in 0..1000 {
let feature = Feature::ClusterQuota(ClusterQuota::limit_full(1, nodes_num));
assert!(unlimit_nodes_feature.verify(&feature)?);
let feature = Feature::ClusterQuota(ClusterQuota::limit_full(2, nodes_num));
assert!(unlimit_nodes_feature.verify(&feature).is_err());
}

let limit_full = Feature::ClusterQuota(ClusterQuota::limit_full(1, 1));
let feature = Feature::ClusterQuota(ClusterQuota::limit_full(1, 1));
assert!(limit_full.verify(&feature)?);
let feature = Feature::ClusterQuota(ClusterQuota::limit_full(2, 1));
assert!(limit_full.verify(&feature).is_err());
let feature = Feature::ClusterQuota(ClusterQuota::limit_full(1, 2));
assert!(limit_full.verify(&feature).is_err());

Ok(())
}

fn test_display_license_info() {
let license_info = LicenseInfo {
r#type: Some("enterprise".to_string()),
Expand Down
7 changes: 3 additions & 4 deletions src/common/license/src/license_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,9 @@ impl LicenseManager for OssLicenseManager {
GlobalInstance::get()
}

fn check_enterprise_enabled(&self, _license_key: String, _feature: Feature) -> Result<()> {
Err(ErrorCode::LicenseKeyInvalid(
"Need Commercial License".to_string(),
))
fn check_enterprise_enabled(&self, _license_key: String, feature: Feature) -> Result<()> {
// oss ignore license key.
feature.verify_default("Need Commercial License".to_string())
}

fn parse_license(&self, _raw: &str) -> Result<JWTClaims<LicenseInfo>> {
Expand Down
Loading
Loading