线程池就是提前创建若干个线程,如果有任务需要处理,线程池里的线程就会处理任务,处理完之后线程并不会被销毁,而是等待下一个任务。由于创建和销毁线程都是消耗系统资源的,所以当你想要频繁的创建和销毁线程的时候就可以考虑使用线程池来提升系统的性能。
java中经常需要用到多线程来处理一些业务,我们非常不建议单纯使用继承Thread或者实现Runnable接口的方式来创建线程,那样势必有创建及销毁线程耗费资源、线程上下文切换问题。同时创建过多的线程也可能引发资源耗尽的风险,这个时候引入线程池比较合理,方便线程任务的管理。java中涉及到线程池的相关类均在jdk1.5开始的java.util.concurrent包中,涉及到的几个核心类及接口包括:Executor、Executors、ExecutorService、ThreadPoolExecutor、FutureTask、Callable、Runnable等。
线程池提供了一种限制和管理资源(包括执行一个任务)。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。
线程池的好处如下:
1.降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
2.可有效的控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
3.提供定时执行、定期执行、单线程、并发数控制等功能。
1)线程提交到线程池
2)判断核心线程池是否已经达到设定的数量,如果没有达到,则直接创建线程执行任务
3)如果达到了,则放在队列中,等待执行
4)如果队列已经满了,则判断线程的数量是否已经达到设定的最大值,如果达到了,则直接执行拒绝策略
5)如果没有达到,则创建线程执行任务。
RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务;
SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);
STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。
Executor就是一个线程池框架,Executor 位于java.util.concurrent.Executors ,提供了用于创建工作线程的线程池的工厂方法。它包含一组用于有效管理工作线程的组件。Executor API 通过 Executors 将任务的执行与要执行的实际任务解耦。
Executor 接口对象能执行我们的线程任务;
Executors 工具类的不同方法按照我们的需求创建了不同的线程池,来满足业务的需求。
ExecutorService 接口继承了Executor接口并进行了扩展,提供了更多的方法,我们能够获得任务执行的状态并且可以获取任务的返回值。
1.newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
2.newFixedThreadPoo
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
3.newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
4.newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。
1)newFixedThreadPool和newSingleThreadExecutor: 主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool: 主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
3)线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让程序员更加明确线程池的运行规则,规避资源耗尽的风险。
ThreadPoolExecutor是线程池的核心实现类,在JDK1.5引入,位于java.util.concurrent包。
通过下面的demo来了解ThreadPoolExecutor创建线程的过程。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 测试ThreadPoolExecutor对线程的执行顺序
**/
public class ThreadPoolSerialTest {
public static void main(String[] args) {
//核心线程数
int corePoolSize = 3;
//最大线程数
int maximumPoolSize = 6;
//超过 corePoolSize 线程数量的线程最大空闲时间
long keepAliveTime = 2;
//以秒为时间单位
TimeUnit unit = TimeUnit.SECONDS;
//创建工作队列,用于存放提交的等待执行任务
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2);
ThreadPoolExecutor threadPoolExecutor = null;
try {
//创建线程池
threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
new ThreadPoolExecutor.AbortPolicy());
//循环提交任务
for (int i = 0; i < 8; i++) {
//提交任务的索引
final int index = (i + 1);
threadPoolExecutor.submit(() -> {
//线程打印输出
System.out.println("大家好,我是线程:" + index);
try {
//模拟线程执行时间,10s
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//每个任务提交后休眠500ms再提交下一个任务,用于保证提交顺序
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
threadPoolExecutor.shutdown();
}
}
}
当一个新任务被提交时:
1. 当前活跃线程数<corePoolSize,则创建一个新线程执行新任务;
2. 当前活跃线程数>corePoolSize,且队列(workQueue)未满时,则将新任务放入队列中;
3. 当前活跃线程数>corePoolSize,且队列(workQueue)已满,且当前活跃线程数<maximumPoolSize,则继续创建一个新线程执行新任务;
4. 当前活跃线程数>corePoolSize,且队列(workQueue)已满,且当前活跃线程数=maximumPoolSize,则执行拒绝策略(handler);
当任务执行完成后:
1. 超出corePoolSize的空闲线程,在等待新任务时,如果超出了keepAliveTime,则线程会被销毁;
2. 如果allowCoreThreadTimeOut被设置为true,那么corePoolSize以内的空闲线程,如果超出了keepAliveTime,则同样会被销毁。
corePoolSize就是线程池中的核心线程数量,这几个核心线程在没有用的时候,也不会被回收
maximumPoolSize就是线程池中可以容纳的最大线程的数量
keepAliveTime就是线程池中除了核心线程之外的其他的最长可以保留的时间,因为在线程池中,除了核心线程即使在无任务的情况下也不能被清 除,其余的都是有存活时间的,意思就是非核心线程可以保留的最长的空闲时间
util就是计算这个时间的一个单位。
workQueue就是等待队列,任务可以储存在任务队列中等待被执行,执行的是FIFIO原则(先进先出)。
threadFactory就是创建线程的线程工厂。
handler是一种拒绝策略,我们可以在任务满了之后,拒绝执行某些任务。
当线程充满了ThreadPool的有界队列时,饱和策略开始起作用。饱和策略可以理解为队列饱和后,处理后续无法入队的任务的策略。ThreadPoolExecutor可以通过调用setRejectedExecutionHandler来修改饱和策略。
当请求任务不断的过来,而系统此时又处理不过来的时候,我们需要采取的策略是拒绝服务。RejectedExecutionHandler接口提供了拒绝任务处理的自定义方法的机会。在ThreadPoolExecutor中已经包含四种处理策略。
AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。
CallerRunsPolicy 策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前的被丢弃的任务。
DiscardOleddestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务。
DiscardPolicy策略:该策略默默的丢弃无法处理的任务,不予任何处理。
除了JDK默认提供的四种拒绝策略,我们可以根据自己的业务需求去自定义拒绝策略,自定义的方式很简单,直接实现RejectedExecutionHandler接口即可。
1.execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
2.submit()方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
线程组ThreadGroup对象中的stop,resume,suspend会导致安全问题,主要是死锁问题,已经被官方废弃,多以价值已经大不如以前。
线程组ThreadGroup不是线程安全的,在使用过程中不能及时获取安全的信息。
shutdownNow():立即关闭线程池(暴力),正在执行中的及队列中的任务会被中断,同时该方法会返回被中断的队列中的任务列表;
shutdown():平滑关闭线程池,正在执行中的及队列中的任务能执行完成,后续进来的任务会被执行拒绝策略;
isTerminated():当正在执行的任务及对列中的任务全部都执行(清空)完就会返回true;
线程池将线程和任务进行解耦,线程是线程,任务是任务,摆脱了之前通过 Thread 创建线程时的一个线程必须对应一个任务的限制。在线程池中,同一个线程可以从阻塞队列中不断获取新任务来执行,其核心原理在于线程池对 Thread 进行了封装,并不是每次执行任务都会调用 Thread.start() 来创建新线程,而是让每个线程去执行一个“循环任务”,在这个“循环任务”中不停的检查是否有任务需要被执行,如果有则直接执行,也就是调用任务中的 run 方法,将 run 方法当成一个普通的方法执行,通过这种方式将只使用固定的线程就将所有任务的 run 方法串联起来。
1.ArrayBlockingQueue是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
2.LinkedBlockingQueue一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列
3.SynchronousQueue 一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool()使用了这个队列。
4.PriorityBlockingQueue 一个具有优先级的无限阻塞队列。
首先是利用好SpringBoot的自动装配功能,配置好线程池的一些基本参数。
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
/*
* 线程池名前缀
*/
private static final String threadNamePrefix = "Api-Async-";
/**
* bean的名称, 默认为首字母小写的方法名
* @return
*/
@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
/**
* 默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,
* 当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
* 当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝
*/
/*
* 核心线程数(默认线程数)
*/
executor.setCorePoolSize(corePoolSize);
//最大线程数
executor.setMaxPoolSize(maxPoolSize);
//缓冲队列数
executor.setQueueCapacity(queueCapacity);
//允许线程空闲时间(单位是秒)
executor.setKeepAliveSeconds(keepAliveTime);
executor.setThreadNamePrefix(threadNamePrefix);
//用来设置线程池关闭时候等待所有任务都完成再继续销毁其他的Bean
executor.setWaitForTasksToCompleteOnShutdown(true);
//线程池对拒绝任务的处理策略,CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化
executor.initialize();
return executor;
}
}
配置好线程池的基本参数时候,我们就可以使用线程池了, 只要在一个限定域为public的方法头部加上@Async注解即可。
@Async
public void createOrder() {
System.out.println("执行任务");
}
1)分析任务的特性
任务的性质:CPU 密集型任务、IO 密集型任务和混合型任务。
任务的优先级:高、中、低。
任务的执行时间:长、中、短。
任务的依赖性:是否依赖其他系统资源,如数据库连接。
2)具体策略
[1]CPU 密集型任务配置尽可能小的线程,如配置N(CPU核心数)+1个线程的线程池。
[2]IO 密集型任务则由于线程并不是一直在执行任务,则配置尽可能多的线程,如2*N(CPU核心数)。
[3]混合型任务如果可以拆分,则将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务。只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率;如果这两个任务执行时间相差太大,则没必要进行分解。
[4]优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理,它可以让优先级高的任务先得到执行。但是,如果一直有高优先级的任务加入到阻塞队列中,那么低优先级的任务可能永远不能执行。
[5]执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。
[6]依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,线程数应该设置得较大,这样才能更好的利用 CPU。
[7]建议使用有界队列,有界队列能增加系统的稳定性和预警能力。可以根据需要设大一点,比如几千。使用无界队列,线程池的队列就会越来越大,有可能会撑满内存,导致整个系统不可用。