tangguo

无法在RejectionHandler中获取CallableThread

java

我有线程池,它将带一个Callable工作线程RejectionHandler。我需要获得此Callable任务,RejectionHandler但无法获得它。

在下面的示例中,我需要为RejectionHandler执行的Callable任务的uniqueId。在RejecitonHandler中,Runnable浇铸FutureTask,我希望它应该被强制转换为Callable工作线程。

请帮助我在中获取CallableWorker线程实例RejectionHandler

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class RejectionDemo {
    RejectionDemo(){
        Random random = new Random();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), new RejectionHandlerImpl());
        CallableWorkerThread workers[] = 
                new CallableWorkerThread[10];
        for (int i=0; i< workers.length; i++){
            workers[i] = new CallableWorkerThread(random.nextInt(100));
            FutureTask<Integer> task = new FutureTask<Integer>(workers[i]);
            executor.submit(task);
        }
    }

    public static void main(String args[]){
        RejectionDemo demo = new RejectionDemo();
    }
    public class CallableWorkerThread implements
        Callable<Integer> {
        private int uniqueId;

        CallableWorkerThread(int uniqueId) {
            this.uniqueId = uniqueId;
        }

        public Integer call() {
            System.out.println("Unique id="+uniqueId);
            return uniqueId;
        }
        public String toString(){
            return ""+uniqueId;
        }
    }
    class RejectionHandlerImpl implements RejectedExecutionHandler{
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try{
                System.out.println(r);
            }catch(Throwable t){
                t.printStackTrace();
            }
        }
    }
}

输出量

java.util.concurrent.FutureTask@70036428
Unique id=68
java.util.concurrent.FutureTask@6ea4b78b
java.util.concurrent.FutureTask@e3f6d
java.util.concurrent.FutureTask@1ce84763
java.util.concurrent.FutureTask@55a6c368
java.util.concurrent.FutureTask@4e77b794
java.util.concurrent.FutureTask@15b57dcb
Unique id=55
Unique id=83

我期待CallableWorkerThread而不是FutureTask。帮助我获取WorkerThread实例。


阅读 347

收藏
2020-12-06

共1个答案

一尘不染

在你的代码中

workers[i] = new CallableWorkerThread(random.nextInt(100));
FutureTask<Integer> task = new FutureTask<Integer>(workers[i]);
executor.submit(task);

您创建了一个FutureTask包装CallableWorkerThread实例的,但是您正在使用submit接受一个任意值Runnable并返回FutureTask包装一个的Runnable。

换句话说,您将自己包装FutureTask在另一个中FutureTask。有两种解决方法

  1. 使用
workers[i] = new CallableWorkerThread(random.nextInt(100));
executor.submit(workers[i]);

让ExecutorService你包在Callable里面FutureTask。

  1. 使用
workers[i] = new CallableWorkerThread(random.nextInt(100));
executor.execute(new FutureTask<Integer>(workers[i]));

Callable手动包装并排队,Runnable无需进一步包装(请注意使用execute而不是submit)

由于您要启用对original的检索Callable,因此第二个选项适合您,因为它可以让您完全控制FutureTask实例:

static class MyFutureTask<T> extends FutureTask<T> {
    final Callable<T> theCallable;

    public MyFutureTask(Callable<T> callable) {
        super(callable);
        theCallable=callable;
    }
}

提交代码:

    for (int i=0; i< workers.length; i++){
        workers[i] = new CallableWorkerThread(random.nextInt(100));
        executor.execute(new MyFutureTask<Integer>(workers[i]));
    }

RejectedExecutionHandler:

class RejectionHandlerImpl implements RejectedExecutionHandler{
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if(r instanceof MyFutureTask) {
            MyFutureTask<?> myFutureTask = (MyFutureTask)r;
            Callable<?> c=myFutureTask.theCallable;
            System.out.println(c);
        }
        else System.out.println(r);
    }
}
2020-12-06