@Override public T get() { try { ForkJoinPool.managedBlock(new ManagedBlocker() { @Override public boolean block() throws InterruptedException { result = supplier.get(); return true; } @Override public boolean isReleasable() { return result != NULL; } }); } catch (InterruptedException e) { throw new RuntimeException(e); } return result; }
private ForkJoinContextWorker() { blocker = new ManagedBlocker() { @Override public boolean block() throws InterruptedException { workQueue.awaitNextReadyWork(); return true; } @Override public boolean isReleasable() { return workQueue.isWorkReady() || !workQueue.isWorkWaiting(); } }; }
@Override protected Future<Optional<Boolean>> signal(final String key, final Map<String, String> param, final int deferredCount, final ForkJoinPool fjp) { final String intervalStr = param.get("interval"); final String repeatStr = param.get("repeat"); final long interval = intervalStr != null ? Long.valueOf(intervalStr) : DEFAULT_INTERVAL; final int repeat = repeatStr != null ? Integer.valueOf(repeatStr) : DEFAULT_REPEAT; return fjp.submit(new Callable<Optional<Boolean>>() { @Override public Optional<Boolean> call() { try { ForkJoinPool.managedBlock(new ManagedBlocker() { private long elapsed = 0; @Override public boolean isReleasable() { return elapsed >= interval; } @Override public boolean block() throws InterruptedException { if (!isReleasable()) { Thread.sleep(UNIT); elapsed += UNIT; } return isReleasable(); } }); } catch (final InterruptedException e) { Logger.getLogger().error("evant loop abort", e); return Optional.absent(); } return Optional.of(deferredCount + 1 >= repeat); //emit continue signal } }); }