Java 类java.util.concurrent.RunnableFuture 实例源码
项目:pgAutomator-agent
文件:ThreadFactory.java
/**
* Returns a {@code RunnableFuture} for the given runnable and default
* value.
*
* @param runnable the runnable task being wrapped
* @param value the default value for the returned future
* @return a {@code RunnableFuture} which, when run, will run the
* underlying runnable and which, as a {@code Future}, will yield
* the given value as its result and provide for cancellation of
* the underlying task
* @since 1.6
*/
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
{
if (runnable instanceof CancellableRunnable)
{
return new FutureTask<T>(runnable, value)
{
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
boolean return_value = super.cancel(mayInterruptIfRunning);
CancellableRunnable.class.cast(runnable).cancelTask();
return return_value;
}
};
}
else
{
return super.newTaskFor(runnable, value);
}
}
项目:pgAutomator-agent
文件:ThreadFactory.java
/**
* Returns a {@code RunnableFuture} for the given callable task.
*
* @param callable the callable task being wrapped
* @return a {@code RunnableFuture} which, when run, will call the
* underlying callable and which, as a {@code Future}, will yield
* the callable's result as its result and provide for
* cancellation of the underlying task
* @since 1.6
*/
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
{
if (callable instanceof CancellableCallable)
{
return new FutureTask<T>(callable)
{
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
CancellableCallable.class.cast(callable).cancelTask();
return super.cancel(mayInterruptIfRunning);
}
};
}
else
{
return super.newTaskFor(callable);
}
}
项目:incubator-netbeans
文件:JUnitProjectOpenedHook.java
@Override
public java.util.concurrent.Future<ProjectProblemsProvider.Result> resolve() {
ProjectProblemsProvider.Result res;
if (action != null) {
action.actionPerformed(null);
String text = (String) action.getValue(ACT_START_MESSAGE);
if (text != null) {
res = ProjectProblemsProvider.Result.create(ProjectProblemsProvider.Status.RESOLVED, text);
} else {
res = ProjectProblemsProvider.Result.create(ProjectProblemsProvider.Status.RESOLVED);
}
} else {
res = ProjectProblemsProvider.Result.create(ProjectProblemsProvider.Status.UNRESOLVED, "No resolution for the problem");
}
RunnableFuture<ProjectProblemsProvider.Result> f = new FutureTask(new Runnable() {
@Override
public void run() {
}
}, res);
f.run();
return f;
}
项目:fort_j
文件:Executor.java
public synchronized void submit(final RunnableFuture task, long delay)
{
getTimer().schedule(new TimerTask()
{
@Override
public void run()
{
Thread t = new Thread(new Runnable()
{
@Override
public void run()
{
submit(task);
}
});
t.setDaemon(daemon);
t.start();
}
}, delay);
}
项目:fort_j
文件:Executor.java
void put(RunnableFuture task)
{
synchronized (queue)
{
while (queue.size() >= queueMax)
{
try
{
queue.wait();
}
catch (Exception ex)
{
}
}
queue.add(task);
queue.notifyAll();
}
}
项目:fort_j
文件:Executor.java
RunnableFuture take()
{
RunnableFuture t = null;
synchronized (queue)
{
while (queue.size() == 0)
{
try
{
queue.wait();
}
catch (InterruptedException ex)
{
}
}
t = queue.removeFirst();
queue.notifyAll();
}
return t;
}
项目:RetrofitAppArchitecture
文件:RequestExecutor.java
/**
* 客户端使用本方法提交一个绑定到指定tag的任务,不同情况下可以根据tag取消任务
*
* @param requestId
* @param task
* @return
*/
public synchronized void submitRequest(String requestId, RequestManager.RequestRunnable task) {
//执行同步操作
RunnableFuture<String> future = newTaskFor(task, requestId);
if (!TextUtils.isEmpty(requestId)) {
List<Future<String>> futures = mFutures.get(requestId);
if (futures == null) {
futures = new ArrayList<>();
mFutures.put(requestId, futures);
}
futures.add(future);
mRequests.put(future, task.getRequest());
}
//执行异步任务
execute(future);
}
项目:MiniDownloader
文件:MiniDownloader.java
/**
* Initial MiniDownloader.
*
* @param context
*/
public void init(Context context) {
this.appContext = context.getApplicationContext();
/** Create work executor. */
this.workExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 0L, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>()) {
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
if (callable instanceof CustomFutureCallable) {
return ((CustomFutureCallable) callable).newTaskFor();
}
return super.newTaskFor(callable);
}
};
/** Create command executor. */
this.commandExecutor = Executors.newSingleThreadExecutor();
/** Create and initial task manager. */
taskManager = new TaskManager();
taskManager.init(context);
/** Create and start ProgressUpdater. */
progressUpdater = new ProgressUpdater();
progressUpdater.start();
}
项目:PackageTemplates
文件:FileWriter.java
public static PsiDirectory writeDirectory(PsiDirectory dir, DirectoryWrapper dirWrapper, Project project) {
if (dir == null) {
//todo print error
return null;
}
RunnableFuture<PsiDirectory> runnableFuture = new FutureTask<>(() ->
ApplicationManager.getApplication().runWriteAction(new Computable<PsiDirectory>() {
@Override
public PsiDirectory compute() {
return writeDirectoryAction(dir, dirWrapper, project);
}
}));
ApplicationManager.getApplication().invokeLater(runnableFuture);
try {
return runnableFuture.get();
} catch (InterruptedException | ExecutionException e) {
Logger.log("runnableFuture " + e.getMessage());
Logger.printStack(e);
}
return null;
}
项目:RealArchitecture
文件:RequestExecutor.java
/**
* 客户端使用本方法提交一个绑定到指定tag的任务,不同情况下可以根据tag取消任务
*
* @param tag
* @param task
* @return
*/
public synchronized void submit(String tag, Runnable task) {
//执行同步操作
RunnableFuture<String> future = newTaskFor(task, tag);
if (!TextUtils.isEmpty(tag)) {
List<Future<String>> list = mFutures.get(tag);
if (list == null) {
list = new ArrayList<>();
mFutures.put(tag, list);
}
list.add(future);
}
//执行异步任务
execute(future);
}
项目:flink
文件:RocksDBStateBackendTest.java
@Test
public void testDismissingSnapshotNotRunnable() throws Exception {
setupRocksKeyedStateBackend();
try {
RunnableFuture<KeyedStateHandle> snapshot =
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpoint());
snapshot.cancel(true);
Thread asyncSnapshotThread = new Thread(snapshot);
asyncSnapshotThread.start();
try {
snapshot.get();
fail();
} catch (Exception ignored) {
}
asyncSnapshotThread.join();
verifyRocksObjectsReleased();
} finally {
this.keyedStateBackend.dispose();
this.keyedStateBackend = null;
}
}
项目:flink
文件:RocksDBStateBackendTest.java
@Test
public void testCompletingSnapshot() throws Exception {
setupRocksKeyedStateBackend();
try {
RunnableFuture<KeyedStateHandle> snapshot =
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpoint());
Thread asyncSnapshotThread = new Thread(snapshot);
asyncSnapshotThread.start();
waiter.await(); // wait for snapshot to run
waiter.reset();
runStateUpdates();
blocker.trigger(); // allow checkpointing to start writing
waiter.await(); // wait for snapshot stream writing to run
KeyedStateHandle keyedStateHandle = snapshot.get();
assertNotNull(keyedStateHandle);
assertTrue(keyedStateHandle.getStateSize() > 0);
assertEquals(2, keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
asyncSnapshotThread.join();
verifyRocksObjectsReleased();
} finally {
this.keyedStateBackend.dispose();
this.keyedStateBackend = null;
}
}
项目:flink
文件:StateUtil.java
/**
* Discards the given state future by first trying to cancel it. If this is not possible, then
* the state object contained in the future is calculated and afterwards discarded.
*
* @param stateFuture to be discarded
* @throws Exception if the discard operation failed
*/
public static void discardStateFuture(RunnableFuture<? extends StateObject> stateFuture) throws Exception {
if (null != stateFuture) {
if (!stateFuture.cancel(true)) {
try {
// We attempt to get a result, in case the future completed before cancellation.
StateObject stateObject = FutureUtil.runIfNotDoneAndGet(stateFuture);
if (null != stateObject) {
stateObject.discardState();
}
} catch (CancellationException | ExecutionException ex) {
LOG.debug("Cancelled execution of snapshot future runnable. Cancellation produced the following " +
"exception, which is expected an can be ignored.", ex);
}
}
}
}
项目:flink
文件:OperatorStateBackendTest.java
@SuppressWarnings("unchecked")
@Test
public void testCorrectClassLoaderUsedOnSnapshot() throws Exception {
AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
final Environment env = createMockEnvironment();
OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(env, "test-op-name");
AtomicInteger copyCounter = new AtomicInteger(0);
TypeSerializer<Integer> serializer = new VerifyingIntSerializer(env.getUserClassLoader(), copyCounter);
// write some state
ListStateDescriptor<Integer> stateDescriptor = new ListStateDescriptor<>("test", serializer);
ListState<Integer> listState = operatorStateBackend.getListState(stateDescriptor);
listState.add(42);
CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
RunnableFuture<OperatorStateHandle> runnableFuture =
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpoint());
FutureUtil.runIfNotDoneAndGet(runnableFuture);
// make sure that the copy method has been called
assertTrue(copyCounter.get() > 0);
}
项目:flink
文件:OperatorStateBackendTest.java
@Test
public void testSnapshotEmpty() throws Exception {
final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
final OperatorStateBackend operatorStateBackend =
abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "testOperator");
CheckpointStreamFactory streamFactory =
abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
RunnableFuture<OperatorStateHandle> snapshot =
operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpoint());
OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(snapshot);
assertNull(stateHandle);
}
项目:flink
文件:TaskCheckpointingBehaviourTest.java
@Override
public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception {
return new DefaultOperatorStateBackend(
env.getUserClassLoader(),
env.getExecutionConfig(),
true) {
@Override
public RunnableFuture<OperatorStateHandle> snapshot(
long checkpointId,
long timestamp,
CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
throw new Exception("Sync part snapshot exception.");
}
};
}
项目:flink
文件:TaskCheckpointingBehaviourTest.java
@Override
public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception {
return new DefaultOperatorStateBackend(
env.getUserClassLoader(),
env.getExecutionConfig(),
true) {
@Override
public RunnableFuture<OperatorStateHandle> snapshot(
long checkpointId,
long timestamp,
CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
return new FutureTask<>(new Callable<OperatorStateHandle>() {
@Override
public OperatorStateHandle call() throws Exception {
throw new Exception("Async part snapshot exception.");
}
});
}
};
}
项目:sstable-adaptor
文件:DebuggableThreadPoolExecutor.java
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T result)
{
if (!(runnable instanceof LocalSessionWrapper))
{
return new LocalSessionWrapper<T>(Executors.callable(runnable, result));
}
return super.newTaskFor(runnable, result);
}
项目:sstable-adaptor
文件:DebuggableThreadPoolExecutor.java
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
{
if (!(callable instanceof LocalSessionWrapper))
{
return new LocalSessionWrapper<T>(callable);
}
return super.newTaskFor(callable);
}
项目:incubator-netbeans
文件:JFXProjectProblems.java
@NbBundle.Messages({"LBL_ResolveFXJDK=Choose FX-enabled Java Platform - \"{0}\" Project"})
@Override
public Future<Result> resolve() {
final ChooseOtherPlatformPanel choosePlatform = new ChooseOtherPlatformPanel(type);
final DialogDescriptor dd = new DialogDescriptor(choosePlatform, Bundle.LBL_ResolveFXJDK(ProjectUtils.getInformation(project).getDisplayName()));
if (DialogDisplayer.getDefault().notify(dd) == DialogDescriptor.OK_OPTION) {
final Callable<ProjectProblemsProvider.Result> resultFnc =
new Callable<Result>() {
@Override
public Result call() throws Exception {
final JavaPlatform jp = choosePlatform.getSelectedPlatform();
if(jp != null) {
try {
ProjectManager.mutex().writeAccess(new Mutex.ExceptionAction<Void>() {
@Override
public Void run() throws IOException {
platformSetter.setProjectPlatform(jp);
JFXProjectUtils.updateClassPathExtension(project);
return null;
}
});
} catch (MutexException e) {
throw (IOException) e.getCause();
}
LOGGER.info("Set " + PLATFORM_ACTIVE + " to platform " + jp);
return ProjectProblemsProvider.Result.create(ProjectProblemsProvider.Status.RESOLVED);
}
return ProjectProblemsProvider.Result.create(ProjectProblemsProvider.Status.UNRESOLVED);
}
};
final RunnableFuture<Result> result = new FutureTask<Result>(resultFnc);
RP.post(result);
return result;
}
return new JFXProjectProblems.Done(
Result.create(ProjectProblemsProvider.Status.UNRESOLVED));
}
项目:incubator-netbeans
文件:Hinter.java
Context(Document doc, LayerHandle layer, FileObject file, RunnableFuture<Map<String,Integer>> lines, List<? super ErrorDescription> errors) {
this.doc = doc;
this.layer = layer;
this.file = file;
this.lines = lines;
this.errors = errors;
}
项目:elasticsearch_my
文件:PrioritizedEsThreadPoolExecutor.java
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
if (!(runnable instanceof PrioritizedRunnable)) {
runnable = PrioritizedRunnable.wrap(runnable, Priority.NORMAL);
}
Priority priority = ((PrioritizedRunnable) runnable).priority();
return new PrioritizedFutureTask<>(runnable, priority, value, insertionOrder.incrementAndGet());
}
项目:elasticsearch_my
文件:PrioritizedEsThreadPoolExecutor.java
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
if (!(callable instanceof PrioritizedCallable)) {
callable = PrioritizedCallable.wrap(callable, Priority.NORMAL);
}
return new PrioritizedFutureTask<>((PrioritizedCallable)callable, insertionOrder.incrementAndGet());
}
项目:fort_j
文件:Executor.java
void processQueue()
{
try
{
while (true && !shutdown && !checkEndThread())
{
do
{
RunnableFuture task = take();
task.run();
}
while (queue.size() > 0);
// if (!shutdown)
// {
// Thread.sleep(delay);
// }
//
// if (queue.size() == 0)
// break;
}
}
catch (Exception ex)
{
ex.printStackTrace();
}
}
项目:Elasticsearch
文件:PrioritizedEsThreadPoolExecutor.java
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
if (!(runnable instanceof PrioritizedRunnable)) {
runnable = PrioritizedRunnable.wrap(runnable, Priority.NORMAL);
}
return new PrioritizedFutureTask<>((PrioritizedRunnable) runnable, value, insertionOrder.incrementAndGet());
}
项目:Elasticsearch
文件:PrioritizedEsThreadPoolExecutor.java
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
if (!(callable instanceof PrioritizedCallable)) {
callable = PrioritizedCallable.wrap(callable, Priority.NORMAL);
}
return new PrioritizedFutureTask<>((PrioritizedCallable<T>) callable, insertionOrder.incrementAndGet());
}
项目:agroal
文件:PriorityScheduledExecutor.java
@Override
protected void beforeExecute(Thread thread, Runnable lowPriorityTask) {
// Run all high priority tasks in queue first, then low priority
RunnableFuture<?> priorityTask;
while ( ( priorityTask = priorityTasks.poll() ) != null ) {
priorityTask.run();
}
super.beforeExecute( thread, lowPriorityTask );
}
项目:skeletoid
文件:ThreadPoolExecutor.java
@Override
protected <T> RunnableFuture<T> newTaskFor(final Callable<T> callable) {
if (callable instanceof Important)
return new PriorityTask<>(((Important) callable).getPriority(), callable);
else
return new PriorityTask<>(0, callable);
}
项目:skeletoid
文件:ThreadPoolExecutor.java
@Override
protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
if (runnable instanceof Important)
return new PriorityTask<>(((Important) runnable).getPriority(), runnable, value);
else
return new PriorityTask<>(0, runnable, value);
}
项目:skeletoid
文件:ThreadPoolExecutor.java
@Override
public void execute(Runnable task) {
if (task == null) {
LOG.e(LOG_TAG, "Executing null runnable... ignoring");
return;
}
if (task instanceof PriorityTask) {
super.execute(task);
} else {
final RunnableFuture<Object> futureTask = newTaskFor(task, null);
super.execute(futureTask);
}
}
项目:skeletoid
文件:ThreadPoolExecutor.java
@Override
public Future<?> submit(final Runnable task) {
if (task == null) {
LOG.e(LOG_TAG, "Submitting null runnable... ignoring");
return null;
}
final RunnableFuture<Object> futureTask = newTaskFor(task, null);
execute(futureTask);
return futureTask;
}
项目:com.zsmartsystems.zigbee
文件:ZigBeeNode.java
/**
* Request an update of the binding table for this node.
* <p>
* This method returns a future to a boolean. Upon success the caller should call {@link #getBindingTable()}
*
* @return {@link Future} returning a {@link Boolean}
*/
public Future<Boolean> updateBindingTable() {
RunnableFuture<Boolean> future = new FutureTask<Boolean>(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
int index = 0;
int tableSize = 0;
List<BindingTable> bindingTable = new ArrayList<BindingTable>();
do {
ManagementBindRequest bindingRequest = new ManagementBindRequest();
bindingRequest.setDestinationAddress(new ZigBeeEndpointAddress(networkAddress));
bindingRequest.setStartIndex(index);
CommandResult result = networkManager.unicast(bindingRequest, new ManagementBindRequest()).get();
if (result.isError()) {
return false;
}
ManagementBindResponse response = (ManagementBindResponse) result.getResponse();
if (response.getStartIndex() == index) {
tableSize = response.getBindingTableEntries();
index += response.getBindingTableList().size();
bindingTable.addAll(response.getBindingTableList());
}
} while (index < tableSize);
setBindingTable(bindingTable);
return true;
}
});
// start the thread to execute it
new Thread(future).start();
return future;
}
项目:hollow
文件:SimultaneousExecutor.java
@Override
public void execute(Runnable command) {
if(command instanceof RunnableFuture) {
super.execute(command);
} else {
super.execute(newTaskFor(command, Boolean.TRUE));
}
}
项目:ThreadDebugger
文件:ThreadExecutor.java
@TargetApi(Build.VERSION_CODES.GINGERBREAD)
@Override
public <T> Future<T> submit(String name, Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> fTask = null;
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.GINGERBREAD) {
fTask = new FutureTask(task, result);
execute(name, fTask);
}
return fTask;
}
项目:ThreadDebugger
文件:ThreadExecutor.java
@TargetApi(Build.VERSION_CODES.GINGERBREAD)
@Override
public Future<?> submit(String name, Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> fTask = null;
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.GINGERBREAD) {
fTask = new FutureTask(task, null);
execute(name, fTask);
}
return fTask;
}
项目:ThreadDebugger
文件:ThreadExecutor.java
@TargetApi(Build.VERSION_CODES.GINGERBREAD)
@Override
public <T> Future<T> submit(String name, Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> fTask = null;
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.GINGERBREAD) {
fTask = new FutureTask(task);
execute(name, fTask);
}
return fTask;
}
项目:flink
文件:RocksDBKeyedStateBackend.java
/**
* Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and
* is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always
* be called by the same thread.
*
* @param checkpointId The Id of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
* @param streamFactory The factory that we can use for writing our state to streams.
* @param checkpointOptions Options for how to perform this checkpoint.
* @return Future to the state handle of the snapshot data.
* @throws Exception
*/
@Override
public RunnableFuture<KeyedStateHandle> snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
if (checkpointOptions.getCheckpointType() != CheckpointOptions.CheckpointType.SAVEPOINT &&
enableIncrementalCheckpointing) {
return snapshotIncrementally(checkpointId, timestamp, streamFactory);
} else {
return snapshotFully(checkpointId, timestamp, streamFactory);
}
}
项目:flink
文件:RocksDBStateBackendTest.java
@Test
public void testDismissingSnapshot() throws Exception {
setupRocksKeyedStateBackend();
try {
RunnableFuture<KeyedStateHandle> snapshot =
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpoint());
snapshot.cancel(true);
verifyRocksObjectsReleased();
} finally {
this.keyedStateBackend.dispose();
this.keyedStateBackend = null;
}
}
项目:flink
文件:RocksDBStateBackendTest.java
@Test
public void testCancelRunningSnapshot() throws Exception {
setupRocksKeyedStateBackend();
try {
RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpoint());
Thread asyncSnapshotThread = new Thread(snapshot);
asyncSnapshotThread.start();
waiter.await(); // wait for snapshot to run
waiter.reset();
runStateUpdates();
snapshot.cancel(true);
blocker.trigger(); // allow checkpointing to start writing
assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
waiter.await(); // wait for snapshot stream writing to run
try {
snapshot.get();
fail();
} catch (Exception ignored) {
}
asyncSnapshotThread.join();
verifyRocksObjectsReleased();
} finally {
this.keyedStateBackend.dispose();
this.keyedStateBackend = null;
}
}
项目:flink
文件:FutureUtil.java
public static <T> T runIfNotDoneAndGet(RunnableFuture<T> future) throws ExecutionException, InterruptedException {
if (null == future) {
return null;
}
if (!future.isDone()) {
future.run();
}
return future.get();
}