diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/config/annotation/AsyncSupportConfigurer.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/config/annotation/AsyncSupportConfigurer.java index a45b99bfcf88..34cb26eb2522 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/config/annotation/AsyncSupportConfigurer.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/config/annotation/AsyncSupportConfigurer.java @@ -16,6 +16,7 @@ package org.springframework.web.servlet.config.annotation; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -32,6 +33,7 @@ * Helps with configuring options for asynchronous request processing. * * @author Rossen Stoyanchev + * @author Réda Housni Alaoui * @since 3.2 */ public class AsyncSupportConfigurer { @@ -44,6 +46,8 @@ public class AsyncSupportConfigurer { private final List deferredResultInterceptors = new ArrayList<>(); + private @Nullable Duration sseHeartbeatPeriod; + /** * The provided task executor is used for the following: @@ -99,6 +103,14 @@ public AsyncSupportConfigurer registerDeferredResultInterceptors( return this; } + /** + * Configure the SSE heartbeat period. + * @param sseHeartbeatPeriod The SSE heartbeat period + */ + public AsyncSupportConfigurer setSseHeartbeatPeriod(Duration sseHeartbeatPeriod) { + this.sseHeartbeatPeriod = sseHeartbeatPeriod; + return this; + } protected @Nullable AsyncTaskExecutor getTaskExecutor() { return this.taskExecutor; @@ -116,4 +128,8 @@ protected List getDeferredResultInterceptor return this.deferredResultInterceptors; } + protected @Nullable Duration getSseHeartbeatPeriod() { + return this.sseHeartbeatPeriod; + } + } diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/config/annotation/WebMvcConfigurationSupport.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/config/annotation/WebMvcConfigurationSupport.java index 8d031b7990a8..207a9ddd34f3 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/config/annotation/WebMvcConfigurationSupport.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/config/annotation/WebMvcConfigurationSupport.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import jakarta.servlet.ServletContext; import org.jspecify.annotations.Nullable; @@ -693,6 +694,7 @@ public RequestMappingHandlerAdapter requestMappingHandlerAdapter( } adapter.setCallableInterceptors(configurer.getCallableInterceptors()); adapter.setDeferredResultInterceptors(configurer.getDeferredResultInterceptors()); + Optional.ofNullable(configurer.getSseHeartbeatPeriod()).ifPresent(adapter::setSseHeartbeatPeriod); return adapter; } diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/RequestMappingHandlerAdapter.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/RequestMappingHandlerAdapter.java index d20d9559ff69..8a3aa2ea1399 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/RequestMappingHandlerAdapter.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/RequestMappingHandlerAdapter.java @@ -17,11 +17,13 @@ package org.springframework.web.servlet.mvc.method.annotation; import java.lang.reflect.Method; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -54,6 +56,8 @@ import org.springframework.http.converter.HttpMessageConverter; import org.springframework.http.converter.StringHttpMessageConverter; import org.springframework.http.converter.support.AllEncompassingFormHttpMessageConverter; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import org.springframework.ui.ModelMap; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; @@ -123,6 +127,7 @@ * @author Rossen Stoyanchev * @author Juergen Hoeller * @author Sebastien Deleuze + * @author Réda Housni Alaoui * @since 3.1 * @see HandlerMethodArgumentResolver * @see HandlerMethodReturnValueHandler @@ -201,6 +206,9 @@ public class RequestMappingHandlerAdapter extends AbstractHandlerMethodAdapter private final Map> modelAttributeAdviceCache = new LinkedHashMap<>(); + private TaskScheduler taskScheduler = new SimpleAsyncTaskScheduler(); + + private @Nullable Duration sseHeartbeatPeriod; /** * Provide resolvers for custom argument types. Custom resolvers are ordered @@ -526,6 +534,20 @@ public void setParameterNameDiscoverer(ParameterNameDiscoverer parameterNameDisc this.parameterNameDiscoverer = parameterNameDiscoverer; } + /** + * Set the {@link TaskScheduler} + */ + public void setTaskScheduler(TaskScheduler taskScheduler) { + this.taskScheduler = taskScheduler; + } + + /** + * Sets the heartbeat period that will be used to periodically prob the SSE connection health + */ + public void setSseHeartbeatPeriod(@Nullable Duration sseHeartbeatPeriod) { + this.sseHeartbeatPeriod = sseHeartbeatPeriod; + } + /** * A {@link ConfigurableBeanFactory} is expected for resolving expressions * in method argument default values. @@ -733,9 +755,12 @@ private List getDefaultReturnValueHandlers() { handlers.add(new ModelAndViewMethodReturnValueHandler()); handlers.add(new ModelMethodProcessor()); handlers.add(new ViewMethodReturnValueHandler()); + + SseEmitterHeartbeatExecutor sseEmitterHeartbeatExecutor = Optional.ofNullable(sseHeartbeatPeriod) + .map(period -> new SseEmitterHeartbeatExecutor(taskScheduler, period)).orElse(null); handlers.add(new ResponseBodyEmitterReturnValueHandler(getMessageConverters(), this.reactiveAdapterRegistry, this.taskExecutor, this.contentNegotiationManager, - initViewResolvers(), initLocaleResolver())); + initViewResolvers(), initLocaleResolver(), sseEmitterHeartbeatExecutor)); handlers.add(new StreamingResponseBodyReturnValueHandler()); handlers.add(new HttpEntityMethodProcessor(getMessageConverters(), this.contentNegotiationManager, this.requestResponseBodyAdvice, this.errorResponseInterceptors)); diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandler.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandler.java index 5f59fcdf9440..84812ee20325 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandler.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandler.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.Optional; import java.util.Set; import java.util.function.Consumer; @@ -89,6 +90,7 @@ * * * @author Rossen Stoyanchev + * @author Réda Housni Alaoui * @since 4.2 */ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodReturnValueHandler { @@ -101,6 +103,8 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur private final LocaleResolver localeResolver; + @Nullable + private final SseEmitterHeartbeatExecutor sseEmitterHeartbeatExecutor; /** * Simple constructor with reactive type support based on a default instance of @@ -143,11 +147,32 @@ public ResponseBodyEmitterReturnValueHandler( ReactiveAdapterRegistry registry, TaskExecutor executor, ContentNegotiationManager manager, List viewResolvers, @Nullable LocaleResolver localeResolver) { + this(messageConverters, registry, executor, manager, viewResolvers, localeResolver, null); + } + + /** + * Constructor that with added arguments for view rendering. + * @param messageConverters converters to write emitted objects with + * @param registry for reactive return value type support + * @param executor for blocking I/O writes of items emitted from reactive types + * @param manager for detecting streaming media types + * @param viewResolvers resolvers for fragment stream rendering + * @param localeResolver the {@link LocaleResolver} for fragment stream rendering + * @param sseEmitterHeartbeatExecutor for sending periodic events to SSE clients + * @since 6.2 + */ + public ResponseBodyEmitterReturnValueHandler( + List> messageConverters, + ReactiveAdapterRegistry registry, TaskExecutor executor, ContentNegotiationManager manager, + List viewResolvers, @Nullable LocaleResolver localeResolver, + @Nullable SseEmitterHeartbeatExecutor sseEmitterHeartbeatExecutor) { + Assert.notEmpty(messageConverters, "HttpMessageConverter List must not be empty"); this.sseMessageConverters = initSseConverters(messageConverters); this.reactiveHandler = new ReactiveTypeHandler(registry, executor, manager, null); this.viewResolvers = viewResolvers; this.localeResolver = (localeResolver != null ? localeResolver : new AcceptHeaderLocaleResolver()); + this.sseEmitterHeartbeatExecutor = sseEmitterHeartbeatExecutor; } private static List> initSseConverters(List> converters) { @@ -239,6 +264,9 @@ public void handleReturnValue(@Nullable Object returnValue, MethodParameter retu } emitter.initialize(emitterHandler); + if (emitter instanceof SseEmitter sseEmitter) { + Optional.ofNullable(sseEmitterHeartbeatExecutor).ifPresent(handler -> handler.register(sseEmitter)); + } } diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java index 6cfda938f34e..23b06ee3233a 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java @@ -18,14 +18,18 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Collections; import java.util.LinkedHashSet; +import java.util.Optional; import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.jspecify.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.server.ServerHttpResponse; @@ -41,10 +45,13 @@ * @author Juergen Hoeller * @author Sam Brannen * @author Brian Clozel + * @author Réda Housni Alaoui * @since 4.2 */ public class SseEmitter extends ResponseBodyEmitter { + private static final Logger LOGGER = LoggerFactory.getLogger(SseEmitter.class); + private static final MediaType TEXT_PLAIN = new MediaType("text", "plain", StandardCharsets.UTF_8); /** @@ -52,6 +59,8 @@ public class SseEmitter extends ResponseBodyEmitter { */ private final Lock writeLock = new ReentrantLock(); + private volatile @Nullable Long lastEmissionNanoTime; + /** * Create a new SseEmitter instance. */ @@ -134,12 +143,31 @@ public void send(SseEventBuilder builder) throws IOException { this.writeLock.lock(); try { super.send(dataToSend); + this.lastEmissionNanoTime = System.nanoTime(); } finally { this.writeLock.unlock(); } } + void notifyOfHeartbeatTick(Duration heartbeatPeriod) { + boolean skip = Optional.ofNullable(lastEmissionNanoTime) + .map(lastEmissionNanoTime -> System.nanoTime() - lastEmissionNanoTime) + .map(nanoTimeElapsedSinceLastEmission -> nanoTimeElapsedSinceLastEmission < heartbeatPeriod.toNanos()) + .orElse(false); + if (skip) { + return; + } + LOGGER.trace("Sending heartbeat to {}", this); + SseEmitter.SseEventBuilder eventBuilder = SseEmitter.event().name("ping").data("ping", MediaType.TEXT_PLAIN); + try { + send(eventBuilder); + } catch (IOException | RuntimeException e) { + // According to SseEmitter's Javadoc, the container itself will call SseEmitter#completeWithError + LOGGER.debug(e.getMessage()); + } + } + @Override public String toString() { return "SseEmitter@" + ObjectUtils.getIdentityHexString(this); diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitterHeartbeatExecutor.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitterHeartbeatExecutor.java new file mode 100644 index 000000000000..084ec89c3a36 --- /dev/null +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitterHeartbeatExecutor.java @@ -0,0 +1,89 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.servlet.mvc.method.annotation; + + +import java.time.Duration; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; + +import org.jspecify.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.TaskScheduler; + +/** + * @author Réda Housni Alaoui + */ +class SseEmitterHeartbeatExecutor { + + private static final Logger LOGGER = LoggerFactory.getLogger(SseEmitterHeartbeatExecutor.class); + + private final TaskScheduler taskScheduler; + private final Set emitters = ConcurrentHashMap.newKeySet(); + + private final Object lifecycleMonitor = new Object(); + + private final Duration period; + + @Nullable + private volatile ScheduledFuture taskFuture; + + public SseEmitterHeartbeatExecutor(TaskScheduler taskScheduler, Duration period) { + this.taskScheduler = taskScheduler; + this.period = period; + } + + public void register(SseEmitter emitter) { + startIfNeeded(); + + Runnable closeCallback = () -> emitters.remove(emitter); + emitter.onCompletion(closeCallback); + emitter.onError(t -> closeCallback.run()); + emitter.onTimeout(closeCallback); + + emitters.add(emitter); + } + + boolean isRegistered(SseEmitter emitter) { + return emitters.contains(emitter); + } + + private void startIfNeeded() { + if (taskFuture != null) { + return; + } + synchronized (lifecycleMonitor) { + if (taskFuture != null) { + return; + } + taskFuture = taskScheduler.scheduleAtFixedRate(this::notifyEmitters, period); + } + } + + private void notifyEmitters() { + LOGGER.atDebug().log(() -> "Notifying %s emitter(s)".formatted(emitters.size())); + + for (SseEmitter emitter : emitters) { + if (Thread.currentThread().isInterrupted()) { + return; + } + emitter.notifyOfHeartbeatTick(period); + } + } +} diff --git a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitterHeartbeatExecutorTests.java b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitterHeartbeatExecutorTests.java new file mode 100644 index 000000000000..d3b39c30a122 --- /dev/null +++ b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitterHeartbeatExecutorTests.java @@ -0,0 +1,329 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.servlet.mvc.method.annotation; + +import static org.assertj.core.api.Assertions.*; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Delayed; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import org.jetbrains.annotations.NotNull; +import org.jspecify.annotations.Nullable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.http.MediaType; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.Trigger; + +/** + * @author Réda Housni Alaoui + */ +class SseEmitterHeartbeatExecutorTests { + + private static final MediaType TEXT_PLAIN_UTF8 = new MediaType("text", "plain", StandardCharsets.UTF_8); + + private TestTaskScheduler taskScheduler; + + @BeforeEach + void beforeEach() { + this.taskScheduler = new TestTaskScheduler(); + } + + @Test + @DisplayName("It sends heartbeat at a fixed rate") + void test1() { + SseEmitterHeartbeatExecutor executor = new SseEmitterHeartbeatExecutor(taskScheduler, Duration.ofSeconds(5)); + + TestEmitter emitter = createEmitter(); + executor.register(emitter.emitter()); + assertThat(taskScheduler.fixedRateTask).isNotNull(); + assertThat(taskScheduler.fixedRatePeriod).isEqualTo(Duration.ofSeconds(5)); + taskScheduler.fixedRateTask.run(); + + emitter.handler.assertSentObjectCount(3); + emitter.handler.assertObject(0, "event:ping\ndata:", TEXT_PLAIN_UTF8); + emitter.handler.assertObject(1, "ping", MediaType.TEXT_PLAIN); + emitter.handler.assertObject(2, "\n\n", TEXT_PLAIN_UTF8); + emitter.handler.assertWriteCount(1); + } + + @Test + @DisplayName("Emitter is unregistered on completion") + void test2() { + SseEmitterHeartbeatExecutor executor = new SseEmitterHeartbeatExecutor(taskScheduler, Duration.ofSeconds(5)); + + TestEmitter emitter = createEmitter(); + executor.register(emitter.emitter()); + + assertThat(executor.isRegistered(emitter.emitter)).isTrue(); + emitter.emitter.complete(); + assertThat(executor.isRegistered(emitter.emitter)).isFalse(); + } + + @Test + @DisplayName("Emitter is unregistered on error") + void test3() { + SseEmitterHeartbeatExecutor executor = new SseEmitterHeartbeatExecutor(taskScheduler, Duration.ofSeconds(5)); + + TestEmitter emitter = createEmitter(); + executor.register(emitter.emitter()); + + assertThat(executor.isRegistered(emitter.emitter)).isTrue(); + emitter.emitter.completeWithError(new RuntimeException()); + assertThat(executor.isRegistered(emitter.emitter)).isFalse(); + } + + @Test + @DisplayName("Emitter is unregistered on timeout") + void test4() { + SseEmitterHeartbeatExecutor executor = new SseEmitterHeartbeatExecutor(taskScheduler, Duration.ofSeconds(5)); + + TestEmitter emitter = createEmitter(); + executor.register(emitter.emitter()); + + assertThat(executor.isRegistered(emitter.emitter)).isTrue(); + emitter.handler.completeWithTimeout(); + assertThat(executor.isRegistered(emitter.emitter)).isFalse(); + } + + @Test + @DisplayName("Emitters are unregistered on executor shutdown") + void test5() { + SseEmitterHeartbeatExecutor executor = new SseEmitterHeartbeatExecutor(taskScheduler, Duration.ofSeconds(5)); + + TestEmitter emitter = createEmitter(); + executor.register(emitter.emitter()); + + assertThat(executor.isRegistered(emitter.emitter)).isTrue(); + } + + @Test + @DisplayName("The task never throws") + void test6() { + SseEmitterHeartbeatExecutor executor = new SseEmitterHeartbeatExecutor(taskScheduler, Duration.ofSeconds(5)); + + TestEmitter emitter = createEmitter(); + executor.register(emitter.emitter()); + assertThat(taskScheduler.fixedRateTask).isNotNull(); + emitter.handler.exceptionToThrowOnSend = new RuntimeException(); + + assertThatCode(() -> taskScheduler.fixedRateTask.run()).doesNotThrowAnyException(); + } + + @Test + @DisplayName("The heartbeat rate can be customized") + void test7() { + SseEmitterHeartbeatExecutor executor = new SseEmitterHeartbeatExecutor(taskScheduler, Duration.ofSeconds(30)); + TestEmitter emitter = createEmitter(); + executor.register(emitter.emitter()); + assertThat(taskScheduler.fixedRateTask).isNotNull(); + assertThat(taskScheduler.fixedRatePeriod).isEqualTo(Duration.ofSeconds(30)); + } + + private TestEmitter createEmitter() { + SseEmitter sseEmitter = new SseEmitter(); + TestEmitterHandler handler = new TestEmitterHandler(); + try { + sseEmitter.initialize(handler); + } catch (IOException e) { + throw new RuntimeException(e); + } + return new TestEmitter(sseEmitter, handler); + } + + private record TestEmitter(SseEmitter emitter, TestEmitterHandler handler) { + + } + + private static class TestEmitterHandler implements ResponseBodyEmitter.Handler { + + private final List objects = new ArrayList<>(); + + private final List<@Nullable MediaType> mediaTypes = new ArrayList<>(); + + private final List timeoutCallbacks = new ArrayList<>(); + private final List completionCallbacks = new ArrayList<>(); + private final List> errorCallbacks = new ArrayList<>(); + + private int writeCount; + @Nullable + private RuntimeException exceptionToThrowOnSend; + + public void assertSentObjectCount(int size) { + assertThat(this.objects).hasSize(size); + } + + public void assertObject(int index, Object object, MediaType mediaType) { + assertThat(index).isLessThanOrEqualTo(this.objects.size()); + assertThat(this.objects.get(index)).isEqualTo(object); + assertThat(this.mediaTypes.get(index)).isEqualTo(mediaType); + } + + public void assertWriteCount(int writeCount) { + assertThat(this.writeCount).isEqualTo(writeCount); + } + + @Override + public void send(Object data, @Nullable MediaType mediaType) { + failSendIfNeeded(); + this.objects.add(data); + this.mediaTypes.add(mediaType); + this.writeCount++; + } + + @Override + public void send(Set items) { + failSendIfNeeded(); + for (ResponseBodyEmitter.DataWithMediaType item : items) { + this.objects.add(item.getData()); + this.mediaTypes.add(item.getMediaType()); + } + this.writeCount++; + } + + private void failSendIfNeeded() { + Optional.ofNullable(exceptionToThrowOnSend) + .ifPresent(e -> { + throw e; + }); + } + + @Override + public void onCompletion(Runnable callback) { + completionCallbacks.add(callback); + } + + @Override + public void onTimeout(Runnable callback) { + timeoutCallbacks.add(callback); + } + + @Override + public void onError(Consumer callback) { + errorCallbacks.add(callback); + } + + @Override + public void complete() { + completionCallbacks.forEach(Runnable::run); + } + + @Override + public void completeWithError(Throwable failure) { + errorCallbacks.forEach(consumer -> consumer.accept(failure)); + } + + public void completeWithTimeout() { + timeoutCallbacks.forEach(Runnable::run); + } + } + + private static class TestTaskScheduler implements TaskScheduler { + + @Nullable + private Runnable fixedRateTask; + @Nullable + private Duration fixedRatePeriod; + private final TestScheduledFuture fixedRateFuture = new TestScheduledFuture<>(); + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period) { + this.fixedRateTask = task; + this.fixedRatePeriod = period; + return fixedRateFuture; + } + + @Override + public ScheduledFuture schedule(Runnable task, Trigger trigger) { + throw new UnsupportedOperationException(); + } + + @Override + public ScheduledFuture schedule(Runnable task, Instant startTime) { + throw new UnsupportedOperationException(); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) { + throw new UnsupportedOperationException(); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay) { + throw new UnsupportedOperationException(); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable task, Duration delay) { + throw new UnsupportedOperationException(); + } + } + + private static class TestScheduledFuture implements ScheduledFuture { + + private boolean canceled; + private boolean interrupted; + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + canceled = true; + interrupted = mayInterruptIfRunning; + return true; + } + + @Override + public long getDelay(@NotNull TimeUnit timeUnit) { + throw new UnsupportedOperationException(); + } + + @Override + public int compareTo(@NotNull Delayed delayed) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isCancelled() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDone() { + throw new UnsupportedOperationException(); + } + + @Override + public T get() { + throw new UnsupportedOperationException(); + } + + @Override + public T get(long l, @NotNull TimeUnit timeUnit) { + throw new UnsupportedOperationException(); + } + } +}