forked from gaolk/graph-database-benchmark
-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathsingleThreadEdgeImporter.java
121 lines (97 loc) · 3.76 KB
/
singleThreadEdgeImporter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/*
* Copyright (c) 2015-now, TigerGraph Inc.
* All rights reserved
* It is provided as it is for benchmark reproducible purpose.
* anyone can use it for benchmark purpose with the
* acknowledgement to TigerGraph.
* Author: Litong Shen [email protected]
*/
import java.util.*;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.janusgraph.core.schema.*;
import org.janusgraph.util.datastructures.CompactMap;
import org.janusgraph.core.util.*;
import org.janusgraph.core.*;
import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
import org.janusgraph.graphdb.database.management.*;
import org.janusgraph.graphdb.vertices.AbstractVertex;
import org.janusgraph.graphdb.internal.ElementLifeCycle;
import org.janusgraph.graphdb.vertices.StandardVertex;
import org.janusgraph.graphdb.transaction.StandardJanusGraphTx;
import java.io.FileWriter;
public class singleThreadEdgeImporter {
public static JanusGraph JanusG;
public static int commitBatch = 1;
private static HashMap<String, Long> idset1= new HashMap<String, Long>();
public static void main(String[] args){
String datasetDir = args[0];
String confPath = args[1];
commitBatch = Integer.parseInt(args[2]);
String hashmapDir = args[3];
BaseConfiguration config = new BaseConfiguration();
JanusG = JanusGraphFactory.open(confPath);
try {
// edge reader
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(datasetDir)));
String line;
// hashmap reader
BufferedReader hashmapReader = new BufferedReader(new InputStreamReader(new FileInputStream(hashmapDir)));
String line2 = hashmapReader.readLine();
//create vertex hashmap without new vertex
while((line2 = hashmapReader.readLine()) != null) {
String[] kvPairs = line2.split("\t");
Long internalId = Long.parseLong(kvPairs[1]);
idset1.put(kvPairs[0],internalId);
}
hashmapReader.close();
System.out.println("finished construct vertex hashmap");
//add edge
long lineCounter = 0;
long startTime = System.nanoTime();
// add edge with idset1(hashmap without create vertex)
JanusGraphTransaction tx = JanusG.newTransaction();
while((line = reader.readLine()) != null) {
try {
String[] parts = line.split("\t");
addEdge(parts[0], parts[1], tx);
lineCounter ++;
if(lineCounter % commitBatch == 0) {
System.out.println("---- commit ----: " + Long.toString(lineCounter / commitBatch));
tx.commit();
tx = JanusG.newTransaction();
}
} catch (Exception e) {
e.printStackTrace();
}
}
tx.commit();
long endTime = System.nanoTime();
long duration = (endTime - startTime);
System.out.println("######## loading time ####### " + Long.toString(duration/1000000) + " ms");
reader.close();
}
catch(IOException ioe) {
ioe.printStackTrace();
}
System.out.println("---- done ----");
System.exit(0);
}
/** this function add edge to graph
* @para srcId the sourceId(internal id) of source vertex
* @para dstId the dextionationId(internal id) of destination vertex
* @para tx the current JanusGraph transaction
*/
private static void addEdge(String srcId, String dstId, JanusGraphTransaction tx) {
Long srcInternalId = idset1.get(srcId);
Long dstInternalId = idset1.get(dstId);
JanusGraphVertex srcVertex, dstVertex;
srcVertex = tx.getVertex(srcInternalId);
dstVertex = tx.getVertex(dstInternalId);
srcVertex.addEdge("MyEdge", dstVertex);
}
}