Skip to content

refactor(interactive): Support complex graph schema #4538

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
git submodule update --init
cd ${GITHUB_WORKSPACE}/flex
mkdir build && cd build
cmake .. -DCMAKE_INSTALL_PREFIX=/opt/graphscope -DCMAKE_BUILD_TYPE=DEBUG && sudo make -j$(nproc)
cmake .. -DCMAKE_INSTALL_PREFIX=/opt/graphscope -DCMAKE_BUILD_TYPE=DEBUG -DLABEL_TYPE="uint16_t" && sudo make -j$(nproc)
# package the build artifacts
cd .. && tar -zcf build.tar.gz build

Expand Down Expand Up @@ -502,7 +502,7 @@ jobs:
git submodule update --init
cd flex/engines/graph_db/grin
mkdir build && cd build
cmake .. && sudo make -j$(nproc)
cmake .. -DLABEL_TYPE="uint16_t" && sudo make -j$(nproc)
export FLEX_DATA_DIR=../../../../interactive/examples/modern_graph/
${GITHUB_WORKSPACE}/flex/build/bin/bulk_loader -g ../../../../interactive/examples/modern_graph/graph.yaml -l ../../../../interactive/examples/modern_graph/bulk_load.yaml -d ./data/
rm -r ./data/wal
Expand Down
8 changes: 8 additions & 0 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ option(USE_PTHASH "Whether to use pthash" OFF)
option(OPTIMIZE_FOR_HOST "Whether to optimize on host" ON) # Whether to build optimized code on host
option(USE_STATIC_ARROW "Whether to use static arrow" OFF) # Whether to link arrow statically, default is OFF
option(BUILD_WITH_OTEL "Whether to build with opentelemetry-cpp" OFF) # Whether to build with opentelemetry-cpp, default is OFF
# option(LABEL_TYPE "The label type of the graph, valid values: uint16, uint8" "uint8") # The type of the label in the graph
SET(LABEL_TYPE "uint8_t" CACHE STRING "The label type of the graph, valid values: uint16_t, uint8_t")

#print options
message(STATUS "Build test: ${BUILD_TEST}")
Expand Down Expand Up @@ -58,6 +60,12 @@ if(USE_PTHASH)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/third_party/murmurhash)
endif()

# check label_type in {uint8, uint16}
if(NOT LABEL_TYPE STREQUAL "uint8_t" AND NOT LABEL_TYPE STREQUAL "uint16_t")
message(FATAL_ERROR "LABEL_TYPE must be uint8 or uint16, but got ${LABEL_TYPE}")
endif()
add_compile_definitions(FLEX_LABEL_TYPE=${LABEL_TYPE})

set(DEFAULT_BUILD_TYPE "Release")
if (NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
message(STATUS "Setting build type to '${DEFAULT_BUILD_TYPE}' as none was specified.")
Expand Down
161 changes: 79 additions & 82 deletions flex/engines/graph_db/database/update_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ UpdateTransaction::UpdateTransaction(const GraphDBSession& session,
extra_vertex_properties_[i].resize(4096);
}

size_t csr_num = 2 * vertex_label_num_ * vertex_label_num_ * edge_label_num_;
size_t csr_num = 2 * schema().get_edge_triplet_num();
added_edges_.resize(csr_num);
updated_edge_data_.resize(csr_num);
}
Expand Down Expand Up @@ -560,7 +560,6 @@ void UpdateTransaction::IngestWal(MutablePropertyFragment& graph,
updated_edge_data;

size_t vertex_label_num = graph.schema().vertex_label_num();
size_t edge_label_num = graph.schema().edge_label_num();

for (label_t idx = 0; idx < vertex_label_num; ++idx) {
if (graph.lf_indexers_[idx].get_type() == PropertyType::kInt64) {
Expand Down Expand Up @@ -603,7 +602,7 @@ void UpdateTransaction::IngestWal(MutablePropertyFragment& graph,
extra_vertex_properties[i].resize(4096);
}

size_t csr_num = 2 * vertex_label_num * vertex_label_num * edge_label_num;
size_t csr_num = 2 * graph.schema().get_edge_triplet_num();
added_edges.resize(csr_num);
updated_edge_data.resize(csr_num);

Expand Down Expand Up @@ -681,16 +680,14 @@ void UpdateTransaction::IngestWal(MutablePropertyFragment& graph,

size_t UpdateTransaction::get_in_csr_index(label_t src_label, label_t dst_label,
label_t edge_label) const {
return src_label * vertex_label_num_ * edge_label_num_ +
dst_label * edge_label_num_ + edge_label;
return graph_.schema().get_edge_triplet_id(src_label, dst_label, edge_label);
}

size_t UpdateTransaction::get_out_csr_index(label_t src_label,
label_t dst_label,
label_t edge_label) const {
return src_label * vertex_label_num_ * edge_label_num_ +
dst_label * edge_label_num_ + edge_label +
vertex_label_num_ * vertex_label_num_ * edge_label_num_;
return graph_.schema().get_edge_triplet_id(src_label, dst_label, edge_label) +
graph_.schema().get_edge_triplet_num();
}

bool UpdateTransaction::oid_to_lid(label_t label, const Any& oid,
Expand Down Expand Up @@ -820,89 +817,89 @@ void UpdateTransaction::applyVerticesUpdates() {
}

void UpdateTransaction::applyEdgesUpdates() {
for (label_t src_label = 0; src_label < vertex_label_num_; ++src_label) {
for (label_t dst_label = 0; dst_label < vertex_label_num_; ++dst_label) {
for (label_t edge_label = 0; edge_label < edge_label_num_; ++edge_label) {
size_t oe_csr_index =
get_out_csr_index(src_label, dst_label, edge_label);
for (auto& pair : updated_edge_data_[oe_csr_index]) {
auto& updates = pair.second;
if (updates.empty()) {
continue;
}
for (size_t index = 0; index < graph_.schema().get_edge_triplet_num();
++index) {
auto edge_triplet = graph_.schema().get_edge_triplet(index);
label_t src_label = std::get<0>(edge_triplet);
label_t dst_label = std::get<1>(edge_triplet);
label_t edge_label = std::get<2>(edge_triplet);
size_t oe_csr_index = get_out_csr_index(src_label, dst_label, edge_label);
for (auto& pair : updated_edge_data_[oe_csr_index]) {
auto& updates = pair.second;
if (updates.empty()) {
continue;
}

std::shared_ptr<CsrEdgeIterBase> edge_iter =
graph_.get_outgoing_edges_mut(src_label, pair.first, dst_label,
edge_label);
for (auto& edge : updates) {
if (edge.second.second != std::numeric_limits<size_t>::max()) {
auto& iter = *edge_iter;
iter += edge.second.second;
if (iter.is_valid() && iter.get_neighbor() == edge.first) {
iter.set_data(edge.second.first, timestamp_);
} else if (iter.is_valid() && iter.get_neighbor() != edge.first) {
LOG(FATAL) << "Inconsistent neighbor id:" << iter.get_neighbor()
<< " " << edge.first << "\n";
} else {
LOG(FATAL) << "Illegal offset: " << edge.first << " "
<< edge.second.second << "\n";
}
}
std::shared_ptr<CsrEdgeIterBase> edge_iter =
graph_.get_outgoing_edges_mut(src_label, pair.first, dst_label,
edge_label);
for (auto& edge : updates) {
if (edge.second.second != std::numeric_limits<size_t>::max()) {
auto& iter = *edge_iter;
iter += edge.second.second;
if (iter.is_valid() && iter.get_neighbor() == edge.first) {
iter.set_data(edge.second.first, timestamp_);
} else if (iter.is_valid() && iter.get_neighbor() != edge.first) {
LOG(FATAL) << "Inconsistent neighbor id:" << iter.get_neighbor()
<< " " << edge.first << "\n";
} else {
LOG(FATAL) << "Illegal offset: " << edge.first << " "
<< edge.second.second << "\n";
}
}
}
}

for (auto& pair : added_edges_[oe_csr_index]) {
vid_t v = pair.first;
auto& add_list = pair.second;
for (auto& pair : added_edges_[oe_csr_index]) {
vid_t v = pair.first;
auto& add_list = pair.second;

if (add_list.empty()) {
continue;
}
std::sort(add_list.begin(), add_list.end());
auto& edge_data = updated_edge_data_[oe_csr_index].at(v);
for (size_t idx = 0; idx < add_list.size(); ++idx) {
if (idx && add_list[idx] == add_list[idx - 1])
continue;
auto u = add_list[idx];
auto value = edge_data.at(u).first;
grape::InArchive iarc;
serialize_field(iarc, value);
grape::OutArchive oarc(std::move(iarc));
graph_.IngestEdge(src_label, v, dst_label, u, edge_label,
timestamp_, oarc, alloc_);
}
}
if (add_list.empty()) {
continue;
}
std::sort(add_list.begin(), add_list.end());
auto& edge_data = updated_edge_data_[oe_csr_index].at(v);
for (size_t idx = 0; idx < add_list.size(); ++idx) {
if (idx && add_list[idx] == add_list[idx - 1])
continue;
auto u = add_list[idx];
auto value = edge_data.at(u).first;
grape::InArchive iarc;
serialize_field(iarc, value);
grape::OutArchive oarc(std::move(iarc));
graph_.IngestEdge(src_label, v, dst_label, u, edge_label, timestamp_,
oarc, alloc_);
}
}
}

for (label_t src_label = 0; src_label < vertex_label_num_; ++src_label) {
for (label_t dst_label = 0; dst_label < vertex_label_num_; ++dst_label) {
for (label_t edge_label = 0; edge_label < edge_label_num_; ++edge_label) {
size_t ie_csr_index =
get_in_csr_index(src_label, dst_label, edge_label);
for (auto& pair : updated_edge_data_[ie_csr_index]) {
auto& updates = pair.second;
if (updates.empty()) {
continue;
}
std::shared_ptr<CsrEdgeIterBase> edge_iter =
graph_.get_incoming_edges_mut(dst_label, pair.first, src_label,
edge_label);
for (auto& edge : updates) {
if (edge.second.second != std::numeric_limits<size_t>::max()) {
auto& iter = *edge_iter;
iter += edge.second.second;
if (iter.is_valid() && iter.get_neighbor() == edge.first) {
iter.set_data(edge.second.first, timestamp_);
} else if (iter.is_valid() && iter.get_neighbor() != edge.first) {
LOG(FATAL) << "Inconsistent neighbor id:" << iter.get_neighbor()
<< " " << edge.first << "\n";
} else {
LOG(FATAL) << "Illegal offset: " << edge.first << " "
<< edge.second.second << "\n";
}
}
for (size_t index = 0; index < graph_.schema().get_edge_triplet_num();
++index) {
auto edge_triplet = graph_.schema().get_edge_triplet(index);
label_t src_label = std::get<0>(edge_triplet);
label_t dst_label = std::get<1>(edge_triplet);
label_t edge_label = std::get<2>(edge_triplet);
size_t ie_csr_index = get_in_csr_index(src_label, dst_label, edge_label);
for (auto& pair : updated_edge_data_[ie_csr_index]) {
auto& updates = pair.second;
if (updates.empty()) {
continue;
}
std::shared_ptr<CsrEdgeIterBase> edge_iter =
graph_.get_incoming_edges_mut(dst_label, pair.first, src_label,
edge_label);
for (auto& edge : updates) {
if (edge.second.second != std::numeric_limits<size_t>::max()) {
auto& iter = *edge_iter;
iter += edge.second.second;
if (iter.is_valid() && iter.get_neighbor() == edge.first) {
iter.set_data(edge.second.first, timestamp_);
} else if (iter.is_valid() && iter.get_neighbor() != edge.first) {
LOG(FATAL) << "Inconsistent neighbor id:" << iter.get_neighbor()
<< " " << edge.first << "\n";
} else {
LOG(FATAL) << "Illegal offset: " << edge.first << " "
<< edge.second.second << "\n";
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions flex/engines/graph_db/grin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ set(GRIN_READER_VERSION ${GRIN_READER_MAJOR_VERSION}.${GRIN_READER_MINOR_VERSION

project(grin_reader LANGUAGES C CXX VERSION ${GRIN_READER_VERSION})

# This option should be aligned with flex/CMakeLists.txt
SET(LABEL_TYPE "uint8_t" CACHE STRING "The label type of the graph, valid values: uint16_t, uint8_t")

# check label_type in {uint8, uint16}
if(NOT LABEL_TYPE STREQUAL "uint8_t" AND NOT LABEL_TYPE STREQUAL "uint16_t")
message(FATAL_ERROR "LABEL_TYPE must be uint8 or uint16, but got ${LABEL_TYPE}")
endif()
add_compile_definitions(FLEX_LABEL_TYPE=${LABEL_TYPE})

# Set flags
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
Expand Down
7 changes: 4 additions & 3 deletions flex/engines/graph_db/grin/src/property/property.cc
Original file line number Diff line number Diff line change
Expand Up @@ -394,11 +394,12 @@ void grin_destroy_edge_property(GRIN_GRAPH g, GRIN_EDGE_PROPERTY ep) {}
GRIN_DATATYPE grin_get_edge_property_datatype(GRIN_GRAPH g,
GRIN_EDGE_PROPERTY ep) {
auto _g = static_cast<GRIN_GRAPH_T*>(g);
auto src_label_i = (ep >> 16) & 0xff;
auto edge_triplet_tuple = _g->g.schema().get_edge_triplet(ep);
auto src_label_i = std::get<0>(edge_triplet_tuple);
auto dst_label_i = std::get<1>(edge_triplet_tuple);
auto edge_label_i = std::get<2>(edge_triplet_tuple);
const auto& src_label = _g->g.schema().get_vertex_label_name(src_label_i);
auto dst_label_i = (ep >> 8) & 0xff;
const auto& dst_label = _g->g.schema().get_vertex_label_name(dst_label_i);
auto edge_label_i = ep & 0xff;
const auto& edge_label = _g->g.schema().get_edge_label_name(edge_label_i);
const auto& type =
_g->g.schema().get_edge_properties(src_label, dst_label, edge_label);
Expand Down
8 changes: 5 additions & 3 deletions flex/engines/graph_db/grin/src/property/propertylist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,13 @@ GRIN_EDGE_PROPERTY_LIST grin_get_edge_property_list_by_type(GRIN_GRAPH g,
GRIN_EDGE_PROPERTY_LIST_T* p = new GRIN_EDGE_PROPERTY_LIST_T();

auto _g = static_cast<GRIN_GRAPH_T*>(g);
auto src_label_i = et >> 16;
auto edge_triplet_tuple = _g->g.schema().get_edge_triplet(et);
auto src_label_i = std::get<0>(edge_triplet_tuple);
auto dst_label_i = std::get<1>(edge_triplet_tuple);
auto edge_label_i = std::get<2>(edge_triplet_tuple);

auto src_label = _g->g.schema().get_vertex_label_name(src_label_i);
auto dst_label_i = (et >> 8) & (0xff);
auto dst_label = _g->g.schema().get_vertex_label_name(dst_label_i);
auto edge_label_i = et & 0xff;
auto edge_label = _g->g.schema().get_edge_label_name(edge_label_i);
auto sz = _g->g.schema()
.get_edge_properties(src_label, dst_label, edge_label)
Expand Down
8 changes: 4 additions & 4 deletions flex/engines/graph_db/grin/src/property/topology.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ size_t grin_get_vertex_num_by_type(GRIN_GRAPH g, GRIN_VERTEX_TYPE vt) {

#ifdef GRIN_WITH_EDGE_PROPERTY
size_t grin_get_edge_num_by_type(GRIN_GRAPH g, GRIN_EDGE_TYPE et) {
auto src_label = et >> 16;
auto dst_label = (et >> 8) & (0xff);
auto edge_label = et & (0xff);
auto _g = static_cast<GRIN_GRAPH_T*>(g);

auto edge_triplet_tuple = _g->g.schema().get_edge_triplet(et);
auto src_label = std::get<0>(edge_triplet_tuple);
auto dst_label = std::get<1>(edge_triplet_tuple);
auto edge_label = std::get<2>(edge_triplet_tuple);
auto oe = _g->g.get_oe_csr(src_label, dst_label, edge_label);
auto vertex_num = _g->g.vertex_num(src_label);
size_t edge_num = 0;
Expand Down
Loading
Loading