@@ -18,7 +18,8 @@ import (
18
18
"time"
19
19
)
20
20
21
- func benchmarkRoutine (radixClient Client , ruedisClient rueidis.Client , useRuedis , useCSC , enableMultiExec bool , datapointsChan chan datapoint , continueOnError bool , cmdS [][]string , commandsCDF []float32 , keyspacelen , datasize , number_samples uint64 , loop bool , debug_level int , wg * sync.WaitGroup , keyplace , dataplace []int , readOnly []bool , useLimiter bool , rateLimiter * rate.Limiter , waitReplicas , waitReplicasMs int , cscDuration time.Duration ) {
21
+ func benchmarkRoutine (radixClient Client , ruedisClient rueidis.Client , useRuedis , useCSC , enableMultiExec bool , datapointsChan chan datapoint , continueOnError bool , cmdS [][]string , commandsCDF []float32 , keyspacelen , datasize , number_samples uint64 , loop bool , debug_level int , wg * sync.WaitGroup , keyplace , dataplace []int , readOnly []bool , useLimiter bool , rateLimiter * rate.Limiter , waitReplicas , waitReplicasMs int , cacheOptions * rueidis.CacheOptions ) {
22
+
22
23
defer wg .Done ()
23
24
for i := 0 ; uint64 (i ) < number_samples || loop ; i ++ {
24
25
cmdPos := sample (commandsCDF )
@@ -32,15 +33,15 @@ func benchmarkRoutine(radixClient Client, ruedisClient rueidis.Client, useRuedis
32
33
time .Sleep (r .Delay ())
33
34
}
34
35
if useRuedis {
35
- sendCmdLogicRuedis (ruedisClient , newCmdS , enableMultiExec , datapointsChan , continueOnError , debug_level , useCSC , isReadOnly , cscDuration , waitReplicas , waitReplicasMs )
36
+ sendCmdLogicRuedis (ruedisClient , newCmdS , enableMultiExec , datapointsChan , continueOnError , debug_level , useCSC , isReadOnly , cacheOptions , waitReplicas , waitReplicasMs )
36
37
} else {
37
38
sendCmdLogicRadix (radixClient , newCmdS , enableMultiExec , key , datapointsChan , continueOnError , debug_level , waitReplicas , waitReplicasMs )
38
39
39
40
}
40
41
}
41
42
}
42
43
43
- func sendCmdLogicRuedis (ruedisClient rueidis.Client , newCmdS []string , enableMultiExec bool , datapointsChan chan datapoint , continueOnError bool , debug_level int , useCSC , isReadOnly bool , cscDuration time. Duration , waitReplicas , waitReplicasMs int ) {
44
+ func sendCmdLogicRuedis (ruedisClient rueidis.Client , newCmdS []string , enableMultiExec bool , datapointsChan chan datapoint , continueOnError bool , debug_level int , useCSC , isReadOnly bool , cacheOptions * rueidis. CacheOptions , waitReplicas , waitReplicasMs int ) {
44
45
ctx := context .Background ()
45
46
var startT time.Time
46
47
var endT time.Time
@@ -55,9 +56,15 @@ func sendCmdLogicRuedis(ruedisClient rueidis.Client, newCmdS []string, enableMul
55
56
}
56
57
}
57
58
if useCSC && isReadOnly {
58
- startT = time .Now ()
59
- redisResult = ruedisClient .DoCache (ctx , arbitrary .Cache (), cscDuration )
60
- endT = time .Now ()
59
+ if cacheOptions .UseMultiExec && cacheOptions .UseServerPTTL {
60
+ startT = time .Now ()
61
+ redisResult = ruedisClient .DoCache (ctx , arbitrary .Cache (), cacheOptions .ClientTTL )
62
+ endT = time .Now ()
63
+ } else {
64
+ startT = time .Now ()
65
+ redisResult = ruedisClient .DoCacheWithOptions (ctx , arbitrary .Cache (), * cacheOptions )
66
+ endT = time .Now ()
67
+ }
61
68
} else if enableMultiExec {
62
69
cmds := make (rueidis.Commands , 0 , 3 )
63
70
cmds = append (cmds , ruedisClient .B ().Multi ().Build ())
@@ -190,8 +197,13 @@ func main() {
190
197
betweenClientsDelay := flag .Duration ("between-clients-duration" , time .Millisecond * 0 , "Between each client creation, wait this time." )
191
198
version := flag .Bool ("v" , false , "Output version and exit" )
192
199
verbose := flag .Bool ("verbose" , false , "Output verbose info" )
193
- cscEnabled := flag .Bool ("csc" , false , "Enable client side caching" )
194
200
useRuedis := flag .Bool ("rueidis" , false , "Use rueidis as the vanilla underlying client." )
201
+ cscEnabled := flag .Bool ("csc" , false , "Enable client side caching" )
202
+ // TODO: add this feature to check locking overhead
203
+ // cscDisableTrackInvalidations := flag.Bool("csc-disable-track-invalidations", false, "Disable CSC tracking")
204
+ cscUseMultiExec := flag .Bool ("csc-use-multi-exec" , false , "Use CSC wrapped in MULTI/EXEC" )
205
+ // TODO: add this feature
206
+ // cscUseServerPTTL := flag.Bool("csc-use-server-pttl", false, "Use CSC wrapped in with PTTL expiration info")
195
207
cscDuration := flag .Duration ("csc-ttl" , time .Minute , "Client side cache ttl for cached entries" )
196
208
clientKeepAlive := flag .Duration ("client-keepalive" , time .Minute , "Client keepalive" )
197
209
cscSizeBytes := flag .Int ("csc-per-client-bytes" , rueidis .DefaultCacheBytes , "client side cache size that bind to each TCP connection to a single redis instance" )
@@ -335,27 +347,30 @@ func main() {
335
347
AlwaysRESP2 : alwaysRESP2 ,
336
348
DisableCache : ! * cscEnabled ,
337
349
BlockingPoolSize : 0 ,
338
- PipelineMultiplex : 0 ,
350
+ PipelineMultiplex : - 1 ,
339
351
RingScaleEachConn : 1 ,
340
352
ReadBufferEachConn : 1024 ,
341
353
WriteBufferEachConn : 1024 ,
342
354
CacheSizeEachConn : * cscSizeBytes ,
343
355
OnInvalidations : invalidationFunction ,
356
+ ForceSingleClient : ! * clusterMode ,
344
357
}
345
358
clientOptions .Dialer .KeepAlive = * clientKeepAlive
346
359
ruedisClient , err = rueidis .NewClient (clientOptions )
360
+ cacheOptions := rueidis.CacheOptions {UseMultiExec : * cscUseMultiExec , UseServerPTTL : * cscUseMultiExec , ClientTTL : * cscDuration }
361
+
347
362
if err != nil {
348
363
panic (err )
349
364
}
350
- go benchmarkRoutine (radixStandalone , ruedisClient , * useRuedis , * cscEnabled , * multi , datapointsChan , * continueonerror , cmds , cdf , * keyspacelen , * datasize , samplesPerClient , * loop , int (* debug ), & wg , cmdKeyplaceHolderPos , cmdDataplaceHolderPos , cmdReadOnly , useRateLimiter , rateLimiter , * waitReplicas , * waitReplicasMs , * cscDuration )
365
+ go benchmarkRoutine (radixStandalone , ruedisClient , * useRuedis , * cscEnabled , * multi , datapointsChan , * continueonerror , cmds , cdf , * keyspacelen , * datasize , samplesPerClient , * loop , int (* debug ), & wg , cmdKeyplaceHolderPos , cmdDataplaceHolderPos , cmdReadOnly , useRateLimiter , rateLimiter , * waitReplicas , * waitReplicasMs , & cacheOptions )
351
366
} else {
352
367
// legacy radix code
353
368
if * clusterMode {
354
369
cluster = getOSSClusterConn (connectionStr , opts , 1 )
355
- go benchmarkRoutine (cluster , ruedisClient , * useRuedis , * cscEnabled , * multi , datapointsChan , * continueonerror , cmds , cdf , * keyspacelen , * datasize , samplesPerClient , * loop , int (* debug ), & wg , cmdKeyplaceHolderPos , cmdDataplaceHolderPos , cmdReadOnly , useRateLimiter , rateLimiter , * waitReplicas , * waitReplicasMs , * cscDuration )
370
+ go benchmarkRoutine (cluster , ruedisClient , * useRuedis , * cscEnabled , * multi , datapointsChan , * continueonerror , cmds , cdf , * keyspacelen , * datasize , samplesPerClient , * loop , int (* debug ), & wg , cmdKeyplaceHolderPos , cmdDataplaceHolderPos , cmdReadOnly , useRateLimiter , rateLimiter , * waitReplicas , * waitReplicasMs , nil )
356
371
} else {
357
372
radixStandalone = getStandaloneConn (connectionStr , opts , 1 )
358
- go benchmarkRoutine (radixStandalone , ruedisClient , * useRuedis , * cscEnabled , * multi , datapointsChan , * continueonerror , cmds , cdf , * keyspacelen , * datasize , samplesPerClient , * loop , int (* debug ), & wg , cmdKeyplaceHolderPos , cmdDataplaceHolderPos , cmdReadOnly , useRateLimiter , rateLimiter , * waitReplicas , * waitReplicasMs , * cscDuration )
373
+ go benchmarkRoutine (radixStandalone , ruedisClient , * useRuedis , * cscEnabled , * multi , datapointsChan , * continueonerror , cmds , cdf , * keyspacelen , * datasize , samplesPerClient , * loop , int (* debug ), & wg , cmdKeyplaceHolderPos , cmdDataplaceHolderPos , cmdReadOnly , useRateLimiter , rateLimiter , * waitReplicas , * waitReplicasMs , nil )
359
374
}
360
375
}
361
376
0 commit comments