一、Executors
通过Executors的静态工厂方法可以创建三个线程池的包装对象: ForkJoinPool、ThreadPoolExecutor、ScheduledThreadPoolExecutor。Executors有5个核心方法:
Executors.newWorkStealingPool(); // JDK8引入,创建持有足够线程的线程池支持给定的并行度,
并通过使用多个队列减少竞争,此方法把CPU数量设置为默认的并行度。
Executors.newCachedThreadPool(); // 高度可伸缩线程池,最大线程可达Integet.MAX_VALUE。
keepLiveTime默认60s,工作线程处于空闲状态,则回收工作线程
Executors.newFixedThreadPool(5); // 创建固定线程数量的线程池。核心线程数也是最大线程数,
不存在空闲线程,keepLiveTime等于0.
Executors.newScheduledThreadPool(); // 支持定时及周期性任务执行。不会瘦工作线程
Executors.newSingleThreadExecutor(); //创建一个单线程线程池
但是阿里巴巴的java开发手册规定,不允许手动创建线程,必须使用线程池;同时线程池不能用Executors
来获取,必须通过ThreadPoolExecutor
。
二、线程池7大参数
点进上面这几个创建线程池的方法中可以发现,其背后都是创建了一个名叫ThreadPoolExecutor
的对象,该对象的创建依赖7个参数。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:指定线程池的核心线程数。
- maximumPoolSize:指定线程池的最大线程数。与核心线程数的区别就是核心线程数是一直活跃的,最大线程数包括核心线程数和一部分不活跃的线程。。当ThreadPoolExecutor的allowCoreThreadTimeOut变量设置为true时,核心线程超时后也会被回收。
- unit:最大空闲时间单位。
- workQueue:阻塞队列。用于存放积压的任务。
- threadFactory:创建新线程的工厂。
- handler:拒绝策略。
线程工厂和拒绝策略应结合实际业务,返回相应的提示或友好的跳转。
三、线程池状态
RUNNING:一切正常
SHUTDOWN:正在执行的任务接着执行,拒绝接受新任务,阻塞队列中的任务接着执行
STOP:STOP状态拒绝接收新任务,正在执行的强制中断,阻塞队列中的任务做为shoudownNow()的结果返回
四、线程池工作原理
- 第一步:会判断核心线程数是否已满,如果没有就执行;如果满了就执行第二步。
- 第二步:判断阻塞队列是否已满。如果没有就加入阻塞队列;如果满了就执行第三步。
- 第三步:判断最大线程数是否已满。如果没有就创建线程来执行;如果满了就执行拒绝策略。
五、线程池拒绝策略
JDK内置的几种拒绝策略:
- AbortPolicy(默认):直接抛异常
RejectedExecutionException
。 - CallerRunsPolicy:将任务交给调用者执行。main线程调用就交给main线程执行。
- DiscardOldestPolicy:抛弃队列中等待最久的任务。
- DIscardPolicy:丢弃任务。
AbortPolicy
:
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 直接抛出异常
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
CallerRunsPolicy
:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
交给调用者执行。DiscardOldestPolicy
:
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
将队列头元素抛弃。DiscardPolicy
:
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
啥也不做,就是抛弃了新的任务。
内置的几种拒绝策略都继承了RejectedExecutionHandler
接口,按照这种逻辑,就可以对拒绝策略有自己的扩展。
public class MyRejectedPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("拒绝执行...");
}
}
六、线程池源码
在ThreadPoolExecutor的属性定义中频繁使用位运算来表示线程池状态。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
1、execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
* // 如果不能放入队列,我们会尝试创建新的线程,如果失败,会执行拒绝策略
*/
// 返回包含线程数及线程池状态的integer类型数值
int c = ctl.get();
// 如果工作线程数小于核心线程数,则创建线程任务并执行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 如果创建失败,防止外部已经在线程池中加入新任务,重新获取一下
c = ctl.get();
}
// 只有线程池处于RUNNING状态,在执行后半句:加入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果线程池不是RUNNING状态,将刚才加入队列的任务移除
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果之前的线程已被消费完,新建一个线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 核心池和队列都已满,尝试创建一个新线程
else if (!addWorker(command, false))
// 如果addWorker失败,则执行拒绝策略s
reject(command);
}
拒绝策略的执行有两个条件:1、线程池状态为非RUNNING状态。2、等待队列已满。
2、addWorker
/**
*
*/
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
// 检查是否可以将woker添加到线程池。如果可以,将调整woker的数量,可能的话,会创建一个新的worker
// 并执行@{param firstTask}作为它的第一个任务。
// 如果线程池是stop状态或者准备结束,方法返回false。线程工厂创建线程失败也返回false
// 如果线程创建失败,要么由于线程工厂返回null,要么报错(典型的oom),我们会回滚
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* true:在新增工作线程时,需要判断当前RUNNING状态的线程是否少于corePoolSize
* false:在新增工作线程时,需要判断当前RUNNING状态的线程是否少于maximumPoolSize
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty())) // TODO 没看懂
return false;
for (;;) {
// 检查worker数量,如果超过最大允许线程数则返回false
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 调整worker数量(将当前活动线程数+1)
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 开始创建工作线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 在进行ThreadPoolExecutor的敏感操作时,
// 都需要持有主锁,避免在添加和启动线程时被干扰
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 在线程池为RUNNING或SHUTDOWN且firstTask为空时
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
// 整个线程池在运行期间的最大并发任务个数
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 线程启动失败,将刚才添加的数量减回去
addWorkerFailed(w);
}
return workerStarted;
}
七、生产中的线程池
生产中一般通过ThreadPoolExecutor
来创建线程池,所以线程数的设置就相当考究。
根据业务的类型可以分为:
- CPU密集型
- IO密集型
当然,跟服务器的硬件配置更是密切相关。
CPU密集型指任务需要大量运算,没有阻塞,CPU一直跑着。CPU密集型一般分配尽可能少的线程数量。
一般公式:CPU核数+1
IO密集型由于不是一直执行任务,应该配置多的线程:比如CPU核数 * 2
但是据小道消息,业内也有使用分配线程数:
CPU核数 / (1-阻塞系数);阻塞系数在0.8~0.9之间。
按照这个公式,一个8核CPU最大可分配的线程数就是:
8 / (1 - 0.9) = 80
个线程数。
八、异步编排
1、创建异步对象
2、计算完成时回调方法
whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。
whenComplete和whenCompleteAsync的区别:
- whenComplete:是执行当前任务的线程继续执行whenComplete任务
- whenCompleteAsync:把whenCompleteAsync这个任务继续提交给线程池执行
总结:方法以aync结尾的,都会把任务提交给线程池执行,不以async结尾的使用当前线程继续执行。
public static void useSupplyAsyncStartAThreadAndComplete() throws ExecutionException, InterruptedException {
System.out.println("线程开始...");
CompletableFuture.supplyAsync(() -> {
int i = 10 / 0;
return i;
}, executor).whenComplete((res, exception) -> {
System.out.println("执行结果是 -> " + res + " 执行异常是 -> " + exception);
}).exceptionally((e) -> {
return 20; // 这个结果会在线程执行异常时覆盖掉supplyAsync中的返回值
});
}
3、handle方法
与complete一样,可以处理线程执行结果(包括异常),也有返回值。
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(asyncPool, fn);
}
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
return uniHandleStage(screenExecutor(executor), fn);
}
public static void useSupplyAsyncStartAThreadAndHandle() throws ExecutionException, InterruptedException {
System.out.println("线程开始...");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 10 / 2;
return i;
}, executor).handle((res, e) -> {
if (res != null) {
return res;
}
if (e != null) {
System.out.println(e);
return 11;
}
return 0;
});
System.out.println("最终结果 -> " + future.get());
}
4、线程串行化方法
// 串行化测试demo
public static void threadSerial(){
/**
* 1、thenRun、thenRunAsync 不会接收上一个线程执行结果作为参数、不会有返回
* 2、.thenAcceptAsync/.thenAccept(res -> {
* }): 可以接收上一个线程执行结果作为参数,但不会有返回值
* 3、thenApply/.thenApplyAsync(res -> {
* return 1;
* }); : 可以接收上一个线程执行结果作为参数,而且会有返回值
*/
CompletableFuture.supplyAsync(() -> {
int i = 10 / 2;
return i;
}, executor).thenApplyAsync(res -> {
return 1;
});
}
5、两任务组合——都要完成
// 多任务组合测试demo【都完成】
/**
* runAfterBoth/runAfterBothAsync: future1和future2执行结束后执行新线程中的,不能接收future2和future1的执行结果作为参数。也不能有返回值
*/
public static void multiTaskThread_runAfterBoth(){
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int i = 10 / 2;
return i;
}, executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "Hello";
}, executor);
future1.runAfterBoth(future2, () -> {
try {
System.out.println(future1.get());
System.out.println(future2.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("world");
});
}
// 多任务组合测试demo【都完成】
/**
* thenAcceptBoth/thenAcceptBothAsync: future1和future2执行结束后执行新线程中的,可以接收future2和future1的执行结果作为参数。但不能有返回值
*/
public static void multiTaskThread_thenAcceptBothAsync(){
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int i = 10 / 2;
return i;
}, executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "Hello";
}, executor);
future1.thenAcceptBothAsync(future2, (res, res1) -> {
System.out.println(res + " -> " + res1 + " -> world");
});
}
// 多任务组合测试demo【都完成】
/**
* thenCombine/thenCombineAsync: future1和future2执行结束后执行新线程中的,可以接收future2和future1的执行结果作为参数。也可以有返回值
*/
public static void multiTaskThread_thenCombineAsync() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int i = 10 / 2;
return i;
}, executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "Hello";
}, executor);
CompletableFuture<String> result = future1.thenCombineAsync(future2, (res, res1) -> {
return res + " -> " + res1 + " -> world";
}, executor);
System.out.println(result.get());
}
6、两任务组合——一个完成
// 多任务组合测试demo【一个完成】
/**
* runAfterEitherAsync/runAfterEither:其中组合执行线程中一个线程执行结束就执行后面的线程。不能接收参数,也没有返回值
*/
public static void multiTaskThread_runAfterEitherAsync(){
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int i = 10 / 2;
return i;
}, executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "Hello";
}, executor);
future1.runAfterEitherAsync(future2, () -> {
System.out.println("world");
});
}
// 多任务组合测试demo【一个完成】
/**
* acceptEitherAsync/acceptEither:其中组合执行线程中一个线程执行结束就执行后面的线程【这几个线程的返回值类型必须相同】。能接收参数,但是没有返回值
*/
public static void multiTaskThread_acceptEitherAsync(){
CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
int i = 10 / 2;
return i;
}, executor);
CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
return "Hello";
}, executor);
future1.acceptEitherAsync(future2, (res) -> {
System.out.println("world");
});
}
// 多任务组合测试demo【一个完成】
/**
* applyToEitherAsync/applyToEither: 其中组合执行线程中一个线程执行结束就执行后面的线程【这几个线程的返回值类型必须相同】。能接收参数,也有返回值
*/
public static void multiTaskThread_applyToEitherAsync(){
CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
int i = 10 / 2;
return i;
}, executor);
CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
return "Hello";
}, executor);
future1.applyToEitherAsync(future2, (res) -> {
return "world";
});
}
7、多任务组合
// 多任务组合 【等待所有组合的线程执行结束】
public static void multiTaskThread_allOf() throws ExecutionException, InterruptedException {
CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
int i = 10 / 2;
return i;
}, executor);
CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
return "Hello";
}, executor);
CompletableFuture<Void> future = CompletableFuture.allOf(future1, future2);
future.get(); // 等待future1和future2执行结束
/**
* 注意:CompletableFuture.allOf返回的future.get并不能获取到future1或者 future2的执行结果
* 要获取future1或者future2的执行结果还是需要调用各自的get方法
*/
System.out.println(future1.get());
System.out.println(future2.get());
}
// 多任务组合 【等待所有组合的线程中一个执行结束】
public static void multiTaskThread_anyOf() throws ExecutionException, InterruptedException {
CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
int i = 10 / 2;
return i;
}, executor);
CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
return "Hello";
}, executor);
CompletableFuture<Object> future = CompletableFuture.anyOf(future1, future2);
future.get(); // 等待future1和future2执行结束
/**
* 注意:CompletableFuture.anyOf返回的future.get只能获取成功线程的返回值
*/
System.out.println(future1.get());
System.out.println(future2.get());
}