Skip to content

Commit 53d5a1a

Browse files
committed
feat: Allow for pubsub connections to optionally be taken from pool
- After use, they are removed from the pool - Only applicable to non-cluster client
1 parent 531f068 commit 53d5a1a

File tree

4 files changed

+144
-6
lines changed

4 files changed

+144
-6
lines changed

options.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,11 @@ type Options struct {
147147

148148
// Add suffix to client name. Default is empty.
149149
IdentitySuffix string
150+
151+
// Use connections from pool instead of creating new ones. Note that after use these connections will not be
152+
// returned to the pool. Useful for managing the total Redis connection limit for a mix of Pubsub & other commands.
153+
// Applies only to non-cluster client. Default is false.
154+
PubsubFromPool bool
150155
}
151156

152157
func (opt *Options) init() {

pubsub.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ import (
1212
"github.com/redis/go-redis/v9/internal/proto"
1313
)
1414

15+
type PubsubNewConnFunc func(ctx context.Context, channels []string) (*pool.Conn, error)
16+
type PubsubCloseConnFunc func(*pool.Conn) error
17+
1518
// PubSub implements Pub/Sub commands as described in
1619
// http://redis.io/topics/pubsub. Message receiving is NOT safe
1720
// for concurrent use by multiple goroutines.
@@ -21,8 +24,8 @@ import (
2124
type PubSub struct {
2225
opt *Options
2326

24-
newConn func(ctx context.Context, channels []string) (*pool.Conn, error)
25-
closeConn func(*pool.Conn) error
27+
newConn PubsubNewConnFunc
28+
closeConn PubsubCloseConnFunc
2629

2730
mu sync.Mutex
2831
cn *pool.Conn

pubsub_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,4 +567,104 @@ var _ = Describe("PubSub", func() {
567567
Expect(msg.Channel).To(Equal("mychannel"))
568568
Expect(msg.Payload).To(Equal(text))
569569
})
570+
571+
It("should not use connections from pool", func() {
572+
statsBefore := client.PoolStats()
573+
574+
pubsub := client.Subscribe(ctx, "mychannel")
575+
defer pubsub.Close()
576+
577+
stats := client.PoolStats()
578+
// A connection has been created
579+
Expect(stats.TotalConns - statsBefore.TotalConns).To(Equal(uint32(1)))
580+
// But it's not taken from the pool
581+
poolFetchesBefore := statsBefore.Hits + statsBefore.Misses
582+
poolFetchesAfter := stats.Hits + stats.Misses
583+
Expect(poolFetchesAfter - poolFetchesBefore).To(Equal(uint32(0)))
584+
585+
pubsub.Close()
586+
587+
stats = client.PoolStats()
588+
// The connection no longer exists
589+
Expect(stats.TotalConns - statsBefore.TotalConns).To(Equal(uint32(0)))
590+
Expect(stats.IdleConns - statsBefore.IdleConns).To(Equal(uint32(0)))
591+
})
592+
})
593+
594+
var _ = Describe("PubSub with PubsubFromPool set", func() {
595+
var client *redis.Client
596+
597+
BeforeEach(func() {
598+
opt := redisOptions()
599+
opt.MinIdleConns = 0
600+
opt.ConnMaxLifetime = 0
601+
opt.PubsubFromPool = true
602+
// zero value ends up using default so set small instead
603+
opt.PoolTimeout = time.Microsecond
604+
client = redis.NewClient(opt)
605+
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
606+
})
607+
608+
AfterEach(func() {
609+
Expect(client.Close()).NotTo(HaveOccurred())
610+
})
611+
612+
It("should use connection from pool", func() {
613+
statsBefore := client.PoolStats()
614+
615+
pubsub := client.Subscribe(ctx, "mychannel")
616+
defer pubsub.Close()
617+
618+
stats := client.PoolStats()
619+
// A connection has been taken from the pool
620+
Expect(stats.Hits - statsBefore.Hits).To(Equal(uint32(1)))
621+
statsDuring := client.PoolStats()
622+
623+
pubsub.Close()
624+
625+
stats = client.PoolStats()
626+
// It's not returned to the idle pool ..
627+
Expect(statsDuring.IdleConns - stats.IdleConns).To(Equal(uint32(0)))
628+
// .. and has been terminated
629+
Expect(statsDuring.TotalConns - stats.TotalConns).To(Equal(uint32(1)))
630+
})
631+
632+
It("should respect pool size limit", func() {
633+
poolSize := client.Options().PoolSize
634+
statsBefore := client.PoolStats()
635+
636+
var pubsubs []*redis.PubSub
637+
for i := 0; i < poolSize; i++ {
638+
pubsub := client.Subscribe(ctx, "mychannel")
639+
defer pubsub.Close()
640+
pubsubs = append(pubsubs, pubsub)
641+
}
642+
643+
statsDuring := client.PoolStats()
644+
poolFetchesBefore := statsBefore.Hits + statsBefore.Misses
645+
poolFetchesAfter := statsDuring.Hits + statsDuring.Misses
646+
647+
// A total of poolSize connections should have been taken from the pool (new or existing)
648+
Expect(poolFetchesAfter - poolFetchesBefore).To(Equal(uint32(poolSize)))
649+
650+
// The next pubsub connection should fail to connect (waiting for pool)
651+
extraPubsub := client.Subscribe(ctx, "mychannel")
652+
defer extraPubsub.Close()
653+
Expect(client.PoolStats().Timeouts - statsDuring.Timeouts).To(Equal(uint32(1)))
654+
655+
// As should retries
656+
err := extraPubsub.Ping(ctx)
657+
Expect(err).To(MatchError(ContainSubstring("connection pool timeout")))
658+
Expect(client.PoolStats().Timeouts - statsDuring.Timeouts).To(Equal(uint32(2)))
659+
660+
for _, pubsub := range pubsubs {
661+
pubsub.Close()
662+
}
663+
664+
stats := client.PoolStats()
665+
// Connections are not returned to the idle pool ..
666+
Expect(statsDuring.IdleConns - stats.IdleConns).To(Equal(uint32(0)))
667+
// .. and have been terminated
668+
Expect(statsDuring.TotalConns - stats.TotalConns).To(Equal(uint32(poolSize)))
669+
})
570670
})

redis.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,9 @@ type baseClient struct {
199199
opt *Options
200200
connPool pool.Pooler
201201

202+
pubsubNewConn PubsubNewConnFunc
203+
pubsubCloseConn PubsubCloseConnFunc
204+
202205
onClose func() error // hook called when client is closed
203206
}
204207

@@ -368,6 +371,13 @@ func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error)
368371
}
369372
}
370373

374+
func (c *baseClient) removeConn(ctx context.Context, cn *pool.Conn, err error) {
375+
if c.opt.Limiter != nil {
376+
c.opt.Limiter.ReportResult(err)
377+
}
378+
c.connPool.Remove(ctx, cn, err)
379+
}
380+
371381
func (c *baseClient) withConn(
372382
ctx context.Context, fn func(context.Context, *pool.Conn) error,
373383
) error {
@@ -649,6 +659,28 @@ func (c *Client) init() {
649659
pipeline: c.baseClient.processPipeline,
650660
txPipeline: c.baseClient.processTxPipeline,
651661
})
662+
663+
if c.opt.PubsubFromPool {
664+
// Take connections from pool and remove them from pool afterwards. (Pubsub & other connections are managed
665+
// together.)
666+
c.pubsubNewConn = func(ctx context.Context, channels []string) (*pool.Conn, error) {
667+
return c.getConn(ctx)
668+
}
669+
c.pubsubCloseConn = func(conn *pool.Conn) error {
670+
c.removeConn(context.TODO(), conn, nil)
671+
return nil
672+
}
673+
} else {
674+
// Make brand new connection from pool and close it afterwards. (Pubsub & other connections are managed
675+
// independently other than that pubsub connection can no longer be created once the pool is full.)
676+
c.pubsubNewConn = func(ctx context.Context, channels []string) (*pool.Conn, error) {
677+
return c.newConn(ctx)
678+
}
679+
// wrapping in closure since pool has not been initialised yet
680+
c.pubsubCloseConn = func(conn *pool.Conn) error {
681+
return c.connPool.CloseConn(conn)
682+
}
683+
}
652684
}
653685

654686
func (c *Client) WithTimeout(timeout time.Duration) *Client {
@@ -720,10 +752,8 @@ func (c *Client) pubSub() *PubSub {
720752
pubsub := &PubSub{
721753
opt: c.opt,
722754

723-
newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) {
724-
return c.newConn(ctx)
725-
},
726-
closeConn: c.connPool.CloseConn,
755+
newConn: c.pubsubNewConn,
756+
closeConn: c.pubsubCloseConn,
727757
}
728758
pubsub.init()
729759
return pubsub

0 commit comments

Comments
 (0)