前言
这篇主要来记录一下并发中的基本类型AtomicInteger、以及四种线程池的解析。
基本类型
AtomicInteger是用来解决多线程并发情况下的变量安全问题。原理上使用了CAS和自旋的方式。但是AtomicInteger解决不了ABA问题。
1 | AtomicInteger atomicInteger = new AtomicInteger(5); |
AtomicStampedReference中有一个Stamp作为版本号,用来解决ABA问题。Demo:1
2
3
4
5int x = 1;
int stamp = 0 ;
AtomicStampedReference atomicStampedReference = new AtomicStampedReference(x,stamp);
System.out.println(atomicStampedReference.compareAndSet(1,2,0,stamp+1));
System.out.println(atomicStampedReference.getReference()+","+atomicStampedReference.getStamp());
结果:
true
2,1
线程池
主要的参数:corePoolSize 、maximumPoolSize 、keepAliveTime 、unit 、workQueue 。
corePoolSize:核心线程数。当前线程数 < corePoolSize ,新进来的任务会再开一个线程。
workQueue :缓存队列。当前线程数 >= corePoolSize,任务先会放入缓存队列中。(在后面详细讲)
maximumPoolSize :最大线程数。当前线程数 >= corePoolSize && 缓存队列满 ,会再创建线程。直到当前线程数 == maximumPoolSize 。
keepAliveTime :存活时间。当前线程数 > 核心线程数 ,会监控多出的线程,空闲时间超过keepAliveTime就会销毁。
unit :这个是keepAliveTime的单位,可以为秒、毫秒等等。
handler: 线程池对拒绝任务的处理策略(在后文讲具体的策略)
四种线程池主要是针对构造ThreadPoolExecutor的时候,对上述参数的设定不同而使得它们的应用场景不同。
newCachedThreadPool:
1 | public static ExecutorService newCachedThreadPool() { |
注意这个核心线程数为0,意味着当有新任务进来的时候,会直接放入到缓存队列中SynchronousQueue,而SynchronousQueue是没有容量的,插入操作put必须等待消费者的移除操作take,所以会马上创建一个线程,这里的最大线程数默认为Integer.MAX_VALUE,所以可以无限创建,但是每一个线程有一个60s的存活时间,所以这个线程池为缓存线程池。
newScheduledThreadPool:
1 | public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { |
DelayedWorkQueue会根据时间的顺序进行排序,周期性的执行任务。
newFixedThreadPool:
1 | public static ExecutorService newFixedThreadPool(int nThreads) { |
传入的nThreads作为核心线程数和最大线程数的值。注意这里的线程存活时间为0ms,而且核心数等于最大线程数,意味着不会有多于核心数的线程会被创建,那么就不存在回收创建的线程,多出来的任务就在阻塞队列中等到。所以为固定大小的线程池。
newSingleThreadExecutor:
1 | public static ExecutorService newSingleThreadExecutor() { |
算是一种特殊的newFixedThreadPool线程池。
2019.02.24更新
线程池的四种拒绝策略
AbortPolicy:直接抛出异常,默认策略
CallerRunsPolicy:用调用者所在的线程来执行任务
DiscardOldestPolicy:丢弃阻塞队列最靠前的任务,执行当前的任务。
DiscardPolicy:直接丢弃任务
这四种策略是独立无关的,是对任务拒绝处理的四中表现形式。最简单的方式就是直接丢弃任务。但是却有两种方式,到底是该丢弃哪一个任务,比如可以丢弃当前将要加入队列的任务本身(DiscardPolicy)或者丢弃任务队列中最旧任务(DiscardOldestPolicy)。丢弃最旧任务也不是简单的丢弃最旧的任务,而是有一些额外的处理。
除了丢弃任务还可以直接抛出一个异常(RejectedExecutionException),这是比较简单的方式。抛出异常的方式(AbortPolicy)尽管实现方式比较简单,但是由于抛出一个RuntimeException,因此会中断调用者的处理过程。除了抛出异常以外还可以不进入线程池执行,在这种方式(CallerRunsPolicy)中任务将由调用者线程去执行。
线程池的三种阻塞队列
1 | BlockingQueue<Runnable> workQueue = null; |
SynchronousQueue:这是一个很有意思的阻塞队列,其中每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入 操作。因此此队列内部其 实没有任何一个元素,或者说容量是0,严格说并不是一种容器。由于队列没有容量,因此不能调用peek操作,因为只有移除元素时才有元素。
LinkedBlockingQueue:LinkedBlockingQueue是一个无界缓存等待队列。当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待。(所以在使用此阻塞队列时maximumPoolSizes就相当于无效了),每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。
ArrayBlockingQueue:ArrayBlockingQueue是一个有界缓存等待队列,可以指定缓存队列的大小,当正在执行的线程数等于corePoolSize时,多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行,当ArrayBlockingQueue已满时,加入ArrayBlockingQueue失败,会开启新的线程去执行,当线程数已经达到最大的maximumPoolSizes时,再有新的元素尝试加入ArrayBlockingQueue时会报错。