Skip to content

Commit 74d3b85

Browse files
Add RESP3 support ( --resp parameter ) (#19)
* move from radix v3 to v4 * Created Client Interface to avoid using radix.Client (which radix.Cluster now does not implement all methods) * fix resp 3 support --------- Co-authored-by: filipecosta90 <[email protected]>
1 parent 0e2ab41 commit 74d3b85

File tree

6 files changed

+75
-45
lines changed

6 files changed

+75
-45
lines changed

cluster_conn.go

+15-15
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,31 @@
11
package main
22

33
import (
4-
"github.com/mediocregopher/radix/v3"
4+
"context"
5+
"github.com/mediocregopher/radix/v4"
56
"log"
67
)
78

8-
func getOSSClusterConn(addr string, opts []radix.DialOpt, clients uint64) *radix.Cluster {
9-
var vanillaCluster *radix.Cluster
9+
func getOSSClusterConn(addr string, opts radix.Dialer, clients uint64) *radix.Cluster {
1010
var err error
11+
var vanillaCluster *radix.Cluster
12+
var size int = int(clients)
13+
ctx := context.Background()
14+
laddr := make([]string, 1)
15+
laddr[0] = addr
16+
poolConfig := radix.PoolConfig{}
17+
poolConfig.Dialer = opts
18+
poolConfig.Size = size
1119

12-
customConnFunc := func(network, addr string) (radix.Conn, error) {
13-
return radix.Dial(network, addr, opts...,
14-
)
15-
}
16-
17-
// this cluster will use the ClientFunc to create a pool to each node in the
18-
// cluster.
19-
poolFunc := func(network, addr string) (radix.Client, error) {
20-
return radix.NewPool(network, addr, int(clients), radix.PoolConnFunc(customConnFunc), radix.PoolPipelineWindow(0, 0))
21-
}
20+
clusterConfig := radix.ClusterConfig{}
21+
clusterConfig.PoolConfig = poolConfig
2222

23-
vanillaCluster, err = radix.NewCluster([]string{addr}, radix.ClusterPoolFunc(poolFunc))
23+
vanillaCluster, err = clusterConfig.New(ctx, laddr)
2424
if err != nil {
2525
log.Fatalf("Error preparing for benchmark, while creating new connection. error = %v", err)
2626
}
2727
// Issue CLUSTER SLOTS command
28-
err = vanillaCluster.Sync()
28+
err = vanillaCluster.Sync(ctx)
2929
if err != nil {
3030
log.Fatalf("Error preparing for benchmark, while issuing CLUSTER SLOTS. error = %v", err)
3131
}

common.go

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package main
2+
3+
import (
4+
"context"
5+
radix "github.com/mediocregopher/radix/v4"
6+
)
7+
8+
type Client interface {
9+
10+
// Do performs an Action on a Conn connected to the redis instance.
11+
Do(context.Context, radix.Action) error
12+
13+
// Once Close() is called all future method calls on the Client will return
14+
// an error
15+
Close() error
16+
}

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ go 1.14
55
require (
66
github.com/HdrHistogram/hdrhistogram-go v1.1.0
77
github.com/google/go-cmp v0.5.5 // indirect
8-
github.com/mediocregopher/radix/v3 v3.5.2
8+
github.com/mediocregopher/radix/v4 v4.1.2
99
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 // indirect
1010
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
1111
)

go.sum

+4-2
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
1919
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
2020
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
2121
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
22-
github.com/mediocregopher/radix/v3 v3.5.2 h1:A9u3G7n4+fWmDZ2ZDHtlK+cZl4q55T+7RjKjR0/MAdk=
23-
github.com/mediocregopher/radix/v3 v3.5.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
22+
github.com/mediocregopher/radix/v4 v4.1.2 h1:Pj7XnNK5WuzzFy63g98pnccainAePK+aZNQRvxSvj2I=
23+
github.com/mediocregopher/radix/v4 v4.1.2/go.mod h1:ajchozX/6ELmydxWeWM6xCFHVpZ4+67LXHOTOVR0nCE=
2424
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
2525
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
2626
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -29,6 +29,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
2929
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
3030
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
3131
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
32+
github.com/tilinna/clock v1.0.2 h1:6BO2tyAC9JbPExKH/z9zl44FLu1lImh3nDNKA0kgrkI=
33+
github.com/tilinna/clock v1.0.2/go.mod h1:ZsP7BcY7sEEz7ktc0IVy8Us6boDrK8VradlKRUGfOao=
3234
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
3335
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
3436
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=

redis-bechmark-go.go

+27-17
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package main
22

33
import (
4+
"context"
45
"flag"
56
"fmt"
67
hdrhistogram "github.com/HdrHistogram/hdrhistogram-go"
7-
"github.com/mediocregopher/radix/v3"
8+
"github.com/mediocregopher/radix/v4"
89
"golang.org/x/time/rate"
910
"log"
1011
"math"
@@ -37,15 +38,15 @@ func stringWithCharset(length int, charset string) string {
3738
return string(b)
3839
}
3940

40-
func ingestionRoutine(conn radix.Client, enableMultiExec bool, datapointsChan chan datapoint, continueOnError bool, cmdS []string, keyspacelen, datasize, number_samples uint64, loop bool, debug_level int, wg *sync.WaitGroup, keyplace, dataplace int, useLimiter bool, rateLimiter *rate.Limiter, waitReplicas, waitReplicasMs int) {
41+
func ingestionRoutine(conn Client, enableMultiExec bool, datapointsChan chan datapoint, continueOnError bool, cmdS []string, keyspacelen, datasize, number_samples uint64, loop bool, debug_level int, wg *sync.WaitGroup, keyplace, dataplace int, useLimiter bool, rateLimiter *rate.Limiter, waitReplicas, waitReplicasMs int) {
4142
defer wg.Done()
4243
for i := 0; uint64(i) < number_samples || loop; i++ {
4344
rawCurrentCmd, key, _ := keyBuildLogic(keyplace, dataplace, datasize, keyspacelen, cmdS)
4445
sendCmdLogic(conn, rawCurrentCmd, enableMultiExec, key, datapointsChan, continueOnError, debug_level, useLimiter, rateLimiter, waitReplicas, waitReplicasMs)
4546
}
4647
}
4748

48-
func keyBuildLogic(keyPos int, dataPos int, datasize, keyspacelen uint64, cmdS []string) (cmd radix.CmdAction, key string, keySlot uint16) {
49+
func keyBuildLogic(keyPos int, dataPos int, datasize, keyspacelen uint64, cmdS []string) (cmd radix.Action, key string, keySlot uint16) {
4950
newCmdS := cmdS
5051
if keyPos > -1 {
5152
newCmdS[keyPos] = fmt.Sprintf("%d", rand.Int63n(int64(keyspacelen)))
@@ -54,22 +55,23 @@ func keyBuildLogic(keyPos int, dataPos int, datasize, keyspacelen uint64, cmdS [
5455
newCmdS[dataPos] = stringWithCharset(int(datasize), charset)
5556
}
5657
rawCmd := radix.Cmd(nil, newCmdS[0], newCmdS[1:]...)
57-
5858
return rawCmd, key, radix.ClusterSlot([]byte(newCmdS[1]))
5959
}
6060

61-
func sendCmdLogic(conn radix.Client, cmd radix.CmdAction, enableMultiExec bool, key string, datapointsChan chan datapoint, continueOnError bool, debug_level int, useRateLimiter bool, rateLimiter *rate.Limiter, waitReplicas, waitReplicasMs int) {
61+
func sendCmdLogic(conn Client, cmd radix.Action, enableMultiExec bool, key string, datapointsChan chan datapoint, continueOnError bool, debug_level int, useRateLimiter bool, rateLimiter *rate.Limiter, waitReplicas, waitReplicasMs int) {
62+
ctx := context.Background()
63+
6264
if useRateLimiter {
6365
r := rateLimiter.ReserveN(time.Now(), int(1))
6466
time.Sleep(r.Delay())
6567
}
6668
var err error
6769
startT := time.Now()
6870
if enableMultiExec {
69-
err = conn.Do(radix.WithConn(key, func(c radix.Conn) error {
71+
err = conn.Do(ctx, radix.WithConn(key, func(ctx context.Context, c radix.Conn) error {
7072

7173
// Begin the transaction with a MULTI command
72-
if err := conn.Do(radix.Cmd(nil, "MULTI")); err != nil {
74+
if err := conn.Do(ctx, radix.Cmd(nil, "MULTI")); err != nil {
7375
log.Fatalf("Received an error while preparing for MULTI: %v, error: %v", cmd, err)
7476
}
7577

@@ -83,25 +85,28 @@ func sendCmdLogic(conn radix.Client, cmd radix.CmdAction, enableMultiExec bool,
8385
// The return from DISCARD doesn't matter. If it's an error then
8486
// it's a network error and the Conn will be closed by the
8587
// client.
86-
conn.Do(radix.Cmd(nil, "DISCARD"))
88+
conn.Do(ctx, radix.Cmd(nil, "DISCARD"))
8789
log.Fatalf("Received an error while in multi: %v, error: %v", cmd, err)
8890
}
8991
}()
9092

9193
// queue up the transaction's commands
92-
err = conn.Do(cmd)
94+
err = conn.Do(ctx, cmd)
9395

9496
// execute the transaction, capturing the result in a Tuple. We only
9597
// care about the first element (the result from GET), so we discard the
9698
// second by setting nil.
97-
return conn.Do(radix.Cmd(nil, "EXEC"))
99+
return conn.Do(ctx, radix.Cmd(nil, "EXEC"))
98100
}))
99101
} else if waitReplicas > 0 {
100-
// pipeline the command + wait
101-
err = conn.Do(radix.Pipeline(cmd,
102-
radix.Cmd(nil, "WAIT", fmt.Sprintf("%d", waitReplicas), fmt.Sprintf("%d", waitReplicasMs))))
102+
// Create a new pipeline for the WAIT command
103+
p := radix.NewPipeline()
104+
// Pass both cmd and waitCmd to the original pipeline
105+
p.Append(cmd)
106+
p.Append(radix.Cmd(nil, "WAIT", fmt.Sprintf("%d", waitReplicas), fmt.Sprintf("%d", waitReplicasMs)))
107+
err = conn.Do(ctx, p)
103108
} else {
104-
err = conn.Do(cmd)
109+
err = conn.Do(ctx, cmd)
105110
}
106111
endT := time.Now()
107112
if err != nil {
@@ -134,6 +139,7 @@ func main() {
134139
clusterMode := flag.Bool("oss-cluster", false, "Enable OSS cluster mode.")
135140
loop := flag.Bool("l", false, "Loop. Run the tests forever.")
136141
version := flag.Bool("v", false, "Output version and exit")
142+
resp := flag.Int("resp", 2, "redis command response protocol (2 - RESP 2, 3 - RESP 3)")
137143
flag.Parse()
138144
git_sha := toolGitSHA1()
139145
git_dirty_str := ""
@@ -173,9 +179,14 @@ func main() {
173179
samplesPerClient := *numberRequests / *clients
174180
client_update_tick := 1
175181
latencies = hdrhistogram.New(1, 90000000, 3)
176-
opts := make([]radix.DialOpt, 0)
182+
opts := radix.Dialer{}
177183
if *password != "" {
178-
opts = append(opts, radix.DialAuthPass(*password))
184+
opts.AuthPass = *password
185+
}
186+
if *resp == 2 {
187+
opts.Protocol = "2"
188+
} else if *resp == 3 {
189+
opts.Protocol = "3"
179190
}
180191
ips, _ := net.LookupIP(*host)
181192
fmt.Printf("IPs %v\n", ips)
@@ -191,7 +202,6 @@ func main() {
191202
fmt.Printf("Using random seed: %d\n", *seed)
192203
rand.Seed(*seed)
193204
var cluster *radix.Cluster
194-
195205
datapointsChan := make(chan datapoint, *numberRequests)
196206
for channel_id := 1; uint64(channel_id) <= *clients; channel_id++ {
197207
wg.Add(1)

standalone_conn.go

+12-10
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,24 @@
11
package main
22

33
import (
4-
"github.com/mediocregopher/radix/v3"
4+
"context"
5+
"github.com/mediocregopher/radix/v4"
56
"log"
67
)
78

8-
func getStandaloneConn(addr string, opts []radix.DialOpt, clients uint64) *radix.Pool {
9-
var pool *radix.Pool
9+
func getStandaloneConn(addr string, opts radix.Dialer, clients uint64) Client {
1010
var err error
11-
12-
customConnFunc := func(network, addr string) (radix.Conn, error) {
13-
return radix.Dial(network, addr, opts...,
14-
)
15-
}
11+
var size int = int(clients)
1612
network := "tcp"
17-
pool, err = radix.NewPool(network, addr, int(clients), radix.PoolConnFunc(customConnFunc), radix.PoolPipelineWindow(0, 0))
13+
ctx := context.Background()
14+
15+
poolConfig := radix.PoolConfig{}
16+
poolConfig.Dialer = opts
17+
poolConfig.Size = size
18+
19+
pool, err := poolConfig.New(ctx, network, addr)
1820
if err != nil {
19-
log.Fatalf("Error preparing for benchmark, while creating new connection. error = %v", err)
21+
log.Fatalf("Error preparing for benchmark, while creating new pool. error = %v", err)
2022
}
2123
return pool
2224
}

0 commit comments

Comments
 (0)