Skip to content

Commit d033d8b

Browse files
authored
Merge pull request #231 from pontusmelke/1.1-close-tx-on-reset
Automatically close on session.reset()
2 parents 91cdb30 + 51248ce commit d033d8b

File tree

7 files changed

+278
-48
lines changed

7 files changed

+278
-48
lines changed

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

+33-21
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,13 @@ public class NetworkSession implements Session
4949
@Override
5050
public void run()
5151
{
52-
if ( currentTransaction != null )
52+
synchronized ( NetworkSession.this )
5353
{
54-
lastBookmark = currentTransaction.bookmark();
55-
currentTransaction = null;
54+
if ( currentTransaction != null )
55+
{
56+
lastBookmark = currentTransaction.bookmark();
57+
currentTransaction = null;
58+
}
5659
}
5760
}
5861
};
@@ -73,9 +76,9 @@ public StatementResult run( String statementText )
7376
}
7477

7578
@Override
76-
public StatementResult run( String statementText, Map<String, Object> statementParameters )
79+
public StatementResult run( String statementText, Map<String,Object> statementParameters )
7780
{
78-
Value params = statementParameters == null ? Values.EmptyMap : value(statementParameters);
81+
Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters );
7982
return run( statementText, params );
8083
}
8184

@@ -97,21 +100,24 @@ public StatementResult run( Statement statement )
97100
{
98101
ensureConnectionIsValidBeforeRunningSession();
99102
InternalStatementResult cursor = new InternalStatementResult( connection, null, statement );
100-
connection.run( statement.text(), statement.parameters().asMap( Values.ofValue() ), cursor.runResponseCollector() );
103+
connection.run( statement.text(), statement.parameters().asMap( Values.ofValue() ),
104+
cursor.runResponseCollector() );
101105
connection.pullAll( cursor.pullAllResponseCollector() );
102106
connection.flush();
103107
return cursor;
104108
}
105109

106-
public void reset()
110+
public synchronized void reset()
107111
{
108112
ensureSessionIsOpen();
109113
ensureNoUnrecoverableError();
110114
ensureConnectionIsOpen();
111115

112-
if( currentTransaction != null )
116+
if ( currentTransaction != null )
113117
{
114118
currentTransaction.markToClose();
119+
lastBookmark = currentTransaction.bookmark();
120+
currentTransaction = null;
115121
}
116122
connection.resetAsync();
117123
}
@@ -126,21 +132,24 @@ public boolean isOpen()
126132
public void close()
127133
{
128134
// Use atomic operation to protect from closing the connection twice (putting back to the pool twice).
129-
if( !isOpen.compareAndSet( true, false ) )
135+
if ( !isOpen.compareAndSet( true, false ) )
130136
{
131137
throw new ClientException( "This session has already been closed." );
132138
}
133139
else
134140
{
135-
if ( currentTransaction != null )
141+
synchronized ( this )
136142
{
137-
try
138-
{
139-
currentTransaction.close();
140-
}
141-
catch ( Throwable e )
143+
if ( currentTransaction != null )
142144
{
143-
// Best-effort
145+
try
146+
{
147+
currentTransaction.close();
148+
}
149+
catch ( Throwable e )
150+
{
151+
// Best-effort
152+
}
144153
}
145154
}
146155
try
@@ -167,7 +176,7 @@ public Transaction beginTransaction()
167176
}
168177

169178
@Override
170-
public Transaction beginTransaction( String bookmark )
179+
public synchronized Transaction beginTransaction( String bookmark )
171180
{
172181
ensureConnectionIsValidBeforeOpeningTransaction();
173182
currentTransaction = new ExplicitTransaction( connection, txCleanup, bookmark );
@@ -224,7 +233,7 @@ private void ensureConnectionIsValidBeforeOpeningTransaction()
224233
@Override
225234
protected void finalize() throws Throwable
226235
{
227-
if( isOpen.compareAndSet( true, false ) )
236+
if ( isOpen.compareAndSet( true, false ) )
228237
{
229238
logger.error( "Neo4j Session object leaked, please ensure that your application calls the `close` " +
230239
"method on Sessions before disposing of the objects.", null );
@@ -235,14 +244,15 @@ protected void finalize() throws Throwable
235244

236245
private void ensureNoUnrecoverableError()
237246
{
238-
if( connection.hasUnrecoverableErrors() )
247+
if ( connection.hasUnrecoverableErrors() )
239248
{
240249
throw new ClientException( "Cannot run more statements in the current session as an unrecoverable error " +
241250
"has happened. Please close the current session and re-run your statement in a" +
242251
" new session." );
243252
}
244253
}
245254

255+
//should be called from a synchronized block
246256
private void ensureNoOpenTransactionBeforeRunningSession()
247257
{
248258
if ( currentTransaction != null )
@@ -252,6 +262,7 @@ private void ensureNoOpenTransactionBeforeRunningSession()
252262
}
253263
}
254264

265+
//should be called from a synchronized block
255266
private void ensureNoOpenTransactionBeforeOpeningTransaction()
256267
{
257268
if ( currentTransaction != null )
@@ -273,12 +284,13 @@ private void ensureConnectionIsOpen()
273284

274285
private void ensureSessionIsOpen()
275286
{
276-
if( !isOpen() )
287+
if ( !isOpen() )
277288
{
278289
throw new ClientException(
279290
"No more interaction with this session is allowed " +
280291
"as the current session is already closed or marked as closed. " +
281-
"You get this error either because you have a bad reference to a session that has already be closed " +
292+
"You get this error either because you have a bad reference to a session that has already be " +
293+
"closed " +
282294
"or you are trying to reuse a session that you have called `reset` on it." );
283295
}
284296
}

driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,9 @@ public void resetAsync()
208208
}
209209

210210
@Override
211-
public boolean isInterrupted()
211+
public boolean isAckFailureMuted()
212212
{
213-
return delegate.isInterrupted();
213+
return delegate.isAckFailureMuted();
214214
}
215215

216216
private void markAsAvailable()

driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java

+43-14
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ public class SocketConnection implements Connection
4646
{
4747
private final Queue<Message> pendingMessages = new LinkedList<>();
4848
private final SocketResponseHandler responseHandler;
49-
private AtomicBoolean interrupted = new AtomicBoolean( false );
49+
private AtomicBoolean isInterrupted = new AtomicBoolean( false );
50+
private AtomicBoolean isAckFailureMuted = new AtomicBoolean( false );
5051
private final Collector.InitCollector initCollector = new Collector.InitCollector();
5152

5253
private final SocketClient socket;
@@ -115,6 +116,8 @@ public void sync()
115116
@Override
116117
public synchronized void flush()
117118
{
119+
ensureNotInterrupted();
120+
118121
try
119122
{
120123
socket.send( pendingMessages );
@@ -126,6 +129,29 @@ public synchronized void flush()
126129
}
127130
}
128131

132+
private void ensureNotInterrupted()
133+
{
134+
try
135+
{
136+
if( isInterrupted.get() )
137+
{
138+
// receive each of it and throw error immediately
139+
while ( responseHandler.collectorsWaiting() > 0 )
140+
{
141+
receiveOne();
142+
}
143+
}
144+
}
145+
catch ( Neo4jException e )
146+
{
147+
throw new ClientException(
148+
"An error has occurred due to the cancellation of executing a previous statement. " +
149+
"You received this error probably because you did not consume the result immediately after " +
150+
"running the statement which get reset in this session.", e );
151+
}
152+
153+
}
154+
129155
private void receiveAll()
130156
{
131157
try
@@ -159,6 +185,7 @@ private void assertNoServerFailure()
159185
{
160186
Neo4jException exception = responseHandler.serverFailure();
161187
responseHandler.clearError();
188+
isInterrupted.set( false );
162189
throw exception;
163190
}
164191
}
@@ -182,6 +209,8 @@ else if ( e instanceof SocketTimeoutException )
182209

183210
private synchronized void queueMessage( Message msg, Collector collector )
184211
{
212+
ensureNotInterrupted();
213+
185214
pendingMessages.add( msg );
186215
responseHandler.appendResultCollector( collector );
187216
}
@@ -211,26 +240,26 @@ public boolean hasUnrecoverableErrors()
211240
}
212241

213242
@Override
214-
public void resetAsync()
243+
public synchronized void resetAsync()
215244
{
216-
if( interrupted.compareAndSet( false, true ) )
245+
queueMessage( RESET, new Collector.ResetCollector( new Runnable()
217246
{
218-
queueMessage( RESET, new Collector.ResetCollector( new Runnable()
247+
@Override
248+
public void run()
219249
{
220-
@Override
221-
public void run()
222-
{
223-
interrupted.set( false );
224-
}
225-
} ) );
226-
flush();
227-
}
250+
isInterrupted.set( false );
251+
isAckFailureMuted.set( false );
252+
}
253+
} ) );
254+
flush();
255+
isInterrupted.set( true );
256+
isAckFailureMuted.set( true );
228257
}
229258

230259
@Override
231-
public boolean isInterrupted()
260+
public boolean isAckFailureMuted()
232261
{
233-
return interrupted.get();
262+
return isAckFailureMuted.get();
234263
}
235264

236265
@Override

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -226,9 +226,9 @@ public void resetAsync()
226226
}
227227

228228
@Override
229-
public boolean isInterrupted()
229+
public boolean isAckFailureMuted()
230230
{
231-
return delegate.isInterrupted();
231+
return delegate.isAckFailureMuted();
232232
}
233233

234234
@Override
@@ -260,7 +260,7 @@ private void onDelegateException( RuntimeException e )
260260
{
261261
unrecoverableErrorsOccurred = true;
262262
}
263-
else if( !isInterrupted() )
263+
else if( !isAckFailureMuted() )
264264
{
265265
ackFailure();
266266
}

driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,10 @@ public interface Connection extends AutoCloseable
114114
void resetAsync();
115115

116116
/**
117-
* Return true if the current session statement execution has been interrupted by another thread, otherwise false.
118-
* @return true if the current session statement execution has been interrupted by another thread, otherwise false
117+
* Return true if ack_failure message is temporarily muted as the failure message will be acked using reset instead
118+
* @return true if no ack_failre message should be sent when ackable failures are received.
119119
*/
120-
boolean isInterrupted();
120+
boolean isAckFailureMuted();
121121

122122
/**
123123
* Returns the version of the server connected to.

0 commit comments

Comments
 (0)