Skip to content

Commit 4314207

Browse files
author
Zhen
committed
Improved return to pool to be thread-safe
Fixed the bug where the connection is not return to pool or closed when it is invalid Fixed the bug where the connection is disposed but being put back to pool Fixed a race condition where the connection is put back to pool after the pool is cleared Added more tests to make sure that all state changes of a pooled connection related to pooling and disposing are thread-safe
1 parent fcadc2f commit 4314207

9 files changed

+258
-64
lines changed

driver/src/main/java/org/neo4j/driver/internal/InternalSession.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.internal;
2020

2121
import java.util.Map;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2223

2324
import org.neo4j.driver.internal.spi.Connection;
2425
import org.neo4j.driver.internal.spi.Logger;
@@ -52,7 +53,7 @@ public void run()
5253
};
5354

5455
private InternalTransaction currentTransaction;
55-
private boolean isOpen = true;
56+
private AtomicBoolean isOpen = new AtomicBoolean( true );
5657

5758
public InternalSession( Connection connection, Logger logger )
5859
{
@@ -100,19 +101,19 @@ public StatementResult run( Statement statement )
100101
@Override
101102
public boolean isOpen()
102103
{
103-
return isOpen;
104+
return isOpen.get();
104105
}
105106

106107
@Override
107108
public void close()
108109
{
109-
if( !isOpen )
110+
// Use atomic operation to protect from closing the connection twice (putting back to the pool twice).
111+
if( !isOpen.compareAndSet( true, false ) )
110112
{
111113
throw new ClientException( "This session has already been closed." );
112114
}
113115
else
114116
{
115-
isOpen = false;
116117
if ( currentTransaction != null )
117118
{
118119
try
@@ -171,7 +172,7 @@ private void ensureConnectionIsValidBeforeOpeningTransaction()
171172
@Override
172173
protected void finalize() throws Throwable
173174
{
174-
if( isOpen )
175+
if( isOpen.compareAndSet( true, false ) )
175176
{
176177
logger.error( "Neo4j Session object leaked, please ensure that your application calls the `close` " +
177178
"method on Sessions before disposing of the objects.", null );

driver/src/main/java/org/neo4j/driver/internal/connector/ConcurrencyGuardingConnection.java

+9-8
Original file line numberDiff line numberDiff line change
@@ -157,15 +157,16 @@ public void receiveOne()
157157

158158
@Override
159159
public void close()
160-
{try
161160
{
162-
markAsInUse();
163-
delegate.close();
164-
}
165-
finally
166-
{
167-
markAsAvailable();
168-
}
161+
try
162+
{
163+
markAsInUse();
164+
delegate.close();
165+
}
166+
finally
167+
{
168+
markAsAvailable();
169+
}
169170
}
170171

171172
@Override

driver/src/main/java/org/neo4j/driver/internal/pool/InternalConnectionPool.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,16 @@
4242
import static java.lang.String.format;
4343

4444
/**
45-
* A basic connection pool that optimizes for threads being long-lived, acquiring/releasing many connections.
46-
* It uses a global queue as a fallback pool, but tries to avoid coordination by storing connections in a ThreadLocal.
45+
* The pool is designed to buffer certain amount of free sessions into session pool. When closing a session, we first
46+
* try to return the session into the session pool, however if we failed to return it back, either because the pool
47+
* is full or the pool is being cleaned on driver.close, then we directly close the connection attached with the
48+
* session.
4749
*
48-
* Safety is achieved by tracking thread locals getting garbage collected, returning connections to the global pool
49-
* when this happens.
50+
* The session is NOT meat to be thread safe, each thread should have an independent session and close it (return to
51+
* pool) when the work with the session has been done.
5052
*
51-
* If threads are long-lived, this pool will achieve linearly scalable performance with overhead equivalent to a
52-
* hash-map lookup per acquire.
53-
*
54-
* If threads are short-lived, this pool is not ideal.
53+
* The driver is thread safe. Each thread could try to get a session from the pool and then return it to the pool
54+
* at the same time.
5555
*/
5656
public class InternalConnectionPool implements ConnectionPool
5757
{

driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnection.java

+29-17
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,26 @@
2626
import org.neo4j.driver.internal.util.Consumer;
2727
import org.neo4j.driver.v1.Value;
2828
import org.neo4j.driver.v1.exceptions.Neo4jException;
29-
29+
/**
30+
* The state of a pooledConnection from a pool point of view could be one of the following:
31+
* Created,
32+
* Available,
33+
* Claimed,
34+
* Closed,
35+
* Disposed.
36+
*
37+
* The state machine looks like:
38+
*
39+
* session.finalize
40+
* session.close failed return to pool
41+
* Created -------> Claimed ----------> Closed ---------> Disposed
42+
* ^ | ^
43+
* pool.acquire | |returned to pool |
44+
* | | |
45+
* ---- Available <----- |
46+
* | pool.close |
47+
* ---------------------------------
48+
*/
3049
public class PooledConnection implements Connection
3150
{
3251
/** The real connection who will do all the real jobs */
@@ -157,23 +176,15 @@ public void receiveOne()
157176
}
158177

159178
@Override
179+
/**
180+
* Make sure only close the connection once on each session to avoid releasing the connection twice, a.k.a.
181+
* adding back the connection twice into the pool.
182+
*/
160183
public void close()
161184
{
162-
// In case this session has an open result or transaction or something,
163-
// make sure it's reset to a nice state before we reuse it.
164-
try
165-
{
166-
reset( StreamCollector.NO_OP );
167-
sync();
168-
}
169-
catch (Exception ex)
170-
{
171-
dispose();
172-
}
173-
finally
174-
{
175-
release.accept( this );
176-
}
185+
release.accept( this );
186+
// put the full logic of deciding whether to dispose the connection or to put it back to
187+
// the pool into the release object
177188
}
178189

179190
@Override
@@ -233,6 +244,7 @@ private boolean isClientOrTransientError( RuntimeException e )
233244

234245
public long idleTime()
235246
{
236-
return clock.millis() - lastUsed;
247+
long idleTime = clock.millis() - lastUsed;
248+
return idleTime;
237249
}
238250
}

driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnectionReleaseConsumer.java

+40-1
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,54 @@ else if ( validConnection( pooledConnection ) )
6464
// Otherwise, we close the connection directly here.
6565
pooledConnection.dispose();
6666
}
67+
else if ( driverStopped.get() )
68+
{
69+
// If our adding the pooledConnection to the queue was racing with the closing of the driver,
70+
// then the loop where the driver is closing all available connections might not observe our newly
71+
// added connection. Thus, we must attempt to remove a connection and dispose it. It doesn't matter
72+
// which connection we get back, because other threads might be in the same situation as ours. It only
73+
// matters that we added *a* connection that might not be observed by the loop, and that we dispose of
74+
// *a* connection in response.
75+
PooledConnection conn = connections.poll();
76+
if ( conn != null )
77+
{
78+
conn.dispose();
79+
}
80+
}
81+
}
82+
else
83+
{
84+
pooledConnection.dispose();
6785
}
6886
}
6987

7088
boolean validConnection( PooledConnection pooledConnection )
7189
{
72-
return !pooledConnection.hasUnrecoverableErrors() &&
90+
return reset(pooledConnection) &&
91+
!pooledConnection.hasUnrecoverableErrors() &&
7392
(pooledConnection.idleTime() <= minIdleBeforeConnectionTest || ping( pooledConnection ));
7493
}
7594

95+
/**
96+
* In case this session has an open result or transaction or something,
97+
* make sure it's reset to a nice state before we reuse it.
98+
* @param conn the PooledConnection
99+
* @return true if the connection is reset successfully without any error, otherwise false.
100+
*/
101+
private boolean reset( PooledConnection conn )
102+
{
103+
try
104+
{
105+
conn.reset( StreamCollector.NO_OP );
106+
conn.sync();
107+
return true;
108+
}
109+
catch ( Throwable e )
110+
{
111+
return false;
112+
}
113+
}
114+
76115
private boolean ping( PooledConnection conn )
77116
{
78117
try

driver/src/test/java/org/neo4j/driver/internal/InternalSessionTest.java

+30
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,14 @@
2424

2525
import org.neo4j.driver.internal.logging.DevNullLogger;
2626
import org.neo4j.driver.internal.spi.Connection;
27+
import org.neo4j.driver.internal.spi.Logger;
2728
import org.neo4j.driver.v1.Transaction;
2829
import org.neo4j.driver.v1.exceptions.ClientException;
2930

31+
import static junit.framework.Assert.fail;
3032
import static junit.framework.TestCase.assertNotNull;
33+
import static org.hamcrest.CoreMatchers.equalTo;
34+
import static org.junit.Assert.assertThat;
3135
import static org.mockito.Mockito.mock;
3236
import static org.mockito.Mockito.verify;
3337
import static org.mockito.Mockito.when;
@@ -135,4 +139,30 @@ public void shouldNotAllowMoreTransactionsInSessionWhileConnectionClosed() throw
135139
// When
136140
sess.beginTransaction();
137141
}
142+
143+
@Test
144+
public void shouldGetExceptionIfTryingToCloseSessionMoreThanOnce() throws Throwable
145+
{
146+
// Given
147+
InternalSession sess = new InternalSession( mock(Connection.class), mock(Logger.class) );
148+
try
149+
{
150+
sess.close();
151+
}
152+
catch( Exception e )
153+
{
154+
fail("Should not get any problem to close first time");
155+
}
156+
157+
// When
158+
try
159+
{
160+
sess.close();
161+
fail( "Should have received an error to close second time" );
162+
}
163+
catch( Exception e )
164+
{
165+
assertThat( e.getMessage(), equalTo("This session has already been closed." ));
166+
}
167+
}
138168
}

driver/src/test/java/org/neo4j/driver/internal/pool/ConnectionInvalidationTest.java

+33-6
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,16 @@
2222
import org.mockito.Mockito;
2323

2424
import java.io.IOException;
25+
import java.util.HashMap;
2526
import java.util.concurrent.BlockingQueue;
2627
import java.util.concurrent.atomic.AtomicBoolean;
2728

2829
import org.neo4j.driver.internal.spi.Connection;
30+
import org.neo4j.driver.internal.spi.StreamCollector;
2931
import org.neo4j.driver.internal.util.Clock;
3032
import org.neo4j.driver.internal.util.Consumers;
3133
import org.neo4j.driver.v1.Config;
34+
import org.neo4j.driver.v1.Value;
3235
import org.neo4j.driver.v1.exceptions.ClientException;
3336
import org.neo4j.driver.v1.exceptions.Neo4jException;
3437
import org.neo4j.driver.v1.exceptions.TransientException;
@@ -38,6 +41,10 @@
3841
import static org.hamcrest.MatcherAssert.assertThat;
3942
import static org.junit.Assert.assertTrue;
4043
import static org.junit.Assert.fail;
44+
import static org.mockito.Matchers.any;
45+
import static org.mockito.Matchers.anyMap;
46+
import static org.mockito.Matchers.anyString;
47+
import static org.mockito.Matchers.eq;
4148
import static org.mockito.Mockito.doThrow;
4249
import static org.mockito.Mockito.mock;
4350
import static org.mockito.Mockito.never;
@@ -57,7 +64,8 @@ public class ConnectionInvalidationTest
5764
public void shouldInvalidateConnectionThatIsOld() throws Throwable
5865
{
5966
// Given a connection that's broken
60-
Mockito.doThrow( new ClientException( "That didn't work" ) ).when( delegate ).sync();
67+
Mockito.doThrow( new ClientException( "That didn't work" ) )
68+
.when( delegate ).run( anyString(), anyMap(), any( StreamCollector.class ) );
6169
Config config = Config.defaultConfig();
6270
when( clock.millis() ).thenReturn( 0L, config.idleTimeBeforeConnectionTest() + 1L );
6371
PooledConnection conn = new PooledConnection( delegate, Consumers.<PooledConnection>noOp(), clock );
@@ -76,7 +84,8 @@ public void shouldInvalidateConnectionThatIsOld() throws Throwable
7684
public void shouldNotInvalidateConnectionThatIsNotOld() throws Throwable
7785
{
7886
// Given a connection that's broken
79-
Mockito.doThrow( new ClientException( "That didn't work" ) ).when( delegate ).sync();
87+
Mockito.doThrow( new ClientException( "That didn't work" ) )
88+
.when( delegate ).run( anyString(), anyMap(), any( StreamCollector.class ) );
8089
Config config = Config.defaultConfig();
8190
when( clock.millis() ).thenReturn( 0L, config.idleTimeBeforeConnectionTest() - 1L );
8291
PooledConnection conn = new PooledConnection( delegate, Consumers.<PooledConnection>noOp(), clock );
@@ -90,6 +99,23 @@ public void shouldNotInvalidateConnectionThatIsNotOld() throws Throwable
9099
verify( queue ).offer( conn );
91100
}
92101

102+
@Test
103+
public void shouldInvalidConnectionIfFailedToReset() throws Throwable
104+
{
105+
// Given a connection that's broken
106+
Mockito.doThrow( new ClientException( "That didn't work" ) ).when( delegate ).reset( any( StreamCollector.class ) );
107+
Config config = Config.defaultConfig();
108+
PooledConnection conn = new PooledConnection( delegate, Consumers.<PooledConnection>noOp(), clock );
109+
110+
// When/Then
111+
BlockingQueue<PooledConnection> queue = mock( BlockingQueue.class );
112+
PooledConnectionReleaseConsumer consumer =
113+
new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), config );
114+
consumer.accept( conn );
115+
116+
verify( queue, never() ).add( conn );
117+
}
118+
93119
@Test
94120
public void shouldInvalidateOnUnrecoverableProblems() throws Throwable
95121
{
@@ -115,12 +141,13 @@ public void shouldInvalidateOnProtocolViolationExceptions() throws Throwable
115141
@SuppressWarnings( "unchecked" )
116142
private void assertUnrecoverable( Neo4jException exception )
117143
{
118-
doThrow( exception ).when( delegate ).sync();
144+
doThrow( exception ).when( delegate )
145+
.run( eq("assert unrecoverable"), anyMap(), any( StreamCollector.class ) );
119146

120147
// When
121148
try
122149
{
123-
conn.sync();
150+
conn.run( "assert unrecoverable", new HashMap<String,Value>( ), StreamCollector.NO_OP );
124151
fail( "Should've rethrown exception" );
125152
}
126153
catch ( Neo4jException e )
@@ -141,12 +168,12 @@ private void assertUnrecoverable( Neo4jException exception )
141168
@SuppressWarnings( "unchecked" )
142169
private void assertRecoverable( Neo4jException exception )
143170
{
144-
doThrow( exception ).when( delegate ).sync();
171+
doThrow( exception ).when( delegate ).run( eq("assert recoverable"), anyMap(), any( StreamCollector.class ) );
145172

146173
// When
147174
try
148175
{
149-
conn.sync();
176+
conn.run( "assert recoverable", new HashMap<String,Value>( ), StreamCollector.NO_OP );
150177
fail( "Should've rethrown exception" );
151178
}
152179
catch ( Neo4jException e )

driver/src/test/java/org/neo4j/driver/internal/pool/InternalConnectionPoolTest.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.neo4j.driver.v1.Config;
3232

3333
import static java.util.Collections.singletonList;
34+
import static org.hamcrest.CoreMatchers.equalTo;
35+
import static org.junit.Assert.assertThat;
3436
import static org.mockito.Matchers.any;
3537
import static org.mockito.Mockito.mock;
3638
import static org.mockito.Mockito.times;
@@ -55,10 +57,11 @@ public void shouldAcquireAndRelease() throws Throwable
5557
conn.close();
5658

5759
// When
58-
pool.acquire( uri );
60+
Connection acquired = pool.acquire( uri );
5961

6062
// Then
6163
verify( connector, times( 1 ) ).connect( uri, config, AuthTokens.none() );
64+
assertThat( acquired, equalTo(conn) );
6265
}
6366

6467
private Connector connector( String scheme )

0 commit comments

Comments
 (0)