diff --git a/src/main/java/me/ronshapiro/rx/priority/PriorityScheduler.java b/src/main/java/me/ronshapiro/rx/priority/PriorityScheduler.java index ef4a048..a788317 100644 --- a/src/main/java/me/ronshapiro/rx/priority/PriorityScheduler.java +++ b/src/main/java/me/ronshapiro/rx/priority/PriorityScheduler.java @@ -1,18 +1,19 @@ package me.ronshapiro.rx.priority; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import rx.Scheduler; -import rx.Scheduler.Worker; -import rx.Subscription; -import rx.functions.Action0; -import rx.internal.schedulers.ScheduledAction; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.Subscriptions; +import io.reactivex.Scheduler; +import io.reactivex.annotations.NonNull; +import io.reactivex.disposables.CompositeDisposable; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.schedulers.ScheduledRunnable; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -24,8 +25,7 @@ */ public final class PriorityScheduler { - private final PriorityBlockingQueue queue = - new PriorityBlockingQueue(); + private final PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); private final AtomicInteger workerCount = new AtomicInteger(); private final int concurrency; private ExecutorService executorService; @@ -33,7 +33,7 @@ public final class PriorityScheduler { /** * Creates a {@link PriorityScheduler} with as many threads as the machine's available * processors. - * + *

* Note: this does not ensure that the priorities will be adheared to exactly, as the * JVM's threading policy might allow one thread to dequeue an action, then let a second thread * dequeue the next action, run it, dequeue another, run it, etc. before the first thread runs @@ -46,7 +46,7 @@ public static PriorityScheduler create() { /** * Creates a {@link PriorityScheduler} using at most {@code concurrency} concurrent actions. - * + *

* Note: this does not ensure that the priorities will be adheared to exactly, as the * JVM's threading policy might allow one thread to dequeue an action, then let a second thread * dequeue the next action, run it, dequeue another, run it, etc. before the first thread runs @@ -62,9 +62,17 @@ private PriorityScheduler(int concurrency) { this.concurrency = concurrency; } + public static PriorityScheduler get() { + return Holder.INSTANCE; + } + + private static class Holder { + static PriorityScheduler INSTANCE = create(); + } + /** - * Prioritize {@link rx.functions.Action action}s with a numerical priority value. The higher - * the priority, the sooner it will run. + * Prioritize {@link io.reactivex.functions.Action action}s with a numerical priority + * value. The higher the priority, the sooner it will run. */ public Scheduler priority(final int priority) { return new InnerPriorityScheduler(priority); @@ -78,18 +86,16 @@ private InnerPriorityScheduler(int priority) { this.priority = priority; } - @Override - public Worker createWorker() { + @Override public Worker createWorker() { synchronized (workerCount) { if (workerCount.get() < concurrency) { workerCount.incrementAndGet(); executorService.submit(new Runnable() { - @Override - public void run() { + @Override public void run() { while (true) { try { - ComparableAction action = queue.take(); - action.call(); + ComparableRunnable runnable = queue.take(); + runnable.run(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; @@ -103,74 +109,82 @@ public void run() { } } - private static final class PriorityWorker extends Worker { + private static final class PriorityWorker extends Scheduler.Worker { - private final CompositeSubscription compositeSubscription = new CompositeSubscription(); - private final PriorityBlockingQueue queue; + private final CompositeDisposable compositeDisposable = new CompositeDisposable(); + private final PriorityBlockingQueue queue; private final int priority; - private PriorityWorker(PriorityBlockingQueue queue, int priority) { + private PriorityWorker(PriorityBlockingQueue queue, int priority) { this.queue = queue; this.priority = priority; } - @Override - public Subscription schedule(Action0 action) { + @Override public Disposable schedule(Runnable action) { return schedule(action, 0, MILLISECONDS); } /** * inspired by HandlerThreadScheduler.InnerHandlerThreadScheduler#schedule. - * * @see InnerHandlerThreadScheduler */ - @Override - public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { - final ComparableAction comparableAction = new ComparableAction(action, priority); - - final ScheduledAction scheduledAction = new ScheduledAction(comparableAction); - scheduledAction.add(Subscriptions.create(new Action0() { - @Override - public void call() { - queue.remove(comparableAction); + @Override public Disposable schedule(@NonNull Runnable run, long delayTime, @NonNull TimeUnit unit) { + final ComparableRunnable runnable = new ComparableRunnable(run, priority); + + final ScheduledRunnable scheduledRunnable = new ScheduledRunnable(runnable, compositeDisposable); + scheduledRunnable.setFuture(new Future() { + @Override public boolean cancel(boolean b) { + return queue.remove(runnable); + } + + @Override public boolean isCancelled() { + return false; + } + + @Override public boolean isDone() { + return false; + } + + @Override public Object get() throws InterruptedException, ExecutionException { + return null; + } + + @Override public Object get(long l, @NonNull TimeUnit timeUnit) + throws InterruptedException, ExecutionException, TimeoutException { + return null; } - })); - scheduledAction.addParent(compositeSubscription); - compositeSubscription.add(scheduledAction); + }); + compositeDisposable.add(scheduledRunnable); - queue.offer(comparableAction, delayTime, unit); - return scheduledAction; + queue.offer(runnable, delayTime, unit); + return scheduledRunnable; } - @Override - public void unsubscribe() { - compositeSubscription.unsubscribe(); + @Override public void dispose() { + compositeDisposable.dispose(); } - @Override - public boolean isUnsubscribed() { - return compositeSubscription.isUnsubscribed(); + @Override public boolean isDisposed() { + return compositeDisposable.isDisposed(); } } - private static final class ComparableAction implements Action0, Comparable { + private static final class ComparableRunnable implements Runnable, Comparable { - private final Action0 action; + private final Runnable runnable; private final int priority; - private ComparableAction(Action0 action, int priority) { - this.action = action; + private ComparableRunnable(Runnable runnable, int priority) { + this.runnable = runnable; this.priority = priority; } - @Override - public void call() { - action.call(); + @Override public void run() { + runnable.run(); } - @Override - public int compareTo(ComparableAction o) { + @Override public int compareTo(ComparableRunnable o) { return o.priority - priority; } }