Skip to content

Commit 364bbef

Browse files
authored
[ADDED] Force reconnect (nats-io#1624)
Signed-off-by: Piotr Piotrowski <[email protected]>
1 parent 833574b commit 364bbef

File tree

6 files changed

+297
-52
lines changed

6 files changed

+297
-52
lines changed

example_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,19 @@ func ExampleConn_Subscribe() {
8989
})
9090
}
9191

92+
func ExampleConn_ForceReconnect() {
93+
nc, _ := nats.Connect(nats.DefaultURL)
94+
defer nc.Close()
95+
96+
nc.Subscribe("foo", func(m *nats.Msg) {
97+
fmt.Printf("Received a message: %s\n", string(m.Data))
98+
})
99+
100+
// Reconnect to the server.
101+
// the subscription will be recreated after the reconnect.
102+
nc.ForceReconnect()
103+
}
104+
92105
// This Example shows a synchronous subscriber.
93106
func ExampleConn_SubscribeSync() {
94107
nc, _ := nats.Connect(nats.DefaultURL)

nats.go

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2161,6 +2161,47 @@ func (nc *Conn) waitForExits() {
21612161
nc.wg.Wait()
21622162
}
21632163

2164+
// ForceReconnect forces a reconnect attempt to the server.
2165+
// This is a non-blocking call and will start the reconnect
2166+
// process without waiting for it to complete.
2167+
//
2168+
// If the connection is already in the process of reconnecting,
2169+
// this call will force an immediate reconnect attempt (bypassing
2170+
// the current reconnect delay).
2171+
func (nc *Conn) ForceReconnect() error {
2172+
nc.mu.Lock()
2173+
defer nc.mu.Unlock()
2174+
2175+
if nc.isClosed() {
2176+
return ErrConnectionClosed
2177+
}
2178+
if nc.isReconnecting() {
2179+
// if we're already reconnecting, force a reconnect attempt
2180+
// even if we're in the middle of a backoff
2181+
if nc.rqch != nil {
2182+
close(nc.rqch)
2183+
}
2184+
return nil
2185+
}
2186+
2187+
// Clear any queued pongs
2188+
nc.clearPendingFlushCalls()
2189+
2190+
// Clear any queued and blocking requests.
2191+
nc.clearPendingRequestCalls()
2192+
2193+
// Stop ping timer if set.
2194+
nc.stopPingTimer()
2195+
2196+
// Go ahead and make sure we have flushed the outbound
2197+
nc.bw.flush()
2198+
nc.conn.Close()
2199+
2200+
nc.changeConnStatus(RECONNECTING)
2201+
go nc.doReconnect(nil, true)
2202+
return nil
2203+
}
2204+
21642205
// ConnectedUrl reports the connected server's URL
21652206
func (nc *Conn) ConnectedUrl() string {
21662207
if nc == nil {
@@ -2420,7 +2461,7 @@ func (nc *Conn) connect() (bool, error) {
24202461
nc.setup()
24212462
nc.changeConnStatus(RECONNECTING)
24222463
nc.bw.switchToPending()
2423-
go nc.doReconnect(ErrNoServers)
2464+
go nc.doReconnect(ErrNoServers, false)
24242465
err = nil
24252466
} else {
24262467
nc.current = nil
@@ -2720,7 +2761,7 @@ func (nc *Conn) stopPingTimer() {
27202761

27212762
// Try to reconnect using the option parameters.
27222763
// This function assumes we are allowed to reconnect.
2723-
func (nc *Conn) doReconnect(err error) {
2764+
func (nc *Conn) doReconnect(err error, forceReconnect bool) {
27242765
// We want to make sure we have the other watchers shutdown properly
27252766
// here before we proceed past this point.
27262767
nc.waitForExits()
@@ -2776,7 +2817,8 @@ func (nc *Conn) doReconnect(err error) {
27762817
break
27772818
}
27782819

2779-
doSleep := i+1 >= len(nc.srvPool)
2820+
doSleep := i+1 >= len(nc.srvPool) && !forceReconnect
2821+
forceReconnect = false
27802822
nc.mu.Unlock()
27812823

27822824
if !doSleep {
@@ -2803,6 +2845,12 @@ func (nc *Conn) doReconnect(err error) {
28032845
select {
28042846
case <-rqch:
28052847
rt.Stop()
2848+
2849+
// we need to reset the rqch channel to avoid
2850+
// closing a closed channel in the next iteration
2851+
nc.mu.Lock()
2852+
nc.rqch = make(chan struct{})
2853+
nc.mu.Unlock()
28062854
case <-rt.C:
28072855
}
28082856
}
@@ -2872,9 +2920,6 @@ func (nc *Conn) doReconnect(err error) {
28722920
// Done with the pending buffer
28732921
nc.bw.doneWithPending()
28742922

2875-
// This is where we are truly connected.
2876-
nc.status = CONNECTED
2877-
28782923
// Queue up the correct callback. If we are in initial connect state
28792924
// (using retry on failed connect), we will call the ConnectedCB,
28802925
// otherwise the ReconnectedCB.
@@ -2930,7 +2975,7 @@ func (nc *Conn) processOpErr(err error) {
29302975
// Clear any queued pongs, e.g. pending flush calls.
29312976
nc.clearPendingFlushCalls()
29322977

2933-
go nc.doReconnect(err)
2978+
go nc.doReconnect(err, false)
29342979
nc.mu.Unlock()
29352980
return
29362981
}

test/conn_test.go

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2946,16 +2946,6 @@ func TestRetryOnFailedConnectWithTLSError(t *testing.T) {
29462946
}
29472947

29482948
func TestConnStatusChangedEvents(t *testing.T) {
2949-
waitForStatus := func(t *testing.T, ch chan nats.Status, expected nats.Status) {
2950-
select {
2951-
case s := <-ch:
2952-
if s != expected {
2953-
t.Fatalf("Expected status: %s; got: %s", expected, s)
2954-
}
2955-
case <-time.After(5 * time.Second):
2956-
t.Fatalf("Timeout waiting for status %q", expected)
2957-
}
2958-
}
29592949
t.Run("default events", func(t *testing.T) {
29602950
s := RunDefaultServer()
29612951
nc, err := nats.Connect(s.ClientURL())
@@ -2978,15 +2968,15 @@ func TestConnStatusChangedEvents(t *testing.T) {
29782968
time.Sleep(50 * time.Millisecond)
29792969

29802970
s.Shutdown()
2981-
waitForStatus(t, newStatus, nats.RECONNECTING)
2971+
WaitOnChannel(t, newStatus, nats.RECONNECTING)
29822972

29832973
s = RunDefaultServer()
29842974
defer s.Shutdown()
29852975

2986-
waitForStatus(t, newStatus, nats.CONNECTED)
2976+
WaitOnChannel(t, newStatus, nats.CONNECTED)
29872977

29882978
nc.Close()
2989-
waitForStatus(t, newStatus, nats.CLOSED)
2979+
WaitOnChannel(t, newStatus, nats.CLOSED)
29902980

29912981
select {
29922982
case s := <-newStatus:
@@ -3019,7 +3009,7 @@ func TestConnStatusChangedEvents(t *testing.T) {
30193009
s = RunDefaultServer()
30203010
defer s.Shutdown()
30213011
nc.Close()
3022-
waitForStatus(t, newStatus, nats.CLOSED)
3012+
WaitOnChannel(t, newStatus, nats.CLOSED)
30233013

30243014
select {
30253015
case s := <-newStatus:

test/helper_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,18 @@ func WaitTime(ch chan bool, timeout time.Duration) error {
5454
return errors.New("timeout")
5555
}
5656

57+
func WaitOnChannel[T comparable](t *testing.T, ch <-chan T, expected T) {
58+
t.Helper()
59+
select {
60+
case s := <-ch:
61+
if s != expected {
62+
t.Fatalf("Expected result: %v; got: %v", expected, s)
63+
}
64+
case <-time.After(5 * time.Second):
65+
t.Fatalf("Timeout waiting for result %v", expected)
66+
}
67+
}
68+
5769
func stackFatalf(t tLogger, f string, args ...any) {
5870
lines := make([]string, 0, 32)
5971
msg := fmt.Sprintf(f, args...)

0 commit comments

Comments
 (0)