Skip to content

Commit

Permalink
fix(interactive): Reuse the graph schema preprocessing to create the …
Browse files Browse the repository at this point in the history
…default graph schema when the service starts (#4239)

- When starting the service from the default graph without specifying
`property_id` and `type_id` in `graph.yaml`, we should be able to
generate them as needed.
- We will also reuse the logic for creating the graph upon receiving a
request. Additionally, we need to resolve the problem of using multiple
`ASSIGN_AND_RETURN_IF_RESULT_NOT_OK` macros within a single code
snippet.
  • Loading branch information
zhanglei1949 authored Sep 20, 2024
1 parent 56963cd commit 87804de
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 152 deletions.
131 changes: 8 additions & 123 deletions flex/engines/http_server/actor/admin_actor.act.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,80 +104,6 @@ std::string merge_graph_and_plugin_meta(
return res.Empty() ? "{}" : gs::rapidjson_stringify(res, 2);
}

gs::Result<YAML::Node> preprocess_vertex_schema(YAML::Node root,
const std::string& type_name) {
// 1. To support open a empty graph, we should check if the x_csr_params is
// set for each vertex type, if not set, we set it to a rather small max_vnum,
// to avoid to much memory usage.
auto types = root[type_name];
for (auto type : types) {
if (!type["x_csr_params"]) {
type["x_csr_params"]["max_vertex_num"] = 8192;
}
}
return types;
}

gs::Result<YAML::Node> preprocess_vertex_edge_types(
YAML::Node root, const std::string& type_name) {
auto types = root[type_name];
int32_t cur_type_id = 0;
for (auto type : types) {
if (type["type_id"]) {
auto type_id = type["type_id"].as<int32_t>();
if (type_id != cur_type_id) {
return gs::Status(gs::StatusCode::INVALID_SCHEMA,
"Invalid " + type_name +
" type_id: " + std::to_string(type_id) +
", expect: " + std::to_string(cur_type_id));
}
} else {
type["type_id"] = cur_type_id;
}
cur_type_id++;
int32_t cur_prop_id = 0;
if (type["properties"]) {
for (auto prop : type["properties"]) {
if (prop["property_id"]) {
auto prop_id = prop["property_id"].as<int32_t>();
if (prop_id != cur_prop_id) {
return gs::Status(gs::StatusCode::INVALID_SCHEMA,
"Invalid " + type_name + " property_id: " +
type["type_name"].as<std::string>() + " : " +
std::to_string(prop_id) +
", expect: " + std::to_string(cur_prop_id));
}
} else {
prop["property_id"] = cur_prop_id;
}
cur_prop_id++;
}
}
}
return types;
}

// Preprocess the schema to be compatible with the current storage.
// 1. check if any property_id or type_id is set for each type, If set, then all
// vertex/edge types should all set.
// 2. If property_id or type_id is not set, then set them according to the order
gs::Result<YAML::Node> preprocess_graph_schema(YAML::Node&& node) {
if (node["schema"] && node["schema"]["vertex_types"]) {
// First check whether property_id or type_id is set in the schema
RETURN_IF_NOT_OK(
preprocess_vertex_edge_types(node["schema"], "vertex_types"));
RETURN_IF_NOT_OK(preprocess_vertex_schema(node["schema"], "vertex_types"));
if (node["schema"]["edge_types"]) {
// edge_type could be optional.
RETURN_IF_NOT_OK(
preprocess_vertex_edge_types(node["schema"], "edge_types"));
}
return node;
} else {
return gs::Status(gs::StatusCode::INVALID_SCHEMA, "Invalid graph schema: ");
}
}

void add_runnable_info(gs::PluginMeta& plugin_meta) {
const auto& graph_db = gs::GraphDB::get();
const auto& schema = graph_db.schema();
Expand Down Expand Up @@ -411,59 +337,18 @@ seastar::future<admin_query_result> admin_actor::run_create_graph(
query_param&& query_param) {
LOG(INFO) << "Creating Graph: " << query_param.content;

YAML::Node yaml;

try {
rapidjson::Document doc;
if (doc.Parse(query_param.content.c_str()).HasParseError()) {
throw std::runtime_error("Fail to parse json: " +
std::to_string(doc.GetParseError()));
}
std::stringstream json_ss;
json_ss << query_param.content;
yaml = YAML::Load(json_ss);
} catch (std::exception& e) {
LOG(ERROR) << "Fail to parse json: " << e.what();
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(
gs::Status(gs::StatusCode::INVALID_SCHEMA,
"Fail to parse json: " + std::string(e.what()))));
} catch (...) {
LOG(ERROR) << "Fail to parse json: " << query_param.content;
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(gs::Status(gs::StatusCode::INVALID_SCHEMA,
"Fail to parse json: ")));
}
// preprocess the schema yaml,
auto res_yaml = preprocess_graph_schema(std::move(yaml));
if (!res_yaml.ok()) {
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(res_yaml.status()));
}
auto& yaml_value = res_yaml.value();
// set default value
if (!yaml_value["store_type"]) {
yaml_value["store_type"] = "mutable_csr";
}

auto parse_schema_res = gs::Schema::LoadFromYamlNode(yaml_value);
if (!parse_schema_res.ok()) {
auto request = gs::CreateGraphMetaRequest::FromJson(query_param.content);
if (!request.ok()) {
LOG(ERROR) << "Fail to parse graph meta: "
<< request.status().error_message();
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(parse_schema_res.status()));
gs::Result<seastar::sstring>(request.status()));
}

auto real_schema_json = gs::get_json_string_from_yaml(yaml_value);
if (!real_schema_json.ok()) {
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(real_schema_json.status()));
}

auto result = metadata_store_->CreateGraphMeta(
gs::CreateGraphMetaRequest::FromJson(real_schema_json.value()));
auto result = metadata_store_->CreateGraphMeta(request.value());
// we also need to store a graph.yaml on disk, for other services to read.
if (result.ok()) {
auto dump_res =
WorkDirManipulator::DumpGraphSchema(result.value(), res_yaml.value());
auto dump_res = WorkDirManipulator::DumpGraphSchema(
result.value(), request.value().ToString());
if (!dump_res.ok()) {
LOG(ERROR) << "Fail to dump graph schema: "
<< dump_res.status().error_message();
Expand Down
8 changes: 7 additions & 1 deletion flex/engines/http_server/graph_db_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,13 @@ gs::GraphId GraphDBService::insert_default_graph_meta() {
LOG(FATAL) << "Failed to get graph schema string: "
<< schema_str_res.status().error_message();
}
auto request = gs::CreateGraphMetaRequest::FromJson(schema_str_res.value());
auto request_res =
gs::CreateGraphMetaRequest::FromJson(schema_str_res.value());
if (!request_res.ok()) {
LOG(FATAL) << "Failed to parse graph schema string: "
<< request_res.status().error_message();
}
auto request = request_res.value();
request.data_update_time = gs::GetCurrentTimeStamp();

auto res = metadata_store_->CreateGraphMeta(request);
Expand Down
15 changes: 15 additions & 0 deletions flex/engines/http_server/workdir_manipulator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ void WorkDirManipulator::SetWorkspace(const std::string& path) {

std::string WorkDirManipulator::GetWorkspace() { return workspace; }

gs::Result<seastar::sstring> WorkDirManipulator::DumpGraphSchema(
const gs::GraphId& graph_id, const std::string& json_str) {
YAML::Node yaml_node;
try {
yaml_node = YAML::Load(json_str);

} catch (const std::exception& e) {
return gs::Result<seastar::sstring>(gs::Status(
gs::StatusCode::INVALID_SCHEMA,
"Fail to parse graph schema: " + json_str + ", error: " + e.what()));
}
return DumpGraphSchema(graph_id, yaml_node);
}

// GraphName can be specified in the config file or in the argument.
gs::Result<seastar::sstring> WorkDirManipulator::DumpGraphSchema(
const gs::GraphId& graph_id, const YAML::Node& yaml_config) {
Expand Down Expand Up @@ -135,6 +149,7 @@ gs::Result<bool> WorkDirManipulator::DumpGraphSchema(
VLOG(10) << "Plugin is not enabled: " << plugin.name;
}
}
yaml_node["stored_procedures"] = procedures_node;
auto dump_res = dump_graph_schema(yaml_node, graph_id);
if (!dump_res.ok()) {
return gs::Result<bool>(gs::Status(gs::StatusCode::PERMISSION_DENIED,
Expand Down
5 changes: 4 additions & 1 deletion flex/engines/http_server/workdir_manipulator.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
#include <seastar/core/future.hh>
#include <seastar/core/sstring.hh>

#include <rapidjson/document.h>
#include <yaml-cpp/yaml.h>
#include <boost/process.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <rapidjson/document.h>

namespace server {

Expand Down Expand Up @@ -67,6 +67,9 @@ class WorkDirManipulator {

static std::string GetWorkspace();

static gs::Result<seastar::sstring> DumpGraphSchema(
const gs::GraphId& graph_id, const std::string& json_string);

/**
* @brief Create a graph with a given name and config.
* @param yaml_node The config of the graph.
Expand Down
1 change: 1 addition & 0 deletions flex/openapi/openapi_coordinator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ components:
$ref: '#/components/schemas/GetEdgeType'

GetGraphResponse:
additionalProperties: true
required:
- id
- name
Expand Down
2 changes: 2 additions & 0 deletions flex/openapi/openapi_interactive.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1704,6 +1704,8 @@ components:
x-body-name: get_graph_response
type: object
properties:
version:
type: string
id:
type: string
name:
Expand Down
Loading

0 comments on commit 87804de

Please sign in to comment.