Skip to content

refactor(interactive): Support complex graph schema #4538

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions .github/workflows/interactive.yml
Original file line number Diff line number Diff line change
@@ -55,7 +55,7 @@ jobs:
git submodule update --init
cd ${GITHUB_WORKSPACE}/flex
mkdir build && cd build
cmake .. -DCMAKE_INSTALL_PREFIX=/opt/graphscope -DCMAKE_BUILD_TYPE=DEBUG && sudo make -j$(nproc)
cmake .. -DCMAKE_INSTALL_PREFIX=/opt/graphscope -DCMAKE_BUILD_TYPE=DEBUG -DFLEX_LABEL_TYPE="uint16_t" && sudo make -j$(nproc)
# package the build artifacts
cd .. && tar -zcf build.tar.gz build

@@ -278,17 +278,20 @@ jobs:
--ir_conf=${GITHUB_WORKSPACE}/flex/tests/hqps/interactive_config_test.yaml -o=${PLUGIN_DIR} \
--procedure_name=plus_one \
--graph_schema_path=../interactive/examples/modern_graph/graph.yaml \
--procedure_desc="This is test procedure, and the input is a number, and the output is the number plus one."
--procedure_desc="This is test procedure, and the input is a number, and the output is the number plus one." \
--label_type="uint16_t"

./load_plan_and_gen.sh -e=hqps -i=../interactive/sdk/java/src/test/resources/sample_app.cc -w=/tmp/codegen \
--ir_conf=${GITHUB_WORKSPACE}/flex/tests/hqps/interactive_config_test.yaml -o=${PLUGIN_DIR} \
--procedure_name=sample_app \
--graph_schema_path=../interactive/examples/modern_graph/graph.yaml
--graph_schema_path=../interactive/examples/modern_graph/graph.yaml \
--label_type="uint16_t"

./load_plan_and_gen.sh -e=hqps -i=../interactive/examples/modern_graph/count_vertex_num.cypher -w=/tmp/codegen \
--ir_conf=${GITHUB_WORKSPACE}/flex/tests/hqps/interactive_config_test.yaml -o=${PLUGIN_DIR} \
--procedure_name=count_vertex_num \
--graph_schema_path=../interactive/examples/modern_graph/graph.yaml
--graph_schema_path=../interactive/examples/modern_graph/graph.yaml \
--label_type="uint16_t"

# Among the above procedures, the correct input format for each is:
# count_vertex_num: () -> (num: int64), CypherProcedure.
@@ -502,7 +505,7 @@ jobs:
git submodule update --init
cd flex/engines/graph_db/grin
mkdir build && cd build
cmake .. && sudo make -j$(nproc)
cmake .. -DFLEX_LABEL_TYPE="uint16_t" && sudo make -j$(nproc)
export FLEX_DATA_DIR=../../../../interactive/examples/modern_graph/
${GITHUB_WORKSPACE}/flex/build/bin/bulk_loader -g ../../../../interactive/examples/modern_graph/graph.yaml -l ../../../../interactive/examples/modern_graph/bulk_load.yaml -d ./data/
rm -r ./data/wal
8 changes: 8 additions & 0 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -17,6 +17,8 @@ option(USE_PTHASH "Whether to use pthash" OFF)
option(OPTIMIZE_FOR_HOST "Whether to optimize on host" ON) # Whether to build optimized code on host
option(USE_STATIC_ARROW "Whether to use static arrow" OFF) # Whether to link arrow statically, default is OFF
option(BUILD_WITH_OTEL "Whether to build with opentelemetry-cpp" OFF) # Whether to build with opentelemetry-cpp, default is OFF
# option(FLEX_LABEL_TYPE "The label type of the graph, valid values: uint16, uint8" "uint8") # The type of the label in the graph
SET(FLEX_LABEL_TYPE "uint8_t" CACHE STRING "The label type of the graph, valid values: uint16_t, uint8_t")

#print options
message(STATUS "Build test: ${BUILD_TEST}")
@@ -58,6 +60,12 @@ if(USE_PTHASH)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/third_party/murmurhash)
endif()

# check label_type in {uint8, uint16}
if(NOT FLEX_LABEL_TYPE STREQUAL "uint8_t" AND NOT FLEX_LABEL_TYPE STREQUAL "uint16_t")
message(FATAL_ERROR "FLEX_LABEL_TYPE must be uint8 or uint16, but got ${FLEX_LABEL_TYPE}")
endif()
add_compile_definitions(FLEX_LABEL_TYPE=${FLEX_LABEL_TYPE})

set(DEFAULT_BUILD_TYPE "Release")
if (NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
message(STATUS "Setting build type to '${DEFAULT_BUILD_TYPE}' as none was specified.")
39 changes: 29 additions & 10 deletions flex/bin/load_plan_and_gen.sh
Original file line number Diff line number Diff line change
@@ -200,35 +200,41 @@ EOM

compile_hqps_so() {
#check input params size eq 2 or 3
if [ $# -gt 8 ] || [ $# -lt 4 ]; then
if [ $# -gt 9 ] || [ $# -lt 4 ]; then
echo "Usage: $0 <input_file> <work_dir> <ir_compiler_properties_file> <graph_schema_file> "
echo " [statistic_path] [output_dir] [stored_procedure_name] [stored_procedure_description]"
echo " [label_type] [output_dir] [stored_procedure_name] [stored_procedure_description] [statistic_path]"
exit 1
fi
input_path=$1
work_dir=$2
ir_compiler_properties=$3
graph_schema_path=$4
if [ $# -ge 5 ]; then
output_dir=$5
label_type=$5
else
output_dir=${work_dir}
label_type=""
fi

if [ $# -ge 6 ]; then
procedure_name=$6
output_dir=$6
else
procedure_name=""
output_dir=${work_dir}
fi

if [ $# -ge 7 ]; then
procedure_description=$7
procedure_name=$7
else
procedure_description=""
procedure_name=""
fi

if [ $# -ge 8 ]; then
statistic_path=$8
procedure_description=$8
else
procedure_description=""
fi

if [ $# -ge 9 ]; then
statistic_path=$9
else
statistic_path=""
fi
@@ -237,6 +243,7 @@ compile_hqps_so() {
info "Work dir = ${work_dir}"
info "ir compiler properties = ${ir_compiler_properties}"
info "graph schema path = ${graph_schema_path}"
info "Label type = ${label_type}"
info "Output dir = ${output_dir}"
info "Procedure name = ${procedure_name}"
info "Procedure description = ${procedure_description}"
@@ -327,6 +334,9 @@ compile_hqps_so() {
# run cmake and make in output path.
pushd ${cur_dir}
cmd="cmake . -DQUERY_NAME=${procedure_name} -DFLEX_INCLUDE_PREFIX=${FLEX_INCLUDE_PREFIX}"
if [ ! -z ${label_type} ]; then
cmd="${cmd} -DFLEX_LABEL_TYPE=${label_type}"
fi
# if CMAKE_CXX_COMPILER is set, use it.
if [ ! -z ${CMAKE_CXX_COMPILER} ]; then
cmd="${cmd} -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}"
@@ -573,6 +583,10 @@ run() {
STATISTIC_PATH="${i#*=}"
shift # past argument=value
;;
--label_type=*)
LABEL_TYPE="${i#*=}"
shift # past argument=value
;;
-* | --*)
err "Unknown option $i"
exit 1
@@ -582,6 +596,10 @@ run() {
esac
done

if [ -z "${LABEL_TYPE}" ]; then
LABEL_TYPE="uint8_t"
fi

echo "Engine type ="${ENGINE_TYPE}
echo "Input ="${INPUT}
echo "Work dir ="${WORK_DIR}
@@ -591,6 +609,7 @@ run() {
echo "Procedure name ="${PROCEDURE_NAME}
echo "Procedure description ="${PROCEDURE_DESCRIPTION}
echo "Statistic path ="${STATISTIC_PATH}
echo "Label type ="${LABEL_TYPE}

find_resources

@@ -625,7 +644,7 @@ run() {
PROCEDURE_NAME="${PROCEDURE_NAME%.cc}"
PROCEDURE_NAME="${PROCEDURE_NAME%.pb}"
fi
compile_hqps_so ${INPUT} ${WORK_DIR} ${IR_CONF} ${GRAPH_SCHEMA_PATH} ${OUTPUT_DIR} ${PROCEDURE_NAME} "${PROCEDURE_DESCRIPTION}" ${STATISTIC_PATH}
compile_hqps_so ${INPUT} ${WORK_DIR} ${IR_CONF} ${GRAPH_SCHEMA_PATH} ${LABEL_TYPE} ${OUTPUT_DIR} ${PROCEDURE_NAME} "${PROCEDURE_DESCRIPTION}" ${STATISTIC_PATH}

# else if engine_type equals pegasus
elif [ ${ENGINE_TYPE} == "pegasus" ]; then
Original file line number Diff line number Diff line change
@@ -127,7 +127,8 @@ bool SingleEdgeInsertTransaction::Commit() {
{
arc.SetSlice(arc_.GetBuffer() + sizeof(WalHeader),
arc_.GetSize() - sizeof(WalHeader));
label_t op_type, label;
uint8_t op_type;
label_t label;
Any temp;
arc >> op_type;
deserialize_oid(graph_, arc, temp);
161 changes: 79 additions & 82 deletions flex/engines/graph_db/database/update_transaction.cc
Original file line number Diff line number Diff line change
@@ -93,7 +93,7 @@ UpdateTransaction::UpdateTransaction(const GraphDBSession& session,
extra_vertex_properties_[i].resize(4096);
}

size_t csr_num = 2 * vertex_label_num_ * vertex_label_num_ * edge_label_num_;
size_t csr_num = 2 * schema().get_edge_triplet_num();
added_edges_.resize(csr_num);
updated_edge_data_.resize(csr_num);
}
@@ -560,7 +560,6 @@ void UpdateTransaction::IngestWal(MutablePropertyFragment& graph,
updated_edge_data;

size_t vertex_label_num = graph.schema().vertex_label_num();
size_t edge_label_num = graph.schema().edge_label_num();

for (label_t idx = 0; idx < vertex_label_num; ++idx) {
if (graph.lf_indexers_[idx].get_type() == PropertyType::kInt64) {
@@ -603,7 +602,7 @@ void UpdateTransaction::IngestWal(MutablePropertyFragment& graph,
extra_vertex_properties[i].resize(4096);
}

size_t csr_num = 2 * vertex_label_num * vertex_label_num * edge_label_num;
size_t csr_num = 2 * graph.schema().get_edge_triplet_num();
added_edges.resize(csr_num);
updated_edge_data.resize(csr_num);

@@ -681,16 +680,14 @@ void UpdateTransaction::IngestWal(MutablePropertyFragment& graph,

size_t UpdateTransaction::get_in_csr_index(label_t src_label, label_t dst_label,
label_t edge_label) const {
return src_label * vertex_label_num_ * edge_label_num_ +
dst_label * edge_label_num_ + edge_label;
return graph_.schema().get_edge_triplet_id(src_label, dst_label, edge_label);
}

size_t UpdateTransaction::get_out_csr_index(label_t src_label,
label_t dst_label,
label_t edge_label) const {
return src_label * vertex_label_num_ * edge_label_num_ +
dst_label * edge_label_num_ + edge_label +
vertex_label_num_ * vertex_label_num_ * edge_label_num_;
return graph_.schema().get_edge_triplet_id(src_label, dst_label, edge_label) +
graph_.schema().get_edge_triplet_num();
}

bool UpdateTransaction::oid_to_lid(label_t label, const Any& oid,
@@ -820,89 +817,89 @@ void UpdateTransaction::applyVerticesUpdates() {
}

void UpdateTransaction::applyEdgesUpdates() {
for (label_t src_label = 0; src_label < vertex_label_num_; ++src_label) {
for (label_t dst_label = 0; dst_label < vertex_label_num_; ++dst_label) {
for (label_t edge_label = 0; edge_label < edge_label_num_; ++edge_label) {
size_t oe_csr_index =
get_out_csr_index(src_label, dst_label, edge_label);
for (auto& pair : updated_edge_data_[oe_csr_index]) {
auto& updates = pair.second;
if (updates.empty()) {
continue;
}
for (size_t index = 0; index < graph_.schema().get_edge_triplet_num();
++index) {
auto edge_triplet = graph_.schema().get_edge_triplet(index);
label_t src_label = std::get<0>(edge_triplet);
label_t dst_label = std::get<1>(edge_triplet);
label_t edge_label = std::get<2>(edge_triplet);
size_t oe_csr_index = get_out_csr_index(src_label, dst_label, edge_label);
for (auto& pair : updated_edge_data_[oe_csr_index]) {
auto& updates = pair.second;
if (updates.empty()) {
continue;
}

std::shared_ptr<CsrEdgeIterBase> edge_iter =
graph_.get_outgoing_edges_mut(src_label, pair.first, dst_label,
edge_label);
for (auto& edge : updates) {
if (edge.second.second != std::numeric_limits<size_t>::max()) {
auto& iter = *edge_iter;
iter += edge.second.second;
if (iter.is_valid() && iter.get_neighbor() == edge.first) {
iter.set_data(edge.second.first, timestamp_);
} else if (iter.is_valid() && iter.get_neighbor() != edge.first) {
LOG(FATAL) << "Inconsistent neighbor id:" << iter.get_neighbor()
<< " " << edge.first << "\n";
} else {
LOG(FATAL) << "Illegal offset: " << edge.first << " "
<< edge.second.second << "\n";
}
}
std::shared_ptr<CsrEdgeIterBase> edge_iter =
graph_.get_outgoing_edges_mut(src_label, pair.first, dst_label,
edge_label);
for (auto& edge : updates) {
if (edge.second.second != std::numeric_limits<size_t>::max()) {
auto& iter = *edge_iter;
iter += edge.second.second;
if (iter.is_valid() && iter.get_neighbor() == edge.first) {
iter.set_data(edge.second.first, timestamp_);
} else if (iter.is_valid() && iter.get_neighbor() != edge.first) {
LOG(FATAL) << "Inconsistent neighbor id:" << iter.get_neighbor()
<< " " << edge.first << "\n";
} else {
LOG(FATAL) << "Illegal offset: " << edge.first << " "
<< edge.second.second << "\n";
}
}
}
}

for (auto& pair : added_edges_[oe_csr_index]) {
vid_t v = pair.first;
auto& add_list = pair.second;
for (auto& pair : added_edges_[oe_csr_index]) {
vid_t v = pair.first;
auto& add_list = pair.second;

if (add_list.empty()) {
continue;
}
std::sort(add_list.begin(), add_list.end());
auto& edge_data = updated_edge_data_[oe_csr_index].at(v);
for (size_t idx = 0; idx < add_list.size(); ++idx) {
if (idx && add_list[idx] == add_list[idx - 1])
continue;
auto u = add_list[idx];
auto value = edge_data.at(u).first;
grape::InArchive iarc;
serialize_field(iarc, value);
grape::OutArchive oarc(std::move(iarc));
graph_.IngestEdge(src_label, v, dst_label, u, edge_label,
timestamp_, oarc, alloc_);
}
}
if (add_list.empty()) {
continue;
}
std::sort(add_list.begin(), add_list.end());
auto& edge_data = updated_edge_data_[oe_csr_index].at(v);
for (size_t idx = 0; idx < add_list.size(); ++idx) {
if (idx && add_list[idx] == add_list[idx - 1])
continue;
auto u = add_list[idx];
auto value = edge_data.at(u).first;
grape::InArchive iarc;
serialize_field(iarc, value);
grape::OutArchive oarc(std::move(iarc));
graph_.IngestEdge(src_label, v, dst_label, u, edge_label, timestamp_,
oarc, alloc_);
}
}
}

for (label_t src_label = 0; src_label < vertex_label_num_; ++src_label) {
for (label_t dst_label = 0; dst_label < vertex_label_num_; ++dst_label) {
for (label_t edge_label = 0; edge_label < edge_label_num_; ++edge_label) {
size_t ie_csr_index =
get_in_csr_index(src_label, dst_label, edge_label);
for (auto& pair : updated_edge_data_[ie_csr_index]) {
auto& updates = pair.second;
if (updates.empty()) {
continue;
}
std::shared_ptr<CsrEdgeIterBase> edge_iter =
graph_.get_incoming_edges_mut(dst_label, pair.first, src_label,
edge_label);
for (auto& edge : updates) {
if (edge.second.second != std::numeric_limits<size_t>::max()) {
auto& iter = *edge_iter;
iter += edge.second.second;
if (iter.is_valid() && iter.get_neighbor() == edge.first) {
iter.set_data(edge.second.first, timestamp_);
} else if (iter.is_valid() && iter.get_neighbor() != edge.first) {
LOG(FATAL) << "Inconsistent neighbor id:" << iter.get_neighbor()
<< " " << edge.first << "\n";
} else {
LOG(FATAL) << "Illegal offset: " << edge.first << " "
<< edge.second.second << "\n";
}
}
for (size_t index = 0; index < graph_.schema().get_edge_triplet_num();
++index) {
auto edge_triplet = graph_.schema().get_edge_triplet(index);
label_t src_label = std::get<0>(edge_triplet);
label_t dst_label = std::get<1>(edge_triplet);
label_t edge_label = std::get<2>(edge_triplet);
size_t ie_csr_index = get_in_csr_index(src_label, dst_label, edge_label);
for (auto& pair : updated_edge_data_[ie_csr_index]) {
auto& updates = pair.second;
if (updates.empty()) {
continue;
}
std::shared_ptr<CsrEdgeIterBase> edge_iter =
graph_.get_incoming_edges_mut(dst_label, pair.first, src_label,
edge_label);
for (auto& edge : updates) {
if (edge.second.second != std::numeric_limits<size_t>::max()) {
auto& iter = *edge_iter;
iter += edge.second.second;
if (iter.is_valid() && iter.get_neighbor() == edge.first) {
iter.set_data(edge.second.first, timestamp_);
} else if (iter.is_valid() && iter.get_neighbor() != edge.first) {
LOG(FATAL) << "Inconsistent neighbor id:" << iter.get_neighbor()
<< " " << edge.first << "\n";
} else {
LOG(FATAL) << "Illegal offset: " << edge.first << " "
<< edge.second.second << "\n";
}
}
}
Loading