From 3a41621d92200ac1ecc0ab61447644593abc914f Mon Sep 17 00:00:00 2001 From: xiafeng Date: Mon, 17 Jun 2024 11:19:31 +0800 Subject: [PATCH] format fix proc end add log bug fix bug fix bug fix proc proto TaskProcLaunch ProcLaunch --- protos/Crane.proto | 58 ++- protos/PublicDefs.proto | 14 + src/CraneCtld/CranedKeeper.cpp | 54 ++- src/CraneCtld/CranedKeeper.h | 5 + src/CraneCtld/CtldGrpcServer.cpp | 32 +- src/CraneCtld/CtldGrpcServer.h | 28 +- src/CraneCtld/TaskScheduler.cpp | 61 ++- src/CraneCtld/TaskScheduler.h | 2 + src/Craned/CforedClient.cpp | 147 ++++--- src/Craned/CforedClient.h | 54 ++- src/Craned/CranedServer.cpp | 12 + src/Craned/CranedServer.h | 3 + src/Craned/TaskManager.cpp | 392 +++++++++++------- src/Craned/TaskManager.h | 79 ++-- .../PublicHeader/include/crane/PublicHeader.h | 1 + 15 files changed, 655 insertions(+), 287 deletions(-) diff --git a/protos/Crane.proto b/protos/Crane.proto index a6a92561..f473a2e6 100644 --- a/protos/Crane.proto +++ b/protos/Crane.proto @@ -81,6 +81,14 @@ message ExecuteTasksReply { repeated uint32 failed_task_id_list = 1; } +message ExecuteProcRequest{ + ProcToD proc = 1; +} + +message ExecuteProcReply{ + bool ok = 1; +} + message CreateCgroupForTasksRequest { repeated uint32 task_id_list = 1; repeated uint32 uid_list = 2; @@ -437,13 +445,16 @@ message StreamCforedRequest { string cfored_name = 1; int32 pid = 2; TaskToCtld task = 3; + // for nested task of crun + optional uint32 task_id = 4; } message TaskCompleteReq { string cfored_name = 1; uint32 task_id = 2; - TaskStatus status = 3; - InteractiveTaskType interactive_type = 4; + uint32 proc_id = 3; + TaskStatus status = 4; + InteractiveTaskType interactive_type = 5; } message GracefulExitReq { @@ -472,7 +483,12 @@ message StreamCtldReply { int32 pid = 1; bool ok = 2; uint32 task_id = 3; - string failure_reason = 4; + uint32 proc_id = 4; + string failure_reason = 5; + message NestedTaskNodes{ + repeated string craned_ids = 1; + } + optional NestedTaskNodes nodes = 6; } message TaskResAllocatedReply { @@ -485,6 +501,7 @@ message StreamCtldReply { message TaskCancelRequest { uint32 task_id = 1; + uint32 proc_id = 2; } message TaskCompletionAckReply { @@ -522,16 +539,20 @@ message StreamCrunRequest{ message TaskReq { int32 crun_pid = 1; TaskToCtld task = 2; + // for nested task of crun + optional uint32 task_id = 3; } message TaskCompleteReq { uint32 task_id = 1; - TaskStatus status = 2; + uint32 proc_id = 2; + TaskStatus status = 3; } message TaskIOForwardReq { uint32 task_id = 1; - string msg = 2; + uint32 proc_id = 2; + string msg = 3; } CrunRequestType type = 1; @@ -551,12 +572,14 @@ message StreamCforedCrunReply { TASK_COMPLETION_ACK_REPLY = 3; TASK_IO_FORWARD = 4; TASK_IO_FORWARD_READY = 5; + Proc_FORWARD_END = 6; } message TaskIdReply { bool ok = 1; uint32 task_id = 2; - string failure_reason = 3; + uint32 proc_id = 3; + string failure_reason = 4; } message TaskResAllocatedReply { @@ -580,6 +603,10 @@ message StreamCforedCrunReply { string msg = 1; } + message ProcForwardEndReply{ + bool ok = 1; + } + CforedCrunReplyType type = 1 ; oneof payload { @@ -589,13 +616,14 @@ message StreamCforedCrunReply { TaskCompletionAckReply payload_task_completion_ack_reply = 5; TaskIOForwardReadyReply payload_task_io_forward_ready_reply = 6; TaskIOForwardReply payload_task_io_forward_reply = 7; + ProcForwardEndReply payload_proc_forward_end_reply = 8; } } message StreamCforedTaskIORequest { enum CranedRequestType{ CRANED_REGISTER = 0; - CRANED_TASK_OUTPUT = 1; + CRANED_PROC_OUTPUT = 1; CRANED_UNREGISTER = 2; } @@ -603,9 +631,11 @@ message StreamCforedTaskIORequest { string craned_id = 1; } - message CranedTaskOutputReq { + message CranedProcOutputReq { uint32 task_id = 1; - string msg = 2; + uint32 proc_id = 2; + string msg = 3; + bool end = 4; } message CranedUnRegisterReq { @@ -615,7 +645,7 @@ message StreamCforedTaskIORequest { CranedRequestType type = 1; oneof payload { CranedRegisterReq payload_register_req = 2; - CranedTaskOutputReq payload_task_output_req = 3; + CranedProcOutputReq payload_proc_output_req = 3; CranedUnRegisterReq payload_unregister_req = 4; } } @@ -631,9 +661,10 @@ message StreamCforedTaskIOReply { bool ok = 1; } - message CranedTaskInputReq { + message CranedProcInputReq { uint32 task_id = 1; - string msg = 2; + uint32 proc_id = 2; + string msg = 3; } message CranedUnregisterReply { @@ -644,7 +675,7 @@ message StreamCforedTaskIOReply { oneof payload { CranedRegisterReply payload_craned_register_reply = 2; - CranedTaskInputReq payload_task_input_req = 3; + CranedProcInputReq payload_proc_input_req = 3; CranedUnregisterReply payload_craned_unregister_reply = 4; } } @@ -694,6 +725,7 @@ service CraneCtld { service Craned { /* ----------------------------------- Called from CraneCtld ---------------------------------------------------- */ rpc ExecuteTask(ExecuteTasksRequest) returns(ExecuteTasksReply); + rpc ExecuteProc(ExecuteProcRequest) returns(ExecuteProcReply); rpc CheckTaskStatus(CheckTaskStatusRequest) returns(CheckTaskStatusReply); diff --git a/protos/PublicDefs.proto b/protos/PublicDefs.proto index 3ea73d82..1ece1cc9 100644 --- a/protos/PublicDefs.proto +++ b/protos/PublicDefs.proto @@ -76,6 +76,20 @@ enum InteractiveTaskType { Crun = 1; } +message ProcToD{ + uint32 task_id = 1; + uint32 proc_id = 2; + TaskType type = 3; + bool get_user_env = 4; + map env = 5; + string cwd = 6; + oneof payload { + BatchTaskAdditionalMeta batch_meta = 7; + InteractiveTaskAdditionalMeta interactive_meta = 8; + } + +} + message TaskToCtld { /* -------- Fields that are set at the submission time. ------- */ google.protobuf.Duration time_limit = 1; diff --git a/src/CraneCtld/CranedKeeper.cpp b/src/CraneCtld/CranedKeeper.cpp index d025fc40..8b8b0069 100644 --- a/src/CraneCtld/CranedKeeper.cpp +++ b/src/CraneCtld/CranedKeeper.cpp @@ -62,6 +62,24 @@ std::vector CranedStub::ExecuteTasks( return failed_task_ids; } +CraneErr CranedStub::ExecuteProc( + const crane::grpc::ExecuteProcRequest &request) { + using crane::grpc::ExecuteProcReply; + + ClientContext context; + Status status; + ExecuteProcReply reply; + + status = m_stub_->ExecuteProc(&context,request,&reply); + if (!status.ok()) { + CRANE_DEBUG( + "ExecuteProc RPC for Node {} returned with status not ok: {}", + m_craned_id_, status.error_message()); + return CraneErr::kRpcFailure; + } + return CraneErr::kOk; +} + CraneErr CranedStub::TerminateTasks(const std::vector &task_ids) { using crane::grpc::TerminateTasksReply; using crane::grpc::TerminateTasksRequest; @@ -81,7 +99,10 @@ CraneErr CranedStub::TerminateTasks(const std::vector &task_ids) { return CraneErr::kRpcFailure; } - return CraneErr::kOk; + if (reply.ok()) + return CraneErr::kOk; + else + return CraneErr::kGenericFailure; } CraneErr CranedStub::TerminateOrphanedTask(task_id_t task_id) { @@ -295,6 +316,37 @@ crane::grpc::ExecuteTasksRequest CranedStub::NewExecuteTasksRequest( return request; } +// only for Proc launching +crane::grpc::ExecuteProcRequest CranedStub::NewExecuteProcRequest( + std::unique_ptr& task_running, + std::unique_ptr& step_submit) { + crane::grpc::ExecuteProcRequest request; + auto proc_to_d = request.mutable_proc(); + // Set type + proc_to_d->set_type(step_submit->type); + proc_to_d->set_task_id(task_running->TaskId()); + proc_to_d->mutable_env()->insert(step_submit->env.begin(), step_submit->env.end()); + + proc_to_d->set_cwd(step_submit->cwd); + proc_to_d->set_get_user_env(step_submit->get_user_env); + + if (step_submit->type == crane::grpc::Batch) { + auto &meta_in_ctld = std::get(step_submit->meta); + auto *mutable_meta = proc_to_d->mutable_batch_meta(); + mutable_meta->set_output_file_pattern(meta_in_ctld.output_file_pattern); + mutable_meta->set_error_file_pattern(meta_in_ctld.error_file_pattern); + mutable_meta->set_sh_script(meta_in_ctld.sh_script); + } else { + auto &meta_in_ctld = std::get(step_submit->meta); + auto *mutable_meta = proc_to_d->mutable_interactive_meta(); + mutable_meta->set_cfored_name(meta_in_ctld.cfored_name); + mutable_meta->set_sh_script(meta_in_ctld.sh_script); + mutable_meta->set_term_env(meta_in_ctld.term_env); + mutable_meta->set_interactive_type(meta_in_ctld.interactive_type); + } + return request; +} + CranedKeeper::CranedKeeper(uint32_t node_num) : m_cq_closed_(false) { m_pmr_pool_res_ = std::make_unique(); m_tag_sync_allocator_ = diff --git a/src/CraneCtld/CranedKeeper.h b/src/CraneCtld/CranedKeeper.h index 200de9a1..e1379479 100644 --- a/src/CraneCtld/CranedKeeper.h +++ b/src/CraneCtld/CranedKeeper.h @@ -40,8 +40,13 @@ class CranedStub { static crane::grpc::ExecuteTasksRequest NewExecuteTasksRequest( const std::vector &tasks); + static crane::grpc::ExecuteProcRequest NewExecuteProcRequest( + std::unique_ptr& task_running, + std::unique_ptr& step_submit); + std::vector ExecuteTasks( const crane::grpc::ExecuteTasksRequest &request); + CraneErr ExecuteProc(const crane::grpc::ExecuteProcRequest &request); CraneErr CreateCgroupForTasks(std::vector const &cgroup_specs); diff --git a/src/CraneCtld/CtldGrpcServer.cpp b/src/CraneCtld/CtldGrpcServer.cpp index dc2a02c2..e3c5256f 100644 --- a/src/CraneCtld/CtldGrpcServer.cpp +++ b/src/CraneCtld/CtldGrpcServer.cpp @@ -971,6 +971,23 @@ grpc::Status CraneCtldServiceImpl::CforedStream( auto const &payload = cfored_request.payload_task_req(); auto task = std::make_unique(); task->SetFieldsByTaskToCtld(payload.task()); + if (payload.has_task_id()) { + auto result = g_task_scheduler->SubmitProc( + std::move(task), payload.task_id(), payload.pid()); + const auto &[proc_id, craned_ids] = result.get(); + ok = stream_writer.WriteTaskIdReply( + payload.pid(), + result::result{payload.task_id()}, + proc_id, craned_ids); + if (!ok) { + CRANE_ERROR( + "Failed to send msg to cfored {}. Connection is broken. " + "Exiting...", + cfored_name); + state = StreamState::kCleanData; + } + break; + } auto &meta = std::get(task->meta); @@ -991,12 +1008,9 @@ grpc::Status CraneCtldServiceImpl::CforedStream( // calloc will not send TaskCompletionAckReply when task // Complete. // crun task will send TaskStatusChange from Craned, - if (meta.interactive_type == InteractiveTaskType::Crun) { - // todo: Remove this - CRANE_TRACE("Sending TaskCompletionAckReply in task_completed", - task_id); - stream_writer.WriteTaskCompletionAckReply(task_id); - } +// if (meta.interactive_type == InteractiveTaskType::Crun) { +// stream_writer.WriteTaskCompletionAckReply(task_id); +// } m_ctld_server_->m_mtx_.Lock(); // If cfored disconnected, the cfored_name should have be @@ -1020,7 +1034,7 @@ grpc::Status CraneCtldServiceImpl::CforedStream( } else { result = result::fail(submit_result.error()); } - ok = stream_writer.WriteTaskIdReply(payload.pid(), result); + ok = stream_writer.WriteTaskIdReply(payload.pid(), result, 0, {}); if (!ok) { CRANE_ERROR( @@ -1042,8 +1056,8 @@ grpc::Status CraneCtldServiceImpl::CforedStream( auto const &payload = cfored_request.payload_task_complete_req(); g_task_scheduler->TerminateRunningTask(payload.task_id()); - - if (payload.interactive_type() != InteractiveTaskType::Crun) { + ok = true; + if (payload.interactive_type() != crane::grpc::Crun) { ok = stream_writer.WriteTaskCompletionAckReply(payload.task_id()); } if (!ok) { diff --git a/src/CraneCtld/CtldGrpcServer.h b/src/CraneCtld/CtldGrpcServer.h index d3281c58..ed9d3a42 100644 --- a/src/CraneCtld/CtldGrpcServer.h +++ b/src/CraneCtld/CtldGrpcServer.h @@ -42,9 +42,10 @@ class CforedStreamWriter { crane::grpc::StreamCforedRequest> *stream) : m_stream_(stream), m_valid_(true) {} - bool WriteTaskIdReply( - pid_t calloc_pid, - result::result res) { + bool WriteTaskIdReply(pid_t calloc_pid, + result::result res, + proc_id_t proc_id, + const std::list &craned_ids) { LockGuard guard(&m_stream_mtx_); if (!m_valid_) return false; @@ -55,6 +56,10 @@ class CforedStreamWriter { task_id_reply->set_ok(true); task_id_reply->set_pid(calloc_pid); task_id_reply->set_task_id(res.value()); + task_id_reply->set_proc_id(proc_id); + if (!craned_ids.empty()) + task_id_reply->mutable_nodes()->mutable_craned_ids()->Add( + craned_ids.begin(), craned_ids.end()); } else { task_id_reply->set_ok(false); task_id_reply->set_pid(calloc_pid); @@ -64,8 +69,11 @@ class CforedStreamWriter { return m_stream_->Write(reply); } - bool WriteTaskResAllocReply(task_id_t task_id, - result::result>, std::string> res) { + bool WriteTaskResAllocReply( + task_id_t task_id, + result::result>, + std::string> + res) { LockGuard guard(&m_stream_mtx_); if (!m_valid_) return false; @@ -76,8 +84,12 @@ class CforedStreamWriter { if (res.has_value()) { task_res_alloc_reply->set_ok(true); - task_res_alloc_reply->set_allocated_craned_regex(std::move(res.value().first)); - std::ranges::for_each(res.value().second,[&task_res_alloc_reply](const auto& craned_id){task_res_alloc_reply->add_craned_ids(craned_id);}); + task_res_alloc_reply->set_allocated_craned_regex( + std::move(res.value().first)); + std::ranges::for_each(res.value().second, + [&task_res_alloc_reply](const auto &craned_id) { + task_res_alloc_reply->add_craned_ids(craned_id); + }); } else { task_res_alloc_reply->set_ok(false); task_res_alloc_reply->set_failure_reason(std::move(res.error())); @@ -89,7 +101,7 @@ class CforedStreamWriter { bool WriteTaskCompletionAckReply(task_id_t task_id) { LockGuard guard(&m_stream_mtx_); if (!m_valid_) return false; - CRANE_TRACE("Sending TaskCompletionAckReply to cfored of task id {}",task_id); + StreamCtldReply reply; reply.set_type(StreamCtldReply::TASK_COMPLETION_ACK_REPLY); diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index 1c324a84..0b168b2c 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -623,14 +623,16 @@ void TaskScheduler::ScheduleThread_() { task->executing_craned_ids.emplace_back(task->CranedIds().front()); } else { const auto& meta = std::get(task->meta); - if (meta.interactive_type == crane::grpc::Calloc) - // For calloc tasks we still need to execute a dummy empty task to - // set up a timer. - task->executing_craned_ids.emplace_back(task->CranedIds().front()); - else - // For crun tasks we need to execute tasks on all allocated nodes. - for (auto const& craned_id : task->CranedIds()) - task->executing_craned_ids.emplace_back(craned_id); + for (auto const& craned_id : task->CranedIds()) + task->executing_craned_ids.emplace_back(craned_id); +// if (meta.interactive_type == crane::grpc::Calloc) +// // For calloc tasks we still need to execute a dummy empty task to +// // set up a timer. +// task->executing_craned_ids.emplace_back(task->CranedIds().front()); +// else +// // For crun tasks we need to execute tasks on all allocated nodes. +// for (auto const& craned_id : task->CranedIds()) +// task->executing_craned_ids.emplace_back(craned_id); } } end = std::chrono::steady_clock::now(); @@ -940,6 +942,35 @@ std::future TaskScheduler::SubmitTaskAsync( return std::move(future); } +std::future>> +TaskScheduler::SubmitProc(std::unique_ptr task_submit, + task_id_t task_id, pid_t pid) { + std::promise>> promise; + std::future>> future = + promise.get_future(); + + m_running_task_map_mtx_.Lock(); + auto& task = m_running_task_map_[task_id]; + promise.set_value({pid, task->CranedIds()}); + auto request = CranedStub::NewExecuteProcRequest(task, task_submit); + request.mutable_proc()->set_proc_id(pid); + + CRANE_TRACE("Launch task #{} proc #{}", task_id, pid); + for (auto const& craned_id : task->CranedIds()) { + auto stub = g_craned_keeper->GetCranedStub(craned_id); + if (stub == nullptr || stub->Invalid()) { + continue; + } + CraneErr err = stub->ExecuteProc(request); + if (err != CraneErr::kOk) { + CRANE_ERROR("Failed to ExecuteProc proc {} for task {} on Node {}", pid, + task_id, craned_id); + } + } + m_running_task_map_mtx_.Unlock(); + return std::move(future); +} + CraneErr TaskScheduler::ChangeTaskTimeLimit(task_id_t task_id, int64_t secs) { bool found_running{false}; std::vector craned_ids; @@ -1411,6 +1442,13 @@ void TaskScheduler::CleanTaskStatusChangeQueueCb_() { task->SetStatus(new_status); } else { auto& meta = std::get(task->meta); + if (++meta.status_change_cnt < task->node_num) { + CRANE_TRACE( + "{}/{} TaskStatusChanges of Interactive task #{} were received. " + "Keep waiting...", + meta.status_change_cnt, task->node_num, task->TaskId()); + continue; + } if (meta.interactive_type == crane::grpc::Calloc) { // TaskStatusChange may indicate the time limit has been reached and // the task has been terminated. No more TerminateTask RPC should be @@ -1428,13 +1466,6 @@ void TaskScheduler::CleanTaskStatusChangeQueueCb_() { } meta.cb_task_completed(task->TaskId()); } else { // Crun - if (++meta.status_change_cnt < task->node_num) { - CRANE_TRACE( - "{}/{} TaskStatusChanges of Crun task #{} were received. " - "Keep waiting...", - meta.status_change_cnt, task->node_num, task->TaskId()); - continue; - } task->SetStatus(new_status); meta.cb_task_completed(task->TaskId()); diff --git a/src/CraneCtld/TaskScheduler.h b/src/CraneCtld/TaskScheduler.h index 3cf3c975..d2dba30b 100644 --- a/src/CraneCtld/TaskScheduler.h +++ b/src/CraneCtld/TaskScheduler.h @@ -217,6 +217,8 @@ class TaskScheduler { /// \return The future is set to 0 if task submission is failed. /// Otherwise, it is set to newly allocated task id. std::future SubmitTaskAsync(std::unique_ptr task); + // todo: replace TaskInCtld by step instance in ctld + std::future>> SubmitProc(std::unique_ptr task_submit,task_id_t task_id,pid_t pid); CraneErr ChangeTaskTimeLimit(task_id_t task_id, int64_t secs); diff --git a/src/Craned/CforedClient.cpp b/src/Craned/CforedClient.cpp index 9dfef150..bc58b047 100644 --- a/src/Craned/CforedClient.cpp +++ b/src/Craned/CforedClient.cpp @@ -50,7 +50,7 @@ void CforedClient::CleanOutputQueueAndWriteToStreamThread_( stream, std::atomic* write_pending) { CRANE_TRACE("CleanOutputQueueThread started."); - std::pair output; + TaskOutPutElem output; bool ok = m_output_queue_.try_dequeue(output); // Make sure before exit all output has been drained. @@ -62,10 +62,13 @@ void CforedClient::CleanOutputQueueAndWriteToStreamThread_( } StreamCforedTaskIORequest request; - request.set_type(StreamCforedTaskIORequest::CRANED_TASK_OUTPUT); + request.set_type(StreamCforedTaskIORequest::CRANED_PROC_OUTPUT); - auto* payload = request.mutable_payload_task_output_req(); - payload->set_msg(output.second), payload->set_task_id(output.first); + auto* payload = request.mutable_payload_proc_output_req(); + payload->set_msg(output.msg); + payload->set_task_id(output.task_id); + payload->set_proc_id(output.proc_id); + payload->set_end(output.end); while (write_pending->load(std::memory_order::acquire)) std::this_thread::sleep_for(std::chrono::milliseconds(25)); @@ -208,12 +211,19 @@ void CforedClient::AsyncSendRecvThread_() { break; } - task_id_t task_id = reply.payload_task_input_req().task_id(); - const std::string& msg = reply.payload_task_input_req().msg(); + task_id_t task_id = reply.payload_proc_input_req().task_id(); + proc_id_t proc_id = reply.payload_proc_input_req().proc_id(); + const std::string& msg = reply.payload_proc_input_req().msg(); m_mtx_.Lock(); - if (m_task_fwd_meta_map_.contains(task_id)) { - m_task_fwd_meta_map_[task_id].input_cb(msg); + if (m_task_proc_fwd_meta_map_.contains(task_id)) { + if (m_task_proc_fwd_meta_map_[task_id].contains(proc_id)) { + m_task_proc_fwd_meta_map_[task_id][proc_id].input_cb(msg); + } else { + CRANE_ERROR( + "Cfored {} trying to send msg to unknown task #{} proc #{}", + m_cfored_name_, task_id, proc_id); + } } else { CRANE_ERROR("Cfored {} trying to send msg to unknown task #{}", m_cfored_name_, task_id); @@ -250,30 +260,34 @@ void CforedClient::AsyncSendRecvThread_() { } } -void CforedClient::InitTaskFwdAndSetInputCb( - task_id_t task_id, std::function task_input_cb) { +void CforedClient::InitProcFwdAndSetInputCb( + task_id_t task_id, proc_id_t proc_id, + std::function task_input_cb) { absl::MutexLock lock(&m_mtx_); - m_task_fwd_meta_map_[task_id].input_cb = std::move(task_input_cb); + m_task_proc_fwd_meta_map_[task_id][proc_id].input_cb = + std::move(task_input_cb); } -bool CforedClient::TaskOutputFinish(task_id_t task_id) { +bool CforedClient::ProcOutputFinish(task_id_t task_id, proc_id_t proc_id) { absl::MutexLock lock(&m_mtx_); - auto& task_fwd_meta = m_task_fwd_meta_map_.at(task_id); - task_fwd_meta.output_stopped = true; - return task_fwd_meta.output_stopped && task_fwd_meta.proc_stopped; + auto& proc_fwd_meta = m_task_proc_fwd_meta_map_.at(task_id).at(proc_id); + proc_fwd_meta.output_stopped = true; + return proc_fwd_meta.output_stopped && proc_fwd_meta.proc_stopped; }; -bool CforedClient::TaskProcessStop(task_id_t task_id) { +bool CforedClient::ProcProcessStop(task_id_t task_id, proc_id_t proc_id) { absl::MutexLock lock(&m_mtx_); - auto& task_fwd_meta = m_task_fwd_meta_map_.at(task_id); - task_fwd_meta.proc_stopped = true; - return task_fwd_meta.output_stopped && task_fwd_meta.proc_stopped; + auto& proc_fwd_meta = m_task_proc_fwd_meta_map_.at(task_id).at(proc_id); + proc_fwd_meta.proc_stopped = true; + return proc_fwd_meta.output_stopped && proc_fwd_meta.proc_stopped; }; -void CforedClient::TaskOutPutForward(task_id_t task_id, - const std::string& msg) { - CRANE_TRACE("Receive TaskOutputForward for task #{}: {}", task_id, msg); - m_output_queue_.enqueue({task_id, msg}); +void CforedClient::ProcOutPutForward(task_id_t task_id, proc_id_t proc_id, + const std::string& msg, bool end) { + CRANE_TRACE( + "Receive ProcOutputForward for task #{}, proc #{}, ending {}, msg: {}", + task_id, proc_id, end, msg); + m_output_queue_.enqueue({task_id, proc_id, msg, end}); } bool CforedManager::Init() { @@ -283,9 +297,9 @@ bool CforedManager::Init() { m_register_handle_->on( [this](const uvw::async_event&, uvw::async_handle&) { RegisterCb_(); }); - m_task_stop_handle_ = m_loop_->resource(); - m_task_stop_handle_->on( - [this](const uvw::async_event&, uvw::async_handle&) { TaskStopCb_(); }); + m_proc_stop_handle_ = m_loop_->resource(); + m_proc_stop_handle_->on( + [this](const uvw::async_event&, uvw::async_handle&) { ProcStopCb_(); }); m_unregister_handle_ = m_loop_->resource(); m_unregister_handle_->on( @@ -324,8 +338,12 @@ void CforedManager::EvLoopThread_(const std::shared_ptr& uvw_loop) { } void CforedManager::RegisterIOForward(std::string const& cfored, - task_id_t task_id, int fd) { - RegisterElem elem{.cfored = cfored, .task_id = task_id, .fd = fd}; + task_id_t task_id, proc_id_t proc_id, + int fd) { + RegisterElem elem{.cfored = cfored, + .task_id = task_id, + .proc_id = proc_id, + .fd = fd}; std::promise done; std::future done_fut = done.get_future(); @@ -348,36 +366,42 @@ void CforedManager::RegisterCb_() { m_cfored_client_ref_count_map_[elem.cfored] = 1; } - m_cfored_client_map_[elem.cfored]->InitTaskFwdAndSetInputCb( - elem.task_id, [fd = elem.fd](const std::string& msg) { + m_cfored_client_map_[elem.cfored]->InitProcFwdAndSetInputCb( + elem.task_id, elem.proc_id, + [fd = elem.fd, task_id = elem.task_id, + proc_id = elem.proc_id](const std::string& msg) { + CRANE_TRACE("Forwarding msg {} to task #{} proc #{}", msg, task_id, + proc_id); ssize_t sz_sent = 0; while (sz_sent != msg.size()) sz_sent += write(fd, msg.c_str() + sz_sent, msg.size() - sz_sent); }); - CRANE_TRACE("Registering fd {} for outputs of task #{}", elem.fd, - elem.task_id); + CRANE_TRACE("Registering fd {} for outputs of task #{} proc #{}", elem.fd, + elem.task_id, elem.proc_id); auto poll_handle = m_loop_->resource(elem.fd); poll_handle->on([this, elem = std::move(elem)]( const uvw::poll_event&, uvw::poll_handle& h) { - CRANE_TRACE("Detect task #{} output.", elem.task_id); + CRANE_TRACE("Detect task #{} proc #{} output.", elem.task_id, + elem.proc_id); constexpr int MAX_BUF_SIZE = 4096; char buf[MAX_BUF_SIZE]; auto ret = read(elem.fd, buf, MAX_BUF_SIZE); if (ret == 0) { - CRANE_TRACE("Task #{} to cfored {} finished its output.", elem.task_id, - elem.cfored); + CRANE_TRACE("Task #{} proc #{} to cfored {} finished its output.", + elem.task_id, elem.proc_id, elem.cfored); h.close(); - bool ok_to_free = - m_cfored_client_map_[elem.cfored]->TaskOutputFinish(elem.task_id); + bool ok_to_free = m_cfored_client_map_[elem.cfored]->ProcOutputFinish( + elem.task_id, elem.proc_id); if (ok_to_free) { - CRANE_TRACE("It's ok to unregister task #{} on {}", elem.task_id, - elem.cfored); - UnregisterIOForward_(elem.cfored, elem.task_id); + CRANE_TRACE("It's ok to unregister task #{} proc#{} on {}", + elem.task_id, elem.proc_id, elem.cfored); + close(elem.fd); + UnregisterIOForward_(elem.cfored, elem.task_id, elem.proc_id); } return; } @@ -386,9 +410,8 @@ void CforedManager::RegisterCb_() { CRANE_ERROR("Error when reading task #{} output", elem.task_id); std::string output(buf, ret); - CRANE_TRACE("Fwd to task #{}: {}", elem.task_id, output); - m_cfored_client_map_[elem.cfored]->TaskOutPutForward(elem.task_id, - output); + m_cfored_client_map_[elem.cfored]->ProcOutPutForward( + elem.task_id, elem.proc_id, output, false); }); int ret = poll_handle->start(uvw::poll_handle::poll_event_flags::READABLE); if (ret < 0) @@ -399,33 +422,35 @@ void CforedManager::RegisterCb_() { } void CforedManager::TaskProcOnCforedStopped(std::string const& cfored, - task_id_t task_id) { - TaskStopElem elem{.cfored = cfored, .task_id = task_id}; - m_task_stop_queue_.enqueue(std::move(elem)); - m_task_stop_handle_->send(); + task_id_t task_id, + proc_id_t proc_id) { + TaskStopElem elem{.cfored = cfored, .task_id = task_id, .proc_id = proc_id}; + m_proc_stop_queue_.enqueue(std::move(elem)); + m_proc_stop_handle_->send(); } -void CforedManager::TaskStopCb_() { +void CforedManager::ProcStopCb_() { TaskStopElem elem; - while (m_task_stop_queue_.try_dequeue(elem)) { + while (m_proc_stop_queue_.try_dequeue(elem)) { const std::string& cfored = elem.cfored; task_id_t task_id = elem.task_id; + proc_id_t proc_id = elem.proc_id; - CRANE_TRACE("Task #{} to cfored {} just stopped its process.", elem.task_id, - elem.cfored); - bool ok_to_free = - m_cfored_client_map_[elem.cfored]->TaskProcessStop(elem.task_id); + CRANE_TRACE("Task #{} Proc #{} to cfored {} just stopped its process.", + task_id, proc_id, elem.cfored); + bool ok_to_free = m_cfored_client_map_[elem.cfored]->ProcProcessStop( + elem.task_id, elem.proc_id); if (ok_to_free) { - CRANE_TRACE("It's ok to unregister task #{} on {}", elem.task_id, - elem.cfored); - UnregisterIOForward_(elem.cfored, elem.task_id); + CRANE_TRACE("It's ok to unregister task #{} proc #{} on {} cfored", + elem.task_id, elem.proc_id, elem.cfored); + UnregisterIOForward_(elem.cfored, elem.task_id, elem.proc_id); } } } void CforedManager::UnregisterIOForward_(const std::string& cfored, - task_id_t task_id) { - UnregisterElem elem{.cfored = cfored, .task_id = task_id}; + task_id_t task_id, proc_id_t proc_id) { + UnregisterElem elem{.cfored = cfored, .task_id = task_id, .proc_id = proc_id}; m_unregister_queue_.enqueue(std::move(elem)); m_unregister_handle_->send(); } @@ -435,8 +460,9 @@ void CforedManager::UnregisterCb_() { while (m_unregister_queue_.try_dequeue(elem)) { const std::string& cfored = elem.cfored; task_id_t task_id = elem.task_id; - + proc_id_t proc_id = elem.proc_id; auto count = m_cfored_client_ref_count_map_[cfored]; + m_cfored_client_map_[cfored]->ProcOutPutForward(task_id, proc_id, {}, true); if (count == 1) { m_cfored_client_ref_count_map_.erase(cfored); m_cfored_client_map_.erase(cfored); @@ -444,7 +470,8 @@ void CforedManager::UnregisterCb_() { --m_cfored_client_ref_count_map_[cfored]; } - g_task_mgr->TaskStopAndDoStatusChangeAsync(task_id); + // main proc + if (proc_id == 0) g_task_mgr->TaskStopAndDoStatusChangeAsync(task_id); } } diff --git a/src/Craned/CforedClient.h b/src/Craned/CforedClient.h index baeb4333..78a22135 100644 --- a/src/Craned/CforedClient.h +++ b/src/Craned/CforedClient.h @@ -30,14 +30,16 @@ class CforedClient { void AsyncSendRecvThread_(); - void InitTaskFwdAndSetInputCb( - task_id_t task_id, std::function task_input_cb); + void InitProcFwdAndSetInputCb( + task_id_t task_id, proc_id_t proc_id, + std::function task_input_cb); - void TaskOutPutForward(task_id_t task_id, const std::string& msg); + void ProcOutPutForward(unsigned int task_id, unsigned int proc_id, + const std::string& msg, bool end); - bool TaskOutputFinish(task_id_t task_id); + bool ProcOutputFinish(task_id_t task_id, proc_id_t proc_id); - bool TaskProcessStop(task_id_t task_id); + bool ProcProcessStop(task_id_t task_id, proc_id_t proc_id); private: struct TaskFwdMeta { @@ -46,12 +48,23 @@ class CforedClient { bool proc_stopped{false}; }; + struct TaskOutPutElem { + task_id_t task_id; + proc_id_t proc_id; + std::string msg; + bool end; + TaskOutPutElem() = default; + TaskOutPutElem(task_id_t task_id, proc_id_t proc_id, std::string msg, + bool end) + : task_id(task_id), proc_id(proc_id), msg(std::move(msg)), end{end} {} + }; + void CleanOutputQueueAndWriteToStreamThread_( ClientAsyncReaderWriter* stream, std::atomic* write_pending); - ConcurrentQueue> m_output_queue_; + ConcurrentQueue m_output_queue_; std::thread m_fwd_thread_; std::atomic m_stopped_{false}; @@ -64,7 +77,8 @@ class CforedClient { grpc::CompletionQueue m_cq_; absl::Mutex m_mtx_; - std::unordered_map m_task_fwd_meta_map_; + std::unordered_map> + m_task_proc_fwd_meta_map_; }; class CforedManager { @@ -77,27 +91,41 @@ class CforedManager { bool Init(); - void RegisterIOForward(std::string const& cfored, task_id_t task_id, int fd); - void TaskProcOnCforedStopped(std::string const& cfored, task_id_t task_id); + void RegisterIOForward(std::string const& cfored, task_id_t task_id, + proc_id_t proc_id, int fd); + + /*! + * + * @param cfored + * @param task_id + * @param proc_id 0 indicate main proc exit,trigger task end. + */ + void TaskProcOnCforedStopped(std::string const& cfored, task_id_t task_id, + proc_id_t proc_id); private: struct RegisterElem { std::string cfored; task_id_t task_id; + proc_id_t proc_id; int fd; }; struct TaskStopElem { std::string cfored; task_id_t task_id; + proc_id_t proc_id; + int fd; }; struct UnregisterElem { std::string cfored; task_id_t task_id; + proc_id_t proc_id; }; - void UnregisterIOForward_(std::string const& cfored, task_id_t task_id); + void UnregisterIOForward_(std::string const& cfored, task_id_t task_id, + proc_id_t proc_id); void EvLoopThread_(const std::shared_ptr& uvw_loop); @@ -110,9 +138,9 @@ class CforedManager { m_register_queue_; void RegisterCb_(); - std::shared_ptr m_task_stop_handle_; - ConcurrentQueue m_task_stop_queue_; - void TaskStopCb_(); + std::shared_ptr m_proc_stop_handle_; + ConcurrentQueue m_proc_stop_queue_; + void ProcStopCb_(); std::shared_ptr m_unregister_handle_; ConcurrentQueue m_unregister_queue_; diff --git a/src/Craned/CranedServer.cpp b/src/Craned/CranedServer.cpp index f8d27358..a187d32d 100644 --- a/src/Craned/CranedServer.cpp +++ b/src/Craned/CranedServer.cpp @@ -41,6 +41,18 @@ grpc::Status CranedServiceImpl::ExecuteTask( return Status::OK; } +grpc::Status CranedServiceImpl::ExecuteProc( + grpc::ServerContext *context, + const crane::grpc::ExecuteProcRequest *request, + crane::grpc::ExecuteProcReply *response) { + + CraneErr err = g_task_mgr->ExecuteProcAsync(request->proc()); + if(err!=CraneErr::kOk){ + response->set_ok(true); + } + return Status::OK; +} + grpc::Status CranedServiceImpl::TerminateTasks( grpc::ServerContext *context, const crane::grpc::TerminateTasksRequest *request, diff --git a/src/Craned/CranedServer.h b/src/Craned/CranedServer.h index 9381c36b..a9afaa79 100644 --- a/src/Craned/CranedServer.h +++ b/src/Craned/CranedServer.h @@ -44,6 +44,9 @@ class CranedServiceImpl : public Craned::Service { grpc::Status ExecuteTask(grpc::ServerContext *context, const crane::grpc::ExecuteTasksRequest *request, crane::grpc::ExecuteTasksReply *response) override; + grpc::Status ExecuteProc(grpc::ServerContext *context, + const crane::grpc::ExecuteProcRequest *request, + crane::grpc::ExecuteProcReply *response) override; grpc::Status TerminateTasks( grpc::ServerContext *context, diff --git a/src/Craned/TaskManager.cpp b/src/Craned/TaskManager.cpp index 5de3e5ed..139d2a6b 100644 --- a/src/Craned/TaskManager.cpp +++ b/src/Craned/TaskManager.cpp @@ -99,6 +99,18 @@ TaskManager::TaskManager() { std::terminate(); } } + { // Grpc Execute Proc Event + m_ev_grpc_execute_proc_ = event_new(m_ev_base_, -1, EV_READ | EV_PERSIST, + EvGrpcExecuteProcCb_, this); + if (!m_ev_grpc_execute_proc_) { + CRANE_ERROR("Failed to create the grpc_execute_proc event!"); + std::terminate(); + } + if (event_add(m_ev_grpc_execute_proc_, nullptr) < 0) { + CRANE_ERROR("Could not add the m_ev_grpc_execute_proc_ to base!"); + std::terminate(); + } + } { // Task Status Change Event m_ev_task_status_change_ = event_new(m_ev_base_, -1, EV_READ | EV_PERSIST, EvTaskStatusChangeCb_, this); @@ -160,6 +172,7 @@ TaskManager::~TaskManager() { if (m_ev_query_task_id_from_pid_) event_free(m_ev_query_task_id_from_pid_); if (m_ev_grpc_execute_task_) event_free(m_ev_grpc_execute_task_); + if (m_ev_grpc_execute_proc_) event_free(m_ev_grpc_execute_proc_); if (m_ev_exit_event_) event_free(m_ev_exit_event_); if (m_ev_task_status_change_) event_free(m_ev_task_status_change_); if (m_ev_task_time_limit_change_) event_free(m_ev_task_time_limit_change_); @@ -297,8 +310,13 @@ void TaskManager::EvSigchldCb_(evutil_socket_t sig, short events, CRANE_ERROR("Failed to find pid {} in task #{}'s ProcessInstances", pid, task_id); } else { + if (CheckIfProcessInstanceIsCrun_(*proc)) { + auto* meta = dynamic_cast( + proc->meta.get()); + g_cfored_manager->TaskProcOnCforedStopped( + meta->cfored_name, proc->proc.task_id(), proc->proc.proc_id()); + } instance->processes.erase(pr_it); - if (!instance->processes.empty()) { if (sigchld_info.is_terminated_by_signal) { // If a task is terminated by a signal and there are other @@ -306,18 +324,13 @@ void TaskManager::EvSigchldCb_(evutil_socket_t sig, short events, this_->TerminateTaskAsync(task_id); } } else { - if (instance->task.interactive_meta().interactive_type() == - crane::grpc::Crun) - // TaskStatusChange of a crun task is triggered in - // CforedManager. - g_cfored_manager->TaskProcOnCforedStopped( - instance->task.interactive_meta().cfored_name(), - instance->task.task_id()); - else /* Batch / Calloc */ { - // If the ProcessInstance has no process left, - // send TaskStatusChange for this task. - // See the comment of EvActivateTaskStatusChange_. + if (instance->task.type() == crane::grpc::Batch) { this_->TaskStopAndDoStatusChangeAsync(task_id); + } else if (CheckIfInstanceTypeIsCalloc_(instance)) { + CRANE_TRACE("Ignoring calloc all process exit"); + continue; + } else { + // for crun, do nothing here } } } @@ -340,21 +353,6 @@ void TaskManager::EvSigchldCb_(evutil_socket_t sig, short events, } } -void TaskManager::EvSubprocessReadCb_(struct bufferevent* bev, void* process) { - auto* proc = reinterpret_cast(process); - - size_t buf_len = evbuffer_get_length(bev->input); - - std::string str; - str.resize(buf_len); - int n_copy = evbuffer_remove(bev->input, str.data(), buf_len); - - CRANE_TRACE("Read {:>4} bytes from subprocess (pid: {}): {}", n_copy, - proc->GetPid(), str); - - proc->Output(std::move(str)); -} - void TaskManager::EvSigintCb_(int sig, short events, void* user_data) { auto* this_ = reinterpret_cast(user_data); @@ -484,7 +482,7 @@ CraneErr TaskManager::SpawnProcessInInstance_( } // Create IO socket pair for crun tasks. - if (CheckIfInstanceTypeIsCrun_(instance)) { + if (CheckIfProcessInstanceIsCrun_(*process)) { if (socketpair(AF_UNIX, SOCK_STREAM, 0, io_sock_pair) != 0) { CRANE_ERROR("Failed to create socket pair for task io forward: {}", strerror(errno)); @@ -492,7 +490,7 @@ CraneErr TaskManager::SpawnProcessInInstance_( } auto* crun_meta = - dynamic_cast(instance->meta.get()); + dynamic_cast(process->meta.get()); crun_meta->msg_forward_fd = io_sock_pair[0]; } @@ -514,22 +512,22 @@ CraneErr TaskManager::SpawnProcessInInstance_( pid_t child_pid = fork(); if (child_pid == -1) { - CRANE_ERROR("fork() failed for task #{}: {}", instance->task.task_id(), - strerror(errno)); + CRANE_ERROR("fork() failed for task #{} proc #{}: {}", + instance->task.task_id(), process->proc_id, strerror(errno)); return CraneErr::kSystemErr; } if (child_pid > 0) { // Parent proc process->SetPid(child_pid); - CRANE_DEBUG("Subprocess was created for task #{} pid: {}", - instance->task.task_id(), child_pid); + CRANE_DEBUG("Subprocess was created for task #{} proc #{} pid: {}", + instance->task.task_id(), process->proc_id, child_pid); - if (CheckIfInstanceTypeIsCrun_(instance)) { + if (CheckIfProcessInstanceIsCrun_(*process)) { + auto* meta = + dynamic_cast(process->meta.get()); g_cfored_manager->RegisterIOForward( - instance->task.interactive_meta().cfored_name(), - instance->task.task_id(), - dynamic_cast(instance->meta.get()) - ->msg_forward_fd); + meta->cfored_name, instance->task.task_id(), process->proc_id, + meta->msg_forward_fd); } // Note that the following code will move the child process into cgroup. @@ -548,6 +546,7 @@ CraneErr TaskManager::SpawnProcessInInstance_( m_pid_proc_map_.emplace(child_pid, process.get()); m_mtx_.Unlock(); + auto proc_id = process->proc_id; // Move the ownership of ProcessInstance into the TaskInstance. instance->processes.emplace(child_pid, std::move(process)); @@ -565,23 +564,6 @@ CraneErr TaskManager::SpawnProcessInInstance_( CanStartMessage msg; ChildProcessReady child_process_ready; - // Add event for stdout/stderr of the new subprocess - // struct bufferevent* ev_buf_event; - // ev_buf_event = - // bufferevent_socket_new(m_ev_base_, fd, BEV_OPT_CLOSE_ON_FREE); - // if (!ev_buf_event) { - // CRANE_ERROR( - // "Error constructing bufferevent for the subprocess of task #!", - // instance->task.task_id()); - // err = CraneErr::kLibEventError; - // goto AskChildToSuicide; - // } - // bufferevent_setcb(ev_buf_event, EvSubprocessReadCb_, nullptr, nullptr, - // (void*)process.get()); - // bufferevent_enable(ev_buf_event, EV_READ); - // bufferevent_disable(ev_buf_event, EV_WRITE); - // process->SetEvBufEvent(ev_buf_event); - // Migrate the new subprocess to newly created cgroup if (!instance->cgroup->MigrateProcIn(child_pid)) { CRANE_ERROR( @@ -593,8 +575,8 @@ CraneErr TaskManager::SpawnProcessInInstance_( goto AskChildToSuicide; } - CRANE_TRACE("New task #{} is ready. Asking subprocess to execv...", - instance->task.task_id()); + CRANE_TRACE("New task #{} Proc #{} is ready. Asking subprocess to execv...", + instance->task.task_id(), proc_id); // Tell subprocess that the parent process is ready. Then the // subprocess should continue to exec(). @@ -618,8 +600,9 @@ CraneErr TaskManager::SpawnProcessInInstance_( ParseDelimitedFromZeroCopyStream(&child_process_ready, &istream, nullptr); if (!msg.ok()) { - CRANE_ERROR("Failed to read protobuf from subprocess {} of task #{}", - child_pid, instance->task.task_id()); + CRANE_ERROR( + "Failed to read protobuf from subprocess {} of task #{} proc #{}", + child_pid, instance->task.task_id(), proc_id); close(ctrl_fd); // See comments above. @@ -650,7 +633,7 @@ CraneErr TaskManager::SpawnProcessInInstance_( // reap the child process by SIGCHLD after it commits suicide. return CraneErr::kOk; } else { // Child proc - const std::string& cwd = instance->task.cwd(); + const std::string& cwd = process->proc.cwd(); rc = chdir(cwd.c_str()); if (rc == -1) { CRANE_ERROR("[Child Process] Error: chdir to {}. {}", cwd.c_str(), @@ -676,13 +659,13 @@ CraneErr TaskManager::SpawnProcessInInstance_( ParseDelimitedFromZeroCopyStream(&msg, &istream, nullptr); if (!msg.ok()) std::abort(); - if (instance->task.type() == crane::grpc::Batch) { + if (process->type == crane::grpc::Batch) { int stdout_fd, stderr_fd; - const std::string& stdout_file_path = - process->batch_meta.parsed_output_file_pattern; - const std::string& stderr_file_path = - process->batch_meta.parsed_error_file_pattern; + auto* meta = + dynamic_cast(process->meta.get()); + const std::string& stdout_file_path = meta->parsed_output_file_pattern; + const std::string& stderr_file_path = meta->parsed_error_file_pattern; stdout_fd = open(stdout_file_path.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0644); @@ -708,7 +691,7 @@ CraneErr TaskManager::SpawnProcessInInstance_( } close(stdout_fd); - } else if (CheckIfInstanceTypeIsCrun_(instance)) { + } else if (CheckIfProcessInstanceIsCrun_(*process)) { close(io_sock_pair[0]); dup2(io_sock_pair[1], 0); @@ -730,17 +713,17 @@ CraneErr TaskManager::SpawnProcessInInstance_( // Close stdin for batch tasks. // If these file descriptors are not closed, a program like mpirun may // keep waiting for the input from stdin or other fds and will never end. - if (instance->task.type() == crane::grpc::Batch) close(0); + if (process->type == crane::grpc::Batch) close(0); util::os::CloseFdFrom(3); std::vector> env_vec; // Load env from the front end. - for (auto& [name, value] : instance->task.env()) { + for (auto& [name, value] : process->proc.env()) { env_vec.emplace_back(name, value); } - if (instance->task.get_user_env()) { + if (process->proc.get_user_env()) { // If --get-user-env is set, the new environment is inherited // from the execution CraneD rather than the submitting node. // @@ -778,10 +761,9 @@ CraneErr TaskManager::SpawnProcessInInstance_( env_vec.emplace_back("CRANE_JOB_ID", std::to_string(instance->task.task_id())); - if (CheckIfInstanceTypeIsCrun_(instance) && - !instance->task.interactive_meta().term_env().empty()) { - env_vec.emplace_back("TERM", - instance->task.interactive_meta().term_env()); + if (CheckIfProcessInstanceIsCrun_(*process) && + !process->proc.interactive_meta().term_env().empty()) { + env_vec.emplace_back("TERM", process->proc.interactive_meta().term_env()); } int64_t time_limit_sec = instance->task.time_limit().seconds(); @@ -808,7 +790,7 @@ CraneErr TaskManager::SpawnProcessInInstance_( // Argv[0] is the program name which can be anything. argv.emplace_back("CraneScript"); - if (instance->task.get_user_env()) { + if (process->proc.get_user_env()) { // If --get-user-env is specified, // we need to use --login option of bash to load settings from the user's // settings. @@ -848,12 +830,6 @@ CraneErr TaskManager::ExecuteTaskAsync(crane::grpc::TaskToD const& task) { // in the corresponding handler (EvGrpcExecuteTaskCb_). instance->task = task; - // Create meta for batch or crun tasks. - if (instance->task.type() == crane::grpc::Batch) - instance->meta = std::make_unique(); - else - instance->meta = std::make_unique(); - m_grpc_execute_task_queue_.enqueue(std::move(instance)); event_active(m_ev_grpc_execute_task_, 0, 0); @@ -925,60 +901,13 @@ void TaskManager::LaunchTaskInstanceMt_(TaskInstance* instance) { // Calloc tasks have no scripts to run. Just return. if (CheckIfInstanceTypeIsCalloc_(instance)) return; - instance->meta->parsed_sh_script_path = - fmt::format("{}/Crane-{}.sh", g_config.CranedScriptDir, task_id); - auto& sh_path = instance->meta->parsed_sh_script_path; - - FILE* fptr = fopen(sh_path.c_str(), "w"); - if (fptr == nullptr) { - CRANE_ERROR("Failed write the script for task #{}", task_id); - EvActivateTaskStatusChange_( - task_id, crane::grpc::TaskStatus::Failed, - ExitCode::kExitCodeFileNotFound, - fmt::format("Cannot write shell script for batch task #{}", task_id)); + // init proc instance + // Create meta for batch or crun tasks. + auto process = std::make_unique(); + if (!SetProcessByTaskInstance(instance, process.get())) { return; } - if (instance->task.type() == crane::grpc::Batch) - fputs(instance->task.batch_meta().sh_script().c_str(), fptr); - else // Crun - fputs(instance->task.interactive_meta().sh_script().c_str(), fptr); - - fclose(fptr); - - chmod(sh_path.c_str(), strtol("0755", nullptr, 8)); - - auto process = - std::make_unique(sh_path, std::list()); - - // Prepare file output name for batch tasks. - if (instance->task.type() == crane::grpc::Batch) { - /* Perform file name substitutions - * %j - Job ID - * %u - Username - * %x - Job name - */ - process->batch_meta.parsed_output_file_pattern = - ParseFilePathPattern_(instance->task.batch_meta().output_file_pattern(), - instance->task.cwd(), task_id); - absl::StrReplaceAll({{"%j", std::to_string(task_id)}, - {"%u", instance->pwd_entry.Username()}, - {"%x", instance->task.name()}}, - &process->batch_meta.parsed_output_file_pattern); - - // If -e / --error is not defined, leave - // batch_meta.parsed_error_file_pattern empty; - if (!instance->task.batch_meta().error_file_pattern().empty()) { - process->batch_meta.parsed_error_file_pattern = ParseFilePathPattern_( - instance->task.batch_meta().error_file_pattern(), - instance->task.cwd(), task_id); - absl::StrReplaceAll({{"%j", std::to_string(task_id)}, - {"%u", instance->pwd_entry.Username()}, - {"%x", instance->task.name()}}, - &process->batch_meta.parsed_error_file_pattern); - } - } - // err will NOT be kOk ONLY if fork() is not called due to some failure // or fork() fails. // In this case, SIGCHLD will NOT be received for this task, and @@ -989,8 +918,83 @@ void TaskManager::LaunchTaskInstanceMt_(TaskInstance* instance) { task_id, crane::grpc::TaskStatus::Failed, ExitCode::kExitCodeSpawnProcessFail, fmt::format( - "Cannot spawn a new process inside the instance of task #{}", - task_id)); + "Cannot spawn a new proc #{} inside the instance of task #{}", + process->proc_id, task_id)); + } +} + +CraneErr TaskManager::ExecuteProcAsync(const crane::grpc::ProcToD& proc) { + if (!m_task_map_.contains(proc.task_id())) { + CRANE_DEBUG("Executing proc #{} with no existing task #{}. Ignoring it.", + proc.proc_id(), proc.task_id()); + return CraneErr::kNonExistent; + } + if (!g_cg_mgr->CheckIfCgroupForTasksExists(proc.task_id())) { + CRANE_DEBUG( + "Executing task #{} proc #{} without an allocated cgroup. Ignoring it.", + proc.task_id(), proc.proc_id()); + return CraneErr::kCgroupError; + } + CRANE_INFO("Executing task #{} proc #{}", proc.task_id(), proc.proc_id()); + + m_grpc_execute_proc_queue_.enqueue(proc); + event_active(m_ev_grpc_execute_proc_, 0, 0); + + return CraneErr::kOk; +} + +void TaskManager::EvGrpcExecuteProcCb_(int, short events, void* user_data) { + auto* this_ = reinterpret_cast(user_data); + crane::grpc::ProcToD popped_proc; + + while (this_->m_grpc_execute_proc_queue_.try_dequeue(popped_proc)) { + // Once ExecuteTask RPC is processed, the TaskInstance goes into + // m_task_map_. + proc_id_t proc_id = popped_proc.proc_id(); + task_id_t task_id = popped_proc.task_id(); + auto process = std::make_unique(); + process->type = popped_proc.type(); + // Todo: use ctld allocated proc_id + process->proc_id = proc_id; + process->proc = popped_proc; + if (process->type == crane::grpc::Batch) + process->meta = std::make_unique(); + else { + process->meta = std::make_unique(); + auto* meta = + dynamic_cast(process->meta.get()); + meta->interactive_type = + process->proc.interactive_meta().interactive_type(); + if (CheckIfProcessInstanceIsCrun_(*process)) + meta->cfored_name = process->proc.interactive_meta().cfored_name(); + } + + process->meta->executive_path = fmt::format( + "{}/Crane-{}-{}.sh", g_config.CranedScriptDir, task_id, proc_id); + auto& sh_path = process->meta->executive_path; + + FILE* fptr = fopen(sh_path.c_str(), "w"); + if (fptr == nullptr) { + CRANE_ERROR("Failed write the script for task #{} Proc #{}", task_id, + proc_id); + // todo: Notify proc launch failed + return; + } + + if (process->type == crane::grpc::Batch) + fputs(process->proc.batch_meta().sh_script().c_str(), fptr); + else // Crun + fputs(process->proc.interactive_meta().sh_script().c_str(), fptr); + + fclose(fptr); + + chmod(sh_path.c_str(), strtol("0755", nullptr, 8)); + + // Step should be interactive type,not handle task output file. + + // todo: TaskInstance may be free when SpawnProcess + this_->SpawnProcessInInstance_(this_->m_task_map_[task_id].get(), + std::move(process)); } } @@ -1035,12 +1039,6 @@ void TaskManager::EvTaskStatusChangeCb_(int efd, short events, } TaskInstance* instance = iter->second.get(); - if (instance->task.type() == crane::grpc::Batch || - CheckIfInstanceTypeIsCrun_(instance)) { - const std::string& path = instance->meta->parsed_sh_script_path; - if (!path.empty()) - g_thread_pool->detach_task([p = path]() { util::os::DeleteFile(p); }); - } bool orphaned = instance->orphaned; @@ -1312,4 +1310,120 @@ bool TaskManager::CheckIfInstanceTypeIsCalloc_(TaskInstance* instance) { crane::grpc::Calloc; } +bool TaskManager::CheckIfProcessInstanceIsCrun_( + const ProcessInstance& process) { + return process.proc.type() == crane::grpc::Interactive && + process.proc.interactive_meta().interactive_type() == + crane::grpc::Crun; +} + +bool TaskManager::CheckIfProcessInstanceIsCalloc_( + const ProcessInstance& process) { + return process.proc.type() == crane::grpc::Interactive && + process.proc.interactive_meta().interactive_type() == + crane::grpc::Calloc; +} +bool TaskManager::SetProcessByTaskInstance(TaskInstance* task, + ProcessInstance* process) { + auto& task_to_d = task->task; + task_id_t task_id = task_to_d.task_id(); + process->type = task_to_d.type(); + // Todo: use ctld allocated proc_id + process->proc_id = 0; + auto& proc_to_d = process->proc; + + proc_to_d.set_type(task_to_d.type()); + proc_to_d.set_task_id(task_to_d.task_id()); + proc_to_d.mutable_env()->insert(task_to_d.env().begin(), + task_to_d.env().end()); + + proc_to_d.set_cwd(task_to_d.cwd()); + proc_to_d.set_get_user_env(task_to_d.get_user_env()); + + if (task_to_d.type() == crane::grpc::Batch) { + auto& meta_in_task = task_to_d.batch_meta(); + auto* mutable_meta = proc_to_d.mutable_batch_meta(); + mutable_meta->set_output_file_pattern(meta_in_task.output_file_pattern()); + mutable_meta->set_error_file_pattern(meta_in_task.error_file_pattern()); + mutable_meta->set_sh_script(meta_in_task.sh_script()); + } else { + auto& meta_in_task = task_to_d.interactive_meta(); + auto* mutable_meta = proc_to_d.mutable_interactive_meta(); + mutable_meta->set_cfored_name(meta_in_task.cfored_name()); + mutable_meta->set_sh_script(meta_in_task.sh_script()); + mutable_meta->set_term_env(meta_in_task.term_env()); + mutable_meta->set_interactive_type(meta_in_task.interactive_type()); + } + + if (process->type == crane::grpc::Batch) + process->meta = std::make_unique(); + else { + process->meta = std::make_unique(); + auto* meta = + dynamic_cast(process->meta.get()); + meta->interactive_type = proc_to_d.interactive_meta().interactive_type(); + if (CheckIfProcessInstanceIsCrun_(*process)) + meta->cfored_name = proc_to_d.interactive_meta().cfored_name(); + } + + process->meta->executive_path = fmt::format( + "{}/Crane-{}-{}.sh", g_config.CranedScriptDir, task_id, process->proc_id); + auto& sh_path = process->meta->executive_path; + + FILE* fptr = fopen(sh_path.c_str(), "w"); + if (fptr == nullptr) { + CRANE_ERROR("Failed write the script for task #{}", task_id); + EvActivateTaskStatusChange_( + task_id, crane::grpc::TaskStatus::Failed, + ExitCode::kExitCodeFileNotFound, + fmt::format("Cannot write shell script for task #{} Proc #{}", task_id, + process->proc_id)); + return false; + } + + if (process->type == crane::grpc::Batch) + fputs(process->proc.batch_meta().sh_script().c_str(), fptr); + else // Crun + fputs(process->proc.interactive_meta().sh_script().c_str(), fptr); + + fclose(fptr); + + chmod(sh_path.c_str(), strtol("0755", nullptr, 8)); + + // Prepare file output name for batch tasks. + if (process->type == crane::grpc::Batch) { + /* Perform file name substitutions + * %j - Job ID + * %u - Username + * %x - Job name + */ + auto& parsed_output_file_pattern = + dynamic_cast(process->meta.get()) + ->parsed_output_file_pattern; + parsed_output_file_pattern = + ParseFilePathPattern_(task->task.batch_meta().output_file_pattern(), + task->task.cwd(), task_id); + absl::StrReplaceAll({{"%j", std::to_string(task_id)}, + {"%u", task->pwd_entry.Username()}, + {"%x", task->task.name()}}, + &parsed_output_file_pattern); + + // If -e / --error is not defined, leave + // batch_meta.parsed_error_file_pattern empty; + if (!task->task.batch_meta().error_file_pattern().empty()) { + auto& parsed_error_file_pattern = + dynamic_cast(process->meta.get()) + ->parsed_error_file_pattern; + parsed_error_file_pattern = + ParseFilePathPattern_(task->task.batch_meta().error_file_pattern(), + task->task.cwd(), task_id); + absl::StrReplaceAll({{"%j", std::to_string(task_id)}, + {"%u", task->pwd_entry.Username()}, + {"%x", task->task.name()}}, + &parsed_error_file_pattern); + } + } + return true; +} + } // namespace Craned diff --git a/src/Craned/TaskManager.h b/src/Craned/TaskManager.h index df5dcec0..f3915fdf 100644 --- a/src/Craned/TaskManager.h +++ b/src/Craned/TaskManager.h @@ -29,6 +29,7 @@ #include "CgroupManager.h" #include "CtldClient.h" +#include "crane/OS.h" #include "crane/PasswordEntry.h" #include "crane/PublicHeader.h" #include "protos/Crane.grpc.pb.h" @@ -37,22 +38,37 @@ namespace Craned { class TaskManager; +struct TaskInstance; struct EvTimerCbArg { TaskManager* task_manager; task_id_t task_id; }; -struct BatchMetaInProcessInstance { +struct MetaInProcessInstance { + std::string executive_path; + virtual ~MetaInProcessInstance() = default; +}; + +struct BatchMetaInProcessInstance : MetaInProcessInstance { std::string parsed_output_file_pattern; std::string parsed_error_file_pattern; + ~BatchMetaInProcessInstance() override = default; +}; + +struct InteractiveMetaInProcessInstance : MetaInProcessInstance { + //this fd show be closed after the process is sure to unregister io fwd. + int msg_forward_fd; + std::string cfored_name; + crane::grpc::InteractiveTaskType interactive_type; + ~InteractiveMetaInProcessInstance() override = default; }; class ProcessInstance { public: - ProcessInstance(std::string exec_path, std::list arg_list) - : m_executive_path_(std::move(exec_path)), - m_arguments_(std::move(arg_list)), + ProcessInstance() + : type(crane::grpc::Batch), + proc_id(0), m_pid_(0), m_ev_buf_event_(nullptr), m_user_data_(nullptr) {} @@ -68,10 +84,22 @@ class ProcessInstance { } if (m_ev_buf_event_) bufferevent_free(m_ev_buf_event_); + if (this->type == crane::grpc::Interactive) { + auto* task_meta = + dynamic_cast(meta.get()); + if (task_meta->interactive_type == crane::grpc::Calloc) return; + CRANE_TRACE("Close task {} proc {} forward fd {}",proc.task_id(),proc.proc_id(),task_meta->msg_forward_fd); +// close(task_meta->msg_forward_fd); + } + const std::string& path = meta->executive_path; + if (!path.empty()) + g_thread_pool->detach_task([p = path]() { util::os::DeleteFile(p); }); } + + [[nodiscard]] const std::string& GetExecPath() const { - return m_executive_path_; + return meta->executive_path; } [[nodiscard]] const std::list& GetArgList() const { return m_arguments_; @@ -105,7 +133,10 @@ class ProcessInstance { m_clean_cb_ = std::move(cb); } - BatchMetaInProcessInstance batch_meta; + crane::grpc::TaskType type; + std::unique_ptr meta; + proc_id_t proc_id; + crane::grpc::ProcToD proc; private: /* ------------- Fields set by SpawnProcessInInstance_ ---------------- */ @@ -115,7 +146,6 @@ class ProcessInstance { struct bufferevent* m_ev_buf_event_; /* ------- Fields set by the caller of SpawnProcessInInstance_ -------- */ - std::string m_executive_path_; std::list m_arguments_; /*** @@ -137,20 +167,6 @@ class ProcessInstance { std::function m_clean_cb_; }; -struct MetaInTaskInstance { - std::string parsed_sh_script_path; - virtual ~MetaInTaskInstance() = default; -}; - -struct BatchMetaInTaskInstance : MetaInTaskInstance { - ~BatchMetaInTaskInstance() override = default; -}; - -struct CrunMetaInTaskInstance : MetaInTaskInstance { - int msg_forward_fd; - ~CrunMetaInTaskInstance() override = default; -}; - struct ProcSigchldInfo { pid_t pid; bool is_terminated_by_signal; @@ -167,17 +183,11 @@ struct TaskInstance { event_free(termination_timer); termination_timer = nullptr; } - - if (this->task.type() == crane::grpc::Interactive && - this->task.interactive_meta().interactive_type() == crane::grpc::Crun) { - close(dynamic_cast(meta.get())->msg_forward_fd); - } } crane::grpc::TaskToD task; PasswordEntry pwd_entry; - std::unique_ptr meta; std::string cgroup_path; Cgroup* cgroup; @@ -207,6 +217,8 @@ class TaskManager { CraneErr ExecuteTaskAsync(crane::grpc::TaskToD const& task); + CraneErr ExecuteProcAsync(const crane::grpc::ProcToD& proc); + std::optional QueryTaskIdFromPidAsync(pid_t pid); void TerminateTaskAsync(uint32_t task_id); @@ -267,8 +279,11 @@ class TaskManager { const std::string& cwd, task_id_t task_id); + bool SetProcessByTaskInstance(TaskInstance* task,ProcessInstance* process); static bool CheckIfInstanceTypeIsCrun_(TaskInstance* instance); static bool CheckIfInstanceTypeIsCalloc_(TaskInstance* instance); + static bool CheckIfProcessInstanceIsCrun_(const ProcessInstance& process); + static bool CheckIfProcessInstanceIsCalloc_(const ProcessInstance& process); void LaunchTaskInstanceMt_(TaskInstance* instance); @@ -391,11 +406,12 @@ class TaskManager { static void EvGrpcExecuteTaskCb_(evutil_socket_t efd, short events, void* user_data); + static void EvGrpcExecuteProcCb_(evutil_socket_t efd, short events, + void* user_data); + static void EvGrpcQueryTaskIdFromPidCb_(evutil_socket_t efd, short events, void* user_data); - static void EvSubprocessReadCb_(struct bufferevent* bev, void* process); - static void EvTaskStatusChangeCb_(evutil_socket_t efd, short events, void* user_data); @@ -434,6 +450,11 @@ class TaskManager { struct event* m_ev_grpc_execute_task_{}; ConcurrentQueue> m_grpc_execute_task_queue_; + // A custom event that handles the ExecuteProc RPC. + struct event* m_ev_grpc_execute_proc_{}; + ConcurrentQueue + m_grpc_execute_proc_queue_; + // When this event is triggered, the event loop will exit. struct event* m_ev_exit_event_{}; diff --git a/src/Utilities/PublicHeader/include/crane/PublicHeader.h b/src/Utilities/PublicHeader/include/crane/PublicHeader.h index 01077111..a83da80f 100644 --- a/src/Utilities/PublicHeader/include/crane/PublicHeader.h +++ b/src/Utilities/PublicHeader/include/crane/PublicHeader.h @@ -23,6 +23,7 @@ #include "protos/Crane.pb.h" using task_id_t = uint32_t; +using proc_id_t = uint32_t; enum class CraneErr : uint16_t { kOk = 0,