Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for jobs in OCI Container #255

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions etc/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ CranedMutexFilePath: craned/craned.lock
# whether the craned is running in the background
CranedForeground: true

# Container Runtime Options
CranedContainer:
Enable: true
TempDir: craned/containers/
RuntimeState: /usr/bin/runc --root=/run/user/%U/ state %u.%U.%j.%x
RuntimeKill: /usr/bin/runc --rootless=true --root=/run/user/%U/ kill -a %u.%U.%j.%x SIGTERM
RuntimeDelete: /usr/bin/runc --rootless=true --root=/run/user/%U/ delete --force %u.%U.%j.%x
RuntimeRun: /usr/bin/runc --rootless=true --root=/run/user/%U/ run -b %b %u.%U.%j.%x

# Scheduling settings
PriorityType: priority/multifactor

Expand All @@ -57,14 +66,14 @@ PriorityWeightJobSize: 0
PriorityWeightPartition: 1000
PriorityWeightQ0S: 1000000

# list of configuration information of the computing machine
# Nodes and partitions settings
# node list
Nodes:
- name: "cn[15-18]"
cpu: 2
memory: 2G

# partition information list
# partition list
Partitions:
- name: CPU
nodes: "cn[15-16]"
Expand All @@ -75,9 +84,7 @@ Partitions:

DefaultPartition: CPU


# Advanced options:

# Advanced settings:
# Maximum size of Pending Queue and must <=900000.
PendingQueueMaxSize: 900000

Expand Down
6 changes: 6 additions & 0 deletions protos/PublicDefs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ message TaskToCtld {
InteractiveTaskAdditionalMeta interactive_meta = 22;
}

string container = 25;

string cmd_line = 31;
string cwd = 32; // Current working directory
map<string, string> env = 33;
Expand Down Expand Up @@ -164,10 +166,12 @@ message TaskToD {
double cpus_per_task = 23;

bool get_user_env = 24;
string container = 25;
}

message BatchTaskAdditionalMeta {
string sh_script = 1;
string interpreter = 2;
string output_file_pattern = 3;
string error_file_pattern = 4;
}
Expand Down Expand Up @@ -196,6 +200,8 @@ message TaskInfo {
string username = 15;
string qos = 16;

string container = 25;

// Dynamic task information
TaskStatus status = 31;
string craned_list = 32;
Expand Down
41 changes: 19 additions & 22 deletions scripts/wipe_data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,25 @@ fi
mode=$1

# 读取配置文件中的账号密码以及unqlite文件路径
confFile=/etc/crane/database.yaml
username=$(grep 'DbUser' "$confFile" | awk '{print $2}')
username=${username//\"/}
password=$(grep 'DbPassword' "$confFile" | awk '{print $2}')
password=${password//\"/}
embedded_db_path=$(grep 'CraneCtldDbPath' "$confFile" | awk '{print $2}')
parent_dir="${embedded_db_path%/*}"
env_path="${parent_dir}/CraneEnv"

# MongoDB服务器的地址和端口
host="localhost"
port="27017"
conf_file=/etc/crane/database.yaml
base_dir=/var/crane/
username=$(grep 'DbUser:' "$conf_file" | awk '{print $2}')
password=$(grep 'DbPassword:' "$conf_file" | awk -F\" '{print $2}')
host=$(grep 'DbHost:' "$conf_file" | awk '{print $2}')
port=$(grep 'DbPort:' "$conf_file" | awk '{print $2}')
dbname=$(grep 'DbName:' "$conf_file" | awk '{print $2}')
embedded_db_path="$base_dir$(grep 'CraneCtldDbPath:' "$conf_file" | awk '{print $2}')"

# 使用mongo shell连接到MongoDB服务器并清空指定的集合

function wipe_collection() {
mongosh --username "$username" --password "$password" --host "$host" --port "$port" <<EOF
use crane_db
mongosh --username "$username" --password "$password" --host "$host" --port "$port" --authenticationDatabase admin <<EOF
use $dbname
db.$1.deleteMany({})
exit
EOF
}

# 集合清除操作,根据mode执行不同操作
if [ "$mode" -eq 1 ] || [ "$mode" -eq 5 ] || [ "$mode" -eq 6 ]; then
wipe_collection acct_table
fi
Expand All @@ -42,13 +38,14 @@ fi
if [ "$mode" -eq 3 ] || [ "$mode" -eq 5 ]; then
wipe_collection task_table

if [ -e "$embedded_db_path" ]; then
echo "Removing file $embedded_db_path ..."
rm "$embedded_db_path"
fi
if [ -e "$env_path" ]; then
echo "Removing env folder $env_path ..."
rm -rf "$env_path"
# 获取Unqlite数据库文件所在目录和文件名前缀
db_dir=$(dirname "$embedded_db_path")
db_filename=$(basename "$embedded_db_path")

# 删除该目录下所有以文件名前缀开头的文件
if [ -d "$db_dir" ]; then
echo "Removing files like $db_filename* in $db_dir ..."
rm -f "$db_dir"/"$db_filename"*
fi
fi
if [ "$mode" -eq 4 ] || [ "$mode" -eq 5 ] || [ "$mode" -eq 6 ]; then
Expand Down
6 changes: 3 additions & 3 deletions src/CraneCtld/CraneCtld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ void ParseConfig(int argc, char** argv) {

g_config.PriorityConfig.MaxAge = kPriorityDefaultMaxAge;
if (config["PriorityMaxAge"]) {
std::string max_age = config["PriorityMaxAge"].as<std::string>();
auto max_age = config["PriorityMaxAge"].as<std::string>();

std::regex pattern_hour_min_sec(R"((\d+):(\d+):(\d+))");
std::regex pattern_day_hour(R"((\d+)-(\d+))");
Expand Down Expand Up @@ -220,7 +220,7 @@ void ParseConfig(int argc, char** argv) {
}

if (config["PriorityType"]) {
std::string priority_type = config["PriorityType"].as<std::string>();
auto priority_type = config["PriorityType"].as<std::string>();
if (priority_type == "priority/multifactor")
g_config.PriorityConfig.Type = Ctld::Config::Priority::MultiFactor;
else
Expand Down Expand Up @@ -576,7 +576,7 @@ void InitializeCtldGlobalVariables() {

g_craned_keeper->SetCranedIsDownCb([](const CranedId& craned_id) {
CRANE_TRACE(
"CranedNode #{} is down now."
"CranedNode #{} is down now. "
"Remove its resource from the global resource pool.",
craned_id);
g_meta_container->CranedDown(craned_id);
Expand Down
3 changes: 3 additions & 0 deletions src/CraneCtld/CranedKeeper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ crane::grpc::ExecuteTasksRequest CranedStub::NewExecuteTasksRequest(
mutable_task->set_cwd(task->cwd);
mutable_task->set_get_user_env(task->get_user_env);

mutable_task->set_container(task->container);

for (const auto &hostname : task->CranedIds())
mutable_task->mutable_allocated_nodes()->Add()->assign(hostname);

Expand All @@ -278,6 +280,7 @@ crane::grpc::ExecuteTasksRequest CranedStub::NewExecuteTasksRequest(
if (task->type == crane::grpc::Batch) {
auto &meta_in_ctld = std::get<BatchMetaInTask>(task->meta);
auto *mutable_meta = mutable_task->mutable_batch_meta();
mutable_meta->set_interpreter(meta_in_ctld.interpreter);
mutable_meta->set_output_file_pattern(meta_in_ctld.output_file_pattern);
mutable_meta->set_error_file_pattern(meta_in_ctld.error_file_pattern);
mutable_meta->set_sh_script(meta_in_ctld.sh_script);
Expand Down
5 changes: 5 additions & 0 deletions src/CraneCtld/CtldPublicDefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ struct InteractiveMetaInTask {

struct BatchMetaInTask {
std::string sh_script;
std::string interpreter;
std::string output_file_pattern;
std::string error_file_pattern;
};
Expand Down Expand Up @@ -254,6 +255,8 @@ struct TaskInCtld {
std::unordered_map<std::string, std::string> env;
std::string cwd;

std::string container;

std::variant<InteractiveMetaInTask, BatchMetaInTask> meta;

private:
Expand Down Expand Up @@ -416,6 +419,7 @@ struct TaskInCtld {
if (type == crane::grpc::Batch) {
meta.emplace<BatchMetaInTask>(BatchMetaInTask{
.sh_script = val.batch_meta().sh_script(),
.interpreter = val.batch_meta().interpreter(),
.output_file_pattern = val.batch_meta().output_file_pattern(),
.error_file_pattern = val.batch_meta().error_file_pattern(),
});
Expand All @@ -438,6 +442,7 @@ struct TaskInCtld {
qos = val.qos();

get_user_env = val.get_user_env();
container = val.container();
}

void SetFieldsByRuntimeAttr(crane::grpc::RuntimeAttrOfTask const& val) {
Expand Down
2 changes: 2 additions & 0 deletions src/Craned/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ add_executable(craned
ResourceAllocators.cpp
TaskManager.h
TaskManager.cpp
TaskExecutor.h
TaskExecutor.cpp
CranedServer.h
CranedServer.cpp
CranedPublicDefs.h
Expand Down
38 changes: 31 additions & 7 deletions src/Craned/Craned.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,15 +317,39 @@ void ParseConfig(int argc, char** argv) {

g_config.Partitions.emplace(std::move(name), std::move(part));
}
}

if (config["CranedForeground"]) {
auto val = config["CranedForeground"].as<std::string>();
if (val == "true")
g_config.CranedForeground = true;
else
g_config.CranedForeground = false;
}
if (config["CranedForeground"]) {
auto val = config["CranedForeground"].as<std::string>();
if (val == "true")
g_config.CranedForeground = true;
else
g_config.CranedForeground = false;
}

if (config["CranedContainer"] && config["CranedContainer"]["Enable"] &&
config["CranedContainer"]["Enable"].as<bool>()) {
g_config.CranedContainer.TempDir =
g_config.CraneBaseDir +
config["CranedContainer"]["TempDir"].as<std::string>();
g_config.CranedContainer.RunTimeState =
config["CranedContainer"]["RuntimeState"].as<std::string>();
g_config.CranedContainer.RuntimeDelete =
config["CranedContainer"]["RuntimeDelete"].as<std::string>();
g_config.CranedContainer.RuntimeKill =
config["CranedContainer"]["RuntimeKill"].as<std::string>();
g_config.CranedContainer.RuntimeRun =
config["CranedContainer"]["RuntimeRun"].as<std::string>();
g_config.CranedContainer.Enable = true;
CRANE_DEBUG("Container support is enabled");
CRANE_TRACE("OCI Runtime set to {}",
g_config.CranedContainer.RuntimeRun);
} else {
g_config.CranedContainer = {};
g_config.CranedContainer.Enable = false;
CRANE_DEBUG("Container support is disabled");
}

} catch (YAML::BadFile& e) {
CRANE_CRITICAL("Can't open config file {}: {}", kDefaultConfigPath,
e.what());
Expand Down
11 changes: 11 additions & 0 deletions src/Craned/CranedPublicDefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ struct Config {
std::string UnixSocketListenAddr;
};

struct CranedContainerConf {
bool Enable{false};
std::string TempDir;
std::string RunTimeState;
std::string RuntimeKill;
std::string RuntimeDelete;
std::string RuntimeRun;
};

CranedListenConf ListenConf;
bool CompressedRpc{};

Expand All @@ -78,6 +87,8 @@ struct Config {

bool CranedForeground{};

CranedContainerConf CranedContainer;

std::string Hostname;
CranedId CranedIdOfThisNode;

Expand Down
Loading