Wednesday, 15 October 2014

Implementation of ThreadPoolExecutor

I will explain implementation of ThreadPoolExecutor.class here!


 ThreadPoolExecutor class has four constructors, using which one can obtain ThreadPoolExecutor instance.

(1)
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }


(2)
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }


(3)
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }


(4)
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }



Out of 4 constructors mentioned above, if you observer,  both 1 & 2, variable defaultHandler is passed to create the instance. So, what is defaultHandler?

private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();



public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +  e.toString());
        }



defaultHandler throws RejecctedExecutionException whenever there is no thread available to execute the job.

If you want to have metric logs for thread rejection execution exception, it is recommended to create a separate implementation for RejectedExecutionHandler.

This can be done as mentioned below.

(a) Create RejectedExecutionHandlerImpl class

public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // log rejection metrics here
    }
}


(b) Create ThreadPoolExecutor

RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();

ThreadFactory threadFactory = Executors.defaultThreadFactory();

ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(minThreadCount, maxThreadCount, threadAliveTime, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(poolQueueSize), threadFactory, rejectionHandler);