Skip to content

Commit

Permalink
add update_procedure
Browse files Browse the repository at this point in the history
  • Loading branch information
TCeason committed Sep 25, 2024
1 parent f4b3ec6 commit 7a9f382
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 36 deletions.
51 changes: 22 additions & 29 deletions src/query/management/src/procedure/procedure_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use databend_common_meta_app::principal::procedure::ProcedureInfo;
use databend_common_meta_app::principal::procedure_id_ident::ProcedureIdIdent;
use databend_common_meta_app::principal::CreateProcedureReply;
use databend_common_meta_app::principal::CreateProcedureReq;
use databend_common_meta_app::principal::DropProcedureReq;
use databend_common_meta_app::principal::GetProcedureReply;
use databend_common_meta_app::principal::GetProcedureReq;
use databend_common_meta_app::principal::ListProcedureReq;
Expand All @@ -31,7 +30,6 @@ use databend_common_meta_app::principal::ProcedureIdToNameIdent;
use databend_common_meta_app::principal::ProcedureIdentity;
use databend_common_meta_app::principal::ProcedureMeta;
use databend_common_meta_app::principal::ProcedureNameIdent;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::KeyWithTenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
Expand Down Expand Up @@ -74,49 +72,44 @@ impl ProcedureMgr {

match create_res {
Ok(id) => Ok(CreateProcedureReply { procedure_id: *id }),
Err(existent) => match req.create_option {
CreateOption::Create => {
Err(AppError::from(name_ident.exist_error(func_name!())).into())
}
CreateOption::CreateIfNotExists => Ok(CreateProcedureReply {
procedure_id: *existent.data,
}),
CreateOption::CreateOrReplace => {
let res = self
.kv_api
.update_id_value(name_ident, meta.clone())
.await?;

if let Some((id, _meta)) = res {
Ok(CreateProcedureReply { procedure_id: *id })
} else {
Err(AppError::from(name_ident.unknown_error(func_name!())).into())
}
}
},
Err(_) => Err(AppError::from(name_ident.exist_error(func_name!())).into()),
}
}

/// Drop the tenant's PROCEDURE by name, return the dropped one or None if nothing is dropped.
#[async_backtrace::framed]
pub async fn drop_procedure(
&self,
req: DropProcedureReq,
name_ident: &ProcedureNameIdent,
) -> Result<Option<(SeqV<ProcedureId>, SeqV<ProcedureMeta>)>, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());
let name_ident = req.name_ident;
debug!(name_ident :? =(name_ident); "SchemaApi: {}", func_name!());
let dropped = self
.kv_api
.remove_id_value(&name_ident, |id| {
.remove_id_value(name_ident, |id| {
vec![ProcedureIdToNameIdent::new_generic(name_ident.tenant(), id).to_string_key()]
})
.await?;
if dropped.is_none() && !req.if_exists {
return Err(AppError::from(name_ident.unknown_error("drop procedure")).into());
}
Ok(dropped)
}

#[fastrace::trace]
#[async_backtrace::framed]
pub async fn update_procedure(
&self,
procedure_ident: &ProcedureNameIdent,
meta: ProcedureMeta,
) -> Result<ProcedureId, KVAppError> {
debug!(procedure_ident :? = (&procedure_ident), meta :? = (meta); "SchemaApi: {}", func_name!());

let res = self.kv_api.update_id_value(procedure_ident, meta).await?;

if let Some((id, _meta)) = res {
Ok(id)
} else {
Err(AppError::from(procedure_ident.unknown_error(func_name!())).into())
}
}

#[fastrace::trace]
pub async fn get_procedure(
&self,
Expand Down
26 changes: 23 additions & 3 deletions src/query/service/src/interpreters/interpreter_procedure_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

use std::sync::Arc;

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::principal::CreateProcedureReq;
use databend_common_meta_app::schema::CreateOption;
use databend_common_sql::plans::CreateProcedurePlan;
use databend_common_users::UserApiProvider;
use log::debug;
Expand Down Expand Up @@ -55,10 +57,28 @@ impl Interpreter for CreateProcedureInterpreter {
let tenant = self.plan.tenant.clone();

let create_procedure_req: CreateProcedureReq = self.plan.clone().into();
let _ = UserApiProvider::instance()
.add_procedure(&tenant, create_procedure_req)
.await?;

let reply = UserApiProvider::instance()
.add_procedure(&tenant, create_procedure_req)
.await;
if let Err(e) = reply {
if e.code() == ErrorCode::PROCEDURE_ALREADY_EXISTS {
return match self.plan.create_option {
CreateOption::Create => Err(ErrorCode::ProcedureAlreadyExists(format!(
"Procedure '{}' already exists",
self.plan.name,
))),
CreateOption::CreateIfNotExists => Ok(PipelineBuildResult::create()),
CreateOption::CreateOrReplace => {
let _reply = UserApiProvider::instance()
.update_procedure(&tenant, &self.plan.name, self.plan.meta.clone())
.await?;
Ok(PipelineBuildResult::create())
}
};
}
return Err(e);
}
Ok(PipelineBuildResult::create())
}
}
40 changes: 36 additions & 4 deletions src/query/users/src/user_procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::app_error::AppError;
use databend_common_meta_app::principal::CreateProcedureReply;
use databend_common_meta_app::principal::CreateProcedureReq;
use databend_common_meta_app::principal::DropProcedureReq;
use databend_common_meta_app::principal::GetProcedureReply;
use databend_common_meta_app::principal::GetProcedureReq;
use databend_common_meta_app::principal::ProcedureMeta;
use databend_common_meta_app::principal::ProcedureNameIdent;
use databend_common_meta_app::tenant::Tenant;

use crate::UserApiProvider;
Expand All @@ -26,10 +30,15 @@ use crate::UserApiProvider;
impl UserApiProvider {
// Add a new Procedure.
#[async_backtrace::framed]
pub async fn add_procedure(&self, tenant: &Tenant, req: CreateProcedureReq) -> Result<()> {
pub async fn add_procedure(
&self,
tenant: &Tenant,
req: CreateProcedureReq,
) -> Result<CreateProcedureReply> {
let procedure_api = self.procedure_api(tenant);
let _ = procedure_api.create_procedure(req).await?;
Ok(())
let replay = procedure_api.create_procedure(req).await?;

Ok(replay)
}

#[async_backtrace::framed]
Expand All @@ -49,7 +58,30 @@ impl UserApiProvider {
// Drop a Procedure by name.
#[async_backtrace::framed]
pub async fn drop_procedure(&self, tenant: &Tenant, req: DropProcedureReq) -> Result<()> {
let _ = self.procedure_api(tenant).drop_procedure(req).await?;
let dropped = self
.procedure_api(tenant)
.drop_procedure(&req.name_ident)
.await?;
if dropped.is_none() && !req.if_exists {
return Err(ErrorCode::UnknownProcedure(format!(
"Unknown procedure '{}' while drop procedure",
req.name_ident
)));
}
Ok(())
}

// Update a Procedure by name.
#[async_backtrace::framed]
pub async fn update_procedure(
&self,
tenant: &Tenant,
name_ident: &ProcedureNameIdent,
meta: ProcedureMeta,
) -> Result<()> {
self.procedure_api(tenant)
.update_procedure(name_ident, meta)
.await?;
Ok(())
}
}

0 comments on commit 7a9f382

Please sign in to comment.