Fork me on GitHub

线程池-tomcat

线程池在tomcat中的创建实现为:

1
2
3
4
5
6
7
8
9
public abstract class AbstractEndpoint<S> {
public void createExecutor() {
internalExecutor = true;
TaskQueue taskqueue = new TaskQueue();
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}
}

  同时(重点): tomcat 的线程池扩展了 jdk 的 executor ,而且队列用的是自己的 task queue ,因此其策略与 jdk 的有所不同,需要注意一下。

tomcat线程池策略

  场景1:接受一个请求,此时 tomcat 启动的线程数还没有达到 corePoolSize ( tomcat 里头叫 minSpareThreads ), tomcat 会启动一个线程来处理该请求;

  场景2:接受一个请求,此时 tomcat 启动的线程数已经达到了 corePoolSize , tomcat 把该请求放入队列 (offer) ,如果放入队列成功,则返回,放入队列不成功,则尝试增加工作线程,在当前线程个数 < maxThreads 的时候,可以继续增加线程来处理,超过 maxThreads 的时候,则继续往等待队列里头放,等待队列放不进去,则抛出 RejectedExecutionException ;

  值得注意的是,使用 LinkedBlockingQueue 的话,默认是使用 Integer.MAX_VALUE ,即无界队列(这种情况下如果没有配置队列的 capacity 的话,队列始终不会满,那么始终无法进入开启新线程到达 maxThreads 个数的地步,则此时配置 maxThreads 其实是没有意义的)。

tomcat等待队列

  而 TaskQueue 的队列 capacity 为 maxQueueSize ,默认也是 Integer.MAX_VALUE 。但是,其重写 offer 方法,当其线程池大小 < maximumPoolSize 的时候,返回false,即在一定程度改写了队列满的逻辑,修复了使用LinkedBlockingQueue默认的 capacity为 Integer.MAX_VALUE 的时候, maxThreads 失效的 “bug” 。从而可以继续增长线程到 maxThreads ,超过之后,继续放入队列。

  tomcat 的线程池使用了自己扩展的 taskQueue ,而不是 Executors 工厂方法里头用的 LinkedBlockingQueue 。(主要是修改了 offer 的逻辑) TaskQueue 实现的offer操作如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package org.apache.tomcat.util.threads;

import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

public class TaskQueue extends LinkedBlockingQueue<Runnable> {
   private ThreadPoolExecutor parent = null;
@Override
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
//当其线程池大小小于maximumPoolSize的时候,返回false
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}
}
tomcat拒绝任务

  这里改写了 jdk 线程池默认的 Rejected 规则,即 catch 住了 RejectedExecutionException 。正常 jdk 的规则是 core 线程数+临时线程数 > maxSize 的时候,就抛出 RejectedExecutionException 。这里 catch 住的话,继续往 taskQueue 里头放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package org.apache.tomcat.util.threads;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.tomcat.util.res.StringManager;

public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
/**
* {@inheritDoc}
*/
@Override
public void execute(Runnable command) {
execute(command,0,TimeUnit.MILLISECONDS);
}

public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
Thread.interrupted();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
}

重点看下 queue.force 方法

1
2
3
4
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
}

  注意的是这里调用的 super.offer(o,timeout,unit) ,即 LinkedBlockingQueue ,只有当队列满的时候,返回 false ,才会抛出重新抛出 RejectedExecutionException 。

  这里改变了 jdk 的 ThreadPoolExecutor 的 RejectedExecutionException 抛出的逻辑,也就是超出了 maxThreads 不会抛出 RejectedExecutionException ,而是继续往队列丢任务,而 taskQueue 本身是无界的,因此可以默认几乎不会抛出 RejectedExecutionException

回顾 JDK 线程池策略 
  • 每次提交任务时,如果线程数还没达到 coreSize 就创建新线程并绑定该任务。所以第 coreSize 次提交任务后线程总数必达到 coreSize ,不会重用之前的空闲线程。
  • 线程数达到 coreSize 后,新增的任务就放到工作队列里,而线程池里的线程则努力的使用 take() 从工作队列里拉活来干。
  • 如果队列是个有界队列,又如果线程池里的线程不能及时将任务取走,工作队列可能会满掉,插入任务就会失败,此时线程池就会紧急的再创建新的临时线程来补救。
  • 临时线程使用 poll(keepAliveTime,timeUnit) 来从工作队列拉活,如果时候到了仍然两手空空没拉到活,表明它太闲了,就会被解雇掉。
  • 如果 core 线程数+临时线程数 > maxSize,则不能再创建新的临时线程了,转头执行 RejectExecutionHanlder 。默认的 AbortPolicy 抛 RejectedExecutionException 异常,其他选择包括静默放弃当前任务(Discard),放弃工作队列里最老的任务(DisacardOldest),或由主线程来直接执行(CallerRuns).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class ThreadPoolExecutor extends AbstractExecutorService {
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
}
总结

tomcat 的线程池与 jdk 的使用无界 LinkedBlockingQueue 主要有如下两点区别:

举个例子:假设队列大小为 10,corePoolSize 为 3,maximumPoolSize 为 6,那么当加入 20 个任务时

jdk有界执行任务过程
核心线程(3) 1、2、3
等待队列(10) 4、5、6、… 12、13
非核心线程(3) 14、15、16
拒绝任务 17、18、19、20
任务执行顺序 1、2、3、14、15、16、4、5、6、… 12、13
tomcat执行任务过程
核心线程 1、2、3
等待队列(无界) 7、8、9、… 19、20
非核心线程 4、5、6
拒绝任务
任务执行顺序 1、2、3、4、5、6、… 19、20
  • ThreadPoolExecutor 的线程池增长策略是:
    • 如果队列是个有界队列,又如果线程池里的线程不能及时将任务取走,工作队列可能会满掉,插入任务就会失败,此时线程池就会紧急的再创建新的临时线程来补救。
    • 而 tomcat 的 ThreadPoolExecutor 使用的 taskQueue ,是无界的 LinkedBlockingQueue ,但是通过 taskQueue 的 offer 方法覆盖了 LinkedBlockingQueue 的 offer 方法,改写了规则,使得它也走 jdk 的 ThreadPoolExecutor 的有界队列的线程增长策略。
  • ThreadPoolExecutor 拒绝任务策略:
    • jdk ,当 core线程数+临时线程数 > maxSize,则不能再创建新的临时线程了,转头执行 RejectExecutionHanlder 。
    • 而 tomcat 的ThreadPoolExecutor 则改写了这个规则,即 catch 住了 RejectExecutionHanlder ,继续往队列里头放,直到队列满了才抛出 RejectExecutionHanlder 。而默认 taskQueue 是无界的。
-------------本文结束感谢您的阅读-------------