Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
fix proc end
add log
bug fix
bug fix
bug fix
proc proto
TaskProcLaunch
ProcLaunch
  • Loading branch information
L-Xiafeng committed Jun 20, 2024
1 parent 63a3024 commit 3a41621
Show file tree
Hide file tree
Showing 15 changed files with 655 additions and 287 deletions.
58 changes: 45 additions & 13 deletions protos/Crane.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ message ExecuteTasksReply {
repeated uint32 failed_task_id_list = 1;
}

message ExecuteProcRequest{
ProcToD proc = 1;
}

message ExecuteProcReply{
bool ok = 1;
}

message CreateCgroupForTasksRequest {
repeated uint32 task_id_list = 1;
repeated uint32 uid_list = 2;
Expand Down Expand Up @@ -437,13 +445,16 @@ message StreamCforedRequest {
string cfored_name = 1;
int32 pid = 2;
TaskToCtld task = 3;
// for nested task of crun
optional uint32 task_id = 4;
}

message TaskCompleteReq {
string cfored_name = 1;
uint32 task_id = 2;
TaskStatus status = 3;
InteractiveTaskType interactive_type = 4;
uint32 proc_id = 3;
TaskStatus status = 4;
InteractiveTaskType interactive_type = 5;
}

message GracefulExitReq {
Expand Down Expand Up @@ -472,7 +483,12 @@ message StreamCtldReply {
int32 pid = 1;
bool ok = 2;
uint32 task_id = 3;
string failure_reason = 4;
uint32 proc_id = 4;
string failure_reason = 5;
message NestedTaskNodes{
repeated string craned_ids = 1;
}
optional NestedTaskNodes nodes = 6;
}

message TaskResAllocatedReply {
Expand All @@ -485,6 +501,7 @@ message StreamCtldReply {

message TaskCancelRequest {
uint32 task_id = 1;
uint32 proc_id = 2;
}

message TaskCompletionAckReply {
Expand Down Expand Up @@ -522,16 +539,20 @@ message StreamCrunRequest{
message TaskReq {
int32 crun_pid = 1;
TaskToCtld task = 2;
// for nested task of crun
optional uint32 task_id = 3;
}

message TaskCompleteReq {
uint32 task_id = 1;
TaskStatus status = 2;
uint32 proc_id = 2;
TaskStatus status = 3;
}

message TaskIOForwardReq {
uint32 task_id = 1;
string msg = 2;
uint32 proc_id = 2;
string msg = 3;
}

CrunRequestType type = 1;
Expand All @@ -551,12 +572,14 @@ message StreamCforedCrunReply {
TASK_COMPLETION_ACK_REPLY = 3;
TASK_IO_FORWARD = 4;
TASK_IO_FORWARD_READY = 5;
Proc_FORWARD_END = 6;
}

message TaskIdReply {
bool ok = 1;
uint32 task_id = 2;
string failure_reason = 3;
uint32 proc_id = 3;
string failure_reason = 4;
}

message TaskResAllocatedReply {
Expand All @@ -580,6 +603,10 @@ message StreamCforedCrunReply {
string msg = 1;
}

message ProcForwardEndReply{
bool ok = 1;
}

CforedCrunReplyType type = 1 ;

oneof payload {
Expand All @@ -589,23 +616,26 @@ message StreamCforedCrunReply {
TaskCompletionAckReply payload_task_completion_ack_reply = 5;
TaskIOForwardReadyReply payload_task_io_forward_ready_reply = 6;
TaskIOForwardReply payload_task_io_forward_reply = 7;
ProcForwardEndReply payload_proc_forward_end_reply = 8;
}
}

message StreamCforedTaskIORequest {
enum CranedRequestType{
CRANED_REGISTER = 0;
CRANED_TASK_OUTPUT = 1;
CRANED_PROC_OUTPUT = 1;
CRANED_UNREGISTER = 2;
}

message CranedRegisterReq {
string craned_id = 1;
}

message CranedTaskOutputReq {
message CranedProcOutputReq {
uint32 task_id = 1;
string msg = 2;
uint32 proc_id = 2;
string msg = 3;
bool end = 4;
}

message CranedUnRegisterReq {
Expand All @@ -615,7 +645,7 @@ message StreamCforedTaskIORequest {
CranedRequestType type = 1;
oneof payload {
CranedRegisterReq payload_register_req = 2;
CranedTaskOutputReq payload_task_output_req = 3;
CranedProcOutputReq payload_proc_output_req = 3;
CranedUnRegisterReq payload_unregister_req = 4;
}
}
Expand All @@ -631,9 +661,10 @@ message StreamCforedTaskIOReply {
bool ok = 1;
}

message CranedTaskInputReq {
message CranedProcInputReq {
uint32 task_id = 1;
string msg = 2;
uint32 proc_id = 2;
string msg = 3;
}

message CranedUnregisterReply {
Expand All @@ -644,7 +675,7 @@ message StreamCforedTaskIOReply {

oneof payload {
CranedRegisterReply payload_craned_register_reply = 2;
CranedTaskInputReq payload_task_input_req = 3;
CranedProcInputReq payload_proc_input_req = 3;
CranedUnregisterReply payload_craned_unregister_reply = 4;
}
}
Expand Down Expand Up @@ -694,6 +725,7 @@ service CraneCtld {
service Craned {
/* ----------------------------------- Called from CraneCtld ---------------------------------------------------- */
rpc ExecuteTask(ExecuteTasksRequest) returns(ExecuteTasksReply);
rpc ExecuteProc(ExecuteProcRequest) returns(ExecuteProcReply);

rpc CheckTaskStatus(CheckTaskStatusRequest) returns(CheckTaskStatusReply);

Expand Down
14 changes: 14 additions & 0 deletions protos/PublicDefs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ enum InteractiveTaskType {
Crun = 1;
}

message ProcToD{
uint32 task_id = 1;
uint32 proc_id = 2;
TaskType type = 3;
bool get_user_env = 4;
map<string, string> env = 5;
string cwd = 6;
oneof payload {
BatchTaskAdditionalMeta batch_meta = 7;
InteractiveTaskAdditionalMeta interactive_meta = 8;
}

}

message TaskToCtld {
/* -------- Fields that are set at the submission time. ------- */
google.protobuf.Duration time_limit = 1;
Expand Down
54 changes: 53 additions & 1 deletion src/CraneCtld/CranedKeeper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,24 @@ std::vector<task_id_t> CranedStub::ExecuteTasks(
return failed_task_ids;
}

CraneErr CranedStub::ExecuteProc(
const crane::grpc::ExecuteProcRequest &request) {
using crane::grpc::ExecuteProcReply;

ClientContext context;
Status status;
ExecuteProcReply reply;

status = m_stub_->ExecuteProc(&context,request,&reply);
if (!status.ok()) {
CRANE_DEBUG(
"ExecuteProc RPC for Node {} returned with status not ok: {}",
m_craned_id_, status.error_message());
return CraneErr::kRpcFailure;
}
return CraneErr::kOk;
}

CraneErr CranedStub::TerminateTasks(const std::vector<task_id_t> &task_ids) {
using crane::grpc::TerminateTasksReply;
using crane::grpc::TerminateTasksRequest;
Expand All @@ -81,7 +99,10 @@ CraneErr CranedStub::TerminateTasks(const std::vector<task_id_t> &task_ids) {
return CraneErr::kRpcFailure;
}

return CraneErr::kOk;
if (reply.ok())
return CraneErr::kOk;
else
return CraneErr::kGenericFailure;
}

CraneErr CranedStub::TerminateOrphanedTask(task_id_t task_id) {
Expand Down Expand Up @@ -295,6 +316,37 @@ crane::grpc::ExecuteTasksRequest CranedStub::NewExecuteTasksRequest(
return request;
}

// only for Proc launching
crane::grpc::ExecuteProcRequest CranedStub::NewExecuteProcRequest(
std::unique_ptr<TaskInCtld>& task_running,
std::unique_ptr<TaskInCtld>& step_submit) {
crane::grpc::ExecuteProcRequest request;
auto proc_to_d = request.mutable_proc();
// Set type
proc_to_d->set_type(step_submit->type);
proc_to_d->set_task_id(task_running->TaskId());
proc_to_d->mutable_env()->insert(step_submit->env.begin(), step_submit->env.end());

proc_to_d->set_cwd(step_submit->cwd);
proc_to_d->set_get_user_env(step_submit->get_user_env);

if (step_submit->type == crane::grpc::Batch) {
auto &meta_in_ctld = std::get<BatchMetaInTask>(step_submit->meta);
auto *mutable_meta = proc_to_d->mutable_batch_meta();
mutable_meta->set_output_file_pattern(meta_in_ctld.output_file_pattern);
mutable_meta->set_error_file_pattern(meta_in_ctld.error_file_pattern);
mutable_meta->set_sh_script(meta_in_ctld.sh_script);
} else {
auto &meta_in_ctld = std::get<InteractiveMetaInTask>(step_submit->meta);
auto *mutable_meta = proc_to_d->mutable_interactive_meta();
mutable_meta->set_cfored_name(meta_in_ctld.cfored_name);
mutable_meta->set_sh_script(meta_in_ctld.sh_script);
mutable_meta->set_term_env(meta_in_ctld.term_env);
mutable_meta->set_interactive_type(meta_in_ctld.interactive_type);
}
return request;
}

CranedKeeper::CranedKeeper(uint32_t node_num) : m_cq_closed_(false) {
m_pmr_pool_res_ = std::make_unique<std::pmr::synchronized_pool_resource>();
m_tag_sync_allocator_ =
Expand Down
5 changes: 5 additions & 0 deletions src/CraneCtld/CranedKeeper.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ class CranedStub {
static crane::grpc::ExecuteTasksRequest NewExecuteTasksRequest(
const std::vector<TaskInCtld *> &tasks);

static crane::grpc::ExecuteProcRequest NewExecuteProcRequest(
std::unique_ptr<TaskInCtld>& task_running,
std::unique_ptr<TaskInCtld>& step_submit);

std::vector<task_id_t> ExecuteTasks(
const crane::grpc::ExecuteTasksRequest &request);
CraneErr ExecuteProc(const crane::grpc::ExecuteProcRequest &request);

CraneErr CreateCgroupForTasks(std::vector<CgroupSpec> const &cgroup_specs);

Expand Down
32 changes: 23 additions & 9 deletions src/CraneCtld/CtldGrpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,23 @@ grpc::Status CraneCtldServiceImpl::CforedStream(
auto const &payload = cfored_request.payload_task_req();
auto task = std::make_unique<TaskInCtld>();
task->SetFieldsByTaskToCtld(payload.task());
if (payload.has_task_id()) {
auto result = g_task_scheduler->SubmitProc(
std::move(task), payload.task_id(), payload.pid());
const auto &[proc_id, craned_ids] = result.get();
ok = stream_writer.WriteTaskIdReply(
payload.pid(),
result::result<task_id_t, std::string>{payload.task_id()},
proc_id, craned_ids);
if (!ok) {
CRANE_ERROR(
"Failed to send msg to cfored {}. Connection is broken. "
"Exiting...",
cfored_name);
state = StreamState::kCleanData;
}
break;
}

auto &meta = std::get<InteractiveMetaInTask>(task->meta);

Expand All @@ -991,12 +1008,9 @@ grpc::Status CraneCtldServiceImpl::CforedStream(
// calloc will not send TaskCompletionAckReply when task
// Complete.
// crun task will send TaskStatusChange from Craned,
if (meta.interactive_type == InteractiveTaskType::Crun) {
// todo: Remove this
CRANE_TRACE("Sending TaskCompletionAckReply in task_completed",
task_id);
stream_writer.WriteTaskCompletionAckReply(task_id);
}
// if (meta.interactive_type == InteractiveTaskType::Crun) {
// stream_writer.WriteTaskCompletionAckReply(task_id);
// }
m_ctld_server_->m_mtx_.Lock();

// If cfored disconnected, the cfored_name should have be
Expand All @@ -1020,7 +1034,7 @@ grpc::Status CraneCtldServiceImpl::CforedStream(
} else {
result = result::fail(submit_result.error());
}
ok = stream_writer.WriteTaskIdReply(payload.pid(), result);
ok = stream_writer.WriteTaskIdReply(payload.pid(), result, 0, {});

if (!ok) {
CRANE_ERROR(
Expand All @@ -1042,8 +1056,8 @@ grpc::Status CraneCtldServiceImpl::CforedStream(
auto const &payload = cfored_request.payload_task_complete_req();

g_task_scheduler->TerminateRunningTask(payload.task_id());

if (payload.interactive_type() != InteractiveTaskType::Crun) {
ok = true;
if (payload.interactive_type() != crane::grpc::Crun) {
ok = stream_writer.WriteTaskCompletionAckReply(payload.task_id());
}
if (!ok) {
Expand Down
Loading

0 comments on commit 3a41621

Please sign in to comment.