diff --git a/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java b/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java index 15f5558c..a521b2d8 100644 --- a/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java +++ b/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java @@ -17,7 +17,19 @@ import dev.failsafe.spi.Scheduler; -import java.util.concurrent.*; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import static java.util.concurrent.ForkJoinPool.commonPool; /** * A {@link Scheduler} implementation that schedules delays on an internal, common ScheduledExecutorService and executes @@ -33,52 +45,83 @@ * @author Ben Manes */ public final class DelegatingScheduler implements Scheduler { - public static final DelegatingScheduler INSTANCE = new DelegatingScheduler(); - private static volatile ForkJoinPool FORK_JOIN_POOL; - private static volatile ScheduledThreadPoolExecutor DELAYER; + public static final DelegatingScheduler INSTANCE = new DelegatingScheduler(null,null); private final ExecutorService executorService; + private final ScheduledExecutorService scheduler; + private final int executorType; + + private static final int EX_FORK_JOIN = 1; + private static final int EX_COMMON = 4; + private static final int EX_INTERNAL = 8; - private DelegatingScheduler() { - this.executorService = null; - } public DelegatingScheduler(ExecutorService executor) { - this.executorService = executor; + this(executor, null); + } + + public DelegatingScheduler(ExecutorService executor, ScheduledExecutorService scheduler) { + if (executor == null || executor == commonPool()) { + if (ForkJoinPool.getCommonPoolParallelism() > 1) {// @see CompletableFuture#useCommonPool + executorService = commonPool(); + executorType = EX_COMMON | EX_FORK_JOIN; + + } else {// don't use commonPool(): cannot support parallelism + executorService = null; + executorType = EX_INTERNAL | EX_FORK_JOIN; + } + } else { + executorService = executor; + executorType = executor instanceof ForkJoinPool ? EX_FORK_JOIN + : 0; + } + this.scheduler = scheduler; } - private static final class DelayerThreadFactory implements ThreadFactory { - public Thread newThread(Runnable r) { - Thread t = new Thread(r); + DelegatingScheduler (byte flags) { + executorService = null; executorType = flags; scheduler = null; + }//new for tests + + private static final class LazyDelayerHolder extends ScheduledThreadPoolExecutor implements ThreadFactory { + private static final ScheduledThreadPoolExecutor DELAYER = new LazyDelayerHolder(); + + public LazyDelayerHolder(){ + super(1); + setThreadFactory(this); + setRemoveOnCancelPolicy(true); + } + + @Override public Thread newThread(Runnable r) { + Thread t = new Thread(r, "FailsafeDelayScheduler"); t.setDaemon(true); - t.setName("FailsafeDelayScheduler"); return t; } } + private static final class LazyForkJoinPoolHolder { + private static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool( + Math.max(Runtime.getRuntime().availableProcessors(), 2), + ForkJoinPool.defaultForkJoinWorkerThreadFactory, + null, true/*asyncMode*/); + } + static final class ScheduledCompletableFuture extends CompletableFuture implements ScheduledFuture { // Guarded by this volatile Future delegate; // Guarded by this Thread forkJoinPoolThread; - private final long time; - - ScheduledCompletableFuture(long delay, TimeUnit unit) { - this.time = System.nanoTime() + unit.toNanos(delay); - } @Override - public long getDelay(TimeUnit unit) { - return unit.convert(time - System.nanoTime(), TimeUnit.NANOSECONDS); + public long getDelay(TimeUnit unit){ + Future f = delegate; + return f instanceof Delayed ? ((Delayed) f).getDelay(unit) + : 0; // we are executing now } @Override public int compareTo(Delayed other) { - if (other == this) { + if (other == this)// ScheduledFuture gives no extra info return 0; - } else if (other instanceof ScheduledCompletableFuture) { - return Long.compare(time, ((ScheduledCompletableFuture) other).time); - } return Long.compare(getDelay(TimeUnit.NANOSECONDS), other.getDelay(TimeUnit.NANOSECONDS)); } @@ -93,75 +136,83 @@ public boolean cancel(boolean mayInterruptIfRunning) { } return result; } - } + }//ScheduledCompletableFuture - private static ScheduledExecutorService delayer() { - if (DELAYER == null) { - synchronized (DelegatingScheduler.class) { - if (DELAYER == null) { - ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor(1, new DelayerThreadFactory()); - delayer.setRemoveOnCancelPolicy(true); - DELAYER = delayer; - } - } - } - return DELAYER; + private ScheduledExecutorService delayer() { + return scheduler != null ? scheduler + : LazyDelayerHolder.DELAYER; } private ExecutorService executorService() { - if (executorService != null) - return executorService; - if (FORK_JOIN_POOL == null) { - synchronized (DelegatingScheduler.class) { - if (FORK_JOIN_POOL == null) { - if (ForkJoinPool.getCommonPoolParallelism() > 1) - FORK_JOIN_POOL = ForkJoinPool.commonPool(); - else - FORK_JOIN_POOL = new ForkJoinPool(2); - } - } - } - return FORK_JOIN_POOL; + return executorService != null ? executorService + : LazyForkJoinPoolHolder.FORK_JOIN_POOL; } - @Override - @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override @SuppressWarnings({ "unchecked", "rawtypes" }) public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - ScheduledCompletableFuture promise = new ScheduledCompletableFuture<>(delay, unit); - ExecutorService es = executorService(); - boolean isForkJoinPool = es instanceof ForkJoinPool; - Callable completingCallable = () -> { - try { - if (isForkJoinPool) { - // Guard against race with promise.cancel + ScheduledCompletableFuture promise = new ScheduledCompletableFuture<>(); + final Callable completingCallable = (executorType & EX_FORK_JOIN) == EX_FORK_JOIN + ? () -> { + try { + // Guard against race with promise.cancel synchronized (promise) { promise.forkJoinPoolThread = Thread.currentThread(); } - } - promise.complete(callable.call()); - } catch (Throwable t) { - promise.completeExceptionally(t); - } finally { - if (isForkJoinPool) { + promise.complete(callable.call()); + } catch (Throwable t) { + promise.completeExceptionally(t); + } finally { synchronized (promise) { promise.forkJoinPoolThread = null; } } - } - return null; - }; - - if (delay == 0) - promise.delegate = es.submit(completingCallable); - else - promise.delegate = delayer().schedule(() -> { - // Guard against race with promise.cancel - synchronized (promise) { + return null; + }// else not ForkJoin BTW: but why? Other ExecutorServices also support cancellation + : () ->{ + try { + promise.complete(callable.call()); + } catch (Throwable t) { + promise.completeExceptionally(t); + } + return null; + }; + + if (delay <= 0) { + promise.delegate = executorService().submit(completingCallable); + return promise; + } + + final Callable r;// use less memory: don't capture variable with commonPool + + if ((executorType & EX_COMMON) == EX_COMMON) + r = ()->{ // Guard against race with promise.cancel + synchronized(promise) { if (!promise.isCancelled()) - promise.delegate = es.submit(completingCallable); + promise.delegate = commonPool().submit(completingCallable); + } + return null; + }; + + else if ((executorType & EX_INTERNAL) == EX_INTERNAL) + r = ()->{// Guard against race with promise.cancel + synchronized(promise) { + if (!promise.isCancelled()) + promise.delegate = LazyForkJoinPoolHolder.FORK_JOIN_POOL.submit(completingCallable); } - }, delay, unit); + return null; + }; + else { + final ExecutorService es = executorService(); + r = ()->{// Guard against race with promise.cancel + synchronized(promise){ + if (!promise.isCancelled()) + promise.delegate = es.submit(completingCallable); + } + return null; + }; + } + promise.delegate = delayer().schedule(r, delay, unit); return promise; } } \ No newline at end of file diff --git a/core/src/test/java/dev/failsafe/internal/util/DelegatingSchedulerTest.java b/core/src/test/java/dev/failsafe/internal/util/DelegatingSchedulerTest.java index 0a3f8559..795e450f 100644 --- a/core/src/test/java/dev/failsafe/internal/util/DelegatingSchedulerTest.java +++ b/core/src/test/java/dev/failsafe/internal/util/DelegatingSchedulerTest.java @@ -15,18 +15,31 @@ */ package dev.failsafe.internal.util; -import net.jodah.concurrentunit.Waiter; -import dev.failsafe.testing.Asserts; import dev.failsafe.spi.Scheduler; +import dev.failsafe.testing.Asserts; +import net.jodah.concurrentunit.Waiter; import org.testng.annotations.Test; import java.io.IOException; import java.time.Duration; -import java.util.concurrent.*; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableScheduledFuture; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; +import static org.testng.Assert.fail; @Test public class DelegatingSchedulerTest { @@ -42,7 +55,7 @@ public void shouldSchedule() throws Throwable { scheduler.schedule(() -> { waiter.resume(); return null; - }, delay.toMillis(), TimeUnit.MILLISECONDS); + }, delay.toMillis(), MILLISECONDS); // Then waiter.await(1000); @@ -52,11 +65,11 @@ public void shouldSchedule() throws Throwable { public void shouldWrapCheckedExceptions() { Asserts.assertThrows(() -> scheduler.schedule(() -> { throw new IOException(); - }, 1, TimeUnit.MILLISECONDS).get(), ExecutionException.class, IOException.class); + }, 1, MILLISECONDS).get(), ExecutionException.class, IOException.class); } public void shouldNotInterruptAlreadyDoneTask() throws Throwable { - Future future1 = scheduler.schedule(() -> null, 0, TimeUnit.MILLISECONDS); + Future future1 = scheduler.schedule(() -> null, 0, MILLISECONDS); Thread.sleep(100); assertFalse(future1.cancel(true)); } @@ -75,7 +88,7 @@ public void shouldClearInterruptFlagInForkJoinPoolThreads() throws Throwable { waiter.resume(); Thread.sleep(10000); return null; - }, 0, TimeUnit.MILLISECONDS); + }, 0, MILLISECONDS); waiter.await(1000); threadRef.get().interrupt(); @@ -84,7 +97,170 @@ public void shouldClearInterruptFlagInForkJoinPoolThreads() throws Throwable { waiter.assertFalse(Thread.currentThread().isInterrupted()); waiter.resume(); return null; - }, 0, TimeUnit.MILLISECONDS); + }, 0, MILLISECONDS); waiter.await(1000); } -} + + + @Test + public void testInternalPool() throws TimeoutException, ExecutionException, InterruptedException{ + DelegatingScheduler ds = new DelegatingScheduler((byte) 8);// internal, not ForkJoin + + Waiter waiter = new Waiter(); + + ScheduledFuture sf = ds.schedule(()->{ + waiter.rethrow(new IOException("OK! testInternalPool")); + return 42; + }, 5, MILLISECONDS); + + try { + waiter.await(1000); + fail(); + } catch (Throwable e) { + assertEquals(e.toString(), "java.io.IOException: OK! testInternalPool"); + } + assertTrue(sf.isDone()); + + try { + sf.get(); + fail(); + } catch (ExecutionException e) { + assertEquals(e.toString(), "java.util.concurrent.ExecutionException: java.io.IOException: OK! testInternalPool"); + } + } + + @Test + public void testExternalScheduler() throws TimeoutException, ExecutionException, InterruptedException{ + ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(1); + DelegatingScheduler ds = new DelegatingScheduler(stpe, stpe); + + Waiter waiter = new Waiter(); + + ScheduledFuture sf1 = ds.schedule(()->{ + waiter.rethrow(new IOException("OK! fail 1")); + return 42; + }, 3, TimeUnit.SECONDS); + ScheduledFuture sf2 = ds.schedule(()->{ + waiter.rethrow(new IOException("OK! fail 2 fast")); + return 42; + }, 1, TimeUnit.SECONDS); + assertEquals(1, sf1.compareTo(sf2)); + assertEquals(0, sf1.compareTo(sf1)); + assertTrue(sf1.getDelay(MILLISECONDS) > 2000); + + try { + waiter.await(3200); + fail(); + } catch (Throwable e) { + assertEquals(e.toString(), "java.io.IOException: OK! fail 2 fast"); + } + assertTrue(sf2.isDone()); + Thread.sleep(2500);//3-1 = 2 for slow sf1 + assertTrue(sf1.isDone()); + + try { + sf1.get(); + fail(); + } catch (ExecutionException e) { + assertEquals(e.toString(), "java.util.concurrent.ExecutionException: java.io.IOException: OK! fail 1"); + } + try { + sf2.get(); + fail(); + } catch (ExecutionException e) { + assertEquals(e.toString(), "java.util.concurrent.ExecutionException: java.io.IOException: OK! fail 2 fast"); + } + assertEquals(stpe.shutdownNow().size(), 0); + + assertEquals(-1, sf2.compareTo(sf1)); + } + + + @Test + public void testScheduleAndWork() throws TimeoutException, ExecutionException, InterruptedException{ + Waiter w = new Waiter(); + ScheduledFuture sf1 = DelegatingScheduler.INSTANCE.schedule(()->{ + w.resume();// after ~ 1 sec + Thread.sleep(5000);//hard work + w.resume(); + return 42; + }, 1, TimeUnit.SECONDS); + + ScheduledFuture sf2 = DelegatingScheduler.INSTANCE.schedule(()->112, 3, TimeUnit.SECONDS); + + assertTrue(sf1.getDelay(MILLISECONDS) > 600); + assertTrue(sf2.getDelay(MILLISECONDS) > 2600); + assertEquals(-1, sf1.compareTo(sf2));// 1 sec < 3 sec + assertFalse(sf1.isDone()); + assertFalse(sf2.isDone()); + + w.await(1200, 1);// sf1 in normal executor + + assertEquals(-1, sf1.compareTo(sf2));// 1 sec < 3 sec + assertEquals(sf1.getDelay(MILLISECONDS), 0); + assertTrue(sf2.getDelay(MILLISECONDS) > 1600); + assertFalse(sf1.isDone()); + assertFalse(sf2.isDone()); + + w.await(5200, 1);// sf2 is done + + assertEquals(0, sf1.compareTo(sf2));// no more time info inside + assertEquals(sf1.getDelay(MILLISECONDS), 0); + assertEquals(sf2.getDelay(MILLISECONDS), 0); + assertTrue(sf1.isDone()); + assertTrue(sf2.isDone()); + assertEquals(42, sf1.get()); + assertEquals(112, sf2.get()); + } + + @Test + public void testScheduleAndCancel() throws TimeoutException, ExecutionException, InterruptedException{ + Waiter w = new Waiter(); + ScheduledFuture sf1 = DelegatingScheduler.INSTANCE.schedule(()->{ + w.resume();// after ~ 1 sec + Thread.sleep(5000);//hard work + w.resume(); + return 42; + }, 1, TimeUnit.SECONDS); + + ScheduledFuture sf2 = DelegatingScheduler.INSTANCE.schedule(()->112, 3, TimeUnit.SECONDS); + + assertTrue(sf1.getDelay(MILLISECONDS) > 600); + assertTrue(sf2.getDelay(MILLISECONDS) > 2600); + assertEquals(-1, sf1.compareTo(sf2));// 1 sec < 3 sec + assertFalse(sf1.isDone()); + assertFalse(sf2.isDone()); + + w.await(1200, 1);// sf1 in normal executor + + assertEquals(-1, sf1.compareTo(sf2));// 1 sec < 3 sec + assertEquals(sf1.getDelay(MILLISECONDS), 0); + assertTrue(sf2.getDelay(MILLISECONDS) > 1600); + assertFalse(sf1.isDone()); + assertFalse(sf2.isDone()); + + sf1.cancel(true); + sf2.cancel(true); + + assertEquals(-1, sf1.compareTo(sf2));// time info inside in sf2's delegate + assertEquals(sf1.getDelay(MILLISECONDS), 0); + assertTrue(sf2.getDelay(MILLISECONDS) > 1000); + assertTrue(sf1.isDone()); + assertTrue(sf2.isDone()); + assertTrue(sf1.isCancelled()); + assertTrue(sf2.isCancelled()); + CancellationException c1 = expectThrows(CancellationException.class, sf1::get); + CancellationException c2 = expectThrows(CancellationException.class, sf2::get); + DelegatingScheduler.ScheduledCompletableFuture scf1 = (DelegatingScheduler.ScheduledCompletableFuture) sf1; + DelegatingScheduler.ScheduledCompletableFuture scf2 = (DelegatingScheduler.ScheduledCompletableFuture) sf2; + + assertTrue(scf1.delegate instanceof ForkJoinTask);// was executing + ForkJoinTask task1 = (ForkJoinTask) scf1.delegate; + assertTrue(scf2.delegate instanceof RunnableScheduledFuture);// was in scheduler's delayQueue + RunnableScheduledFuture task2 = (RunnableScheduledFuture) scf2.delegate; + assertTrue(task1.isCompletedAbnormally()); + assertTrue(task1.isCancelled()); + assertTrue(task2.isCancelled()); + } + +} \ No newline at end of file