@@ -22,13 +22,14 @@ import org.apache.kafka.common.acl._
22
22
import org .apache .kafka .common .acl .AclOperation .{ALL , ALTER , ALTER_CONFIGS , CLUSTER_ACTION , CREATE , DELETE , DESCRIBE , IDEMPOTENT_WRITE }
23
23
import org .apache .kafka .common .acl .AclPermissionType .{ALLOW , DENY }
24
24
import org .apache .kafka .common .config .{ConfigResource , SaslConfigs , TopicConfig }
25
- import org .apache .kafka .common .errors .{ClusterAuthorizationException , InvalidRequestException , TopicAuthorizationException , UnknownTopicOrPartitionException }
25
+ import org .apache .kafka .common .errors .{ClusterAuthorizationException , DelegationTokenExpiredException , DelegationTokenNotFoundException , InvalidRequestException , TopicAuthorizationException , UnknownTopicOrPartitionException }
26
26
import org .apache .kafka .common .resource .PatternType .LITERAL
27
27
import org .apache .kafka .common .resource .ResourceType .{GROUP , TOPIC }
28
28
import org .apache .kafka .common .resource .{PatternType , Resource , ResourcePattern , ResourcePatternFilter , ResourceType }
29
29
import org .apache .kafka .common .security .auth .{KafkaPrincipal , SecurityProtocol }
30
+ import org .apache .kafka .common .security .token .delegation .DelegationToken
30
31
import org .apache .kafka .security .authorizer .AclEntry .{WILDCARD_HOST , WILDCARD_PRINCIPAL_STRING }
31
- import org .apache .kafka .server .config .{ServerConfigs , ZkConfigs }
32
+ import org .apache .kafka .server .config .{DelegationTokenManagerConfigs , ServerConfigs , ZkConfigs }
32
33
import org .apache .kafka .metadata .authorizer .StandardAuthorizer
33
34
import org .apache .kafka .storage .internals .log .LogConfig
34
35
import org .junit .jupiter .api .Assertions ._
@@ -67,6 +68,10 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
67
68
this .serverConfig.setProperty(AclAuthorizer .SuperUsersProp , kafkaPrincipal.toString)
68
69
}
69
70
71
+ // Enable delegationTokenControlManager
72
+ serverConfig.setProperty(DelegationTokenManagerConfigs .DELEGATION_TOKEN_SECRET_KEY_CONFIG , " 123" )
73
+ serverConfig.setProperty(DelegationTokenManagerConfigs .DELEGATION_TOKEN_MAX_LIFETIME_CONFIG , " 5000" )
74
+
70
75
setUpSasl()
71
76
super .setUp(testInfo)
72
77
setInitialAcls()
@@ -520,6 +525,50 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
520
525
}
521
526
}
522
527
528
+ @ ParameterizedTest
529
+ @ ValueSource (strings = Array (" zk" , " kraft" ))
530
+ def testExpireDelegationToken (quorum : String ): Unit = {
531
+ client = createAdminClient
532
+ val createDelegationTokenOptions = new CreateDelegationTokenOptions ()
533
+
534
+ // Test expiration for non-exists token
535
+ TestUtils .assertFutureExceptionTypeEquals(
536
+ client.expireDelegationToken(" " .getBytes()).expiryTimestamp(),
537
+ classOf [DelegationTokenNotFoundException ]
538
+ )
539
+
540
+ // Test expiring the token immediately
541
+ val token1 = client.createDelegationToken(createDelegationTokenOptions).delegationToken().get()
542
+ TestUtils .retry(maxWaitMs = 1000 ) { assertTrue(expireTokenOrFailWithAssert(token1, - 1 ) < System .currentTimeMillis()) }
543
+
544
+ // Test expiring the expired token
545
+ val token2 = client.createDelegationToken(createDelegationTokenOptions.maxlifeTimeMs(1000 )).delegationToken().get()
546
+ // Ensure current time > maxLifeTimeMs of token
547
+ Thread .sleep(1000 )
548
+ TestUtils .assertFutureExceptionTypeEquals(
549
+ client.expireDelegationToken(token2.hmac(), new ExpireDelegationTokenOptions ().expiryTimePeriodMs(1 )).expiryTimestamp(),
550
+ classOf [DelegationTokenExpiredException ]
551
+ )
552
+
553
+ // Ensure expiring the expired token with negative expiryTimePeriodMs will not throw exception
554
+ assertDoesNotThrow(() => expireTokenOrFailWithAssert(token2, - 1 ))
555
+
556
+ // Test shortening the expiryTimestamp
557
+ val token3 = client.createDelegationToken(createDelegationTokenOptions).delegationToken().get()
558
+ TestUtils .retry(1000 ) { assertTrue(expireTokenOrFailWithAssert(token3, 200 ) < token3.tokenInfo().expiryTimestamp()) }
559
+ }
560
+
561
+ private def expireTokenOrFailWithAssert (token : DelegationToken , expiryTimePeriodMs : Long ): Long = {
562
+ try {
563
+ client.expireDelegationToken(token.hmac(), new ExpireDelegationTokenOptions ().expiryTimePeriodMs(expiryTimePeriodMs))
564
+ .expiryTimestamp().get()
565
+ } catch {
566
+ // If metadata is not synced yet, the response will contain an errorCode, causing an exception to be thrown.
567
+ // This wrapper is designed to work with TestUtils.retry
568
+ case _ : ExecutionException => throw new AssertionError (" Metadata not sync yet." )
569
+ }
570
+ }
571
+
523
572
private def describeConfigs (topic : String ): Iterable [ConfigEntry ] = {
524
573
val topicResource = new ConfigResource (ConfigResource .Type .TOPIC , topic)
525
574
var configEntries : Iterable [ConfigEntry ] = null
0 commit comments