Skip to content

porting into rxjava2 #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 69 additions & 55 deletions src/main/java/me/ronshapiro/rx/priority/PriorityScheduler.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package me.ronshapiro.rx.priority;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.Subscription;
import rx.functions.Action0;
import rx.internal.schedulers.ScheduledAction;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import io.reactivex.Scheduler;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.schedulers.ScheduledRunnable;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

Expand All @@ -24,16 +25,15 @@
*/
public final class PriorityScheduler {

private final PriorityBlockingQueue<ComparableAction> queue =
new PriorityBlockingQueue<ComparableAction>();
private final PriorityBlockingQueue<ComparableRunnable> queue = new PriorityBlockingQueue<>();
private final AtomicInteger workerCount = new AtomicInteger();
private final int concurrency;
private ExecutorService executorService;

/**
* Creates a {@link PriorityScheduler} with as many threads as the machine's available
* processors.
*
* <p>
* <b>Note:</b> this does not ensure that the priorities will be adheared to exactly, as the
* JVM's threading policy might allow one thread to dequeue an action, then let a second thread
* dequeue the next action, run it, dequeue another, run it, etc. before the first thread runs
Expand All @@ -46,7 +46,7 @@ public static PriorityScheduler create() {

/**
* Creates a {@link PriorityScheduler} using at most {@code concurrency} concurrent actions.
*
* <p>
* <b>Note:</b> this does not ensure that the priorities will be adheared to exactly, as the
* JVM's threading policy might allow one thread to dequeue an action, then let a second thread
* dequeue the next action, run it, dequeue another, run it, etc. before the first thread runs
Expand All @@ -62,9 +62,17 @@ private PriorityScheduler(int concurrency) {
this.concurrency = concurrency;
}

public static PriorityScheduler get() {
return Holder.INSTANCE;
}

private static class Holder {
static PriorityScheduler INSTANCE = create();
}

/**
* Prioritize {@link rx.functions.Action action}s with a numerical priority value. The higher
* the priority, the sooner it will run.
* Prioritize {@link io.reactivex.functions.Action action}s with a numerical priority
* value. The higher the priority, the sooner it will run.
*/
public Scheduler priority(final int priority) {
return new InnerPriorityScheduler(priority);
Expand All @@ -78,18 +86,16 @@ private InnerPriorityScheduler(int priority) {
this.priority = priority;
}

@Override
public Worker createWorker() {
@Override public Worker createWorker() {
synchronized (workerCount) {
if (workerCount.get() < concurrency) {
workerCount.incrementAndGet();
executorService.submit(new Runnable() {
@Override
public void run() {
@Override public void run() {
while (true) {
try {
ComparableAction action = queue.take();
action.call();
ComparableRunnable runnable = queue.take();
runnable.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
Expand All @@ -103,74 +109,82 @@ public void run() {
}
}

private static final class PriorityWorker extends Worker {
private static final class PriorityWorker extends Scheduler.Worker {

private final CompositeSubscription compositeSubscription = new CompositeSubscription();
private final PriorityBlockingQueue<ComparableAction> queue;
private final CompositeDisposable compositeDisposable = new CompositeDisposable();
private final PriorityBlockingQueue<ComparableRunnable> queue;
private final int priority;

private PriorityWorker(PriorityBlockingQueue<ComparableAction> queue, int priority) {
private PriorityWorker(PriorityBlockingQueue<ComparableRunnable> queue, int priority) {
this.queue = queue;
this.priority = priority;
}

@Override
public Subscription schedule(Action0 action) {
@Override public Disposable schedule(Runnable action) {
return schedule(action, 0, MILLISECONDS);
}

/**
* inspired by HandlerThreadScheduler.InnerHandlerThreadScheduler#schedule.
*
* @see <a href="https://github.com/ReactiveX/RxAndroid/blob/53bc70785b1c8f150c2be871a5b85979ad8b233a/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java">InnerHandlerThreadScheduler</a>
*/
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
final ComparableAction comparableAction = new ComparableAction(action, priority);

final ScheduledAction scheduledAction = new ScheduledAction(comparableAction);
scheduledAction.add(Subscriptions.create(new Action0() {
@Override
public void call() {
queue.remove(comparableAction);
@Override public Disposable schedule(@NonNull Runnable run, long delayTime, @NonNull TimeUnit unit) {
final ComparableRunnable runnable = new ComparableRunnable(run, priority);

final ScheduledRunnable scheduledRunnable = new ScheduledRunnable(runnable, compositeDisposable);
scheduledRunnable.setFuture(new Future<Object>() {
@Override public boolean cancel(boolean b) {
return queue.remove(runnable);
}

@Override public boolean isCancelled() {
return false;
}

@Override public boolean isDone() {
return false;
}

@Override public Object get() throws InterruptedException, ExecutionException {
return null;
}

@Override public Object get(long l, @NonNull TimeUnit timeUnit)
throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
}));
scheduledAction.addParent(compositeSubscription);
compositeSubscription.add(scheduledAction);
});
compositeDisposable.add(scheduledRunnable);

queue.offer(comparableAction, delayTime, unit);
return scheduledAction;
queue.offer(runnable, delayTime, unit);
return scheduledRunnable;
}

@Override
public void unsubscribe() {
compositeSubscription.unsubscribe();
@Override public void dispose() {
compositeDisposable.dispose();
}

@Override
public boolean isUnsubscribed() {
return compositeSubscription.isUnsubscribed();
@Override public boolean isDisposed() {
return compositeDisposable.isDisposed();
}

}

private static final class ComparableAction implements Action0, Comparable<ComparableAction> {
private static final class ComparableRunnable implements Runnable, Comparable<ComparableRunnable> {

private final Action0 action;
private final Runnable runnable;
private final int priority;

private ComparableAction(Action0 action, int priority) {
this.action = action;
private ComparableRunnable(Runnable runnable, int priority) {
this.runnable = runnable;
this.priority = priority;
}

@Override
public void call() {
action.call();
@Override public void run() {
runnable.run();
}

@Override
public int compareTo(ComparableAction o) {
@Override public int compareTo(ComparableRunnable o) {
return o.priority - priority;
}
}
Expand Down