diff --git a/etc/config.yaml b/etc/config.yaml index 13eef3d7..d572b75b 100644 --- a/etc/config.yaml +++ b/etc/config.yaml @@ -41,6 +41,15 @@ CranedMutexFilePath: craned/craned.lock # whether the craned is running in the background CranedForeground: true +# Container Runtime Options +CranedContainer: + Enable: true + TempDir: craned/containers/ + RuntimeState: /usr/bin/runc --root=/run/user/%U/ state %u.%U.%j.%x + RuntimeKill: /usr/bin/runc --rootless=true --root=/run/user/%U/ kill -a %u.%U.%j.%x SIGTERM + RuntimeDelete: /usr/bin/runc --rootless=true --root=/run/user/%U/ delete --force %u.%U.%j.%x + RuntimeRun: /usr/bin/runc --rootless=true --root=/run/user/%U/ run -b %b %u.%U.%j.%x + # Scheduling settings PriorityType: priority/multifactor @@ -57,14 +66,14 @@ PriorityWeightJobSize: 0 PriorityWeightPartition: 1000 PriorityWeightQ0S: 1000000 -# list of configuration information of the computing machine # Nodes and partitions settings +# node list Nodes: - name: "cn[15-18]" cpu: 2 memory: 2G -# partition information list +# partition list Partitions: - name: CPU nodes: "cn[15-16]" @@ -75,9 +84,7 @@ Partitions: DefaultPartition: CPU - -# Advanced options: - +# Advanced settings: # Maximum size of Pending Queue and must <=900000. PendingQueueMaxSize: 900000 diff --git a/protos/PublicDefs.proto b/protos/PublicDefs.proto index 855ed560..07b8fda8 100644 --- a/protos/PublicDefs.proto +++ b/protos/PublicDefs.proto @@ -95,6 +95,8 @@ message TaskToCtld { InteractiveTaskAdditionalMeta interactive_meta = 22; } + string container = 25; + string cmd_line = 31; string cwd = 32; // Current working directory map env = 33; @@ -164,10 +166,12 @@ message TaskToD { double cpus_per_task = 23; bool get_user_env = 24; + string container = 25; } message BatchTaskAdditionalMeta { string sh_script = 1; + string interpreter = 2; string output_file_pattern = 3; string error_file_pattern = 4; } @@ -196,6 +200,8 @@ message TaskInfo { string username = 15; string qos = 16; + string container = 25; + // Dynamic task information TaskStatus status = 31; string craned_list = 32; diff --git a/scripts/wipe_data.sh b/scripts/wipe_data.sh index 4a52eace..e820ee34 100755 --- a/scripts/wipe_data.sh +++ b/scripts/wipe_data.sh @@ -10,29 +10,25 @@ fi mode=$1 # 读取配置文件中的账号密码以及unqlite文件路径 -confFile=/etc/crane/database.yaml -username=$(grep 'DbUser' "$confFile" | awk '{print $2}') -username=${username//\"/} -password=$(grep 'DbPassword' "$confFile" | awk '{print $2}') -password=${password//\"/} -embedded_db_path=$(grep 'CraneCtldDbPath' "$confFile" | awk '{print $2}') -parent_dir="${embedded_db_path%/*}" -env_path="${parent_dir}/CraneEnv" - -# MongoDB服务器的地址和端口 -host="localhost" -port="27017" +conf_file=/etc/crane/database.yaml +base_dir=/var/crane/ +username=$(grep 'DbUser:' "$conf_file" | awk '{print $2}') +password=$(grep 'DbPassword:' "$conf_file" | awk -F\" '{print $2}') +host=$(grep 'DbHost:' "$conf_file" | awk '{print $2}') +port=$(grep 'DbPort:' "$conf_file" | awk '{print $2}') +dbname=$(grep 'DbName:' "$conf_file" | awk '{print $2}') +embedded_db_path="$base_dir$(grep 'CraneCtldDbPath:' "$conf_file" | awk '{print $2}')" # 使用mongo shell连接到MongoDB服务器并清空指定的集合 - function wipe_collection() { - mongosh --username "$username" --password "$password" --host "$host" --port "$port" <(); + auto max_age = config["PriorityMaxAge"].as(); std::regex pattern_hour_min_sec(R"((\d+):(\d+):(\d+))"); std::regex pattern_day_hour(R"((\d+)-(\d+))"); @@ -220,7 +220,7 @@ void ParseConfig(int argc, char** argv) { } if (config["PriorityType"]) { - std::string priority_type = config["PriorityType"].as(); + auto priority_type = config["PriorityType"].as(); if (priority_type == "priority/multifactor") g_config.PriorityConfig.Type = Ctld::Config::Priority::MultiFactor; else @@ -576,7 +576,7 @@ void InitializeCtldGlobalVariables() { g_craned_keeper->SetCranedIsDownCb([](const CranedId& craned_id) { CRANE_TRACE( - "CranedNode #{} is down now." + "CranedNode #{} is down now. " "Remove its resource from the global resource pool.", craned_id); g_meta_container->CranedDown(craned_id); diff --git a/src/CraneCtld/CranedKeeper.cpp b/src/CraneCtld/CranedKeeper.cpp index 23a3ee88..73bc6bca 100644 --- a/src/CraneCtld/CranedKeeper.cpp +++ b/src/CraneCtld/CranedKeeper.cpp @@ -267,6 +267,8 @@ crane::grpc::ExecuteTasksRequest CranedStub::NewExecuteTasksRequest( mutable_task->set_cwd(task->cwd); mutable_task->set_get_user_env(task->get_user_env); + mutable_task->set_container(task->container); + for (const auto &hostname : task->CranedIds()) mutable_task->mutable_allocated_nodes()->Add()->assign(hostname); @@ -278,6 +280,7 @@ crane::grpc::ExecuteTasksRequest CranedStub::NewExecuteTasksRequest( if (task->type == crane::grpc::Batch) { auto &meta_in_ctld = std::get(task->meta); auto *mutable_meta = mutable_task->mutable_batch_meta(); + mutable_meta->set_interpreter(meta_in_ctld.interpreter); mutable_meta->set_output_file_pattern(meta_in_ctld.output_file_pattern); mutable_meta->set_error_file_pattern(meta_in_ctld.error_file_pattern); mutable_meta->set_sh_script(meta_in_ctld.sh_script); diff --git a/src/CraneCtld/CtldPublicDefs.h b/src/CraneCtld/CtldPublicDefs.h index 37609009..2dbda0ca 100644 --- a/src/CraneCtld/CtldPublicDefs.h +++ b/src/CraneCtld/CtldPublicDefs.h @@ -222,6 +222,7 @@ struct InteractiveMetaInTask { struct BatchMetaInTask { std::string sh_script; + std::string interpreter; std::string output_file_pattern; std::string error_file_pattern; }; @@ -254,6 +255,8 @@ struct TaskInCtld { std::unordered_map env; std::string cwd; + std::string container; + std::variant meta; private: @@ -416,6 +419,7 @@ struct TaskInCtld { if (type == crane::grpc::Batch) { meta.emplace(BatchMetaInTask{ .sh_script = val.batch_meta().sh_script(), + .interpreter = val.batch_meta().interpreter(), .output_file_pattern = val.batch_meta().output_file_pattern(), .error_file_pattern = val.batch_meta().error_file_pattern(), }); @@ -438,6 +442,7 @@ struct TaskInCtld { qos = val.qos(); get_user_env = val.get_user_env(); + container = val.container(); } void SetFieldsByRuntimeAttr(crane::grpc::RuntimeAttrOfTask const& val) { diff --git a/src/Craned/CMakeLists.txt b/src/Craned/CMakeLists.txt index 8f061d96..478320e8 100644 --- a/src/Craned/CMakeLists.txt +++ b/src/Craned/CMakeLists.txt @@ -5,6 +5,8 @@ add_executable(craned ResourceAllocators.cpp TaskManager.h TaskManager.cpp + TaskExecutor.h + TaskExecutor.cpp CranedServer.h CranedServer.cpp CranedPublicDefs.h diff --git a/src/Craned/Craned.cpp b/src/Craned/Craned.cpp index beb3b878..ef47e531 100644 --- a/src/Craned/Craned.cpp +++ b/src/Craned/Craned.cpp @@ -317,15 +317,39 @@ void ParseConfig(int argc, char** argv) { g_config.Partitions.emplace(std::move(name), std::move(part)); } + } - if (config["CranedForeground"]) { - auto val = config["CranedForeground"].as(); - if (val == "true") - g_config.CranedForeground = true; - else - g_config.CranedForeground = false; - } + if (config["CranedForeground"]) { + auto val = config["CranedForeground"].as(); + if (val == "true") + g_config.CranedForeground = true; + else + g_config.CranedForeground = false; + } + + if (config["CranedContainer"] && config["CranedContainer"]["Enable"] && + config["CranedContainer"]["Enable"].as()) { + g_config.CranedContainer.TempDir = + g_config.CraneBaseDir + + config["CranedContainer"]["TempDir"].as(); + g_config.CranedContainer.RunTimeState = + config["CranedContainer"]["RuntimeState"].as(); + g_config.CranedContainer.RuntimeDelete = + config["CranedContainer"]["RuntimeDelete"].as(); + g_config.CranedContainer.RuntimeKill = + config["CranedContainer"]["RuntimeKill"].as(); + g_config.CranedContainer.RuntimeRun = + config["CranedContainer"]["RuntimeRun"].as(); + g_config.CranedContainer.Enable = true; + CRANE_DEBUG("Container support is enabled"); + CRANE_TRACE("OCI Runtime set to {}", + g_config.CranedContainer.RuntimeRun); + } else { + g_config.CranedContainer = {}; + g_config.CranedContainer.Enable = false; + CRANE_DEBUG("Container support is disabled"); } + } catch (YAML::BadFile& e) { CRANE_CRITICAL("Can't open config file {}: {}", kDefaultConfigPath, e.what()); diff --git a/src/Craned/CranedPublicDefs.h b/src/Craned/CranedPublicDefs.h index 0e266716..4e03e0a0 100644 --- a/src/Craned/CranedPublicDefs.h +++ b/src/Craned/CranedPublicDefs.h @@ -63,6 +63,15 @@ struct Config { std::string UnixSocketListenAddr; }; + struct CranedContainerConf { + bool Enable{false}; + std::string TempDir; + std::string RunTimeState; + std::string RuntimeKill; + std::string RuntimeDelete; + std::string RuntimeRun; + }; + CranedListenConf ListenConf; bool CompressedRpc{}; @@ -78,6 +87,8 @@ struct Config { bool CranedForeground{}; + CranedContainerConf CranedContainer; + std::string Hostname; CranedId CranedIdOfThisNode; diff --git a/src/Craned/TaskExecutor.cpp b/src/Craned/TaskExecutor.cpp new file mode 100644 index 00000000..bff2bfb1 --- /dev/null +++ b/src/Craned/TaskExecutor.cpp @@ -0,0 +1,830 @@ +/** + * Copyright (c) 2023 Peking University and Peking University + * Changsha Institute for Computing and Digital Economy + * + * CraneSched is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of + * the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, + * WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +#include "TaskExecutor.h" + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "CranedPublicDefs.h" +#include "TaskManager.h" +#include "crane/Logger.h" +#include "crane/OS.h" +#include "protos/CraneSubprocess.pb.h" + +namespace Craned { + +/** + * Generate CRANE_* vars using task information and + * extract task.env() from frontend. + */ +TaskExecutor::EnvironVars TaskExecutor::GetEnvironVarsFromTask( + const TaskInstance& instance) { + TaskExecutor::EnvironVars env_vec{}; + env_vec.emplace_back("CRANE_JOB_NODELIST", + absl::StrJoin(instance.task.allocated_nodes(), ";")); + env_vec.emplace_back("CRANE_EXCLUDES", + absl::StrJoin(instance.task.excludes(), ";")); + env_vec.emplace_back("CRANE_JOB_NAME", instance.task.name()); + env_vec.emplace_back("CRANE_ACCOUNT", instance.task.account()); + env_vec.emplace_back("CRANE_PARTITION", instance.task.partition()); + env_vec.emplace_back("CRANE_QOS", instance.task.qos()); + env_vec.emplace_back("CRANE_MEM_PER_NODE", + std::to_string(instance.task.resources() + .allocatable_resource() + .memory_limit_bytes() / + (1024 * 1024))); + env_vec.emplace_back("CRANE_JOB_ID", std::to_string(instance.task.task_id())); + + int64_t time_limit_sec = instance.task.time_limit().seconds(); + int hours = time_limit_sec / 3600; + int minutes = (time_limit_sec % 3600) / 60; + int seconds = time_limit_sec % 60; + std::string time_limit = + fmt::format("{:0>2}:{:0>2}:{:0>2}", hours, minutes, seconds); + env_vec.emplace_back("CRANE_TIMELIMIT", time_limit); + + // Add env from user + auto& env_from_user = instance.task.env(); + for (auto&& [name, value] : env_from_user) { + env_vec.emplace_back(name, value); + } + + return env_vec; +} + +std::string ProcessInstance::WriteBatchScript(const std::string_view script) { + m_executive_path_ = + fmt::format("{}/Crane-{}.sh", g_config.CranedScriptDir, m_meta_.id); + + FILE* fptr = fopen(m_executive_path_.c_str(), "w"); + if (fptr == nullptr) return ""; + fputs(script.data(), fptr); + fclose(fptr); + + chmod(m_executive_path_.c_str(), strtol("0755", nullptr, 8)); + + return m_executive_path_; +} + +CraneErr ProcessInstance::Spawn(util::Cgroup* cgroup) { + using google::protobuf::io::FileInputStream; + using google::protobuf::io::FileOutputStream; + using google::protobuf::util::ParseDelimitedFromZeroCopyStream; + using google::protobuf::util::SerializeDelimitedToZeroCopyStream; + + using crane::grpc::subprocess::CanStartMessage; + using crane::grpc::subprocess::ChildProcessReady; + + int socket_pair[2]; + + if (socketpair(AF_UNIX, SOCK_STREAM, 0, socket_pair) != 0) { + CRANE_ERROR("Failed to create socket pair: {}", strerror(errno)); + return CraneErr::kSystemErr; + } + + // save the current uid/gid + std::pair saved_priv{getuid(), getgid()}; + + int rc = setegid(m_meta_.pwd.Gid()); + if (rc == -1) { + CRANE_ERROR("error: setegid. {}", strerror(errno)); + return CraneErr::kSystemErr; + } + __gid_t gid_a[1] = {m_meta_.pwd.Gid()}; + setgroups(1, gid_a); + rc = seteuid(m_meta_.pwd.Uid()); + if (rc == -1) { + CRANE_ERROR("error: seteuid. {}", strerror(errno)); + return CraneErr::kSystemErr; + } + + pid_t child_pid = fork(); + if (child_pid > 0) { // Parent proc + close(socket_pair[1]); + int fd = socket_pair[0]; + bool ok; + CraneErr err; + + setegid(saved_priv.second); + seteuid(saved_priv.first); + setgroups(0, nullptr); + + FileInputStream istream(fd); + FileOutputStream ostream(fd); + CanStartMessage msg; + ChildProcessReady child_process_ready; + + CRANE_DEBUG("Subprocess was created for task #{} pid: {}", m_meta_.id, + child_pid); + + SetPid(child_pid); + + // Add event for stdout/stderr of the new subprocess + // struct bufferevent* ev_buf_event; + // ev_buf_event = + // bufferevent_socket_new(m_ev_base_, fd, BEV_OPT_CLOSE_ON_FREE); + // if (!ev_buf_event) { + // CRANE_ERROR( + // "Error constructing bufferevent for the subprocess of task #!", + // instance->task.task_id()); + // err = CraneErr::kLibEventError; + // goto AskChildToSuicide; + // } + // bufferevent_setcb(ev_buf_event, EvSubprocessReadCb_, nullptr, nullptr, + // (void*)process.get()); + // bufferevent_enable(ev_buf_event, EV_READ); + // bufferevent_disable(ev_buf_event, EV_WRITE); + // process->SetEvBufEvent(ev_buf_event); + + // Migrate the new subprocess to newly created cgroup + if (!cgroup->MigrateProcIn(GetPid())) { + CRANE_ERROR( + "Terminate the subprocess of task #{} due to failure of cgroup " + "migration.", + m_meta_.id); + + err = CraneErr::kCgroupError; + goto AskChildToSuicide; + } + + CRANE_TRACE("New task #{} is ready. Asking subprocess to execv...", + m_meta_.id); + + // Tell subprocess that the parent process is ready. Then the + // subprocess should continue to exec(). + msg.set_ok(true); + ok = SerializeDelimitedToZeroCopyStream(msg, &ostream); + ok &= ostream.Flush(); + if (!ok) { + CRANE_ERROR("Failed to send ok=true to subprocess {} for task #{}", + child_pid, m_meta_.id); + close(fd); + return CraneErr::kProtobufError; + } + + ParseDelimitedFromZeroCopyStream(&child_process_ready, &istream, nullptr); + if (!msg.ok()) { + CRANE_ERROR("Failed to read protobuf from subprocess {} of task #{}", + child_pid, m_meta_.id); + close(fd); + return CraneErr::kProtobufError; + } + + close(fd); + return CraneErr::kOk; + + AskChildToSuicide: + msg.set_ok(false); + + ok = SerializeDelimitedToZeroCopyStream(msg, &ostream); + close(fd); + if (!ok) { + CRANE_ERROR("Failed to ask subprocess {} to suicide for task #{}", + child_pid, m_meta_.id); + return CraneErr::kProtobufError; + } + return err; + } else { // Child proc + rc = chdir(m_cwd_.c_str()); + if (rc == -1) { + CRANE_ERROR("[Child Process] Error: chdir to {}. {}", m_cwd_.c_str(), + strerror(errno)); + std::abort(); + } + + setreuid(m_meta_.pwd.Uid(), m_meta_.pwd.Uid()); + setregid(m_meta_.pwd.Gid(), m_meta_.pwd.Gid()); + + // Set pgid to the pid of task root process. + setpgid(0, 0); + + close(socket_pair[0]); + int fd = socket_pair[1]; + + FileInputStream istream(fd); + FileOutputStream ostream(fd); + CanStartMessage msg; + ChildProcessReady child_process_ready; + bool ok; + + ParseDelimitedFromZeroCopyStream(&msg, &istream, nullptr); + if (!msg.ok()) std::abort(); + + const std::string& stdout_file_path = + m_batch_meta_.parsed_output_file_pattern; + const std::string& stderr_file_path = + m_batch_meta_.parsed_error_file_pattern; + + int stdout_fd = + open(stdout_file_path.c_str(), O_RDWR | O_CREAT | O_APPEND, 0644); + if (stdout_fd == -1) { + CRANE_ERROR("[Child Process] Error: open {}. {}", stdout_file_path, + strerror(errno)); + std::abort(); + } + dup2(stdout_fd, 1); // stdout -> output file + + if (stderr_file_path.empty()) { + // if stderr filename is not specified + dup2(stdout_fd, 2); // stderr -> output file + } else { + int stderr_fd = + open(stderr_file_path.c_str(), O_RDWR | O_CREAT | O_APPEND, 0644); + if (stderr_fd == -1) { + CRANE_ERROR("[Child Process] Error: open {}. {}", stderr_file_path, + strerror(errno)); + std::abort(); + } + dup2(stderr_fd, 2); // stderr -> error file + close(stderr_fd); + } + + close(stdout_fd); + + child_process_ready.set_ok(true); + ok = SerializeDelimitedToZeroCopyStream(child_process_ready, &ostream); + ok &= ostream.Flush(); + if (!ok) { + CRANE_ERROR("[Child Process] Error: Failed to flush."); + std::abort(); + } + + close(fd); + + // If these file descriptors are not closed, a program like mpirun may + // keep waiting for the input from stdin or other fds and will never end. + close(0); // close stdin + util::os::CloseFdFrom(3); + + // Use bash to create the executing environment + std::vector argv{"/bin/bash"}; + + // If get_user_env, will load envs using --login + // Maybe should use `su` instead. + if (m_meta_.get_user_env) { + // If --get-user-env is set, the new environment is inherited + // from the execution CraneD rather than the submitting node. + // + // Since we want to reinitialize the environment variables of the user + // by reloading the settings in something like .bashrc or /etc/profile, + // we are actually performing two steps: login -> start shell. + // Shell starting is done by calling "bash --login". + // + // During shell starting step, the settings in + // /etc/profile, ~/.bash_profile, ... are loaded. + // + // During login step, "HOME" and "SHELL" are set. + // Here we are just mimicking the login module. + + // Slurm uses `su -c /usr/bin/env` to retrieve + // all the environment variables. + // We use a more tidy way. + m_env_.emplace_back("HOME", m_meta_.pwd.HomeDir()); + m_env_.emplace_back("SHELL", m_meta_.pwd.Shell()); + argv.emplace_back("--login"); + } + + // Set environment variables + if (clearenv()) { + fmt::print("clearenv() failed!\n"); + } + + for (const auto& [name, value] : m_env_) { + if (setenv(name.c_str(), value.c_str(), 1)) { + fmt::print("setenv for {}={} failed!\n", name, value); + } + } + + // Add arguments + // TODO: Arguments for the interpreter or executive? + std::string arguments{}; + for (const auto& arg : m_arguments_) { + arguments += arg + " "; + } + + // e.g., /bin/bash -c "/bin/zsh script.sh --arg1 --arg2 ..." + auto cmd = fmt::format("{} {} {}", m_batch_meta_.interpreter, + m_executive_path_, arguments); + + argv.emplace_back("-c"); + argv.emplace_back(cmd.c_str()); + argv.emplace_back(nullptr); + + execv(argv[0], const_cast(argv.data())); + + // Error occurred since execv returned. At this point, errno is set. + // Ctld use SIGABRT to inform the client of this failure. + fmt::print(stderr, "[Craned Subprocess Error] Failed to execv. Error: {}\n", + strerror(errno)); + // Todo: See https://tldp.org/LDP/abs/html/exitcodes.html, return standard + // exit codes + abort(); + } +} + +CraneErr ProcessInstance::Kill(int signum) { + // TODO: Add timer which sends SIGTERM for those tasks who + // will not quit when receiving SIGINT. + if (m_pid_) { + // Send the signal to the whole process group. + int err = kill(-m_pid_, signum); + + if (err == 0) + return CraneErr::kOk; + else { + CRANE_TRACE("kill pid {} failed. error: {}", m_pid_, strerror(errno)); + return CraneErr::kGenericFailure; + } + } + + return CraneErr::kNonExistent; +} + +TaskExecutor::ChldStatus ProcessInstance::CheckChldStatus(pid_t pid, + int status) { + ChldStatus chld_status{}; + + if (WIFEXITED(status)) { + // Exited with status WEXITSTATUS(status) + chld_status = {pid, false, WEXITSTATUS(status)}; + CRANE_TRACE("Receiving SIGCHLD for pid {}. Signaled: false, Status: {}", + pid, WEXITSTATUS(status)); + + } else if (WIFSIGNALED(status)) { + // Killed by signal WTERMSIG(status) + chld_status = {pid, true, WTERMSIG(status)}; + CRANE_TRACE("Receiving SIGCHLD for pid {}. Signaled: true, Signal: {}", pid, + WTERMSIG(status)); + } + + return chld_status; +} + +CraneErr ContainerInstance::ModifyBundleConfig_(const std::string& src, + const std::string& dst) { + using json = nlohmann::json; + + // Check if bundle is valid + auto src_config = std::filesystem::path(src) / "config.json"; + auto src_rootfs = std::filesystem::path(src) / "rootfs"; + if (!std::filesystem::exists(src_config) || + !std::filesystem::exists(src_rootfs)) { + CRANE_ERROR("Bundle provided by Task #{} not exists : {}", m_meta_.id, + src_config.string()); + return CraneErr::kInvalidParam; + } + + std::ifstream fin{src_config}; + if (!fin) { + CRANE_ERROR("Failed to open bundle config provided by Task #{}: {}", + m_meta_.id, src_config.string()); + return CraneErr::kSystemErr; + } + + json config = json::parse(fin, nullptr, false); + if (config.is_discarded()) { + CRANE_ERROR("Bundle config provided by Task #{} is invalid: {}", m_meta_.id, + src_config.string()); + return CraneErr::kInvalidParam; + } + + try { + // Set root object, see: + // https://github.com/opencontainers/runtime-spec/blob/main/config.md#root + // Set real rootfs path in the modified config. + config["root"]["path"] = src_rootfs; + + // Set mounts array, see: + // https://github.com/opencontainers/runtime-spec/blob/main/config.md#mounts + // Bind mount script into the container + auto mounts = config["mounts"].get>(); + std::string mounted_executive = "/tmp/crane/script.sh"; + mounts.emplace_back(json::object({ + {"destination", mounted_executive}, + {"source", m_executive_path_}, + {"options", json::array({"bind", "ro"})}, + })); + config["mounts"] = mounts; + + // Set process object, see: + // https://github.com/opencontainers/runtime-spec/blob/main/config.md#process + auto& process = config["process"]; + // Set pass-through mode, see runc docs for detail. + process["terminal"] = false; + // Write environment variables in IEEE format. + process["env"] = [this]() { + // TODO: Will discard `--get-user-env` + auto env_str = std::vector(); + for (const auto& [name, value] : m_env_) + env_str.emplace_back(fmt::format("{}={}", name, value)); + return env_str; + }(); + // TODO: Support more arguments. + process["args"] = { + m_batch_meta_.interpreter, + mounted_executive, + }; + } catch (json::exception& e) { + CRANE_ERROR("Failed to generate bundle config for Task #{}: {}", m_meta_.id, + e.what()); + return CraneErr::kGenericFailure; + } + + // Write the modified config + auto dst_config = std::filesystem::path(dst) / "config.json"; + std::ofstream fout{dst_config}; + if (!fout) { + CRANE_ERROR("Failed to write bundle config for Task #{}: {}", m_meta_.id, + dst_config.string()); + return CraneErr::kSystemErr; + } + fout << config.dump(4); + fout.flush(); + + return CraneErr::kOk; +} + +std::string ContainerInstance::WriteBatchScript(const std::string_view script) { + // Create temp folder + if (AssureContainerTempDir_() != CraneErr::kOk) return ""; + + // Write into the temp folder + m_executive_path_ = std::filesystem::path(m_temp_path_) / + fmt::format("Crane-{}.sh", m_meta_.id); + + FILE* fptr = fopen(m_executive_path_.c_str(), "w"); + if (fptr == nullptr) return ""; + fputs(script.data(), fptr); + fclose(fptr); + + chmod(m_executive_path_.c_str(), strtol("0755", nullptr, 8)); + + return m_executive_path_; +} + +CraneErr ContainerInstance::Spawn(util::Cgroup* cgroup) { + using google::protobuf::io::FileInputStream; + using google::protobuf::io::FileOutputStream; + using google::protobuf::util::ParseDelimitedFromZeroCopyStream; + using google::protobuf::util::SerializeDelimitedToZeroCopyStream; + + using crane::grpc::subprocess::CanStartMessage; + using crane::grpc::subprocess::ChildProcessReady; + + int socket_pair[2]; + + if (socketpair(AF_UNIX, SOCK_STREAM, 0, socket_pair) != 0) { + CRANE_ERROR("Failed to create socket pair: {}", strerror(errno)); + return CraneErr::kSystemErr; + } + + // save the current uid/gid + std::pair saved_id{getuid(), getgid()}; + + int rc = setegid(m_meta_.pwd.Gid()); + if (rc == -1) { + CRANE_ERROR("error: setegid. {}", strerror(errno)); + return CraneErr::kSystemErr; + } + __gid_t gid_a[1] = {m_meta_.pwd.Gid()}; + setgroups(1, gid_a); + rc = seteuid(m_meta_.pwd.Uid()); + if (rc == -1) { + CRANE_ERROR("error: seteuid. {}", strerror(errno)); + return CraneErr::kSystemErr; + } + + pid_t child_pid = fork(); + if (child_pid > 0) { // Parent proc + close(socket_pair[1]); + int fd = socket_pair[0]; + bool ok; + CraneErr err; + + setegid(saved_id.second); + seteuid(saved_id.first); + setgroups(0, nullptr); + + FileInputStream istream(fd); + FileOutputStream ostream(fd); + CanStartMessage msg; + ChildProcessReady child_process_ready; + + CRANE_DEBUG("Subprocess was created for task #{} pid: {}", m_meta_.id, + child_pid); + + SetPid(child_pid); + + // Add event for stdout/stderr of the new subprocess + // struct bufferevent* ev_buf_event; + // ev_buf_event = + // bufferevent_socket_new(m_ev_base_, fd, BEV_OPT_CLOSE_ON_FREE); + // if (!ev_buf_event) { + // CRANE_ERROR( + // "Error constructing bufferevent for the subprocess of task #!", + // instance->task.task_id()); + // err = CraneErr::kLibEventError; + // goto AskChildToSuicide; + // } + // bufferevent_setcb(ev_buf_event, EvSubprocessReadCb_, nullptr, nullptr, + // (void*)process.get()); + // bufferevent_enable(ev_buf_event, EV_READ); + // bufferevent_disable(ev_buf_event, EV_WRITE); + // process->SetEvBufEvent(ev_buf_event); + + // Migrate the new subprocess to newly created cgroup + if (!cgroup->MigrateProcIn(GetPid())) { + CRANE_ERROR( + "Terminate the subprocess of task #{} due to failure of cgroup " + "migration.", + m_meta_.id); + + err = CraneErr::kCgroupError; + goto AskChildToSuicide; + } + + CRANE_TRACE("New task #{} is ready. Asking subprocess to execv...", + m_meta_.id); + + // Tell subprocess that the parent process is ready. Then the + // subprocess should continue to exec(). + msg.set_ok(true); + ok = SerializeDelimitedToZeroCopyStream(msg, &ostream); + ok &= ostream.Flush(); + if (!ok) { + CRANE_ERROR("Failed to send ok=true to subprocess {} for task #{}", + child_pid, m_meta_.id); + close(fd); + return CraneErr::kProtobufError; + } + + ParseDelimitedFromZeroCopyStream(&child_process_ready, &istream, nullptr); + if (!msg.ok()) { + CRANE_ERROR("Failed to read protobuf from subprocess {} of task #{}", + child_pid, m_meta_.id); + close(fd); + return CraneErr::kProtobufError; + } + + close(fd); + return CraneErr::kOk; + + AskChildToSuicide: + msg.set_ok(false); + + ok = SerializeDelimitedToZeroCopyStream(msg, &ostream); + close(fd); + if (!ok) { + CRANE_ERROR("Failed to ask subprocess {} to suicide for task #{}", + child_pid, m_meta_.id); + return CraneErr::kProtobufError; + } + return err; + } else { // Child proc + // Change work dir to execute + rc = chdir(m_cwd_.c_str()); + if (rc == -1) { + fmt::print("[Child Process] Error: chdir to {}. {}", m_cwd_.c_str(), + strerror(errno)); + std::abort(); + } + + setreuid(m_meta_.pwd.Uid(), m_meta_.pwd.Uid()); + setregid(m_meta_.pwd.Gid(), m_meta_.pwd.Gid()); + + // Set pgid to the pid of task root process. + setpgid(0, 0); + + close(socket_pair[0]); + int fd = socket_pair[1]; + + FileInputStream istream(fd); + FileOutputStream ostream(fd); + CanStartMessage msg; + ChildProcessReady child_process_ready; + bool ok; + + ParseDelimitedFromZeroCopyStream(&msg, &istream, nullptr); + if (!msg.ok()) std::abort(); + + const std::string& stdout_file_path = + m_batch_meta_.parsed_output_file_pattern; + const std::string& stderr_file_path = + m_batch_meta_.parsed_error_file_pattern; + + int stdout_fd = + open(stdout_file_path.c_str(), O_RDWR | O_CREAT | O_APPEND, 0644); + if (stdout_fd == -1) { + fmt::print("[Child Process] Error: open {}. {}", stdout_file_path, + strerror(errno)); + std::abort(); + } + dup2(stdout_fd, 1); // stdout -> output file + + if (stderr_file_path.empty()) { + // if stderr filename is not specified + dup2(stdout_fd, 2); // stderr -> output file + } else { + int stderr_fd = + open(stderr_file_path.c_str(), O_RDWR | O_CREAT | O_APPEND, 0644); + if (stderr_fd == -1) { + fmt::print("[Child Process] Error: open {}. {}", stderr_file_path, + strerror(errno)); + std::abort(); + } + dup2(stderr_fd, 2); // stderr -> error file + close(stderr_fd); + } + + close(stdout_fd); + + child_process_ready.set_ok(true); + ok = SerializeDelimitedToZeroCopyStream(child_process_ready, &ostream); + ok &= ostream.Flush(); + if (!ok) { + fmt::print("[Child Process] Error: Failed to flush."); + std::abort(); + } + + close(fd); + + // If these file descriptors are not closed, a program like mpirun may + // keep waiting for the input from stdin or other fds and will never end. + close(0); // close stdin + util::os::CloseFdFrom(3); + + // Generate modified bundle config. + CraneErr err = ModifyBundleConfig_(m_bundle_path_, m_temp_path_); + if (err != CraneErr::kOk) { + fmt::print("[Child Process] Error: Failed to spawn container."); + std::abort(); + } + + // Parse the pattern in run command. + // we provide m_temp_path_ to runtime so that it would use the modified + // config.json + auto run_cmd = ParseContainerCmdPattern_( + g_config.CranedContainer.RuntimeRun, m_meta_.id, m_meta_.name, + m_meta_.pwd, m_temp_path_); + + std::vector split = absl::StrSplit(std::move(run_cmd), " "); + auto split_view = + split | std::views::transform([](auto& s) { return s.c_str(); }); + auto argv = std::vector(split_view.begin(), split_view.end()); + argv.push_back(nullptr); + + // Call OCI Runtime to run container. + execv(argv[0], const_cast(argv.data())); + + // Error occurred since execv returned. At this point, errno is set. + // Ctld use SIGABRT to inform the client of this failure. + fmt::print(stderr, "[Craned Subprocess Error] Failed to execv. Error: {}\n", + strerror(errno)); + // Todo: See https://tldp.org/LDP/abs/html/exitcodes.html, return standard + // exit codes + std::abort(); + } +} + +CraneErr ContainerInstance::Kill(int signum) { + using json = nlohmann::json; + if (m_pid_) { + // If m_pid_ not exists, no further operation. + int rc = 0; + std::array buffer; + std::string cmd, ret, status; + json jret; + + // Check the state of the container + cmd = ParseContainerCmdPattern_(g_config.CranedContainer.RunTimeState, + m_meta_.id, m_meta_.name, m_meta_.pwd, + m_temp_path_); + + std::unique_ptr pipe(popen(cmd.c_str(), "r"), + pclose); + if (!pipe) { + CRANE_TRACE("Error in getting container status: popen() failed."); + goto ProcessKill; + } + while (fgets(buffer.data(), static_cast(buffer.size()), pipe.get()) != + nullptr) { + ret += buffer.data(); + } + + jret = json::parse(std::move(ret), nullptr, false); + if (jret.is_discarded() || !jret.contains("status") || + !jret["status"].is_string()) { + CRANE_TRACE("Error in parsing container status: {}", cmd); + goto ProcessKill; + } + + // Take action according to OCI container states, see: + // https://github.com/opencontainers/runtime-spec/blob/main/runtime.md#state + status = std::move(jret["status"]); + if (status == "creating") { + goto ProcessKill; + } else if (status == "created") { + goto ContainerDelete; + } else if (status == "running") { + goto ContainerKill; + } else if (status == "stopped") { + goto ContainerDelete; + } else { + CRANE_WARN("Unknown container status received: {}", status); + goto ProcessKill; + } + + ContainerKill: + // Try to stop gracefully. + // Note: Signum is configured in config.yaml instead of in the param. + cmd = ParseContainerCmdPattern_(g_config.CranedContainer.RuntimeKill, + m_meta_.id, m_meta_.name, m_meta_.pwd, + m_temp_path_); + rc = system(cmd.c_str()); + if (rc) { + CRANE_TRACE("Failed to kill container for Task #{}: error in {}", + m_meta_.id, cmd); + } + + ContainerDelete: + // Delete the container + // Note: Admin could choose if --force is configured or not. + cmd = ParseContainerCmdPattern_(g_config.CranedContainer.RuntimeDelete, + m_meta_.id, m_meta_.name, m_meta_.pwd, + m_temp_path_); + rc = system(cmd.c_str()); + if (rc) { + CRANE_TRACE("Failed to delete container for Task #{}: error in {}", + m_meta_.id, cmd); + } + + ProcessKill: + // Kill runc process as the last resort. + // Note: If runc is launched in `detached` mode, this will not work. + rc = kill(-m_pid_, signum); + if (rc && (errno != ESRCH)) { + CRANE_TRACE("Failed to kill pid {}. error: {}", m_pid_, strerror(errno)); + return CraneErr::kGenericFailure; + } + + return CraneErr::kOk; + } + + return CraneErr::kNonExistent; +} + +TaskExecutor::ChldStatus ContainerInstance::CheckChldStatus(pid_t pid, + int status) { + ChldStatus chld_status{}; + + if (WIFEXITED(status)) { + // Exited with status WEXITSTATUS(status) + chld_status = {pid, false, WEXITSTATUS(status)}; + if (chld_status.value > 128) { + // Note: When cancel a container task, + // the OCI runtime will signal the process inside the container. + // In this case, WIFSIGNALED is false and WEXITSTATUS is set to `128 + + // signum` (e.g., 128 + SIGTERM = 143). + chld_status.signaled = true; + chld_status.value -= 128; + CRANE_TRACE( + "Receiving SIGCHLD for pid {}. Signaled: true (in container), " + "Signal: {}", + pid, chld_status.value); + } else { + CRANE_TRACE("Receiving SIGCHLD for pid {}. Signaled: false, Status: {}", + pid, WEXITSTATUS(status)); + } + } else if (WIFSIGNALED(status)) { + // Killed by signal WTERMSIG(status) + // Note: This could happen when the OCI runtime itself got killed. + chld_status = {pid, true, WTERMSIG(status)}; + CRANE_TRACE("Receiving SIGCHLD for pid {}. Signaled: true, Signal: {}", pid, + WTERMSIG(status)); + } + + return chld_status; +} + +} // namespace Craned \ No newline at end of file diff --git a/src/Craned/TaskExecutor.h b/src/Craned/TaskExecutor.h new file mode 100644 index 00000000..b7563bee --- /dev/null +++ b/src/Craned/TaskExecutor.h @@ -0,0 +1,381 @@ +/** + * Copyright (c) 2023 Peking University and Peking University + * Changsha Institute for Computing and Digital Economy + * + * CraneSched is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of + * the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, + * WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +#pragma once + +#include "CranedPublicDefs.h" +// Precompiled header comes first. + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "crane/PasswordEntry.h" +#include "crane/PublicHeader.h" + +namespace Craned { + +struct TaskInstance; + +/** + * This struct stores some info in TaskInstance for convenience. + * Fields stored should be immutable. + */ +struct TaskMetaInExecutor { + const PasswordEntry& pwd; // pwd_entry of submitter + const task_id_t id; // Task id + const std::string name; // Task name + const bool get_user_env; // If the task should inherit user's env +}; + +struct BatchMetaInTaskExecutor { + std::string interpreter; + std::string parsed_output_file_pattern; + std::string parsed_error_file_pattern; +}; + +/** + * TaskExecutor handles task's execution procedure, e.g., + * - Prepare environment variables, + * - Write bash script / mount scripts in container, + * - Modify OCI container configs, + * - Call bash/OCI runtime... + * Note: Resource allocation (CGroups) is not in this scope. + */ +class TaskExecutor { + public: + using EnvironVars = std::vector>; + + struct ChldStatus { + pid_t pid; + bool signaled; + int value; // the signal number if signaled is true, + // the return value otherwise + }; + + TaskExecutor() : m_ev_buf_event_(nullptr) {} + virtual ~TaskExecutor() { + if (m_ev_buf_event_) { + bufferevent_free(m_ev_buf_event_); + } + }; + + /* --- Abstract Interfaces --- */ + + [[nodiscard]] virtual const std::string& GetExecPath() const = 0; + + virtual void SetPid(pid_t pid) = 0; + [[nodiscard]] virtual pid_t GetPid() const = 0; + + virtual void SetBatchMeta(BatchMetaInTaskExecutor batch_meta) = 0; + [[nodiscard]] virtual const BatchMetaInTaskExecutor& GetBatchMeta() const = 0; + + virtual void Output(std::string&& buf) = 0; + + virtual void Finish(bool is_killed, int val) = 0; + + /** + * Spawn process or container in the task. + * EvActivateTaskStatusChange_ must NOT be called in this method and should be + * called in the caller method after checking the return value of this + * method. + * @param task_id + * @param pwd_entry + * @param cgroup + * @param task_envs + * @return kSystemErr if the socket pair between the parent process and child + * process cannot be created, and the caller should call strerror() to check + * the unix error code. kLibEventError* if bufferevent_socket_new() fails. + * kCgroupError if CgroupManager cannot move the process to the cgroup bound + * to the TaskInstance. kProtobufError if the communication between the parent + * and the child process fails. + */ + [[nodiscard]] virtual CraneErr Spawn(util::Cgroup* cgroup) = 0; + + /** + * Kill running process or container. + * @param signum the value of signal. + * @return if the signal is sent successfully, kOk is returned. + * if the task name doesn't exist, kNonExistent is returned. + * if the signal is invalid, kInvalidParam is returned. + * otherwise, kGenericFailure is returned. + */ + virtual CraneErr Kill(int signum) = 0; + + /** + * Check the status of the managed process when receiving SIGCHLD. + * @param pid the pid causing SIGCHLD. + * @param status the status code returned by waitpid(). + * @return the exit status of the child process. + * Note: As processes and containers behaves differently when + * quitting, this method should be implemented in the derived class. + */ + virtual ChldStatus CheckChldStatus(pid_t pid, int status) = 0; + + /** + * Write script to a file and return the path to the file. + * The file path is dependent on implementation and will be + * stored in m_executive_path. + * @param script the script content to write. + * @return the path to the file. If failed, empty string will be returned. + */ + [[nodiscard]] virtual std::string WriteBatchScript( + const std::string_view script) = 0; + + /* --- Implemented in TaskExecutor --- */ + + virtual void SetEvBufEvent(struct bufferevent* ev_buf_event) { + m_ev_buf_event_ = ev_buf_event; + } + + virtual void SetOutputCb(std::function cb) { + m_output_cb_ = std::move(cb); + } + + virtual void SetFinishCb(std::function cb) { + m_finish_cb_ = std::move(cb); + } + + // Extract environment variables from a TaskInstance. + // This method does NOT get environment variables from the node. + static EnvironVars GetEnvironVarsFromTask(const TaskInstance& instance); + + protected: + // The underlying event that handles the output of the task. + struct bufferevent* m_ev_buf_event_{}; + + /*** + * The callback function called when a task writes to stdout or stderr. + * @param[in] buf a slice of output buffer. + */ + std::function m_output_cb_; + + /*** + * The callback function called when a task is finished. + * @param[in] bool true if the task is terminated by a signal, false + * otherwise. + * @param[in] int the number of signal if bool is true, the return value + * otherwise. + */ + std::function m_finish_cb_; +}; // namespace Craned + +class ProcessInstance final : public TaskExecutor { + public: + ProcessInstance(TaskMetaInExecutor meta, std::string cwd, + std::list args, EnvironVars env) + : m_meta_(std::move(meta)), + m_cwd_(std::move(cwd)), + m_arguments_(std::move(args)), + m_env_(std::move(env)), + m_executive_path_(""), + m_pid_(0), + m_user_data_(nullptr) {} + + ~ProcessInstance() override { + if (m_user_data_) { + if (m_clean_cb_) { + CRANE_TRACE("Clean Callback for pid {} is called.", m_pid_); + m_clean_cb_(m_user_data_); + } else + CRANE_ERROR( + "user_data in ProcessInstance is set, but clean_cb is not set!"); + } + } + + void SetBatchMeta(BatchMetaInTaskExecutor batch_meta) override { + this->m_batch_meta_ = std::move(batch_meta); + } + + [[nodiscard]] const BatchMetaInTaskExecutor& GetBatchMeta() const override { + return m_batch_meta_; + } + + [[nodiscard]] const std::string& GetExecPath() const override { + return m_executive_path_; + } + + [[nodiscard]] const std::list& GetArgList() const { + return m_arguments_; + } + + void SetPid(pid_t pid) override { m_pid_ = pid; } + [[nodiscard]] pid_t GetPid() const override { return m_pid_; } + + void Output(std::string&& buf) override { + if (m_output_cb_) m_output_cb_(std::move(buf), m_user_data_); + } + + void Finish(bool is_killed, int val) override { + if (m_finish_cb_) m_finish_cb_(is_killed, val, m_user_data_); + } + + [[nodiscard]] CraneErr Spawn(util::Cgroup* cgroup) override; + + CraneErr Kill(int signum) override; + + [[nodiscard]] ChldStatus CheckChldStatus(pid_t pid, int status) override; + + [[nodiscard]] std::string WriteBatchScript( + const std::string_view script) override; + + void SetUserDataAndCleanCb(void* data, std::function cb) { + m_user_data_ = data; + m_clean_cb_ = std::move(cb); + } + + private: + std::string m_cwd_; + EnvironVars m_env_; + + TaskMetaInExecutor m_meta_; + BatchMetaInTaskExecutor m_batch_meta_; + + pid_t m_pid_; + + std::string m_executive_path_; // script path + std::list m_arguments_; // Not used + + void* m_user_data_; + std::function m_clean_cb_; +}; + +class ContainerInstance : public TaskExecutor { + public: + ContainerInstance(TaskMetaInExecutor meta, std::string cwd, + std::string bundle_path, EnvironVars env) + : m_meta_(std::move(meta)), + m_cwd_(std::move(cwd)), + m_bundle_path_(std::move(bundle_path)), + m_env_(std::move(env)), + m_pid_(0) {} + + ~ContainerInstance() override = default; + + void SetPid(pid_t pid) override { m_pid_ = pid; } + [[nodiscard]] pid_t GetPid() const override { return m_pid_; } + + void SetBatchMeta(BatchMetaInTaskExecutor batch_meta) override { + this->m_batch_meta_ = std::move(batch_meta); + } + + [[nodiscard]] const BatchMetaInTaskExecutor& GetBatchMeta() const override { + return m_batch_meta_; + } + + [[nodiscard]] const std::string& GetExecPath() const override { + return m_executive_path_; + } + + [[nodiscard]] const std::string& GetBundlePath() const { + return m_bundle_path_; + } + + void Output(std::string&& buf) override { + if (m_output_cb_) m_output_cb_(std::move(buf), nullptr); + } + + void Finish(bool is_killed, int val) override { + if (m_finish_cb_) m_finish_cb_(is_killed, val, nullptr); + } + + [[nodiscard]] CraneErr Spawn(util::Cgroup* cgroup) override; + + CraneErr Kill(int signum) override; + + [[nodiscard]] ChldStatus CheckChldStatus(pid_t pid, int status) override; + + [[nodiscard]] std::string WriteBatchScript( + const std::string_view script) override; + + private: + /*** + * Parse the command in config to get the real command for OCI runtime. + * @param cmd_to_parse the command to parse + * @param task_id the task id (%j) + * @param task_name for task name (%x) + * @param pwd for uid/username (%u/%U) + * @param bundle the path to the OCI bundle (%b) + * @return the parsed command in string + */ + static std::string ParseContainerCmdPattern_( + std::string cmd_pattern, task_id_t task_id, const std::string& task_name, + const PasswordEntry& pwd, const std::string& bundle) noexcept { + // TODO: What if there is a space after replacing? + absl::StrReplaceAll({{"%b", bundle}, + {"%j", std::to_string(task_id)}, + {"%x", task_name}, + {"%u", pwd.Username()}, + {"%U", std::to_string(pwd.Uid())}}, + &cmd_pattern); + return cmd_pattern; + } + + /*** + * Check and create temporary dir for the container. + * m_temp_dir will be set. + * @return kOk if the directory is created successfully, kSystemErr otherwise. + */ + CraneErr AssureContainerTempDir_() { + m_temp_path_ = std::filesystem::path(g_config.CranedContainer.TempDir) / + fmt::format("container-{}/", m_meta_.id); + try { + if (!std::filesystem::exists(m_temp_path_)) + std::filesystem::create_directories(m_temp_path_); + } catch (const std::exception& e) { + CRANE_ERROR("Failed to create container temp directory: {}", e.what()); + return CraneErr::kSystemErr; + } + + return CraneErr::kOk; + } + + /** + * Create a modified config.json based on original one. + * Modified config.json will include user provided scripts, + * mount points, environments, process settings and so on. + * @param src: Folder the original config.json resides + * @param dst: Folder the modified one resides, will be + * used in launching. + */ + CraneErr ModifyBundleConfig_(const std::string& src, const std::string& dst); + + std::string m_temp_path_; // temp files for container, + // e.g., modified config.json + std::string m_bundle_path_; // original rootfs and config.json + std::string m_executive_path_; // script path on host to be mounted + + std::string m_cwd_; // cwd to *execute OCI commands* + EnvironVars m_env_; // environment variables in container + pid_t m_pid_; // pid of the runtime process + + // Note: We don't store container id as it's admin's responsibility to + // use consistent patterns in configured OCI commands. + + TaskMetaInExecutor m_meta_; + BatchMetaInTaskExecutor m_batch_meta_; +}; + +} // namespace Craned diff --git a/src/Craned/TaskManager.cpp b/src/Craned/TaskManager.cpp index 70192752..4bbbc738 100644 --- a/src/Craned/TaskManager.cpp +++ b/src/Craned/TaskManager.cpp @@ -16,14 +16,21 @@ #include "TaskManager.h" +#include #include #include #include #include +#include + +#include "CranedPublicDefs.h" +#include "CtldClient.h" #include "ResourceAllocators.h" +#include "TaskExecutor.h" #include "crane/OS.h" -#include "protos/CraneSubprocess.pb.h" +#include "crane/PasswordEntry.h" +#include "crane/PublicHeader.h" namespace Craned { @@ -199,8 +206,6 @@ void TaskManager::EvSigchldCb_(evutil_socket_t sig, short events, assert(m_instance_ptr_->m_instance_ptr_ != nullptr); auto* this_ = reinterpret_cast(user_data); - SigchldInfo sigchld_info{}; - int status; pid_t pid; while (true) { @@ -208,99 +213,84 @@ void TaskManager::EvSigchldCb_(evutil_socket_t sig, short events, /* TODO(More status tracing): | WUNTRACED | WCONTINUED */); if (pid > 0) { - if (WIFEXITED(status)) { - // Exited with status WEXITSTATUS(status) - sigchld_info = {pid, false, WEXITSTATUS(status)}; - CRANE_TRACE("Receiving SIGCHLD for pid {}. Signaled: false, Status: {}", - pid, WEXITSTATUS(status)); - } else if (WIFSIGNALED(status)) { - // Killed by signal WTERMSIG(status) - sigchld_info = {pid, true, WTERMSIG(status)}; - CRANE_TRACE("Receiving SIGCHLD for pid {}. Signaled: true, Signal: {}", - pid, WTERMSIG(status)); - } - /* Todo(More status tracing): - else if (WIFSTOPPED(status)) { - printf("stopped by signal %d\n", WSTOPSIG(status)); - } else if (WIFCONTINUED(status)) { - printf("continued\n"); - } */ - this_->m_mtx_.Lock(); auto task_iter = this_->m_pid_task_map_.find(pid); - auto proc_iter = this_->m_pid_proc_map_.find(pid); + auto exec_iter = this_->m_pid_exec_map_.find(pid); if (task_iter == this_->m_pid_task_map_.end() || - proc_iter == this_->m_pid_proc_map_.end()) { + exec_iter == this_->m_pid_exec_map_.end()) { CRANE_WARN("Failed to find task id for pid {}.", pid); this_->m_mtx_.Unlock(); } else { TaskInstance* instance = task_iter->second; - ProcessInstance* proc = proc_iter->second; + TaskExecutor* exec = exec_iter->second; uint32_t task_id = instance->task.task_id(); - // Remove indexes from pid to ProcessInstance* - this_->m_pid_proc_map_.erase(proc_iter); + // Remove indexes from pid to TaskExecutor* + this_->m_pid_exec_map_.erase(exec_iter); this_->m_pid_task_map_.erase(task_iter); this_->m_mtx_.Unlock(); - proc->Finish(sigchld_info.is_terminated_by_signal, sigchld_info.value); - - // Free the ProcessInstance. ITask struct is not freed here because - // the ITask for an Interactive task can have no ProcessInstance. - auto pr_it = instance->processes.find(pid); - if (pr_it == instance->processes.end()) { - CRANE_ERROR("Failed to find pid {} in task #{}'s ProcessInstances", - pid, task_id); + // Get the exit status of the child process. + auto chld_status = exec->CheckChldStatus(pid, status); + exec->Finish(chld_status.signaled, chld_status.value); + + // Free the TaskExecutor. ITask struct is not freed here because + // the ITask for an Interactive task can have no TaskExecutor. + auto exec_it = instance->executors.find(pid); + if (exec_it == instance->executors.end()) { + // TODO: Potential leak in not freeing TaskExecutor* exec? + CRANE_ERROR("Failed to find pid {} in task #{}'s TaskExecutors", pid, + task_id); } else { - instance->processes.erase(pr_it); + instance->executors.erase(exec_it); - if (!instance->processes.empty()) { - if (sigchld_info.is_terminated_by_signal) { + if (!instance->executors.empty()) { + if (chld_status.signaled) { // If a task is terminated by a signal and there are other - // running processes belonging to this task, kill them. + // running processes belonging to this task, kill them. this_->TerminateTaskAsync(task_id); } } else { if (!instance->orphaned) { - // If the ProcessInstance has no process left and the task was not + // If the TaskExecutor has no process left and the task was not // marked as an orphaned task, send TaskStatusChange for this // task. See the comment of EvActivateTaskStatusChange_. if (instance->task.type() == crane::grpc::Batch) { // For a Batch task, the end of the process means it is done. - if (sigchld_info.is_terminated_by_signal) { + if (chld_status.signaled) { if (instance->cancelled_by_user) this_->EvActivateTaskStatusChange_( task_id, crane::grpc::TaskStatus::Cancelled, - sigchld_info.value + ExitCode::kTerminationSignalBase, + chld_status.value + ExitCode::kTerminationSignalBase, std::nullopt); else if (instance->terminated_by_timeout) this_->EvActivateTaskStatusChange_( task_id, crane::grpc::TaskStatus::ExceedTimeLimit, - sigchld_info.value + ExitCode::kTerminationSignalBase, + chld_status.value + ExitCode::kTerminationSignalBase, std::nullopt); else this_->EvActivateTaskStatusChange_( task_id, crane::grpc::TaskStatus::Failed, - sigchld_info.value + ExitCode::kTerminationSignalBase, + chld_status.value + ExitCode::kTerminationSignalBase, std::nullopt); } else this_->EvActivateTaskStatusChange_( task_id, crane::grpc::TaskStatus::Completed, - sigchld_info.value, std::nullopt); + chld_status.value, std::nullopt); } else { // For a COMPLETING Interactive task with a process running, the // end of this process means that this task is done. - if (sigchld_info.is_terminated_by_signal) { + if (chld_status.signaled) { this_->EvActivateTaskStatusChange_( task_id, crane::grpc::TaskStatus::Completed, - sigchld_info.value + ExitCode::kTerminationSignalBase, + chld_status.value + ExitCode::kTerminationSignalBase, std::nullopt); } else { this_->EvActivateTaskStatusChange_( task_id, crane::grpc::TaskStatus::Completed, - sigchld_info.value, std::nullopt); + chld_status.value, std::nullopt); } } } @@ -360,12 +350,12 @@ void TaskManager::EvSigintCb_(int sig, short events, void* user_data) { TaskInstance* task_instance = task_it->second.get(); if (task_instance->task.type() == crane::grpc::Batch) { - for (auto&& [pid, pr_instance] : task_instance->processes) { + for (auto&& [pid, pr_instance] : task_instance->executors) { CRANE_INFO( "Sending SIGINT to the process group of task #{} with root " "process pid {}", task_id, pr_instance->GetPid()); - KillProcessInstance_(pr_instance.get(), SIGKILL); + pr_instance->Kill(SIGKILL); } task_it++; } else { @@ -393,12 +383,12 @@ void TaskManager::EvSigintCb_(int sig, short events, void* user_data) { this_->EvActivateShutdown_(); } else { for (auto&& [task_id, task_instance] : this_->m_task_map_) { - for (auto&& [pid, pr_instance] : task_instance->processes) { + for (auto&& [pid, pr_instance] : task_instance->executors) { CRANE_INFO( "Sending SIGKILL to the process group of task #{} with root " "process pid {}", task_id, pr_instance->GetPid()); - KillProcessInstance_(pr_instance.get(), SIGKILL); + pr_instance->Kill(SIGKILL); } } } @@ -424,314 +414,10 @@ void TaskManager::Wait() { if (m_ev_loop_thread_.joinable()) m_ev_loop_thread_.join(); } -CraneErr TaskManager::KillProcessInstance_(const ProcessInstance* proc, - int signum) { - // Todo: Add timer which sends SIGTERM for those tasks who - // will not quit when receiving SIGINT. - if (proc) { - // Send the signal to the whole process group. - int err = kill(-proc->GetPid(), signum); - - if (err == 0) - return CraneErr::kOk; - else { - CRANE_TRACE("kill failed. error: {}", strerror(errno)); - return CraneErr::kGenericFailure; - } - } - - return CraneErr::kNonExistent; -} - void TaskManager::SetSigintCallback(std::function cb) { m_sigint_cb_ = std::move(cb); } -CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance, - ProcessInstance* process) { - using google::protobuf::io::FileInputStream; - using google::protobuf::io::FileOutputStream; - using google::protobuf::util::ParseDelimitedFromZeroCopyStream; - using google::protobuf::util::SerializeDelimitedToZeroCopyStream; - - using crane::grpc::subprocess::CanStartMessage; - using crane::grpc::subprocess::ChildProcessReady; - - int socket_pair[2]; - - if (socketpair(AF_UNIX, SOCK_STREAM, 0, socket_pair) != 0) { - CRANE_ERROR("Failed to create socket pair: {}", strerror(errno)); - return CraneErr::kSystemErr; - } - - // save the current uid/gid - savedPrivilege saved_priv{getuid(), getgid()}; - - int rc = setegid(instance->pwd_entry.Gid()); - if (rc == -1) { - CRANE_ERROR("error: setegid. {}", strerror(errno)); - return CraneErr::kSystemErr; - } - __gid_t gid_a[1] = {instance->pwd_entry.Gid()}; - setgroups(1, gid_a); - rc = seteuid(instance->pwd_entry.Uid()); - if (rc == -1) { - CRANE_ERROR("error: seteuid. {}", strerror(errno)); - return CraneErr::kSystemErr; - } - - pid_t child_pid = fork(); - if (child_pid > 0) { // Parent proc - close(socket_pair[1]); - int fd = socket_pair[0]; - bool ok; - CraneErr err; - - setegid(saved_priv.gid); - seteuid(saved_priv.uid); - setgroups(0, nullptr); - - FileInputStream istream(fd); - FileOutputStream ostream(fd); - CanStartMessage msg; - ChildProcessReady child_process_ready; - - CRANE_DEBUG("Subprocess was created for task #{} pid: {}", - instance->task.task_id(), child_pid); - - process->SetPid(child_pid); - - // Add event for stdout/stderr of the new subprocess - // struct bufferevent* ev_buf_event; - // ev_buf_event = - // bufferevent_socket_new(m_ev_base_, fd, BEV_OPT_CLOSE_ON_FREE); - // if (!ev_buf_event) { - // CRANE_ERROR( - // "Error constructing bufferevent for the subprocess of task #!", - // instance->task.task_id()); - // err = CraneErr::kLibEventError; - // goto AskChildToSuicide; - // } - // bufferevent_setcb(ev_buf_event, EvSubprocessReadCb_, nullptr, nullptr, - // (void*)process.get()); - // bufferevent_enable(ev_buf_event, EV_READ); - // bufferevent_disable(ev_buf_event, EV_WRITE); - // process->SetEvBufEvent(ev_buf_event); - - // Migrate the new subprocess to newly created cgroup - if (!instance->cgroup->MigrateProcIn(process->GetPid())) { - CRANE_ERROR( - "Terminate the subprocess of task #{} due to failure of cgroup " - "migration.", - instance->task.task_id()); - - err = CraneErr::kCgroupError; - goto AskChildToSuicide; - } - - CRANE_TRACE("New task #{} is ready. Asking subprocess to execv...", - instance->task.task_id()); - - // Tell subprocess that the parent process is ready. Then the - // subprocess should continue to exec(). - msg.set_ok(true); - ok = SerializeDelimitedToZeroCopyStream(msg, &ostream); - ok &= ostream.Flush(); - if (!ok) { - CRANE_ERROR("Failed to send ok=true to subprocess {} for task #{}", - child_pid, instance->task.task_id()); - close(fd); - return CraneErr::kProtobufError; - } - - ParseDelimitedFromZeroCopyStream(&child_process_ready, &istream, nullptr); - if (!msg.ok()) { - CRANE_ERROR("Failed to read protobuf from subprocess {} of task #{}", - child_pid, instance->task.task_id()); - close(fd); - return CraneErr::kProtobufError; - } - - close(fd); - return CraneErr::kOk; - - AskChildToSuicide: - msg.set_ok(false); - - ok = SerializeDelimitedToZeroCopyStream(msg, &ostream); - close(fd); - if (!ok) { - CRANE_ERROR("Failed to ask subprocess {} to suicide for task #{}", - child_pid, instance->task.task_id()); - return CraneErr::kProtobufError; - } - return err; - } else { // Child proc - const std::string& cwd = instance->task.cwd(); - rc = chdir(cwd.c_str()); - if (rc == -1) { - CRANE_ERROR("[Child Process] Error: chdir to {}. {}", cwd.c_str(), - strerror(errno)); - std::abort(); - } - - setreuid(instance->pwd_entry.Uid(), instance->pwd_entry.Uid()); - setregid(instance->pwd_entry.Gid(), instance->pwd_entry.Gid()); - - // Set pgid to the pid of task root process. - setpgid(0, 0); - - close(socket_pair[0]); - int fd = socket_pair[1]; - - FileInputStream istream(fd); - FileOutputStream ostream(fd); - CanStartMessage msg; - ChildProcessReady child_process_ready; - bool ok; - - ParseDelimitedFromZeroCopyStream(&msg, &istream, nullptr); - if (!msg.ok()) std::abort(); - - const std::string& stdout_file_path = - process->batch_meta.parsed_output_file_pattern; - const std::string& stderr_file_path = - process->batch_meta.parsed_error_file_pattern; - - int stdout_fd = - open(stdout_file_path.c_str(), O_RDWR | O_CREAT | O_APPEND, 0644); - if (stdout_fd == -1) { - CRANE_ERROR("[Child Process] Error: open {}. {}", stdout_file_path, - strerror(errno)); - std::abort(); - } - dup2(stdout_fd, 1); // stdout -> output file - - if (stderr_file_path.empty()) { // if stderr filename is not specified - dup2(stdout_fd, 2); // stderr -> output file - } else { - int stderr_fd = - open(stderr_file_path.c_str(), O_RDWR | O_CREAT | O_APPEND, 0644); - if (stderr_fd == -1) { - CRANE_ERROR("[Child Process] Error: open {}. {}", stderr_file_path, - strerror(errno)); - std::abort(); - } - dup2(stderr_fd, 2); // stderr -> error file - close(stderr_fd); - } - - close(stdout_fd); - - child_process_ready.set_ok(true); - ok = SerializeDelimitedToZeroCopyStream(child_process_ready, &ostream); - ok &= ostream.Flush(); - if (!ok) { - CRANE_ERROR("[Child Process] Error: Failed to flush."); - std::abort(); - } - - close(fd); - - // If these file descriptors are not closed, a program like mpirun may - // keep waiting for the input from stdin or other fds and will never end. - close(0); // close stdin - util::os::CloseFdFrom(3); - - std::vector> env_vec; - - // Load env from the front end. - for (auto& [name, value] : instance->task.env()) { - env_vec.emplace_back(name, value); - } - - if (instance->task.get_user_env()) { - // If --get-user-env is set, the new environment is inherited - // from the execution CraneD rather than the submitting node. - // - // Since we want to reinitialize the environment variables of the user - // by reloading the settings in something like .bashrc or /etc/profile, - // we are actually performing two steps: login -> start shell. - // Shell starting is done by calling "bash --login". - // - // During shell starting step, the settings in - // /etc/profile, ~/.bash_profile, ... are loaded. - // - // During login step, "HOME" and "SHELL" are set. - // Here we are just mimicking the login module. - - // Slurm uses `su -c /usr/bin/env` to retrieve - // all the environment variables. - // We use a more tidy way. - env_vec.emplace_back("HOME", instance->pwd_entry.HomeDir()); - env_vec.emplace_back("SHELL", instance->pwd_entry.Shell()); - } - - env_vec.emplace_back("CRANE_JOB_NODELIST", - absl::StrJoin(instance->task.allocated_nodes(), ";")); - env_vec.emplace_back("CRANE_EXCLUDES", - absl::StrJoin(instance->task.excludes(), ";")); - env_vec.emplace_back("CRANE_JOB_NAME", instance->task.name()); - env_vec.emplace_back("CRANE_ACCOUNT", instance->task.account()); - env_vec.emplace_back("CRANE_PARTITION", instance->task.partition()); - env_vec.emplace_back("CRANE_QOS", instance->task.qos()); - env_vec.emplace_back("CRANE_MEM_PER_NODE", - std::to_string(instance->task.resources() - .allocatable_resource() - .memory_limit_bytes() / - (1024 * 1024))); - env_vec.emplace_back("CRANE_JOB_ID", - std::to_string(instance->task.task_id())); - - int64_t time_limit_sec = instance->task.time_limit().seconds(); - int hours = time_limit_sec / 3600; - int minutes = (time_limit_sec % 3600) / 60; - int seconds = time_limit_sec % 60; - std::string time_limit = - fmt::format("{:0>2}:{:0>2}:{:0>2}", hours, minutes, seconds); - env_vec.emplace_back("CRANE_TIMELIMIT", time_limit); - - if (clearenv()) { - fmt::print("clearenv() failed!\n"); - } - - for (const auto& [name, value] : env_vec) { - if (setenv(name.c_str(), value.c_str(), 1)) { - fmt::print("setenv for {}={} failed!\n", name, value); - } - } - - // Prepare the command line arguments. - std::vector argv; - - // Argv[0] is the program name which can be anything. - argv.emplace_back("CraneScript"); - - if (instance->task.get_user_env()) { - // If --get-user-env is specified, - // we need to use --login option of bash to load settings from the user's - // settings. - argv.emplace_back("--login"); - } - - argv.emplace_back(process->GetExecPath().c_str()); - for (auto&& arg : process->GetArgList()) { - argv.push_back(arg.c_str()); - } - argv.push_back(nullptr); - - execv("/bin/bash", const_cast(argv.data())); - - // Error occurred since execv returned. At this point, errno is set. - // Ctld use SIGABRT to inform the client of this failure. - fmt::print(stderr, "[Craned Subprocess Error] Failed to execv. Error: {}\n", - strerror(errno)); - // Todo: See https://tldp.org/LDP/abs/html/exitcodes.html, return standard - // exit codes - abort(); - } -} - CraneErr TaskManager::ExecuteTaskAsync(crane::grpc::TaskToD const& task) { if (!m_task_id_to_cg_map_.Contains(task.task_id())) { CRANE_DEBUG("Executing task #{} without an allocated cgroup. Ignoring it.", @@ -837,84 +523,116 @@ void TaskManager::EvGrpcExecuteTaskCb_(int, short events, void* user_data) { // If this is a batch task, run it now. if (instance->task.type() == crane::grpc::Batch) { - instance->batch_meta.parsed_sh_script_path = - fmt::format("{}/Crane-{}.sh", g_config.CranedScriptDir, task_id); - auto& sh_path = instance->batch_meta.parsed_sh_script_path; + std::unique_ptr executor = nullptr; + + // Store some meta data in executor for convenience + auto meta = + TaskMetaInExecutor{.pwd = instance->pwd_entry, + .id = task_id, + .name = instance->task.name(), + .get_user_env = instance->task.get_user_env()}; + + // Generate environment variables + auto env = TaskExecutor::GetEnvironVarsFromTask(*instance); + + // Prepare arguments + // Note: Currently no arguments will be accepted in container. + auto args = std::list{}; + + // Instantiate ProcessInstance/ContainerInstance + if (instance->task.container().empty()) { + // use ProcessInstance + executor = std::make_unique( + std::move(meta), instance->task.cwd(), std::move(args), + std::move(env)); + } else if (g_config.CranedContainer.Enable) { + // use ContainerInstance + executor = std::make_unique( + std::move(meta), instance->task.cwd(), instance->task.container(), + std::move(env)); + } else { + // not supported by this node + CRANE_ERROR("Container support is disabled but requested by task #{}", + task_id); + this_->EvActivateTaskStatusChange_( + task_id, crane::grpc::TaskStatus::Failed, + ExitCode::kExitCodeSpawnExecutorFail, + fmt::format( + "Container support is disabled but requested by task #{}", + task_id)); + return; + } - FILE* fptr = fopen(sh_path.c_str(), "w"); - if (fptr == nullptr) { - CRANE_ERROR("Cannot write shell script for batch task #{}", - instance->task.task_id()); + if (executor == nullptr) { + CRANE_ERROR("Failed to create executor for task #{}", task_id); this_->EvActivateTaskStatusChange_( task_id, crane::grpc::TaskStatus::Failed, - ExitCode::kExitCodeFileNotFound, - fmt::format("Cannot write shell script for batch task #{}", - task_id)); + ExitCode::kExitCodeSpawnExecutorFail, + fmt::format("Failed to create executor for task #{}", task_id)); return; } - fputs(instance->task.batch_meta().sh_script().c_str(), fptr); - fclose(fptr); - - chmod(sh_path.c_str(), strtol("0755", nullptr, 8)); - - CraneErr err = CraneErr::kOk; - auto process = std::make_unique( - sh_path, std::list()); - - /* Perform file name substitutions - * %j - Job ID - * %u - Username - * %x - Job name - */ - process->batch_meta.parsed_output_file_pattern = ParseFilePathPattern_( - instance->task.batch_meta().output_file_pattern(), - instance->task.cwd(), task_id); - absl::StrReplaceAll({{"%j", std::to_string(task_id)}, - {"%u", instance->pwd_entry.Username()}, - {"%x", instance->task.name()}}, - &process->batch_meta.parsed_output_file_pattern); - - // If -e / --error is not defined, leave - // batch_meta.parsed_error_file_pattern empty; - if (!instance->task.batch_meta().error_file_pattern().empty()) { - process->batch_meta.parsed_error_file_pattern = ParseFilePathPattern_( - instance->task.batch_meta().error_file_pattern(), - instance->task.cwd(), task_id); - absl::StrReplaceAll({{"%j", std::to_string(task_id)}, - {"%u", instance->pwd_entry.Username()}, - {"%x", instance->task.name()}}, - &process->batch_meta.parsed_error_file_pattern); + + // Write the script to the file + instance->batch_meta.parsed_sh_script_path = + executor->WriteBatchScript(instance->task.batch_meta().sh_script()); + if (instance->batch_meta.parsed_sh_script_path.empty()) { + CRANE_ERROR("Cannot write shell script for task #{}", task_id); + this_->EvActivateTaskStatusChange_( + task_id, crane::grpc::TaskStatus::Failed, + ExitCode::kExitCodeFileNotFound, + fmt::format("Cannot write shell script for task #{}", task_id)); + return; } + auto batch_meta = BatchMetaInTaskExecutor{ + .interpreter = instance->task.batch_meta().interpreter(), + .parsed_output_file_pattern = + instance->task.batch_meta().output_file_pattern(), + .parsed_error_file_pattern = + instance->task.batch_meta().error_file_pattern(), + }; + + // Set interpreter, if not specified, use /bin/sh + if (batch_meta.interpreter.empty()) batch_meta.interpreter = "/bin/sh"; + + // Parse the result file patterns + ParseResultPathPattern_(task_id, instance->task.name(), + instance->task.cwd(), instance->pwd_entry, + batch_meta.parsed_output_file_pattern, + batch_meta.parsed_error_file_pattern); + + // Set the parsed patterns to batch_meta in executor. + executor->SetBatchMeta(std::move(batch_meta)); + // auto output_cb = [](std::string&& buf, void* data) { // CRANE_TRACE("Read output from subprocess: {}", buf); // }; // // process->SetOutputCb(std::move(output_cb)); - err = SpawnProcessInInstance_(instance, process.get()); + // Spawn the process/container + CraneErr err = executor->Spawn(instance->cgroup); if (err == CraneErr::kOk) { this_->m_mtx_.Lock(); - // Child process may finish or abort before we put its pid into maps. - // However, it doesn't matter because SIGCHLD will be handled after - // this function or event ends. - // Add indexes from pid to TaskInstance*, ProcessInstance* - this_->m_pid_task_map_.emplace(process->GetPid(), instance); - this_->m_pid_proc_map_.emplace(process->GetPid(), process.get()); + // Child process may finish or abort before we put its pid into + // maps. However, it doesn't matter because SIGCHLD will be handled + // after this function or event ends. Add indexes from pid to + // TaskInstance*, TaskExecutor* + this_->m_pid_task_map_.emplace(executor->GetPid(), instance); + this_->m_pid_exec_map_.emplace(executor->GetPid(), executor.get()); this_->m_mtx_.Unlock(); - // Move the ownership of ProcessInstance into the - // TaskInstance. - instance->processes.emplace(process->GetPid(), std::move(process)); + // Move the ownership of TaskExecutor into the TaskInstance. + instance->executors.emplace(executor->GetPid(), std::move(executor)); } else { this_->EvActivateTaskStatusChange_( task_id, crane::grpc::TaskStatus::Failed, - ExitCode::kExitCodeSpawnProcessFail, + ExitCode::kExitCodeSpawnExecutorFail, fmt::format( - "Cannot spawn a new process inside the instance of task #{}", + "Cannot spawn an executor inside the instance of task #{}", task_id)); } } @@ -922,29 +640,47 @@ void TaskManager::EvGrpcExecuteTaskCb_(int, short events, void* user_data) { } } -std::string TaskManager::ParseFilePathPattern_(const std::string& path_pattern, - const std::string& cwd, - task_id_t task_id) { - std::string resolved_path_pattern; +void TaskManager::ParseResultPathPattern_(const task_id_t task_id, + const std::string& task_name, + const std::string& cwd, + const PasswordEntry& pwd, + std::string& stdout_pattern, + std::string& stderr_pattern) { + // Resolve the path + auto path_resolver = [&](std::string& path) { + if (path.empty()) { + // if not specified, assume cwd. + path = fmt::format("{}/", cwd); + } else if (path[0] != '/') { + // If an absolute path, do nothing. + // If a relative path, prepend cwd to the path. + path = fmt::format("{}/{}", cwd, path); + } + }; - if (path_pattern.empty()) { - // If file path is not specified, first set it to cwd. - resolved_path_pattern = fmt::format("{}/", cwd); - } else { - if (path_pattern[0] == '/') - // If output file path is an absolute path, do nothing. - resolved_path_pattern = path_pattern; - else - // If output file path is a relative path, prepend cwd to the path. - resolved_path_pattern = fmt::format("{}/{}", cwd, path_pattern); - } + // Resolve the pattern + // %j - Job ID, %u - Username, %x - Job name + auto pattern_filler = [&](std::string& pattern) { + absl::StrReplaceAll({{"%j", std::to_string(task_id)}, + {"%u", pwd.Username()}, + {"%x", task_name}}, + &pattern); + }; - // Path ends with a directory, append default stdout file name + // stdout + path_resolver(stdout_pattern); + pattern_filler(stdout_pattern); + // If ends with a directory, append default stdout file name // `Crane-.out` to the path. - if (absl::EndsWith(resolved_path_pattern, "/")) - resolved_path_pattern += fmt::format("Crane-{}.out", task_id); - - return resolved_path_pattern; + if (absl::EndsWith(stdout_pattern, "/")) + stdout_pattern += fmt::format("Crane-{}.out", task_id); + + // stderr, if not defined, leave empty; + if (stderr_pattern.empty()) return; + path_resolver(stderr_pattern); + pattern_filler(stderr_pattern); + if (absl::EndsWith(stderr_pattern, "/")) + stderr_pattern += fmt::format("Crane-{}.err", task_id); } void TaskManager::EvTaskStatusChangeCb_(int efd, short events, @@ -961,6 +697,7 @@ void TaskManager::EvTaskStatusChangeCb_(int efd, short events, if (iter->second->task.type() == crane::grpc::Batch) { g_thread_pool->detach_task( [p = iter->second->batch_meta.parsed_sh_script_path]() { + // FIXME: Refactor this in TaskExecutor's destructor. util::os::DeleteFile(p); }); } @@ -1043,20 +780,27 @@ void TaskManager::EvGrpcSpawnInteractiveTaskCb_(int efd, short events, return; } + // TODO: Add container support + // FIXME: Didn't passing executive path auto process = std::make_unique( - std::move(elem.executive_path), std::move(elem.arguments)); + TaskMetaInExecutor{ + .pwd = task_iter->second->pwd_entry, + .id = task_iter->second->task.task_id(), + .name = task_iter->second->task.name(), + }, + std::move(task_iter->second->task.cwd()), std::move(elem.arguments), + TaskExecutor::GetEnvironVarsFromTask(*task_iter->second)); process->SetOutputCb(std::move(elem.output_cb)); process->SetFinishCb(std::move(elem.finish_cb)); CraneErr err; - err = - this_->SpawnProcessInInstance_(task_iter->second.get(), process.get()); + err = process->Spawn(task_iter->second->cgroup); elem.err_promise.set_value(err); if (err != CraneErr::kOk) this_->EvActivateTaskStatusChange_(elem.task_id, crane::grpc::Failed, - ExitCode::kExitCodeSpawnProcessFail, + ExitCode::kExitCodeSpawnExecutorFail, std::string(CraneErrStr(err))); } } @@ -1132,8 +876,9 @@ void TaskManager::EvTerminateTaskCb_(int efd, short events, void* user_data) { CRANE_DEBUG("Terminating a non-existent task #{}.", elem.task_id); // Note if Ctld wants to terminate some tasks that are not running, - // it might indicate other nodes allocated to the task might have crashed. - // We should mark the task as kind of not runnable by removing its cgroup. + // it might indicate other nodes allocated to the task might have + // crashed. We should mark the task as kind of not runnable by + // removing its cgroup. // // Considering such a situation: // In Task Scheduler of Ctld, @@ -1148,8 +893,8 @@ void TaskManager::EvTerminateTaskCb_(int efd, short events, void* user_data) { // In order to give Ctld kind of feedback without adding complicated // synchronizing mechanism in ScheduleThread_(), // we just remove the cgroup for such task, Ctld will fail in the - // following ExecuteTasks and the task will go to the right place as well - // as the completed queue. + // following ExecuteTasks and the task will go to the right place as + // well as the completed queue. uid_t uid; { @@ -1158,7 +903,8 @@ void TaskManager::EvTerminateTaskCb_(int efd, short events, void* user_data) { if (!vp) return; CRANE_DEBUG( - "Remove cgroup for task #{} for potential crashes of other craned.", + "Remove cgroup for task #{} for potential crashes of other " + "craned.", elem.task_id); uid = *vp; } @@ -1176,13 +922,14 @@ void TaskManager::EvTerminateTaskCb_(int efd, short events, void* user_data) { int sig = SIGTERM; // For BatchTask if (task_instance->task.type() == crane::grpc::Interactive) sig = SIGHUP; - if (!task_instance->processes.empty()) { - // For an Interactive task with a process running or a Batch task, we just - // send a kill signal here. - for (auto&& [pid, pr_instance] : task_instance->processes) - KillProcessInstance_(pr_instance.get(), sig); + if (!task_instance->executors.empty()) { + // For an Interactive task with a process running or a Batch task, we + // just send a kill signal here. + for (auto&& [pid, executor] : task_instance->executors) + executor->Kill(sig); } else if (task_instance->task.type() == crane::grpc::Interactive) { - // For an Interactive task with no process running, it ends immediately. + // For an Interactive task with no process running, it ends + // immediately. this_->EvActivateTaskStatusChange_(elem.task_id, crane::grpc::Completed, ExitCode::kExitCodeTerminated, std::nullopt); @@ -1238,7 +985,8 @@ bool TaskManager::CreateCgroupsAsync( bool TaskManager::ReleaseCgroupAsync(uint32_t task_id, uid_t uid) { if (!this->m_uid_to_task_ids_map_.Contains(uid)) { CRANE_DEBUG( - "Trying to release a non-existent cgroup for uid #{}. Ignoring it...", + "Trying to release a non-existent cgroup for uid #{}. Ignoring " + "it...", uid); return false; } @@ -1252,16 +1000,17 @@ bool TaskManager::ReleaseCgroupAsync(uint32_t task_id, uid_t uid) { if (!this->m_task_id_to_cg_map_.Contains(task_id)) { CRANE_DEBUG( - "Trying to release a non-existent cgroup for task #{}. Ignoring it...", + "Trying to release a non-existent cgroup for task #{}. Ignoring " + "it...", task_id); return false; } else { - // The termination of all processes in a cgroup is a time-consuming work. - // Therefore, once we are sure that the cgroup for this task exists, we - // let gRPC call return and put the termination work into the thread pool - // to avoid blocking the event loop of TaskManager. - // Kind of async behavior. + // The termination of all processes in a cgroup is a time-consuming + // work. Therefore, once we are sure that the cgroup for this task + // exists, we let gRPC call return and put the termination work into the + // thread pool to avoid blocking the event loop of TaskManager. Kind of + // async behavior. // avoid deadlock by Erase at next line util::Cgroup* cgroup = this->m_task_id_to_cg_map_[task_id]->release(); @@ -1339,8 +1088,8 @@ void TaskManager::EvCheckTaskStatusCb_(int, short events, void* user_data) { } // If a task id can be found in g_ctld_client, the task has ended. - // Now if CraneCtld check the status of these tasks, there is no need to - // send to TaskStatusChange again. Just cancel them. + // Now if CraneCtld check the status of these tasks, there is no need + // to send to TaskStatusChange again. Just cancel them. crane::grpc::TaskStatus status; bool exist = g_ctld_client->CancelTaskStatusChangeByTaskId(task_id, &status); diff --git a/src/Craned/TaskManager.h b/src/Craned/TaskManager.h index 2fc1ca04..31257cb4 100644 --- a/src/Craned/TaskManager.h +++ b/src/Craned/TaskManager.h @@ -31,12 +31,10 @@ #include #include -#include "CtldClient.h" +#include "TaskExecutor.h" #include "crane/AtomicHashMap.h" #include "crane/PasswordEntry.h" #include "crane/PublicHeader.h" -#include "protos/Crane.grpc.pb.h" -#include "protos/Crane.pb.h" namespace Craned { @@ -47,100 +45,6 @@ struct EvTimerCbArg { task_id_t task_id; }; -struct BatchMetaInProcessInstance { - std::string parsed_output_file_pattern; - std::string parsed_error_file_pattern; -}; - -class ProcessInstance { - public: - ProcessInstance(std::string exec_path, std::list arg_list) - : m_executive_path_(std::move(exec_path)), - m_arguments_(std::move(arg_list)), - m_pid_(0), - m_ev_buf_event_(nullptr), - m_user_data_(nullptr) {} - - ~ProcessInstance() { - if (m_user_data_) { - if (m_clean_cb_) { - CRANE_TRACE("Clean Callback for pid {} is called.", m_pid_); - m_clean_cb_(m_user_data_); - } else - CRANE_ERROR( - "user_data in ProcessInstance is set, but clean_cb is not set!"); - } - - if (m_ev_buf_event_) bufferevent_free(m_ev_buf_event_); - } - - [[nodiscard]] const std::string& GetExecPath() const { - return m_executive_path_; - } - [[nodiscard]] const std::list& GetArgList() const { - return m_arguments_; - } - - void SetPid(pid_t pid) { m_pid_ = pid; } - [[nodiscard]] pid_t GetPid() const { return m_pid_; } - - void SetEvBufEvent(struct bufferevent* ev_buf_event) { - m_ev_buf_event_ = ev_buf_event; - } - - void SetOutputCb(std::function cb) { - m_output_cb_ = std::move(cb); - } - - void SetFinishCb(std::function cb) { - m_finish_cb_ = std::move(cb); - } - - void Output(std::string&& buf) { - if (m_output_cb_) m_output_cb_(std::move(buf), m_user_data_); - } - - void Finish(bool is_killed, int val) { - if (m_finish_cb_) m_finish_cb_(is_killed, val, m_user_data_); - } - - void SetUserDataAndCleanCb(void* data, std::function cb) { - m_user_data_ = data; - m_clean_cb_ = std::move(cb); - } - - BatchMetaInProcessInstance batch_meta; - - private: - /* ------------- Fields set by SpawnProcessInInstance_ ---------------- */ - pid_t m_pid_; - - // The underlying event that handles the output of the task. - struct bufferevent* m_ev_buf_event_; - - /* ------- Fields set by the caller of SpawnProcessInInstance_ -------- */ - std::string m_executive_path_; - std::list m_arguments_; - - /*** - * The callback function called when a task writes to stdout or stderr. - * @param[in] buf a slice of output buffer. - */ - std::function m_output_cb_; - - /*** - * The callback function called when a task is finished. - * @param[in] bool true if the task is terminated by a signal, false - * otherwise. - * @param[in] int the number of signal if bool is true, the return value - * otherwise. - */ - std::function m_finish_cb_; - - void* m_user_data_; - std::function m_clean_cb_; -}; - struct BatchMetaInTaskInstance { std::string parsed_sh_script_path; }; @@ -171,7 +75,7 @@ struct TaskInstance { util::Cgroup* cgroup; struct event* termination_timer{nullptr}; - absl::flat_hash_map> processes; + absl::flat_hash_map> executors; }; /** @@ -227,18 +131,8 @@ class TaskManager { template using ConcurrentQueue = moodycamel::ConcurrentQueue; - struct SigchldInfo { - pid_t pid; - bool is_terminated_by_signal; - int value; - }; - - struct savedPrivilege { - uid_t uid; - gid_t gid; - }; - struct EvQueueGrpcInteractiveTask { + // TODO: Add support for container std::promise err_promise; uint32_t task_id; std::string executive_path; @@ -274,26 +168,18 @@ class TaskManager { static std::string CgroupStrByTaskId_(task_id_t task_id); - static std::string ParseFilePathPattern_(const std::string& path_pattern, - const std::string& cwd, - task_id_t task_id); - - /** - * EvActivateTaskStatusChange_ must NOT be called in this method and should be - * called in the caller method after checking the return value of this - * method. - * @return kSystemErr if the socket pair between the parent process and child - * process cannot be created, and the caller should call strerror() to check - * the unix error code. kLibEventError if bufferevent_socket_new() fails. - * kCgroupError if CgroupManager cannot move the process to the cgroup bound - * to the TaskInstance. kProtobufError if the communication between the - * parent and the child process fails. - */ - static CraneErr SpawnProcessInInstance_(TaskInstance* instance, - ProcessInstance* process); + static void ParseResultPathPattern_(const task_id_t task_id, + const std::string& task_name, + const std::string& cwd, + const PasswordEntry& pwd, + std::string& stdout_pattern, + std::string& stderr_pattern); const TaskInstance* FindInstanceByTaskId_(uint32_t task_id); + [[deprecated]] CraneErr SpawnProcessInInstance_(TaskInstance* instance, + ProcessInstance* process); + // Ask TaskManager to stop its event loop. void EvActivateShutdown_(); @@ -357,19 +243,6 @@ class TaskManager { instance->termination_timer = nullptr; } - /** - * Send a signal to the process group to which the processes in - * ProcessInstance belongs. - * This function ASSUMES that ALL processes belongs to the process group with - * the PGID set to the PID of the first process in this ProcessInstance. - * @param signum the value of signal. - * @return if the signal is sent successfully, kOk is returned. - * if the task name doesn't exist, kNonExistent is returned. - * if the signal is invalid, kInvalidParam is returned. - * otherwise, kGenericFailure is returned. - */ - static CraneErr KillProcessInstance_(const ProcessInstance* proc, int signum); - // Note: the three maps below are NOT protected by any mutex. // They should be modified in libev callbacks to avoid races. @@ -392,7 +265,7 @@ class TaskManager { // A TaskInstance may contain more than one ProcessInstance. absl::flat_hash_map m_pid_task_map_ GUARDED_BY(m_mtx_); - absl::flat_hash_map m_pid_proc_map_ + absl::flat_hash_map m_pid_exec_map_ GUARDED_BY(m_mtx_); absl::Mutex m_mtx_; diff --git a/src/Utilities/PublicHeader/include/crane/PublicHeader.h b/src/Utilities/PublicHeader/include/crane/PublicHeader.h index b750185e..de666730 100644 --- a/src/Utilities/PublicHeader/include/crane/PublicHeader.h +++ b/src/Utilities/PublicHeader/include/crane/PublicHeader.h @@ -103,7 +103,7 @@ enum ExitCodeEnum : uint16_t { kExitCodePermissionDenied, kExitCodeCgroupError, kExitCodeFileNotFound, - kExitCodeSpawnProcessFail, + kExitCodeSpawnExecutorFail, kExitCodeExceedTimeLimit, kExitCodeCranedDown, kExitCodeExecutionError,