Skip to content

Commit 695ef6b

Browse files
committed
make sure interactive support relative large number of vertex and edge labels
Committed-by: [email protected] from Dev container
1 parent b0b8099 commit 695ef6b

File tree

23 files changed

+778
-592
lines changed

23 files changed

+778
-592
lines changed

Diff for: flex/CMakeLists.txt

+8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ option(USE_PTHASH "Whether to use pthash" OFF)
1717
option(OPTIMIZE_FOR_HOST "Whether to optimize on host" ON) # Whether to build optimized code on host
1818
option(USE_STATIC_ARROW "Whether to use static arrow" OFF) # Whether to link arrow statically, default is OFF
1919
option(BUILD_WITH_OTEL "Whether to build with opentelemetry-cpp" OFF) # Whether to build with opentelemetry-cpp, default is OFF
20+
# option(LABEL_TYPE "The label type of the graph, valid values: uint16, uint8" "uint8") # The type of the label in the graph
21+
SET(LABEL_TYPE "uint8_t" CACHE STRING "The label type of the graph, valid values: uint16_t, uint8_t")
2022

2123
#print options
2224
message(STATUS "Build test: ${BUILD_TEST}")
@@ -58,6 +60,12 @@ if(USE_PTHASH)
5860
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/third_party/murmurhash)
5961
endif()
6062

63+
# check label_type in {uint8, uint16}
64+
if(NOT LABEL_TYPE STREQUAL "uint8_t" AND NOT LABEL_TYPE STREQUAL "uint16_t")
65+
message(FATAL_ERROR "LABEL_TYPE must be uint8 or uint16, but got ${LABEL_TYPE}")
66+
endif()
67+
add_compile_definitions(FLEX_LABEL_TYPE=${LABEL_TYPE})
68+
6169
set(DEFAULT_BUILD_TYPE "Release")
6270
if (NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
6371
message(STATUS "Setting build type to '${DEFAULT_BUILD_TYPE}' as none was specified.")

Diff for: flex/engines/graph_db/database/update_transaction.cc

+79-82
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ UpdateTransaction::UpdateTransaction(const GraphDBSession& session,
9393
extra_vertex_properties_[i].resize(4096);
9494
}
9595

96-
size_t csr_num = 2 * vertex_label_num_ * vertex_label_num_ * edge_label_num_;
96+
size_t csr_num = 2 * schema().get_edge_triplet_num();
9797
added_edges_.resize(csr_num);
9898
updated_edge_data_.resize(csr_num);
9999
}
@@ -560,7 +560,6 @@ void UpdateTransaction::IngestWal(MutablePropertyFragment& graph,
560560
updated_edge_data;
561561

562562
size_t vertex_label_num = graph.schema().vertex_label_num();
563-
size_t edge_label_num = graph.schema().edge_label_num();
564563

565564
for (label_t idx = 0; idx < vertex_label_num; ++idx) {
566565
if (graph.lf_indexers_[idx].get_type() == PropertyType::kInt64) {
@@ -603,7 +602,7 @@ void UpdateTransaction::IngestWal(MutablePropertyFragment& graph,
603602
extra_vertex_properties[i].resize(4096);
604603
}
605604

606-
size_t csr_num = 2 * vertex_label_num * vertex_label_num * edge_label_num;
605+
size_t csr_num = 2 * graph.schema().get_edge_triplet_num();
607606
added_edges.resize(csr_num);
608607
updated_edge_data.resize(csr_num);
609608

@@ -681,16 +680,14 @@ void UpdateTransaction::IngestWal(MutablePropertyFragment& graph,
681680

682681
size_t UpdateTransaction::get_in_csr_index(label_t src_label, label_t dst_label,
683682
label_t edge_label) const {
684-
return src_label * vertex_label_num_ * edge_label_num_ +
685-
dst_label * edge_label_num_ + edge_label;
683+
return graph_.schema().get_edge_triplet_id(src_label, dst_label, edge_label);
686684
}
687685

688686
size_t UpdateTransaction::get_out_csr_index(label_t src_label,
689687
label_t dst_label,
690688
label_t edge_label) const {
691-
return src_label * vertex_label_num_ * edge_label_num_ +
692-
dst_label * edge_label_num_ + edge_label +
693-
vertex_label_num_ * vertex_label_num_ * edge_label_num_;
689+
return graph_.schema().get_edge_triplet_id(dst_label, src_label, edge_label) +
690+
graph_.schema().get_edge_triplet_num();
694691
}
695692

696693
bool UpdateTransaction::oid_to_lid(label_t label, const Any& oid,
@@ -820,89 +817,89 @@ void UpdateTransaction::applyVerticesUpdates() {
820817
}
821818

822819
void UpdateTransaction::applyEdgesUpdates() {
823-
for (label_t src_label = 0; src_label < vertex_label_num_; ++src_label) {
824-
for (label_t dst_label = 0; dst_label < vertex_label_num_; ++dst_label) {
825-
for (label_t edge_label = 0; edge_label < edge_label_num_; ++edge_label) {
826-
size_t oe_csr_index =
827-
get_out_csr_index(src_label, dst_label, edge_label);
828-
for (auto& pair : updated_edge_data_[oe_csr_index]) {
829-
auto& updates = pair.second;
830-
if (updates.empty()) {
831-
continue;
832-
}
820+
for (size_t index = 0; index < graph_.schema().get_edge_triplet_num();
821+
++index) {
822+
auto edge_triplet = graph_.schema().get_edge_triplet(index);
823+
label_t src_label = std::get<0>(edge_triplet);
824+
label_t dst_label = std::get<1>(edge_triplet);
825+
label_t edge_label = std::get<2>(edge_triplet);
826+
size_t oe_csr_index = get_out_csr_index(src_label, dst_label, edge_label);
827+
for (auto& pair : updated_edge_data_[oe_csr_index]) {
828+
auto& updates = pair.second;
829+
if (updates.empty()) {
830+
continue;
831+
}
833832

834-
std::shared_ptr<CsrEdgeIterBase> edge_iter =
835-
graph_.get_outgoing_edges_mut(src_label, pair.first, dst_label,
836-
edge_label);
837-
for (auto& edge : updates) {
838-
if (edge.second.second != std::numeric_limits<size_t>::max()) {
839-
auto& iter = *edge_iter;
840-
iter += edge.second.second;
841-
if (iter.is_valid() && iter.get_neighbor() == edge.first) {
842-
iter.set_data(edge.second.first, timestamp_);
843-
} else if (iter.is_valid() && iter.get_neighbor() != edge.first) {
844-
LOG(FATAL) << "Inconsistent neighbor id:" << iter.get_neighbor()
845-
<< " " << edge.first << "\n";
846-
} else {
847-
LOG(FATAL) << "Illegal offset: " << edge.first << " "
848-
<< edge.second.second << "\n";
849-
}
850-
}
833+
std::shared_ptr<CsrEdgeIterBase> edge_iter =
834+
graph_.get_outgoing_edges_mut(src_label, pair.first, dst_label,
835+
edge_label);
836+
for (auto& edge : updates) {
837+
if (edge.second.second != std::numeric_limits<size_t>::max()) {
838+
auto& iter = *edge_iter;
839+
iter += edge.second.second;
840+
if (iter.is_valid() && iter.get_neighbor() == edge.first) {
841+
iter.set_data(edge.second.first, timestamp_);
842+
} else if (iter.is_valid() && iter.get_neighbor() != edge.first) {
843+
LOG(FATAL) << "Inconsistent neighbor id:" << iter.get_neighbor()
844+
<< " " << edge.first << "\n";
845+
} else {
846+
LOG(FATAL) << "Illegal offset: " << edge.first << " "
847+
<< edge.second.second << "\n";
851848
}
852849
}
850+
}
851+
}
853852

854-
for (auto& pair : added_edges_[oe_csr_index]) {
855-
vid_t v = pair.first;
856-
auto& add_list = pair.second;
853+
for (auto& pair : added_edges_[oe_csr_index]) {
854+
vid_t v = pair.first;
855+
auto& add_list = pair.second;
857856

858-
if (add_list.empty()) {
859-
continue;
860-
}
861-
std::sort(add_list.begin(), add_list.end());
862-
auto& edge_data = updated_edge_data_[oe_csr_index].at(v);
863-
for (size_t idx = 0; idx < add_list.size(); ++idx) {
864-
if (idx && add_list[idx] == add_list[idx - 1])
865-
continue;
866-
auto u = add_list[idx];
867-
auto value = edge_data.at(u).first;
868-
grape::InArchive iarc;
869-
serialize_field(iarc, value);
870-
grape::OutArchive oarc(std::move(iarc));
871-
graph_.IngestEdge(src_label, v, dst_label, u, edge_label,
872-
timestamp_, oarc, alloc_);
873-
}
874-
}
857+
if (add_list.empty()) {
858+
continue;
859+
}
860+
std::sort(add_list.begin(), add_list.end());
861+
auto& edge_data = updated_edge_data_[oe_csr_index].at(v);
862+
for (size_t idx = 0; idx < add_list.size(); ++idx) {
863+
if (idx && add_list[idx] == add_list[idx - 1])
864+
continue;
865+
auto u = add_list[idx];
866+
auto value = edge_data.at(u).first;
867+
grape::InArchive iarc;
868+
serialize_field(iarc, value);
869+
grape::OutArchive oarc(std::move(iarc));
870+
graph_.IngestEdge(src_label, v, dst_label, u, edge_label, timestamp_,
871+
oarc, alloc_);
875872
}
876873
}
877874
}
878875

879-
for (label_t src_label = 0; src_label < vertex_label_num_; ++src_label) {
880-
for (label_t dst_label = 0; dst_label < vertex_label_num_; ++dst_label) {
881-
for (label_t edge_label = 0; edge_label < edge_label_num_; ++edge_label) {
882-
size_t ie_csr_index =
883-
get_in_csr_index(src_label, dst_label, edge_label);
884-
for (auto& pair : updated_edge_data_[ie_csr_index]) {
885-
auto& updates = pair.second;
886-
if (updates.empty()) {
887-
continue;
888-
}
889-
std::shared_ptr<CsrEdgeIterBase> edge_iter =
890-
graph_.get_incoming_edges_mut(dst_label, pair.first, src_label,
891-
edge_label);
892-
for (auto& edge : updates) {
893-
if (edge.second.second != std::numeric_limits<size_t>::max()) {
894-
auto& iter = *edge_iter;
895-
iter += edge.second.second;
896-
if (iter.is_valid() && iter.get_neighbor() == edge.first) {
897-
iter.set_data(edge.second.first, timestamp_);
898-
} else if (iter.is_valid() && iter.get_neighbor() != edge.first) {
899-
LOG(FATAL) << "Inconsistent neighbor id:" << iter.get_neighbor()
900-
<< " " << edge.first << "\n";
901-
} else {
902-
LOG(FATAL) << "Illegal offset: " << edge.first << " "
903-
<< edge.second.second << "\n";
904-
}
905-
}
876+
for (size_t index = 0; index < graph_.schema().get_edge_triplet_num();
877+
++index) {
878+
auto edge_triplet = graph_.schema().get_edge_triplet(index);
879+
label_t src_label = std::get<0>(edge_triplet);
880+
label_t dst_label = std::get<1>(edge_triplet);
881+
label_t edge_label = std::get<2>(edge_triplet);
882+
size_t ie_csr_index = get_in_csr_index(src_label, dst_label, edge_label);
883+
for (auto& pair : updated_edge_data_[ie_csr_index]) {
884+
auto& updates = pair.second;
885+
if (updates.empty()) {
886+
continue;
887+
}
888+
std::shared_ptr<CsrEdgeIterBase> edge_iter =
889+
graph_.get_incoming_edges_mut(dst_label, pair.first, src_label,
890+
edge_label);
891+
for (auto& edge : updates) {
892+
if (edge.second.second != std::numeric_limits<size_t>::max()) {
893+
auto& iter = *edge_iter;
894+
iter += edge.second.second;
895+
if (iter.is_valid() && iter.get_neighbor() == edge.first) {
896+
iter.set_data(edge.second.first, timestamp_);
897+
} else if (iter.is_valid() && iter.get_neighbor() != edge.first) {
898+
LOG(FATAL) << "Inconsistent neighbor id:" << iter.get_neighbor()
899+
<< " " << edge.first << "\n";
900+
} else {
901+
LOG(FATAL) << "Illegal offset: " << edge.first << " "
902+
<< edge.second.second << "\n";
906903
}
907904
}
908905
}

Diff for: flex/engines/graph_db/runtime/common/accessors.h

+26-33
Original file line numberDiff line numberDiff line change
@@ -419,16 +419,16 @@ class MultiPropsEdgePropertyPathAccessor : public IAccessor {
419419
MultiPropsEdgePropertyPathAccessor(const GraphInterface& graph,
420420
const std::string& prop_name,
421421
const Context& ctx, int tag)
422-
: col_(*std::dynamic_pointer_cast<IEdgeColumn>(ctx.get(tag))) {
422+
: graph_(graph),
423+
col_(*std::dynamic_pointer_cast<IEdgeColumn>(ctx.get(tag))) {
423424
const auto& labels = col_.get_labels();
424425
vertex_label_num_ = graph.schema().vertex_label_num();
425426
edge_label_num_ = graph.schema().edge_label_num();
426-
prop_index_.resize(
427-
2 * vertex_label_num_ * vertex_label_num_ * edge_label_num_,
428-
std::numeric_limits<size_t>::max());
427+
prop_index_.resize(graph.schema().get_edge_triplet_num(),
428+
std::numeric_limits<size_t>::max());
429429
for (auto& label : labels) {
430-
size_t idx = label.src_label * vertex_label_num_ * edge_label_num_ +
431-
label.dst_label * edge_label_num_ + label.edge_label;
430+
size_t idx = graph.schema().get_edge_triplet_id(
431+
label.src_label, label.dst_label, label.edge_label);
432432
const auto& names = graph.schema().get_edge_property_names(
433433
label.src_label, label.dst_label, label.edge_label);
434434
for (size_t i = 0; i < names.size(); ++i) {
@@ -478,8 +478,8 @@ class MultiPropsEdgePropertyPathAccessor : public IAccessor {
478478
bool is_optional() const override { return col_.is_optional(); }
479479

480480
size_t get_index(const LabelTriplet& label) const {
481-
size_t idx = label.src_label * vertex_label_num_ * edge_label_num_ +
482-
label.dst_label * edge_label_num_ + label.edge_label;
481+
size_t idx = graph_.schema().get_edge_triplet_id(
482+
label.src_label, label.dst_label, label.edge_label);
483483
return prop_index_[idx];
484484
}
485485

@@ -495,6 +495,7 @@ class MultiPropsEdgePropertyPathAccessor : public IAccessor {
495495
}
496496

497497
private:
498+
const GraphInterface& graph_;
498499
const IEdgeColumn& col_;
499500
std::vector<size_t> prop_index_;
500501
size_t vertex_label_num_;
@@ -556,32 +557,23 @@ class MultiPropsEdgePropertyEdgeAccessor : public IAccessor {
556557
using elem_t = T;
557558

558559
MultiPropsEdgePropertyEdgeAccessor(const GraphInterface& graph,
559-
const std::string& name) {
560+
const std::string& name)
561+
: graph_(graph) {
560562
edge_label_num_ = graph.schema().edge_label_num();
561563
vertex_label_num_ = graph.schema().vertex_label_num();
562-
indexs.resize(2 * vertex_label_num_ * vertex_label_num_ * edge_label_num_,
564+
indexs.resize(graph.schema().get_edge_triplet_num(),
563565
std::numeric_limits<size_t>::max());
564-
for (label_t src_label = 0; src_label < vertex_label_num_; ++src_label) {
565-
auto src = graph.schema().get_vertex_label_name(src_label);
566-
for (label_t dst_label = 0; dst_label < vertex_label_num_; ++dst_label) {
567-
auto dst = graph.schema().get_vertex_label_name(dst_label);
568-
for (label_t edge_label = 0; edge_label < edge_label_num_;
569-
++edge_label) {
570-
auto edge = graph.schema().get_edge_label_name(edge_label);
571-
if (!graph.schema().exist(src, dst, edge)) {
572-
continue;
573-
}
574-
size_t idx = src_label * vertex_label_num_ * edge_label_num_ +
575-
dst_label * edge_label_num_ + edge_label;
576-
const std::vector<std::string>& names =
577-
graph.schema().get_edge_property_names(src_label, dst_label,
578-
edge_label);
579-
for (size_t i = 0; i < names.size(); ++i) {
580-
if (names[i] == name) {
581-
indexs[idx] = i;
582-
break;
583-
}
584-
}
566+
567+
for (size_t index = 0; index < graph.schema().get_edge_triplet_num();
568+
++index) {
569+
auto label = graph.schema().get_edge_triplet(index);
570+
const std::vector<std::string>& names =
571+
graph.schema().get_edge_property_names(
572+
std::get<0>(label), std::get<1>(label), std::get<2>(label));
573+
for (size_t i = 0; i < names.size(); ++i) {
574+
if (names[i] == name) {
575+
indexs[index] = i;
576+
break;
585577
}
586578
}
587579
}
@@ -613,12 +605,13 @@ class MultiPropsEdgePropertyEdgeAccessor : public IAccessor {
613605
}
614606

615607
size_t get_index(const LabelTriplet& label) const {
616-
size_t idx = label.src_label * vertex_label_num_ * edge_label_num_ +
617-
label.dst_label * edge_label_num_ + label.edge_label;
608+
size_t idx = graph_.schema().get_edge_triplet_id(
609+
label.src_label, label.dst_label, label.edge_label);
618610
return indexs[idx];
619611
}
620612

621613
private:
614+
const GraphInterface& graph_;
622615
std::vector<size_t> indexs;
623616
size_t vertex_label_num_;
624617
size_t edge_label_num_;

Diff for: flex/engines/graph_db/runtime/common/rt_any.cc

+4-3
Original file line numberDiff line numberDiff line change
@@ -826,12 +826,13 @@ void RTAny::sink(const GraphReadInterface& graph, int id,
826826
auto [label, src, dst, prop, dir] = this->as_edge();
827827
e->mutable_src_label()->set_id(label.src_label);
828828
e->mutable_dst_label()->set_id(label.dst_label);
829-
auto edge_label = generate_edge_label_id(label.src_label, label.dst_label,
830-
label.edge_label);
829+
830+
auto edge_triplet_id = graph.schema().get_edge_triplet_id(
831+
label.src_label, label.dst_label, label.edge_label);
831832
e->mutable_label()->set_id(label.edge_label);
832833
e->set_src_id(encode_unique_vertex_id(label.src_label, src));
833834
e->set_dst_id(encode_unique_vertex_id(label.dst_label, dst));
834-
e->set_id(encode_unique_edge_id(edge_label, src, dst));
835+
e->set_id(encode_unique_edge_id(edge_triplet_id, src, dst));
835836
auto& prop_names = graph.schema().get_edge_property_names(
836837
label.src_label, label.dst_label, label.edge_label);
837838
if (prop_names.size() == 1) {

Diff for: flex/engines/graph_db/runtime/common/types.cc

-27
Original file line numberDiff line numberDiff line change
@@ -29,33 +29,6 @@ std::pair<label_t, vid_t> decode_unique_vertex_id(uint64_t unique_id) {
2929
GlobalId::get_vid(unique_id)};
3030
}
3131

32-
uint32_t generate_edge_label_id(label_t src_label_id, label_t dst_label_id,
33-
label_t edge_label_id) {
34-
uint32_t unique_edge_label_id = src_label_id;
35-
static constexpr int num_bits = sizeof(label_t) * 8;
36-
static_assert(num_bits * 3 <= sizeof(uint32_t) * 8,
37-
"label_t is too large to be encoded in 32 bits");
38-
unique_edge_label_id = unique_edge_label_id << num_bits;
39-
unique_edge_label_id = unique_edge_label_id | dst_label_id;
40-
unique_edge_label_id = unique_edge_label_id << num_bits;
41-
unique_edge_label_id = unique_edge_label_id | edge_label_id;
42-
return unique_edge_label_id;
43-
}
44-
45-
std::tuple<label_t, label_t, label_t> decode_edge_label_id(
46-
uint32_t edge_label_id) {
47-
static constexpr int num_bits = sizeof(label_t) * 8;
48-
static_assert(num_bits * 3 <= sizeof(uint32_t) * 8,
49-
"label_t is too large to be encoded in 32 bits");
50-
auto mask = (1 << num_bits) - 1;
51-
label_t edge_label = edge_label_id & mask;
52-
edge_label_id = edge_label_id >> num_bits;
53-
label_t dst_label = edge_label_id & mask;
54-
edge_label_id = edge_label_id >> num_bits;
55-
label_t src_label = edge_label_id & mask;
56-
return std::make_tuple(src_label, dst_label, edge_label);
57-
}
58-
5932
int64_t encode_unique_edge_id(uint32_t label_id, vid_t src, vid_t dst) {
6033
// We assume label_id is only used by 24 bits.
6134
int64_t unique_edge_id = label_id;

0 commit comments

Comments
 (0)