线程资源管理 线程池状态
一直受困于ThreadPoolExecutor的内部实现, 今天就拿出点时间解决几点自己的疑问
ThreadPoolExecutor是如何重复利用线程资源的
ThreadPoolExecutor reject 策略
线程资源管理 首先拿出一段运行代码
1 2 3 4 5 6 public class TestPool { public static void main (String[] args) { ExecutorService pool = Executors.newFixedThreadPool(5 ); pool.execute(() -> System.out.println("task is running" )); } }
我们从构造函数入手
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); } public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException (); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException (); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
我们看到了在构造函数中, 只是进行了初始化的操作, 并没有运行任何逻辑代码, 那么下来我们从execute(Runnable )这个方法入手
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public void execute (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
这个方法的重点一个是addWorker()它会启动一个新的线程, 如果指定了first task(addWorker(command, true)), 那么新的worker线程就从first task开始执行. 如果没有指定的话(addWorker(null, false)), 就从任务队列里取出任务依次执行.
另一个重点是workQueue.offer(command)通过这个方法向任务队列里添加任务, 然后在Worker的runWorker()里依次执行任务.
下来我们看一下addWorker()方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) ) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker (firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException (); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
根据线程池的当前状态和指定的bound 检查新的worker能否添加.如果新的worker能被添加的话, 就会创建出一个新的worker, 同时增加worker count的数量。然后在这个新的worker运行firstTask
如果线程池是停止状态或者准备停止的话, 这个方法会返回一个false.如果ThreadFactory创建线程失败的话,也会返回一个false.当创建线程失败时,不管是ThreadFactory返回null还是产生错误(一般是在Thread.start()时抛出OutOfMemoryError), 我们将 执行回滚操作
当新创建一个线程时, firstTask会成为它第一个执行的任务。当线程池线程数量小于corePoolSize或者队列满的时候, 创建出的worker内部会自动创建一个first task ,忽略掉从任务队列中出列的任务.
我们需要着重看一下t.start();方法, 这个方法是开始运行Worker对象里的thread线程对象(其本身也是Worker类型). 最终也是执行到Worker的runWorker()方法.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
看到这里我们可以看出, ThreadPoolExecutor其内部也是通过while来不断轮训任务队列, 执行任务的task.run();方法, 不开启新线程的方式, 来达到线程资源管理的目的.
那么任务执行完之后, 线程就被干掉了吗? 我们重点看processWorkerExit(w, completedAbruptly);这个方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private void processWorkerExit (Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1 ; if (workerCountOf(c) >= min) return ; } addWorker(null , false ); } }
线程池状态 从上面的分析, 我们看到了很多这种代码
1 2 workerCountOf(c) isRunning(c)
看到这里可能会有很多疑问, 贴一下ThreadPoolExecutor部分内部成员
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private final AtomicInteger ctl = new AtomicInteger (ctlOf(RUNNING, 0 )); private static final int COUNT_BITS = Integer.SIZE - 3 ; private static final int CAPACITY = (1 << COUNT_BITS) - 1 ; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private static int runStateOf (int c) { return c & ~CAPACITY; } private static int workerCountOf (int c) { return c & CAPACITY; } private static int ctlOf (int rs, int wc) { return rs | wc; }
ctl 内部封装了俩个关键性的字段
workerCount, 工作的线程数
runState, 线程池状态 为了将这俩个值都存储进ctl里,workerCount的最大值是500万左右((1 << (Integer.SIZE - 3)) - 1),而不是2亿(queue的最大值是Integer的最大值)。 如果将来需要更高的任务量的话,可以采用AtomLong作为ctl的类型,但是现在采用int可以带来更快的运行速度和更简单
下面的几个值表示了整个线程池的运行状态
RUNNING: Accept new tasks and process queued tasks
SHUTDOWN: Don’t accept new tasks, but process queued tasks
STOP: Don’t accept new tasks, don’t process queued tasks, and interrupt in-progress tasks
TIDYING: All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
TERMINATED: terminated() has completed
随着线程池运行, 上面几个状态是依次递增的, 但是在整个线程池生命周期中不一定会达到每个状态. 线程池的状态转换过程如下:
RUNNING -> SHUTDOWN On invocation of shutdown(), perhaps implicitly in finalize()
(RUNNING or SHUTDOWN) -> STOP On invocation of shutdownNow()
SHUTDOWN -> TIDYING When both queue and pool are empty
STOP -> TIDYING When pool is empty
TIDYING -> TERMINATED When the terminated() hook method has completed