Skip to content

Commit 91cdb30

Browse files
authored
Merge pull request #232 from pontusmelke/1.1-acquisition-fixup
Session acquisition update
2 parents 9263030 + ccd2843 commit 91cdb30

24 files changed

+607
-288
lines changed

driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java

+123-30
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,16 @@
2020

2121
import java.util.Collections;
2222
import java.util.Comparator;
23+
import java.util.HashSet;
24+
import java.util.List;
2325
import java.util.Set;
26+
import java.util.concurrent.atomic.AtomicLong;
2427

2528
import org.neo4j.driver.internal.net.BoltServerAddress;
2629
import org.neo4j.driver.internal.security.SecurityPlan;
2730
import org.neo4j.driver.internal.spi.Connection;
2831
import org.neo4j.driver.internal.spi.ConnectionPool;
32+
import org.neo4j.driver.internal.util.Clock;
2933
import org.neo4j.driver.internal.util.ConcurrentRoundRobinSet;
3034
import org.neo4j.driver.internal.util.Consumer;
3135
import org.neo4j.driver.v1.AccessMode;
@@ -34,91 +38,133 @@
3438
import org.neo4j.driver.v1.Record;
3539
import org.neo4j.driver.v1.Session;
3640
import org.neo4j.driver.v1.StatementResult;
41+
import org.neo4j.driver.v1.Value;
3742
import org.neo4j.driver.v1.exceptions.ClientException;
3843
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
3944
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
4045
import org.neo4j.driver.v1.util.BiFunction;
46+
import org.neo4j.driver.v1.util.Function;
4147

4248
import static java.lang.String.format;
4349

4450
public class ClusterDriver extends BaseDriver
4551
{
4652
private static final String GET_SERVERS = "dbms.cluster.routing.getServers";
53+
private static final long MAX_TTL = Long.MAX_VALUE / 1000L;
4754
private final static Comparator<BoltServerAddress> COMPARATOR = new Comparator<BoltServerAddress>()
4855
{
4956
@Override
5057
public int compare( BoltServerAddress o1, BoltServerAddress o2 )
5158
{
5259
int compare = o1.host().compareTo( o2.host() );
53-
if (compare == 0)
60+
if ( compare == 0 )
5461
{
5562
compare = Integer.compare( o1.port(), o2.port() );
5663
}
5764

5865
return compare;
5966
}
6067
};
61-
private static final int MIN_SERVERS = 2;
68+
private static final int MIN_SERVERS = 1;
6269
private final ConnectionPool connections;
63-
private final BiFunction<Connection,Logger, Session> sessionProvider;
64-
65-
private final ConcurrentRoundRobinSet<BoltServerAddress> routingServers = new ConcurrentRoundRobinSet<>(COMPARATOR);
66-
private final ConcurrentRoundRobinSet<BoltServerAddress> readServers = new ConcurrentRoundRobinSet<>(COMPARATOR);
67-
private final ConcurrentRoundRobinSet<BoltServerAddress> writeServers = new ConcurrentRoundRobinSet<>(COMPARATOR);
70+
private final BiFunction<Connection,Logger,Session> sessionProvider;
71+
private final Clock clock;
72+
private final ConcurrentRoundRobinSet<BoltServerAddress> routingServers =
73+
new ConcurrentRoundRobinSet<>( COMPARATOR );
74+
private final ConcurrentRoundRobinSet<BoltServerAddress> readServers = new ConcurrentRoundRobinSet<>( COMPARATOR );
75+
private final ConcurrentRoundRobinSet<BoltServerAddress> writeServers = new ConcurrentRoundRobinSet<>( COMPARATOR );
76+
private final AtomicLong expires = new AtomicLong( 0L );
6877

6978
public ClusterDriver( BoltServerAddress seedAddress,
7079
ConnectionPool connections,
7180
SecurityPlan securityPlan,
72-
BiFunction<Connection,Logger, Session> sessionProvider,
81+
BiFunction<Connection,Logger,Session> sessionProvider,
82+
Clock clock,
7383
Logging logging )
7484
{
7585
super( securityPlan, logging );
7686
routingServers.add( seedAddress );
7787
this.connections = connections;
7888
this.sessionProvider = sessionProvider;
89+
this.clock = clock;
7990
checkServers();
8091
}
8192

8293
private void checkServers()
8394
{
8495
synchronized ( routingServers )
8596
{
86-
if ( routingServers.size() < MIN_SERVERS ||
97+
if ( expires.get() < clock.millis() ||
98+
routingServers.size() < MIN_SERVERS ||
8799
readServers.isEmpty() ||
88-
writeServers.isEmpty())
100+
writeServers.isEmpty() )
89101
{
90102
getServers();
91103
}
92104
}
93105
}
94106

107+
private Set<BoltServerAddress> forgetAllServers()
108+
{
109+
final Set<BoltServerAddress> seen = new HashSet<>();
110+
seen.addAll( routingServers );
111+
seen.addAll( readServers );
112+
seen.addAll( writeServers );
113+
routingServers.clear();
114+
readServers.clear();
115+
writeServers.clear();
116+
return seen;
117+
}
118+
119+
private long calculateNewExpiry( Record record )
120+
{
121+
long ttl = record.get( "ttl" ).asLong();
122+
long nextExpiry = clock.millis() + 1000L * ttl;
123+
if ( ttl < 0 || ttl >= MAX_TTL || nextExpiry < 0 )
124+
{
125+
return Long.MAX_VALUE;
126+
}
127+
else
128+
{
129+
return nextExpiry;
130+
}
131+
}
132+
95133
//must be called from a synchronized block
96134
private void getServers()
97135
{
98136
BoltServerAddress address = null;
99137
try
100138
{
101139
boolean success = false;
102-
while ( !routingServers.isEmpty() && !success )
140+
141+
ConcurrentRoundRobinSet<BoltServerAddress> routers = new ConcurrentRoundRobinSet<>( routingServers );
142+
final Set<BoltServerAddress> seen = forgetAllServers();
143+
while ( !routers.isEmpty() && !success )
103144
{
104-
address = routingServers.hop();
145+
address = routers.hop();
105146
success = call( address, GET_SERVERS, new Consumer<Record>()
106147
{
107148
@Override
108149
public void accept( Record record )
109150
{
110-
BoltServerAddress newAddress = new BoltServerAddress( record.get( "address" ).asString() );
111-
switch ( record.get( "mode" ).asString().toUpperCase() )
151+
expires.set( calculateNewExpiry( record ) );
152+
List<ServerInfo> servers = servers( record );
153+
for ( ServerInfo server : servers )
112154
{
113-
case "READ":
114-
readServers.add( newAddress );
115-
break;
116-
case "WRITE":
117-
writeServers.add( newAddress );
118-
break;
119-
case "ROUTE":
120-
routingServers.add( newAddress );
121-
break;
155+
seen.removeAll( server.addresses() );
156+
switch ( server.role() )
157+
{
158+
case "READ":
159+
readServers.addAll( server.addresses() );
160+
break;
161+
case "WRITE":
162+
writeServers.addAll( server.addresses() );
163+
break;
164+
case "ROUTE":
165+
routingServers.addAll( server.addresses() );
166+
break;
167+
}
122168
}
123169
}
124170
} );
@@ -127,6 +173,12 @@ public void accept( Record record )
127173
{
128174
throw new ServiceUnavailableException( "Run out of servers" );
129175
}
176+
177+
//the server no longer think we should care about these
178+
for ( BoltServerAddress remove : seen )
179+
{
180+
connections.purge( remove );
181+
}
130182
}
131183
catch ( ClientException ex )
132184
{
@@ -137,7 +189,7 @@ public void accept( Record record )
137189
this.close();
138190
throw new ServiceUnavailableException(
139191
String.format( "Server %s couldn't perform discovery",
140-
address == null ? "`UNKNOWN`" : address.toString()), ex );
192+
address == null ? "`UNKNOWN`" : address.toString() ), ex );
141193
}
142194
else
143195
{
@@ -146,14 +198,55 @@ public void accept( Record record )
146198
}
147199
}
148200

201+
private static class ServerInfo
202+
{
203+
private final List<BoltServerAddress> addresses;
204+
private final String role;
205+
206+
public ServerInfo( List<BoltServerAddress> addresses, String role )
207+
{
208+
this.addresses = addresses;
209+
this.role = role;
210+
}
211+
212+
public String role()
213+
{
214+
return role;
215+
}
216+
217+
List<BoltServerAddress> addresses()
218+
{
219+
return addresses;
220+
}
221+
}
222+
223+
private List<ServerInfo> servers( Record record )
224+
{
225+
return record.get( "servers" ).asList( new Function<Value,ServerInfo>()
226+
{
227+
@Override
228+
public ServerInfo apply( Value value )
229+
{
230+
return new ServerInfo( value.get( "addresses" ).asList( new Function<Value,BoltServerAddress>()
231+
{
232+
@Override
233+
public BoltServerAddress apply( Value value )
234+
{
235+
return new BoltServerAddress( value.asString() );
236+
}
237+
} ), value.get( "role" ).asString() );
238+
}
239+
} );
240+
}
241+
149242
//must be called from a synchronized method
150243
private boolean call( BoltServerAddress address, String procedureName, Consumer<Record> recorder )
151244
{
152245
Connection acquire = null;
153246
Session session = null;
154247
try
155248
{
156-
acquire = connections.acquire(address);
249+
acquire = connections.acquire( address );
157250
session = sessionProvider.apply( acquire, log );
158251

159252
StatementResult records = session.run( format( "CALL %s", procedureName ) );
@@ -217,19 +310,19 @@ public void onWriteFailure( BoltServerAddress address )
217310
log );
218311
}
219312

220-
private Connection acquireConnection( AccessMode mode )
313+
private Connection acquireConnection( AccessMode role )
221314
{
222315
//Potentially rediscover servers if we are not happy with our current knowledge
223316
checkServers();
224317

225-
switch ( mode )
318+
switch ( role )
226319
{
227320
case READ:
228321
return connections.acquire( readServers.hop() );
229322
case WRITE:
230323
return connections.acquire( writeServers.hop() );
231324
default:
232-
throw new ClientException( mode + " is not supported for creating new sessions" );
325+
throw new ClientException( role + " is not supported for creating new sessions" );
233326
}
234327
}
235328

@@ -255,13 +348,13 @@ Set<BoltServerAddress> routingServers()
255348
//For testing
256349
Set<BoltServerAddress> readServers()
257350
{
258-
return Collections.unmodifiableSet(readServers);
351+
return Collections.unmodifiableSet( readServers );
259352
}
260353

261354
//For testing
262355
Set<BoltServerAddress> writeServers()
263356
{
264-
return Collections.unmodifiableSet( writeServers);
357+
return Collections.unmodifiableSet( writeServers );
265358
}
266359

267360
//For testing

driver/src/main/java/org/neo4j/driver/internal/ClusteredNetworkSession.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public StatementResult run( Statement statement )
5353
}
5454
catch ( ClientException e )
5555
{
56-
if ( e.code().equals( "Neo.ClientError.General.ForbiddenOnFollower" ) )
56+
if ( e.code().equals( "Neo.ClientError.Cluster.NotALeader" ) )
5757
{
5858
onError.onWriteFailure( connection.address() );
5959
throw new SessionExpiredException(

driver/src/main/java/org/neo4j/driver/internal/ClusteredStatementResult.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,6 @@ private SessionExpiredException failedWrite()
256256

257257
private boolean isFailedToWrite( ClientException e )
258258
{
259-
return e.code().equals( "Neo.ClientError.General.ForbiddenOnFollower" );
259+
return e.code().equals( "Neo.ClientError.Cluster.NotALeader" );
260260
}
261261
}

0 commit comments

Comments
 (0)