Skip to content

Commit 42f8d17

Browse files
committed
init
0 parents  commit 42f8d17

File tree

11 files changed

+733
-0
lines changed

11 files changed

+733
-0
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
2+
.DS_Store

README.md

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
## Time-series upsert performance & compression
2+
3+
A small check to benchmark the performance of upsert queries for time-series data in:
4+
5+
- mongodb
6+
- postgresql
7+
- postgresql with timescaledb extension
8+
9+
**NOTE** that this was written in a hurry and the end results should be taken with a grain of salt.
10+
11+
### Setup
12+
13+
- mongodb on localhost:27017, no auth
14+
- postgresql on localhost:5432, user: postgres, password: postgres, db: test
15+
- install timescaledb extension on postgresql
16+
17+
### Result
18+
19+
```
20+
go test -benchmem -run=^$ -bench ^BenchmarkDbs$ ts-benchmark
21+
22+
BenchmarkDbs/MongoDB-compound-single-upsert-10 1 1394460875 ns/op 76327976 B/op 1140562 allocs/op
23+
BenchmarkDbs/Postgres-native-single-10 1 1376155583 ns/op 2488368 B/op 90038 allocs/op
24+
BenchmarkDbs/Postgres-timescale-single_uspert-10 1 1849769834 ns/op 2480400 B/op 89993 allocs/op
25+
BenchmarkDbs/Postgres-timescale-batch_upsert-10 3 412889889 ns/op 11025413 B/op 90033 allocs/op
26+
27+
Table size summary for 10000 records:
28+
------------------------
29+
MongoDB: 1308 kb
30+
Postgres (native): 2168 kb
31+
Postgres (timescale): 24 kb
32+
```

benchmark_test.go

+144
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"testing"
8+
9+
"ts-benchmark/dbs/data"
10+
"ts-benchmark/dbs/mongodb"
11+
"ts-benchmark/dbs/postgres"
12+
13+
"go.mongodb.org/mongo-driver/bson"
14+
)
15+
16+
const (
17+
TABLE = "timeseries"
18+
)
19+
20+
func BenchmarkDbs(b *testing.B) {
21+
22+
pgdb, err := postgres.NewConn("localhost", 5432, "postgres", "postgres", "test")
23+
if err != nil {
24+
panic(err)
25+
}
26+
defer pgdb.Close(context.Background())
27+
28+
const NUM_RECORDS = 10000
29+
30+
// Generate fake data which is reused for both MongoDB and Postgres
31+
fake := data.CreateFakeData(NUM_RECORDS)
32+
33+
var kbMongo, kbPostgresNative, kbPostgresTimescale int32
34+
35+
b.Run("MongoDB-compound-single-upsert", func(b *testing.B) {
36+
db, err := mongodb.NewConn("localhost", 27017, "", "")
37+
if err != nil {
38+
b.Fatalf("Failed to connect to MongoDB: %v", err)
39+
}
40+
defer db.Disconnect(context.Background())
41+
42+
coll := db.Database("test").Collection(TABLE)
43+
if err := mongodb.Setup(coll); err != nil {
44+
b.Fatalf("Failed to setup MongoDB: %v", err)
45+
}
46+
47+
b.ResetTimer()
48+
for i := 0; i < b.N; i++ {
49+
t, err := mongodb.Upsert(coll, fake)
50+
if err != nil {
51+
b.Fatalf("Failed to Upsert in MongoDB: %v", err)
52+
}
53+
_ = t
54+
}
55+
b.StopTimer()
56+
57+
// Get the statistics of the collection
58+
var stats bson.M
59+
command := bson.D{{Key: "collStats", Value: "timeseries"}}
60+
err = db.Database("test").RunCommand(context.TODO(), command).Decode(&stats)
61+
if err != nil {
62+
log.Fatal(err)
63+
}
64+
65+
bytes, ok := stats["totalSize"].(int32)
66+
if !ok {
67+
log.Fatal("Failed to convert to int32")
68+
}
69+
70+
kbMongo = bytes / 1024
71+
})
72+
73+
b.Run("Postgres-native-single", func(b *testing.B) {
74+
75+
if err := postgres.SetupVanilla(pgdb); err != nil {
76+
b.Fatalf("Failed to setup Postgres: %v", err)
77+
}
78+
79+
b.ResetTimer()
80+
for i := 0; i < b.N; i++ {
81+
t, err := postgres.UpsertVanilla(pgdb, fake)
82+
if err != nil {
83+
b.Fatalf("Failed to Upsert in Postgres: %v", err)
84+
}
85+
_ = t
86+
}
87+
b.StopTimer()
88+
89+
tableSize, err := postgres.GetTableSize(pgdb, "timeseries")
90+
if err != nil {
91+
log.Fatal(err)
92+
}
93+
94+
kbPostgresNative = int32(tableSize)
95+
})
96+
97+
b.Run("Postgres-timescale-single uspert", func(b *testing.B) {
98+
if err := postgres.SetupTimescaleDB(pgdb); err != nil {
99+
b.Fatalf("Failed to setup Postgres: %v", err)
100+
}
101+
102+
b.ResetTimer()
103+
for i := 0; i < b.N; i++ {
104+
t, err := postgres.UpsertTimescaleDB(pgdb, fake)
105+
if err != nil {
106+
b.Fatalf("Failed to Upsert in Postgres: %v", err)
107+
}
108+
109+
_ = t
110+
}
111+
b.StopTimer()
112+
113+
tableSize, err := postgres.GetTableSize(pgdb, "timeseries_timescale")
114+
if err != nil {
115+
log.Fatal(err)
116+
}
117+
kbPostgresTimescale = int32(tableSize)
118+
})
119+
120+
b.Run("Postgres-timescale-batch upsert", func(b *testing.B) {
121+
if err := postgres.SetupTimescaleDB(pgdb); err != nil {
122+
b.Fatalf("Failed to setup Postgres: %v", err)
123+
}
124+
125+
b.ResetTimer()
126+
for i := 0; i < b.N; i++ {
127+
t, err := postgres.UpsertBatchTimescaleDB(pgdb, fake)
128+
if err != nil {
129+
b.Fatalf("Failed to Upsert in Postgres: %v", err)
130+
}
131+
_ = t
132+
}
133+
b.StopTimer()
134+
135+
})
136+
137+
fmt.Printf(`
138+
Table size summary for %v records:
139+
------------------------
140+
MongoDB: %v kb
141+
Postgres (native): %v kb
142+
Postgres (timescale): %v kb
143+
`, NUM_RECORDS, kbMongo, kbPostgresNative, kbPostgresTimescale)
144+
}

dbs/data/main.go

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package data
2+
3+
import (
4+
"math"
5+
"time"
6+
)
7+
8+
type Timeseries struct {
9+
StartTime time.Time `json:"start_time" bson:"start_time"`
10+
Interval int `json:"interval" bson:"interval"`
11+
Area string `json:"area" bson:"area"`
12+
Value float64 `json:"value" bson:"value"`
13+
}
14+
15+
func CreateFakeData(numRows int) []Timeseries {
16+
var docs []Timeseries
17+
18+
for i := 0; i < numRows; i++ {
19+
// random value
20+
val := math.Round(math.Sin(float64(i)) * 100)
21+
22+
doc := Timeseries{
23+
StartTime: time.Now().Add(time.Duration(i) * time.Hour).Truncate(time.Minute),
24+
Interval: 36000000,
25+
Area: "area1",
26+
Value: val,
27+
}
28+
docs = append(docs, doc)
29+
}
30+
31+
return docs
32+
}

dbs/influxdb/main.go

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package influxdb
2+
3+
/*
4+
5+
brew install influxdb
6+
influxd
7+
8+
brew services start influxdb
9+
10+
*/
11+
12+
import (
13+
"context"
14+
"fmt"
15+
"time"
16+
"ts-benchmark/dbs/data"
17+
18+
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
19+
)
20+
21+
func NewConn(url, token string) influxdb2.Client {
22+
client := influxdb2.NewClient(url, token)
23+
return client
24+
}
25+
26+
func Setup(client influxdb2.Client, org, bucket string) error {
27+
deleteAPI := client.DeleteAPI()
28+
29+
// Define the time range for deletion (from the beginning of time to now)
30+
start := time.Unix(0, 0)
31+
end := time.Now().AddDate(2, 0, 0)
32+
33+
// Delete previous data
34+
err := deleteAPI.DeleteWithName(context.Background(), org, bucket, start, end, `_measurement="timeseries"`)
35+
if err != nil {
36+
return fmt.Errorf("failed to delete previous data: %v", err)
37+
}
38+
39+
return nil
40+
}
41+
42+
func Upsert(client influxdb2.Client, org, bucket string, docs []data.Timeseries) error {
43+
writeAPI := client.WriteAPIBlocking(org, bucket)
44+
45+
// Upsert new data
46+
for _, doc := range docs {
47+
p := influxdb2.NewPointWithMeasurement("timeseries").
48+
// AddTag("interval", fmt.Sprintf("%d", doc.Interval)).
49+
AddTag("area", doc.Area).
50+
AddField("value", doc.Value).
51+
SetTime(doc.StartTime)
52+
53+
err := writeAPI.WritePoint(context.Background(), p)
54+
if err != nil {
55+
return err
56+
}
57+
}
58+
59+
return nil
60+
}

dbs/mongodb/main.go

+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package mongodb
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
"ts-benchmark/dbs/data"
8+
9+
"go.mongodb.org/mongo-driver/bson"
10+
"go.mongodb.org/mongo-driver/mongo"
11+
"go.mongodb.org/mongo-driver/mongo/options"
12+
)
13+
14+
func NewConn(host string, port int, username, password string) (*mongo.Client, error) {
15+
opt := options.Client().
16+
SetMaxPoolSize(20). // Set the maximum number of connections in the connection pool
17+
SetMaxConnIdleTime(10 * time.Minute) // Close idle connections after the specified time
18+
19+
// If both the username and password exists, use it as the credentials. Else use the non-authenticated url.
20+
var url string
21+
if username != "" && password != "" {
22+
opt.SetAuth(options.Credential{Username: username, Password: password})
23+
url = fmt.Sprintf("mongodb://%s:%s@%s:%d", username, password, host, port)
24+
} else {
25+
url = fmt.Sprintf("mongodb://%s:%d", host, port)
26+
}
27+
28+
opt.ApplyURI(url)
29+
30+
conn, err := mongo.Connect(context.Background(), opt)
31+
if err != nil {
32+
return nil, err
33+
}
34+
35+
if err := conn.Ping(context.Background(), nil); err != nil {
36+
return nil, fmt.Errorf("failed to ping mongodb: %v", err)
37+
}
38+
39+
return conn, nil
40+
}
41+
42+
func Setup(coll *mongo.Collection) error {
43+
// Delete previous data
44+
if _, err := coll.DeleteMany(context.Background(), bson.M{}); err != nil {
45+
return err
46+
}
47+
48+
// Create index on the "start_time", "interval" and "area" fields
49+
indexModel := mongo.IndexModel{
50+
Keys: bson.D{
51+
{Key: "start_time", Value: -1},
52+
{Key: "interval", Value: -1},
53+
{Key: "area", Value: -1},
54+
},
55+
}
56+
57+
if _, err := coll.Indexes().CreateOne(context.Background(), indexModel); err != nil {
58+
return err
59+
}
60+
61+
return nil
62+
}
63+
64+
func Upsert(coll *mongo.Collection, docs []data.Timeseries) (time.Duration, error) {
65+
now := time.Now()
66+
67+
for _, doc := range docs {
68+
filter := map[string]interface{}{
69+
"start_time": doc.StartTime,
70+
"interval": doc.Interval,
71+
"area": doc.Area,
72+
}
73+
74+
update := map[string]interface{}{
75+
"$set": map[string]interface{}{
76+
"value": doc.Value,
77+
},
78+
}
79+
80+
_, err := coll.UpdateOne(context.Background(), filter, update, options.Update().SetUpsert(true))
81+
if err != nil {
82+
return 0, err
83+
}
84+
}
85+
86+
return time.Since(now), nil
87+
}

0 commit comments

Comments
 (0)