Skip to content

Commit 3a12b7a

Browse files
authored
Merge pull request #1301 from nats-io/kv-max-age-vs-duplicate-window
KV TTL (stream max_age) versus stream duplicate_window
2 parents 0238c9a + c072e81 commit 3a12b7a

File tree

4 files changed

+56
-3
lines changed

4 files changed

+56
-3
lines changed

Diff for: src/main/java/io/nats/client/api/FeatureConfiguration.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,9 @@ public Map<String, String> getMetadata() {
137137
}
138138

139139
protected static abstract class Builder<B, FC> {
140-
String name;
141-
StreamConfiguration.Builder scBuilder;
140+
protected String name;
141+
protected Duration ttl = Duration.ZERO;
142+
protected StreamConfiguration.Builder scBuilder;
142143
protected abstract B getThis();
143144

144145
/**
@@ -177,7 +178,8 @@ protected B maxBucketSize(long maxBucketSize) {
177178
* @return Builder
178179
*/
179180
protected B ttl(Duration ttl) {
180-
scBuilder.maxAge(ttl);
181+
this.ttl = ttl == null ? Duration.ZERO : ttl;
182+
scBuilder.maxAge(this.ttl);
181183
return getThis();
182184
}
183185

Diff for: src/main/java/io/nats/client/api/KeyValueConfiguration.java

+13
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.time.Duration;
2020
import java.util.*;
2121

22+
import static io.nats.client.support.NatsJetStreamConstants.SERVER_DEFAULT_DUPLICATE_WINDOW_MS;
2223
import static io.nats.client.support.NatsKeyValueUtil.*;
2324
import static io.nats.client.support.Validator.*;
2425

@@ -420,6 +421,18 @@ else if (!sources.isEmpty()) {
420421
scBuilder.subjects(toStreamSubject(name));
421422
}
422423

424+
// When stream's MaxAge is not set, server uses 2 minutes as the default
425+
// for the duplicate window. If MaxAge is set, and lower than 2 minutes,
426+
// then the duplicate window will be set to that. If MaxAge is greater,
427+
// we will cap the duplicate window to 2 minutes (to be consistent with
428+
// previous behavior).
429+
long ttlMs = ttl.toMillis();
430+
long dupeMs = SERVER_DEFAULT_DUPLICATE_WINDOW_MS;
431+
if (ttlMs > 0 && ttlMs < SERVER_DEFAULT_DUPLICATE_WINDOW_MS) {
432+
dupeMs = ttlMs;
433+
}
434+
scBuilder.duplicateWindow(dupeMs);
435+
423436
return new KeyValueConfiguration(scBuilder.build());
424437
}
425438
}

Diff for: src/main/java/io/nats/client/support/NatsJetStreamConstants.java

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ public interface NatsJetStreamConstants {
1212
*/
1313
int MAX_HISTORY_PER_KEY = 64;
1414

15+
long SERVER_DEFAULT_DUPLICATE_WINDOW_MS = 120_000; // 1000ms/sec * 60sec/min * 2 min
16+
1517
String PREFIX_DOLLAR_JS_DOT = "$JS.";
1618
String PREFIX_API = "API";
1719
String DEFAULT_API_PREFIX = "$JS.API.";

Diff for: src/test/java/io/nats/client/impl/KeyValueTests.java

+36
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static io.nats.client.api.KeyValuePurgeOptions.DEFAULT_THRESHOLD_MILLIS;
3131
import static io.nats.client.api.KeyValueWatchOption.*;
3232
import static io.nats.client.support.NatsConstants.DOT;
33+
import static io.nats.client.support.NatsJetStreamConstants.SERVER_DEFAULT_DUPLICATE_WINDOW_MS;
3334
import static org.junit.jupiter.api.Assertions.*;
3435

3536
public class KeyValueTests extends JetStreamTestBase {
@@ -1709,5 +1710,40 @@ public void testSubjectFiltersAgainst209OptOut() throws Exception {
17091710
assertKeys(kv.keys(Arrays.asList("one", "two")), "one", "two");
17101711
});
17111712
}
1713+
1714+
@Test
1715+
public void testTtlAndDuplicateWindow() throws Exception {
1716+
jsServer.run(TestBase::atLeast2_10, nc -> {
1717+
KeyValueManagement kvm = nc.keyValueManagement();
1718+
String bucket = bucket();
1719+
KeyValueConfiguration config = KeyValueConfiguration.builder()
1720+
.name(bucket)
1721+
.storageType(StorageType.Memory)
1722+
.build();
1723+
KeyValueStatus status = kvm.create(config);
1724+
1725+
StreamConfiguration sc = status.getBackingStreamInfo().getConfiguration();
1726+
assertEquals(0, sc.getMaxAge().toMillis());
1727+
assertEquals(SERVER_DEFAULT_DUPLICATE_WINDOW_MS, sc.getDuplicateWindow().toMillis());
1728+
1729+
config = KeyValueConfiguration.builder(status.getConfiguration()).ttl(Duration.ofSeconds(10)).build();
1730+
status = kvm.update(config);
1731+
sc = status.getBackingStreamInfo().getConfiguration();
1732+
assertEquals(10_000, sc.getMaxAge().toMillis());
1733+
assertEquals(10_000, sc.getDuplicateWindow().toMillis());
1734+
1735+
bucket = bucket();
1736+
config = KeyValueConfiguration.builder()
1737+
.name(bucket)
1738+
.storageType(StorageType.Memory)
1739+
.ttl(Duration.ofMinutes(30))
1740+
.build();
1741+
status = kvm.create(config);
1742+
1743+
sc = status.getBackingStreamInfo().getConfiguration();
1744+
assertEquals(30, sc.getMaxAge().toMinutes());
1745+
assertEquals(SERVER_DEFAULT_DUPLICATE_WINDOW_MS, sc.getDuplicateWindow().toMillis());
1746+
});
1747+
}
17121748
}
17131749

0 commit comments

Comments
 (0)