27
27
import java .util .OptionalLong ;
28
28
import java .util .Random ;
29
29
import java .util .concurrent .CompletableFuture ;
30
+ import java .util .concurrent .CountDownLatch ;
30
31
import java .util .concurrent .RejectedExecutionException ;
31
32
import java .util .concurrent .TimeUnit ;
32
33
import java .util .function .Function ;
89
90
import org .slf4j .Logger ;
90
91
91
92
import static java .util .concurrent .TimeUnit .MICROSECONDS ;
93
+ import static java .util .concurrent .TimeUnit .MILLISECONDS ;
92
94
import static java .util .concurrent .TimeUnit .NANOSECONDS ;
93
95
94
96
@@ -475,6 +477,12 @@ <T> CompletableFuture<T> appendReadEvent(String name, Supplier<T> handler) {
475
477
return event .future ();
476
478
}
477
479
480
+ <T > CompletableFuture <T > appendReadEvent (String name , long deadlineNs , Supplier <T > handler ) {
481
+ ControllerReadEvent <T > event = new ControllerReadEvent <T >(name , handler );
482
+ queue .appendWithDeadline (deadlineNs , event );
483
+ return event .future ();
484
+ }
485
+
478
486
interface ControllerWriteOperation <T > {
479
487
/**
480
488
* Generate the metadata records needed to implement this controller write
@@ -602,11 +610,10 @@ public String toString() {
602
610
}
603
611
604
612
private <T > CompletableFuture <T > appendWriteEvent (String name ,
605
- long timeoutMs ,
613
+ long deadlineNs ,
606
614
ControllerWriteOperation <T > op ) {
607
615
ControllerWriteEvent <T > event = new ControllerWriteEvent <>(name , op );
608
- queue .appendWithDeadline (time .nanoseconds () +
609
- NANOSECONDS .convert (timeoutMs , TimeUnit .MILLISECONDS ), event );
616
+ queue .appendWithDeadline (deadlineNs , event );
610
617
return event .future ();
611
618
}
612
619
@@ -961,8 +968,9 @@ public CompletableFuture<AlterIsrResponseData> alterIsr(AlterIsrRequestData requ
961
968
if (request .topics ().isEmpty ()) {
962
969
return CompletableFuture .completedFuture (new CreateTopicsResponseData ());
963
970
}
964
- return appendWriteEvent ("createTopics" , () ->
965
- replicationControl .createTopics (request ));
971
+ return appendWriteEvent ("createTopics" ,
972
+ time .nanoseconds () + NANOSECONDS .convert (request .timeoutMs (), MILLISECONDS ),
973
+ () -> replicationControl .createTopics (request ));
966
974
}
967
975
968
976
@ Override
@@ -972,23 +980,26 @@ public CompletableFuture<Void> unregisterBroker(int brokerId) {
972
980
}
973
981
974
982
@ Override
975
- public CompletableFuture <Map <String , ResultOrError <Uuid >>> findTopicIds (Collection <String > names ) {
983
+ public CompletableFuture <Map <String , ResultOrError <Uuid >>> findTopicIds (long deadlineNs ,
984
+ Collection <String > names ) {
976
985
if (names .isEmpty ()) return CompletableFuture .completedFuture (Collections .emptyMap ());
977
- return appendReadEvent ("findTopicIds" ,
986
+ return appendReadEvent ("findTopicIds" , deadlineNs ,
978
987
() -> replicationControl .findTopicIds (lastCommittedOffset , names ));
979
988
}
980
989
981
990
@ Override
982
- public CompletableFuture <Map <Uuid , ResultOrError <String >>> findTopicNames (Collection <Uuid > ids ) {
991
+ public CompletableFuture <Map <Uuid , ResultOrError <String >>> findTopicNames (long deadlineNs ,
992
+ Collection <Uuid > ids ) {
983
993
if (ids .isEmpty ()) return CompletableFuture .completedFuture (Collections .emptyMap ());
984
- return appendReadEvent ("findTopicNames" ,
994
+ return appendReadEvent ("findTopicNames" , deadlineNs ,
985
995
() -> replicationControl .findTopicNames (lastCommittedOffset , ids ));
986
996
}
987
997
988
998
@ Override
989
- public CompletableFuture <Map <Uuid , ApiError >> deleteTopics (Collection <Uuid > ids ) {
999
+ public CompletableFuture <Map <Uuid , ApiError >> deleteTopics (long deadlineNs ,
1000
+ Collection <Uuid > ids ) {
990
1001
if (ids .isEmpty ()) return CompletableFuture .completedFuture (Collections .emptyMap ());
991
- return appendWriteEvent ("deleteTopics" ,
1002
+ return appendWriteEvent ("deleteTopics" , deadlineNs ,
992
1003
() -> replicationControl .deleteTopics (ids ));
993
1004
}
994
1005
@@ -1002,7 +1013,13 @@ public CompletableFuture<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> ids)
1002
1013
@ Override
1003
1014
public CompletableFuture <ElectLeadersResponseData >
1004
1015
electLeaders (ElectLeadersRequestData request ) {
1005
- return appendWriteEvent ("electLeaders" , request .timeoutMs (),
1016
+ // If topicPartitions is null, we will try to trigger a new leader election on
1017
+ // all partitions (!). But if it's empty, there is nothing to do.
1018
+ if (request .topicPartitions () != null && request .topicPartitions ().isEmpty ()) {
1019
+ return CompletableFuture .completedFuture (new ElectLeadersResponseData ());
1020
+ }
1021
+ return appendWriteEvent ("electLeaders" ,
1022
+ time .nanoseconds () + NANOSECONDS .convert (request .timeoutMs (), MILLISECONDS ),
1006
1023
() -> replicationControl .electLeaders (request ));
1007
1024
}
1008
1025
@@ -1016,6 +1033,9 @@ public CompletableFuture<FeatureMapAndEpoch> finalizedFeatures() {
1016
1033
public CompletableFuture <Map <ConfigResource , ApiError >> incrementalAlterConfigs (
1017
1034
Map <ConfigResource , Map <String , Entry <OpType , String >>> configChanges ,
1018
1035
boolean validateOnly ) {
1036
+ if (configChanges .isEmpty ()) {
1037
+ return CompletableFuture .completedFuture (Collections .emptyMap ());
1038
+ }
1019
1039
return appendWriteEvent ("incrementalAlterConfigs" , () -> {
1020
1040
ControllerResult <Map <ConfigResource , ApiError >> result =
1021
1041
configurationControl .incrementalAlterConfigs (configChanges );
@@ -1030,17 +1050,24 @@ public CompletableFuture<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
1030
1050
@ Override
1031
1051
public CompletableFuture <AlterPartitionReassignmentsResponseData >
1032
1052
alterPartitionReassignments (AlterPartitionReassignmentsRequestData request ) {
1033
- CompletableFuture <AlterPartitionReassignmentsResponseData > future = new CompletableFuture <>();
1034
- future .completeExceptionally (new UnsupportedOperationException ());
1035
- return future ;
1053
+ if (request .topics ().isEmpty ()) {
1054
+ return CompletableFuture .completedFuture (new AlterPartitionReassignmentsResponseData ());
1055
+ }
1056
+ return appendWriteEvent ("alterPartitionReassignments" ,
1057
+ time .nanoseconds () + NANOSECONDS .convert (request .timeoutMs (), MILLISECONDS ),
1058
+ () -> {
1059
+ throw new UnsupportedOperationException ();
1060
+ });
1036
1061
}
1037
1062
1038
1063
@ Override
1039
1064
public CompletableFuture <ListPartitionReassignmentsResponseData >
1040
1065
listPartitionReassignments (ListPartitionReassignmentsRequestData request ) {
1041
- CompletableFuture <ListPartitionReassignmentsResponseData > future = new CompletableFuture <>();
1042
- future .completeExceptionally (new UnsupportedOperationException ());
1043
- return future ;
1066
+ return appendReadEvent ("listPartitionReassignments" ,
1067
+ time .nanoseconds () + NANOSECONDS .convert (request .timeoutMs (), MILLISECONDS ),
1068
+ () -> {
1069
+ throw new UnsupportedOperationException ();
1070
+ });
1044
1071
}
1045
1072
1046
1073
@ Override
@@ -1118,12 +1145,12 @@ public CompletableFuture<Map<ClientQuotaEntity, ApiError>> alterClientQuotas(
1118
1145
1119
1146
@ Override
1120
1147
public CompletableFuture <List <CreatePartitionsTopicResult >>
1121
- createPartitions (List <CreatePartitionsTopic > topics ) {
1148
+ createPartitions (long deadlineNs , List <CreatePartitionsTopic > topics ) {
1122
1149
if (topics .isEmpty ()) {
1123
1150
return CompletableFuture .completedFuture (Collections .emptyList ());
1124
1151
}
1125
- return appendWriteEvent ("createPartitions" , () ->
1126
- replicationControl .createPartitions (topics ));
1152
+ return appendWriteEvent ("createPartitions" , deadlineNs ,
1153
+ () -> replicationControl .createPartitions (topics ));
1127
1154
}
1128
1155
1129
1156
@ Override
@@ -1165,4 +1192,22 @@ public long curClaimEpoch() {
1165
1192
public void close () throws InterruptedException {
1166
1193
queue .close ();
1167
1194
}
1195
+
1196
+ // VisibleForTesting
1197
+ CountDownLatch pause () {
1198
+ final CountDownLatch latch = new CountDownLatch (1 );
1199
+ appendControlEvent ("pause" , () -> {
1200
+ try {
1201
+ latch .await ();
1202
+ } catch (InterruptedException e ) {
1203
+ log .info ("Interrupted while waiting for unpause." , e );
1204
+ }
1205
+ });
1206
+ return latch ;
1207
+ }
1208
+
1209
+ // VisibleForTesting
1210
+ Time time () {
1211
+ return time ;
1212
+ }
1168
1213
}
0 commit comments