diff --git a/src/query/management/src/procedure/procedure_mgr.rs b/src/query/management/src/procedure/procedure_mgr.rs index 927df50073ba..bb3e9ff6ff38 100644 --- a/src/query/management/src/procedure/procedure_mgr.rs +++ b/src/query/management/src/procedure/procedure_mgr.rs @@ -22,7 +22,6 @@ use databend_common_meta_app::principal::procedure::ProcedureInfo; use databend_common_meta_app::principal::procedure_id_ident::ProcedureIdIdent; use databend_common_meta_app::principal::CreateProcedureReply; use databend_common_meta_app::principal::CreateProcedureReq; -use databend_common_meta_app::principal::DropProcedureReq; use databend_common_meta_app::principal::GetProcedureReply; use databend_common_meta_app::principal::GetProcedureReq; use databend_common_meta_app::principal::ListProcedureReq; @@ -31,7 +30,6 @@ use databend_common_meta_app::principal::ProcedureIdToNameIdent; use databend_common_meta_app::principal::ProcedureIdentity; use databend_common_meta_app::principal::ProcedureMeta; use databend_common_meta_app::principal::ProcedureNameIdent; -use databend_common_meta_app::schema::CreateOption; use databend_common_meta_app::KeyWithTenant; use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::DirName; @@ -74,26 +72,7 @@ impl ProcedureMgr { match create_res { Ok(id) => Ok(CreateProcedureReply { procedure_id: *id }), - Err(existent) => match req.create_option { - CreateOption::Create => { - Err(AppError::from(name_ident.exist_error(func_name!())).into()) - } - CreateOption::CreateIfNotExists => Ok(CreateProcedureReply { - procedure_id: *existent.data, - }), - CreateOption::CreateOrReplace => { - let res = self - .kv_api - .update_id_value(name_ident, meta.clone()) - .await?; - - if let Some((id, _meta)) = res { - Ok(CreateProcedureReply { procedure_id: *id }) - } else { - Err(AppError::from(name_ident.unknown_error(func_name!())).into()) - } - } - }, + Err(_) => Err(AppError::from(name_ident.exist_error(func_name!())).into()), } } @@ -101,22 +80,36 @@ impl ProcedureMgr { #[async_backtrace::framed] pub async fn drop_procedure( &self, - req: DropProcedureReq, + name_ident: &ProcedureNameIdent, ) -> Result, SeqV)>, KVAppError> { - debug!(req :? =(&req); "SchemaApi: {}", func_name!()); - let name_ident = req.name_ident; + debug!(name_ident :? =(name_ident); "SchemaApi: {}", func_name!()); let dropped = self .kv_api - .remove_id_value(&name_ident, |id| { + .remove_id_value(name_ident, |id| { vec![ProcedureIdToNameIdent::new_generic(name_ident.tenant(), id).to_string_key()] }) .await?; - if dropped.is_none() && !req.if_exists { - return Err(AppError::from(name_ident.unknown_error("drop procedure")).into()); - } Ok(dropped) } + #[fastrace::trace] + #[async_backtrace::framed] + pub async fn update_procedure( + &self, + procedure_ident: &ProcedureNameIdent, + meta: ProcedureMeta, + ) -> Result { + debug!(procedure_ident :? = (&procedure_ident), meta :? = (meta); "SchemaApi: {}", func_name!()); + + let res = self.kv_api.update_id_value(procedure_ident, meta).await?; + + if let Some((id, _meta)) = res { + Ok(id) + } else { + Err(AppError::from(procedure_ident.unknown_error(func_name!())).into()) + } + } + #[fastrace::trace] pub async fn get_procedure( &self, diff --git a/src/query/service/src/interpreters/interpreter_procedure_create.rs b/src/query/service/src/interpreters/interpreter_procedure_create.rs index 31f7a7022810..1119e75baf5a 100644 --- a/src/query/service/src/interpreters/interpreter_procedure_create.rs +++ b/src/query/service/src/interpreters/interpreter_procedure_create.rs @@ -14,8 +14,10 @@ use std::sync::Arc; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::principal::CreateProcedureReq; +use databend_common_meta_app::schema::CreateOption; use databend_common_sql::plans::CreateProcedurePlan; use databend_common_users::UserApiProvider; use log::debug; @@ -55,10 +57,28 @@ impl Interpreter for CreateProcedureInterpreter { let tenant = self.plan.tenant.clone(); let create_procedure_req: CreateProcedureReq = self.plan.clone().into(); - let _ = UserApiProvider::instance() - .add_procedure(&tenant, create_procedure_req) - .await?; + let reply = UserApiProvider::instance() + .add_procedure(&tenant, create_procedure_req) + .await; + if let Err(e) = reply { + if e.code() == ErrorCode::PROCEDURE_ALREADY_EXISTS { + return match self.plan.create_option { + CreateOption::Create => Err(ErrorCode::ProcedureAlreadyExists(format!( + "Procedure '{}' already exists", + self.plan.name, + ))), + CreateOption::CreateIfNotExists => Ok(PipelineBuildResult::create()), + CreateOption::CreateOrReplace => { + let _reply = UserApiProvider::instance() + .update_procedure(&tenant, &self.plan.name, self.plan.meta.clone()) + .await?; + Ok(PipelineBuildResult::create()) + } + }; + } + return Err(e); + } Ok(PipelineBuildResult::create()) } } diff --git a/src/query/users/src/user_procedure.rs b/src/query/users/src/user_procedure.rs index 3266c8364be3..1f1b510cd0ab 100644 --- a/src/query/users/src/user_procedure.rs +++ b/src/query/users/src/user_procedure.rs @@ -12,12 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::app_error::AppError; +use databend_common_meta_app::principal::CreateProcedureReply; use databend_common_meta_app::principal::CreateProcedureReq; use databend_common_meta_app::principal::DropProcedureReq; use databend_common_meta_app::principal::GetProcedureReply; use databend_common_meta_app::principal::GetProcedureReq; +use databend_common_meta_app::principal::ProcedureMeta; +use databend_common_meta_app::principal::ProcedureNameIdent; use databend_common_meta_app::tenant::Tenant; use crate::UserApiProvider; @@ -26,10 +30,15 @@ use crate::UserApiProvider; impl UserApiProvider { // Add a new Procedure. #[async_backtrace::framed] - pub async fn add_procedure(&self, tenant: &Tenant, req: CreateProcedureReq) -> Result<()> { + pub async fn add_procedure( + &self, + tenant: &Tenant, + req: CreateProcedureReq, + ) -> Result { let procedure_api = self.procedure_api(tenant); - let _ = procedure_api.create_procedure(req).await?; - Ok(()) + let replay = procedure_api.create_procedure(req).await?; + + Ok(replay) } #[async_backtrace::framed] @@ -49,7 +58,30 @@ impl UserApiProvider { // Drop a Procedure by name. #[async_backtrace::framed] pub async fn drop_procedure(&self, tenant: &Tenant, req: DropProcedureReq) -> Result<()> { - let _ = self.procedure_api(tenant).drop_procedure(req).await?; + let dropped = self + .procedure_api(tenant) + .drop_procedure(&req.name_ident) + .await?; + if dropped.is_none() && !req.if_exists { + return Err(ErrorCode::UnknownProcedure(format!( + "Unknown procedure '{}' while drop procedure", + req.name_ident + ))); + } + Ok(()) + } + + // Update a Procedure by name. + #[async_backtrace::framed] + pub async fn update_procedure( + &self, + tenant: &Tenant, + name_ident: &ProcedureNameIdent, + meta: ProcedureMeta, + ) -> Result<()> { + self.procedure_api(tenant) + .update_procedure(name_ident, meta) + .await?; Ok(()) } }