@@ -765,8 +765,8 @@ public boolean includeAllServers() {
765
765
@ Test
766
766
public void testForceReconnectQueueBehaviorCheck () throws Exception {
767
767
runInJsCluster ((nc0 , nc1 , nc2 ) -> {
768
- int pubCount = 1000000 ;
769
- int subscribeTime = 4000 ;
768
+ int pubCount = 100_000 ;
769
+ int subscribeTime = 5000 ;
770
770
int flushWait = 2500 ;
771
771
int port = nc0 .getServerInfo ().getPort ();
772
772
@@ -803,8 +803,11 @@ private static void _testForceReconnectQueueCheck(String subject, int pubCount,
803
803
froBuilder .forceClose ();
804
804
}
805
805
806
+ ReconnectQueueCheckConnectionListener listener = new ReconnectQueueCheckConnectionListener ();
807
+
806
808
Options options = Options .builder ()
807
809
.server (getNatsLocalhostUri (port ))
810
+ .connectionListener (listener )
808
811
.dataPortType (ForceReconnectQueueCheckDataPort .class .getCanonicalName ())
809
812
.build ();
810
813
@@ -815,6 +818,8 @@ private static void _testForceReconnectQueueCheck(String subject, int pubCount,
815
818
816
819
nc .forceReconnect (froBuilder .build ());
817
820
821
+ assertTrue (listener .latch .await (subscribeTime , TimeUnit .MILLISECONDS ));
822
+
818
823
long maxTime = subscribeTime ;
819
824
while (!subscriber .subscriberDone .get () && maxTime > 0 ) {
820
825
//noinspection BusyWait
@@ -834,6 +839,17 @@ private static void _testForceReconnectQueueCheck(String subject, int pubCount,
834
839
}
835
840
}
836
841
842
+ static class ReconnectQueueCheckConnectionListener implements ConnectionListener {
843
+ public CountDownLatch latch = new CountDownLatch (1 );
844
+
845
+ @ Override
846
+ public void connectionEvent (Connection conn , Events type ) {
847
+ if (type == Events .RECONNECTED ) {
848
+ latch .countDown ();
849
+ }
850
+ }
851
+ }
852
+
837
853
static class ReconnectQueueCheckSubscriber implements Runnable {
838
854
final AtomicBoolean subscriberDone ;
839
855
final String subject ;
0 commit comments