Skip to content

Commit 703afe8

Browse files
author
Zhen Li
authored
Merge pull request #190 from zhenlineo/1.0-new-pool-zhen
New pool
2 parents 3e70706 + fd05f88 commit 703afe8

20 files changed

+732
-1053
lines changed

driver/src/main/java/org/neo4j/driver/internal/InternalSession.java

+14-7
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.internal;
2020

2121
import java.util.Map;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2223

2324
import org.neo4j.driver.internal.spi.Connection;
2425
import org.neo4j.driver.internal.spi.Logger;
@@ -52,7 +53,7 @@ public void run()
5253
};
5354

5455
private InternalTransaction currentTransaction;
55-
private boolean isOpen = true;
56+
private AtomicBoolean isOpen = new AtomicBoolean( true );
5657

5758
public InternalSession( Connection connection, Logger logger )
5859
{
@@ -100,19 +101,19 @@ public StatementResult run( Statement statement )
100101
@Override
101102
public boolean isOpen()
102103
{
103-
return isOpen;
104+
return isOpen.get();
104105
}
105106

106107
@Override
107108
public void close()
108109
{
109-
if( !isOpen )
110+
// Use atomic operation to protect from closing the connection twice (putting back to the pool twice).
111+
if( !isOpen.compareAndSet( true, false ) )
110112
{
111113
throw new ClientException( "This session has already been closed." );
112114
}
113115
else
114116
{
115-
isOpen = false;
116117
if ( currentTransaction != null )
117118
{
118119
try
@@ -124,8 +125,14 @@ public void close()
124125
// Best-effort
125126
}
126127
}
127-
connection.sync();
128-
connection.close();
128+
try
129+
{
130+
connection.sync();
131+
}
132+
finally
133+
{
134+
connection.close();
135+
}
129136
}
130137
}
131138

@@ -171,7 +178,7 @@ private void ensureConnectionIsValidBeforeOpeningTransaction()
171178
@Override
172179
protected void finalize() throws Throwable
173180
{
174-
if( isOpen )
181+
if( isOpen.compareAndSet( true, false ) )
175182
{
176183
logger.error( "Neo4j Session object leaked, please ensure that your application calls the `close` " +
177184
"method on Sessions before disposing of the objects.", null );

driver/src/main/java/org/neo4j/driver/internal/connector/ConcurrencyGuardingConnection.java

+9-8
Original file line numberDiff line numberDiff line change
@@ -157,15 +157,16 @@ public void receiveOne()
157157

158158
@Override
159159
public void close()
160-
{try
161160
{
162-
markAsInUse();
163-
delegate.close();
164-
}
165-
finally
166-
{
167-
markAsAvailable();
168-
}
161+
try
162+
{
163+
markAsInUse();
164+
delegate.close();
165+
}
166+
finally
167+
{
168+
markAsAvailable();
169+
}
169170
}
170171

171172
@Override

driver/src/main/java/org/neo4j/driver/internal/pool/Allocator.java

-41
This file was deleted.

driver/src/main/java/org/neo4j/driver/internal/pool/InternalConnectionPool.java

+50-77
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,16 @@
2424
import java.util.LinkedList;
2525
import java.util.List;
2626
import java.util.ServiceLoader;
27+
import java.util.concurrent.BlockingQueue;
2728
import java.util.concurrent.ConcurrentHashMap;
28-
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.LinkedBlockingQueue;
30+
import java.util.concurrent.atomic.AtomicBoolean;
2931

3032
import org.neo4j.driver.internal.connector.socket.SocketConnector;
3133
import org.neo4j.driver.internal.spi.Connection;
3234
import org.neo4j.driver.internal.spi.ConnectionPool;
3335
import org.neo4j.driver.internal.spi.Connector;
3436
import org.neo4j.driver.internal.util.Clock;
35-
import org.neo4j.driver.internal.util.Consumer;
3637
import org.neo4j.driver.v1.AuthToken;
3738
import org.neo4j.driver.v1.Config;
3839
import org.neo4j.driver.v1.exceptions.ClientException;
@@ -41,16 +42,16 @@
4142
import static java.lang.String.format;
4243

4344
/**
44-
* A basic connection pool that optimizes for threads being long-lived, acquiring/releasing many connections.
45-
* It uses a global queue as a fallback pool, but tries to avoid coordination by storing connections in a ThreadLocal.
45+
* The pool is designed to buffer certain amount of free sessions into session pool. When closing a session, we first
46+
* try to return the session into the session pool, however if we failed to return it back, either because the pool
47+
* is full or the pool is being cleaned on driver.close, then we directly close the connection attached with the
48+
* session.
4649
*
47-
* Safety is achieved by tracking thread locals getting garbage collected, returning connections to the global pool
48-
* when this happens.
50+
* The session is NOT meant to be thread safe, each thread should have an independent session and close it (return to
51+
* pool) when the work with the session has been done.
4952
*
50-
* If threads are long-lived, this pool will achieve linearly scalable performance with overhead equivalent to a
51-
* hash-map lookup per acquire.
52-
*
53-
* If threads are short-lived, this pool is not ideal.
53+
* The driver is thread safe. Each thread could try to get a session from the pool and then return it to the pool
54+
* at the same time.
5455
*/
5556
public class InternalConnectionPool implements ConnectionPool
5657
{
@@ -62,36 +63,26 @@ public class InternalConnectionPool implements ConnectionPool
6263
/**
6364
* Pools, organized by URL.
6465
*/
65-
private final ConcurrentHashMap<URI,ThreadCachingPool<PooledConnection>> pools = new ConcurrentHashMap<>();
66-
67-
/**
68-
* Connections that fail this criteria will be disposed of.
69-
*/
70-
private final ValidationStrategy<PooledConnection> connectionValidation;
66+
private final ConcurrentHashMap<URI,BlockingQueue<PooledConnection>> pools = new ConcurrentHashMap<>();
7167

7268
private final AuthToken authToken;
73-
/**
74-
* Timeout in milliseconds if there are no available sessions.
75-
*/
76-
private final long acquireSessionTimeout;
77-
7869
private final Clock clock;
7970
private final Config config;
8071

72+
/** Shutdown flag */
73+
private final AtomicBoolean stopped = new AtomicBoolean( false );
74+
8175
public InternalConnectionPool( Config config, AuthToken authToken )
8276
{
83-
this( loadConnectors(), Clock.SYSTEM, config, authToken,
84-
Long.getLong( "neo4j.driver.acquireSessionTimeout", 30_000 ) );
77+
this( loadConnectors(), Clock.SYSTEM, config, authToken);
8578
}
8679

8780
public InternalConnectionPool( Collection<Connector> conns, Clock clock, Config config,
88-
AuthToken authToken, long acquireTimeout )
81+
AuthToken authToken )
8982
{
9083
this.authToken = authToken;
91-
this.acquireSessionTimeout = acquireTimeout;
9284
this.config = config;
9385
this.clock = clock;
94-
this.connectionValidation = new PooledConnectionValidator( config.idleTimeBeforeConnectionTest() );
9586
for ( Connector connector : conns )
9687
{
9788
for ( String s : connector.supportedSchemes() )
@@ -104,37 +95,37 @@ public InternalConnectionPool( Collection<Connector> conns, Clock clock, Config
10495
@Override
10596
public Connection acquire( URI sessionURI )
10697
{
107-
try
98+
if ( stopped.get() )
10899
{
109-
Connection conn = pool( sessionURI ).acquire( acquireSessionTimeout, TimeUnit.MILLISECONDS );
110-
if ( conn == null )
100+
throw new IllegalStateException( "Pool has been closed, cannot acquire new values." );
101+
}
102+
BlockingQueue<PooledConnection> connections = pool( sessionURI );
103+
PooledConnection conn = connections.poll();
104+
if ( conn == null )
105+
{
106+
Connector connector = connectors.get( sessionURI.getScheme() );
107+
if ( connector == null )
111108
{
112109
throw new ClientException(
113-
"Failed to acquire a session with Neo4j " +
114-
"as all the connections in the connection pool are already occupied by other sessions. " +
115-
"Please close unused session and retry. " +
116-
"Current Pool size: " + config.connectionPoolSize() +
117-
". If your application requires running more sessions concurrently than the current pool " +
118-
"size, you should create a driver with a larger connection pool size." );
110+
format( "Unsupported URI scheme: '%s' in url: '%s'. Supported transports are: '%s'.",
111+
sessionURI.getScheme(), sessionURI, connectorSchemes() ) );
119112
}
120-
return conn;
121-
}
122-
catch ( InterruptedException e )
123-
{
124-
throw new ClientException( "Interrupted while waiting for a connection to Neo4j." );
113+
conn = new PooledConnection(connector.connect( sessionURI, config, authToken ), new
114+
PooledConnectionReleaseConsumer( connections, stopped, config ), clock);
125115
}
116+
conn.updateUsageTimestamp();
117+
return conn;
126118
}
127119

128-
private ThreadCachingPool<PooledConnection> pool( URI sessionURI )
120+
private BlockingQueue<PooledConnection> pool( URI sessionURI )
129121
{
130-
ThreadCachingPool<PooledConnection> pool = pools.get( sessionURI );
122+
BlockingQueue<PooledConnection> pool = pools.get( sessionURI );
131123
if ( pool == null )
132124
{
133-
pool = newPool( sessionURI );
125+
pool = new LinkedBlockingQueue<>(config.maxIdleConnectionPoolSize());
134126
if ( pools.putIfAbsent( sessionURI, pool ) != null )
135127
{
136128
// We lost a race to create the pool, dispose of the one we created, and recurse
137-
pool.close();
138129
return pool( sessionURI );
139130
}
140131
}
@@ -161,48 +152,30 @@ private static Collection<Connector> loadConnectors()
161152
@Override
162153
public void close() throws Neo4jException
163154
{
164-
for ( ThreadCachingPool<PooledConnection> pool : pools.values() )
155+
if( !stopped.compareAndSet( false, true ) )
165156
{
166-
pool.close();
157+
// already closed or some other thread already started close
158+
return;
167159
}
168-
pools.clear();
169-
}
170-
171-
private String connectorSchemes()
172-
{
173-
return Arrays.toString( connectors.keySet().toArray( new String[connectors.keySet().size()] ) );
174-
}
175-
176-
private ThreadCachingPool<PooledConnection> newPool( final URI uri )
177-
{
178160

179-
return new ThreadCachingPool<>( config.connectionPoolSize(), new Allocator<PooledConnection>()
161+
for ( BlockingQueue<PooledConnection> pool : pools.values() )
180162
{
181-
@Override
182-
public PooledConnection allocate( Consumer<PooledConnection> release )
163+
while ( !pool.isEmpty() )
183164
{
184-
Connector connector = connectors.get( uri.getScheme() );
185-
if ( connector == null )
165+
PooledConnection conn = pool.poll();
166+
if ( conn != null )
186167
{
187-
throw new ClientException(
188-
format( "Unsupported URI scheme: '%s' in url: '%s'. Supported transports are: '%s'.",
189-
uri.getScheme(), uri, connectorSchemes() ) );
168+
//close the underlying connection without adding it back to the queue
169+
conn.dispose();
190170
}
191-
Connection conn = connector.connect( uri, config, authToken );
192-
return new PooledConnection( conn, release );
193-
}
194-
195-
@Override
196-
public void onDispose( PooledConnection pooledConnection )
197-
{
198-
pooledConnection.dispose();
199171
}
172+
}
200173

201-
@Override
202-
public void onAcquire( PooledConnection pooledConnection )
203-
{
174+
pools.clear();
175+
}
204176

205-
}
206-
}, connectionValidation, clock );
177+
private String connectorSchemes()
178+
{
179+
return Arrays.toString( connectors.keySet().toArray( new String[connectors.keySet().size()] ) );
207180
}
208181
}

0 commit comments

Comments
 (0)