@@ -240,10 +240,7 @@ protected boolean hasAckToSend() {
240
240
* Override point, this returns the result of hasMessagesToSend.
241
241
*/
242
242
protected boolean hasSomethingToSend () {
243
- if (ackToSend >= 0 ) {
244
- return true ;
245
- }
246
- if (hasPingToSend ()) {
243
+ if (hasAckToSend () || hasPingToSend ()) {
247
244
return true ;
248
245
}
249
246
if (waitingForAcks ()) {
@@ -282,8 +279,9 @@ protected void onConnected() {
282
279
connected = true ;
283
280
lastTimeRecv = lastTimeSend = System .currentTimeMillis ();
284
281
readThread = new ReadThread (getConnection ().getLink ().getLinkName () + " Reader" );
285
- writeThread = new WriteThread (getConnection ().getLink ().getLinkName () + " Writer" );
286
282
readThread .start ();
283
+ Thread .yield ();
284
+ writeThread = new WriteThread (getConnection ().getLink ().getLinkName () + " Writer" );
287
285
writeThread .start ();
288
286
}
289
287
@@ -296,11 +294,6 @@ protected void onDisconnected() {
296
294
outgoingResponses .clear ();
297
295
outgoingMutex .notifyAll ();
298
296
}
299
- try {
300
- writeThread .join ();
301
- } catch (Exception x ) {
302
- debug (getPath (), x );
303
- }
304
297
try {
305
298
readThread .join ();
306
299
} catch (Exception x ) {
@@ -319,6 +312,13 @@ protected void onDisconnecting() {
319
312
}
320
313
connected = false ;
321
314
notifyOutgoing ();
315
+ try {
316
+ writeThread .join ();
317
+ } catch (Exception x ) {
318
+ debug (getPath (), x );
319
+ }
320
+ //Attempt to exit cleanly, try to get acks for sent messages.
321
+ waitForAcks (1000 );
322
322
}
323
323
324
324
protected void requeueOutgoingRequest (OutboundMessage arg ) {
@@ -385,6 +385,25 @@ private void verifyLastSend() throws IOException {
385
385
}
386
386
}
387
387
388
+ /* Try to exit cleanly, wait for all acks for sent messages. */
389
+ private void waitForAcks (long timeout ) {
390
+ long start = System .currentTimeMillis ();
391
+ synchronized (outgoingMutex ) {
392
+ while (getMissingAcks () > 0 ) {
393
+ try {
394
+ outgoingMutex .wait (500 );
395
+ } catch (InterruptedException x ) {
396
+ warn (getPath (), x );
397
+ }
398
+ if ((System .currentTimeMillis () - start ) > timeout ) {
399
+ debug (debug () ? String .format ("witForAcks timeout (%s / %s)" ,ackRcvd ,messageId )
400
+ : null );
401
+ break ;
402
+ }
403
+ }
404
+ }
405
+ }
406
+
388
407
///////////////////////////////////////////////////////////////////////////
389
408
// Inner Classes
390
409
///////////////////////////////////////////////////////////////////////////
@@ -400,6 +419,7 @@ private class ReadThread extends Thread {
400
419
}
401
420
402
421
public void run () {
422
+ debug ("Enter DSSession.ReadThread" );
403
423
DSLinkConnection conn = getConnection ();
404
424
try {
405
425
while (connected ) {
@@ -415,6 +435,7 @@ public void run() {
415
435
conn .connDown (DSException .makeMessage (x ));
416
436
}
417
437
}
438
+ debug ("Exit DSSession.ReadThread" );
418
439
}
419
440
}
420
441
@@ -430,6 +451,7 @@ private class WriteThread extends Thread {
430
451
431
452
public void run () {
432
453
DSLinkConnection conn = getConnection ();
454
+ debug ("Enter DSSession.WriteThread" );
433
455
try {
434
456
while (connected ) {
435
457
verifyLastRead ();
@@ -454,6 +476,7 @@ public void run() {
454
476
conn .connDown (DSException .makeMessage (x ));
455
477
}
456
478
}
479
+ debug ("Exit DSSession.WriteThread" );
457
480
}
458
481
}
459
482
0 commit comments