Skip to content

Commit 8f728aa

Browse files
Ensure mutual exclusion on cluster nodes/slots update when starting benchmark (#16)
* Removed radix usage from --oss-cluster-api-distribute-subscribers. Using clusterClient.MasterForKey() in case of cluster API and SSUBSCRIBE * Fixd --pool_size option description * Ensure mutual exclusion on cluster nodes/slots update when starting benchmark * Remove spurious Address string info from addresses slice -- pure stetic
1 parent 9210538 commit 8f728aa

File tree

1 file changed

+35
-17
lines changed

1 file changed

+35
-17
lines changed

subscriber.go

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ const (
2929
)
3030

3131
var totalMessages uint64
32+
var clusterSlicesMu sync.Mutex
3233

3334
type testResult struct {
3435
StartTime int64 `json:"StartTime"`
@@ -126,9 +127,9 @@ func main() {
126127

127128
ctx := context.Background()
128129
nodeCount := 0
129-
poolSize := *poolSizePtr
130130
var nodesAddresses []string
131131
var nodeClients []*redis.Client
132+
poolSize := *poolSizePtr
132133
var clusterClient *redis.ClusterClient
133134
var standaloneClient *redis.Client
134135

@@ -161,22 +162,7 @@ func main() {
161162
if err != nil {
162163
log.Fatal(err)
163164
}
164-
log.Println("Getting cluster slots info.")
165-
slots, err := clusterClient.ClusterSlots(ctx).Result()
166-
if err != nil {
167-
log.Fatal(err)
168-
}
169-
nodeCount = len(slots)
170-
log.Println(fmt.Sprintf("Detailing cluster slots info. Total nodes: %d", nodeCount))
171-
for _, slotInfo := range slots {
172-
log.Println(fmt.Sprintf("\tSlot range start %d end %d. Nodes: %v", slotInfo.Start, slotInfo.End, slotInfo.Nodes))
173-
nodesAddresses = append(nodesAddresses, slotInfo.Nodes[0].Addr)
174-
}
175-
fn := func(ctx context.Context, client *redis.Client) (err error) {
176-
nodeClients = append(nodeClients, client)
177-
return
178-
}
179-
clusterClient.ForEachMaster(ctx, fn)
165+
nodeCount, nodeClients, nodesAddresses = updateSecondarySlicesCluster(clusterClient, ctx)
180166
} else {
181167
nodeCount = 1
182168
nodesAddresses = append(nodesAddresses, fmt.Sprintf("%s:%s", *host, *port))
@@ -214,6 +200,7 @@ func main() {
214200
if err != nil {
215201
log.Fatal(err)
216202
}
203+
nodeCount, nodeClients, nodesAddresses = updateSecondarySlicesCluster(clusterClient, ctx)
217204
}
218205
// trap Ctrl+C and call cancel on the context
219206
// We Use this instead of the previous stopChannel + chan radix.PubSubMessage
@@ -334,6 +321,37 @@ func main() {
334321
wg.Wait()
335322
}
336323

324+
func updateSecondarySlicesCluster(clusterClient *redis.ClusterClient, ctx context.Context) (int, []*redis.Client, []string) {
325+
var nodeCount = 0
326+
var nodesAddresses []string
327+
var nodeClients []*redis.Client
328+
log.Println("Getting cluster slots info.")
329+
slots, err := clusterClient.ClusterSlots(ctx).Result()
330+
if err != nil {
331+
log.Fatal(err)
332+
}
333+
334+
log.Println(fmt.Sprintf("Detailing cluster slots info. Total slot groups: %d", len(slots)))
335+
for _, slotInfo := range slots {
336+
log.Println(fmt.Sprintf("\tSlot range start %d end %d. Nodes: %v", slotInfo.Start, slotInfo.End, slotInfo.Nodes))
337+
}
338+
log.Println(fmt.Sprintf("Detailing cluster node info"))
339+
fn := func(ctx context.Context, client *redis.Client) (err error) {
340+
clusterSlicesMu.Lock()
341+
nodeClients = append(nodeClients, client)
342+
addr := client.Conn().String()
343+
addrS := strings.Split(addr, ":")
344+
finalAddr := fmt.Sprintf("%s:%s", addrS[0][len(" Redis<")-1:], addrS[1][:len(addrS[1])-3])
345+
log.Println(fmt.Sprintf("Cluster node pos #%d. Address: %s.", len(nodeClients), finalAddr))
346+
nodesAddresses = append(nodesAddresses, finalAddr)
347+
clusterSlicesMu.Unlock()
348+
return
349+
}
350+
clusterClient.ForEachMaster(ctx, fn)
351+
nodeCount = len(slots)
352+
return nodeCount, nodeClients, nodesAddresses
353+
}
354+
337355
func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabwriter.Writer, test_time int) (bool, time.Time, time.Duration, uint64, []float64) {
338356

339357
start := time.Now()

0 commit comments

Comments
 (0)