线程池

java / 2023-01-04
0 1,030

一、Executors

通过Executors的静态工厂方法可以创建三个线程池的包装对象: ForkJoinPool、ThreadPoolExecutor、ScheduledThreadPoolExecutor。Executors有5个核心方法:

Executors.newWorkStealingPool(); // JDK8引入,创建持有足够线程的线程池支持给定的并行度,
并通过使用多个队列减少竞争,此方法把CPU数量设置为默认的并行度。
Executors.newCachedThreadPool(); // 高度可伸缩线程池,最大线程可达Integet.MAX_VALUE。
keepLiveTime默认60s,工作线程处于空闲状态,则回收工作线程
Executors.newFixedThreadPool(5); // 创建固定线程数量的线程池。核心线程数也是最大线程数,
不存在空闲线程,keepLiveTime等于0.
Executors.newScheduledThreadPool(); // 支持定时及周期性任务执行。不会瘦工作线程
Executors.newSingleThreadExecutor(); //创建一个单线程线程池

image.png
但是阿里巴巴的java开发手册规定,不允许手动创建线程,必须使用线程池;同时线程池不能用Executors来获取,必须通过ThreadPoolExecutor

二、线程池7大参数

点进上面这几个创建线程池的方法中可以发现,其背后都是创建了一个名叫ThreadPoolExecutor的对象,该对象的创建依赖7个参数。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • corePoolSize:指定线程池的核心线程数。
  • maximumPoolSize:指定线程池的最大线程数。与核心线程数的区别就是核心线程数是一直活跃的,最大线程数包括核心线程数和一部分不活跃的线程。。当ThreadPoolExecutor的allowCoreThreadTimeOut变量设置为true时,核心线程超时后也会被回收。
  • unit:最大空闲时间单位。
  • workQueue:阻塞队列。用于存放积压的任务。
  • threadFactory:创建新线程的工厂。
  • handler:拒绝策略。

线程工厂和拒绝策略应结合实际业务,返回相应的提示或友好的跳转。

三、线程池状态

RUNNING:一切正常
SHUTDOWN:正在执行的任务接着执行,拒绝接受新任务,阻塞队列中的任务接着执行
STOP:STOP状态拒绝接收新任务,正在执行的强制中断,阻塞队列中的任务做为shoudownNow()的结果返回

四、线程池工作原理

image.png

  • 第一步:会判断核心线程数是否已满,如果没有就执行;如果满了就执行第二步。
  • 第二步:判断阻塞队列是否已满。如果没有就加入阻塞队列;如果满了就执行第三步。
  • 第三步:判断最大线程数是否已满。如果没有就创建线程来执行;如果满了就执行拒绝策略。

五、线程池拒绝策略

JDK内置的几种拒绝策略:

  • AbortPolicy(默认):直接抛异常RejectedExecutionException
  • CallerRunsPolicy:将任务交给调用者执行。main线程调用就交给main线程执行。
  • DiscardOldestPolicy:抛弃队列中等待最久的任务。
  • DIscardPolicy:丢弃任务。

AbortPolicy:

public static class AbortPolicy implements RejectedExecutionHandler {
    /**
     * Creates an {@code AbortPolicy}.
     */
    public AbortPolicy() { }

    /**
     * Always throws RejectedExecutionException.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     * @throws RejectedExecutionException always
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      // 直接抛出异常
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

CallerRunsPolicy:

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

交给调用者执行。
DiscardOldestPolicy:

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

将队列头元素抛弃。
DiscardPolicy:

    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

啥也不做,就是抛弃了新的任务。
内置的几种拒绝策略都继承了RejectedExecutionHandler接口,按照这种逻辑,就可以对拒绝策略有自己的扩展。

public class MyRejectedPolicy implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("拒绝执行...");
    }
}

六、线程池源码

在ThreadPoolExecutor的属性定义中频繁使用位运算来表示线程池状态。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

/*
 * Bit field accessors that don't require unpacking ctl.
 * These depend on the bit layout and on workerCount being never negative.
 */

private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

1、execute
public void execute(Runnable command) {
	if (command == null)
		throw new NullPointerException();
	/*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
		 * // 如果不能放入队列,我们会尝试创建新的线程,如果失败,会执行拒绝策略
         */
	// 返回包含线程数及线程池状态的integer类型数值
	int c = ctl.get();
	// 如果工作线程数小于核心线程数,则创建线程任务并执行
	if (workerCountOf(c) < corePoolSize) {
		if (addWorker(command, true))
			return;
		// 如果创建失败,防止外部已经在线程池中加入新任务,重新获取一下
		c = ctl.get();
	}
	
	// 只有线程池处于RUNNING状态,在执行后半句:加入队列
	if (isRunning(c) && workQueue.offer(command)) {
		int recheck = ctl.get();
		// 如果线程池不是RUNNING状态,将刚才加入队列的任务移除
		if (! isRunning(recheck) && remove(command))
			reject(command);
		// 如果之前的线程已被消费完,新建一个线程
		else if (workerCountOf(recheck) == 0)
			addWorker(null, false);
	}
	// 核心池和队列都已满,尝试创建一个新线程
	else if (!addWorker(command, false))
		// 如果addWorker失败,则执行拒绝策略s
		reject(command);
}

拒绝策略的执行有两个条件:1、线程池状态为非RUNNING状态。2、等待队列已满。

2、addWorker
/**
* 
*/
/**
 * Checks if a new worker can be added with respect to current
 * pool state and the given bound (either core or maximum). If so,
 * the worker count is adjusted accordingly, and, if possible, a
 * new worker is created and started, running firstTask as its
 * first task. This method returns false if the pool is stopped or
 * eligible to shut down. It also returns false if the thread
 * factory fails to create a thread when asked.  If the thread
 * creation fails, either due to the thread factory returning
 * null, or due to an exception (typically OutOfMemoryError in
 * Thread.start()), we roll back cleanly.
// 检查是否可以将woker添加到线程池。如果可以,将调整woker的数量,可能的话,会创建一个新的worker
// 并执行@{param firstTask}作为它的第一个任务。
// 如果线程池是stop状态或者准备结束,方法返回false。线程工厂创建线程失败也返回false
// 如果线程创建失败,要么由于线程工厂返回null,要么报错(典型的oom),我们会回滚
 *
 * @param firstTask the task the new thread should run first (or
 * null if none). Workers are created with an initial first task
 * (in method execute()) to bypass queuing when there are fewer
 * than corePoolSize threads (in which case we always start one),
 * or when the queue is full (in which case we must bypass queue).
 * Initially idle threads are usually created via
 * prestartCoreThread or to replace other dying workers.
 *
 * @param core if true use corePoolSize as bound, else
 * maximumPoolSize. (A boolean indicator is used here rather than a
 * value to ensure reads of fresh values after checking other pool
 * state).
 * true:在新增工作线程时,需要判断当前RUNNING状态的线程是否少于corePoolSize
 * false:在新增工作线程时,需要判断当前RUNNING状态的线程是否少于maximumPoolSize
 * @return true if successful
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty())) // TODO 没看懂
            return false;

        for (;;) {
            // 检查worker数量,如果超过最大允许线程数则返回false
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 调整worker数量(将当前活动线程数+1)
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    // 开始创建工作线程
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 在进行ThreadPoolExecutor的敏感操作时,
            // 都需要持有主锁,避免在添加和启动线程时被干扰
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                // 在线程池为RUNNING或SHUTDOWN且firstTask为空时
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    // 整个线程池在运行期间的最大并发任务个数
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            // 线程启动失败,将刚才添加的数量减回去
            addWorkerFailed(w);
    }
    return workerStarted;
}

七、生产中的线程池

生产中一般通过ThreadPoolExecutor来创建线程池,所以线程数的设置就相当考究。
根据业务的类型可以分为:

  • CPU密集型
  • IO密集型

当然,跟服务器的硬件配置更是密切相关。
CPU密集型指任务需要大量运算,没有阻塞,CPU一直跑着。CPU密集型一般分配尽可能少的线程数量。
一般公式:CPU核数+1
IO密集型由于不是一直执行任务,应该配置多的线程:比如CPU核数 * 2
但是据小道消息,业内也有使用分配线程数:
CPU核数 / (1-阻塞系数);阻塞系数在0.8~0.9之间。
按照这个公式,一个8核CPU最大可分配的线程数就是:
8 / (1 - 0.9) = 80
个线程数。


八、异步编排

1、创建异步对象

2、计算完成时回调方法

whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。
whenComplete和whenCompleteAsync的区别:

  • whenComplete:是执行当前任务的线程继续执行whenComplete任务
  • whenCompleteAsync:把whenCompleteAsync这个任务继续提交给线程池执行

总结:方法以aync结尾的,都会把任务提交给线程池执行,不以async结尾的使用当前线程继续执行。

public static void useSupplyAsyncStartAThreadAndComplete() throws ExecutionException, InterruptedException {
    System.out.println("线程开始...");
    CompletableFuture.supplyAsync(() -> {
        int i = 10 / 0;
        return i;
    }, executor).whenComplete((res, exception) -> {
        System.out.println("执行结果是 -> " + res + "  执行异常是 -> " + exception);
    }).exceptionally((e) -> {
        return 20;  // 这个结果会在线程执行异常时覆盖掉supplyAsync中的返回值
    });
}

3、handle方法

与complete一样,可以处理线程执行结果(包括异常),也有返回值。

public <U> CompletableFuture<U> handle(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(null, fn);
}

public <U> CompletableFuture<U> handleAsync(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(asyncPool, fn);
}

public <U> CompletableFuture<U> handleAsync(
    BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
    return uniHandleStage(screenExecutor(executor), fn);
}
public static void useSupplyAsyncStartAThreadAndHandle() throws ExecutionException, InterruptedException {
    System.out.println("线程开始...");
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        int i = 10 / 2;
        return i;
    }, executor).handle((res, e) -> {
        if (res != null) {
            return res;
        }
        if (e != null) {
            System.out.println(e);
            return 11;
        }
        return 0;
    });
    System.out.println("最终结果 -> " + future.get());
}

4、线程串行化方法
// 串行化测试demo
public static void threadSerial(){
    /**
        * 1、thenRun、thenRunAsync 不会接收上一个线程执行结果作为参数、不会有返回
        * 2、.thenAcceptAsync/.thenAccept(res -> {
        *      }): 可以接收上一个线程执行结果作为参数,但不会有返回值
        * 3、thenApply/.thenApplyAsync(res -> {
        *             return 1;
        *         }); : 可以接收上一个线程执行结果作为参数,而且会有返回值
        */
    CompletableFuture.supplyAsync(() -> {
        int i = 10 / 2;
        return i;
    }, executor).thenApplyAsync(res -> {
        return 1;
    });
}

5、两任务组合——都要完成
// 多任务组合测试demo【都完成】
    /**
     * runAfterBoth/runAfterBothAsync: future1和future2执行结束后执行新线程中的,不能接收future2和future1的执行结果作为参数。也不能有返回值
     */
    public static void multiTaskThread_runAfterBoth(){
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 2;
            return i;
        }, executor);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            return "Hello";
        }, executor);

        future1.runAfterBoth(future2, () -> {
            try {
                System.out.println(future1.get());
                System.out.println(future2.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println("world");
        });
    }

    // 多任务组合测试demo【都完成】
    /**
     * thenAcceptBoth/thenAcceptBothAsync: future1和future2执行结束后执行新线程中的,可以接收future2和future1的执行结果作为参数。但不能有返回值
     */
    public static void multiTaskThread_thenAcceptBothAsync(){
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 2;
            return i;
        }, executor);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            return "Hello";
        }, executor);

        future1.thenAcceptBothAsync(future2, (res, res1) -> {
            System.out.println(res + " -> " + res1 + " -> world");
        });
    }

    // 多任务组合测试demo【都完成】
    /**
     * thenCombine/thenCombineAsync: future1和future2执行结束后执行新线程中的,可以接收future2和future1的执行结果作为参数。也可以有返回值
     */
    public static void multiTaskThread_thenCombineAsync() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 2;
            return i;
        }, executor);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            return "Hello";
        }, executor);

        CompletableFuture<String> result = future1.thenCombineAsync(future2, (res, res1) -> {
            return res + " -> " + res1 + " -> world";
        }, executor);
        System.out.println(result.get());
    }

6、两任务组合——一个完成

    // 多任务组合测试demo【一个完成】
    /**
     * runAfterEitherAsync/runAfterEither:其中组合执行线程中一个线程执行结束就执行后面的线程。不能接收参数,也没有返回值
     */
    public static void multiTaskThread_runAfterEitherAsync(){
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 2;
            return i;
        }, executor);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            return "Hello";
        }, executor);

        future1.runAfterEitherAsync(future2, () -> {
            System.out.println("world");
        });
    }

    // 多任务组合测试demo【一个完成】
    /**
     * acceptEitherAsync/acceptEither:其中组合执行线程中一个线程执行结束就执行后面的线程【这几个线程的返回值类型必须相同】。能接收参数,但是没有返回值
     */
    public static void multiTaskThread_acceptEitherAsync(){
        CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 2;
            return i;
        }, executor);
        CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
            return "Hello";
        }, executor);

        future1.acceptEitherAsync(future2, (res) -> {
            System.out.println("world");
        });
    }

    // 多任务组合测试demo【一个完成】
    /**
     * applyToEitherAsync/applyToEither: 其中组合执行线程中一个线程执行结束就执行后面的线程【这几个线程的返回值类型必须相同】。能接收参数,也有返回值
     */
    public static void multiTaskThread_applyToEitherAsync(){
        CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 2;
            return i;
        }, executor);
        CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
            return "Hello";
        }, executor);

        future1.applyToEitherAsync(future2, (res) -> {
            return "world";
        });
    }

7、多任务组合
    // 多任务组合 【等待所有组合的线程执行结束】
    public static void multiTaskThread_allOf() throws ExecutionException, InterruptedException {
        CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 2;
            return i;
        }, executor);
        CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
            return "Hello";
        }, executor);

        CompletableFuture<Void> future = CompletableFuture.allOf(future1, future2);
        future.get(); // 等待future1和future2执行结束
        /**
         * 注意:CompletableFuture.allOf返回的future.get并不能获取到future1或者 future2的执行结果
         * 要获取future1或者future2的执行结果还是需要调用各自的get方法
         */
        System.out.println(future1.get());
        System.out.println(future2.get());
    }

    // 多任务组合 【等待所有组合的线程中一个执行结束】
    public static void multiTaskThread_anyOf() throws ExecutionException, InterruptedException {
        CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 2;
            return i;
        }, executor);
        CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
            return "Hello";
        }, executor);

        CompletableFuture<Object> future = CompletableFuture.anyOf(future1, future2);
        future.get(); // 等待future1和future2执行结束
        /**
         * 注意:CompletableFuture.anyOf返回的future.get只能获取成功线程的返回值
         */
        System.out.println(future1.get());
        System.out.println(future2.get());
    }