多线程并发 之 基本类型与线程池

前言

这篇主要来记录一下并发中的基本类型AtomicInteger、以及四种线程池的解析。

基本类型

AtomicInteger是用来解决多线程并发情况下的变量安全问题。原理上使用了CAS和自旋的方式。但是AtomicInteger解决不了ABA问题。

1
2
3
AtomicInteger atomicInteger = new AtomicInteger(5);
atomicInteger.addAndGet(1);
System.out.println(atomicInteger);

AtomicStampedReference中有一个Stamp作为版本号,用来解决ABA问题。Demo:

1
2
3
4
5
int x = 1;
int stamp = 0 ;
AtomicStampedReference atomicStampedReference = new AtomicStampedReference(x,stamp);
System.out.println(atomicStampedReference.compareAndSet(1,2,0,stamp+1));
System.out.println(atomicStampedReference.getReference()+","+atomicStampedReference.getStamp());

结果:

true
2,1

线程池

主要的参数:corePoolSize 、maximumPoolSize 、keepAliveTime 、unit 、workQueue 。

corePoolSize:核心线程数。当前线程数 < corePoolSize ,新进来的任务会再开一个线程。

workQueue :缓存队列。当前线程数 >= corePoolSize,任务先会放入缓存队列中。(在后面详细讲)

maximumPoolSize :最大线程数。当前线程数 >= corePoolSize && 缓存队列满 ,会再创建线程。直到当前线程数 == maximumPoolSize 。

keepAliveTime :存活时间。当前线程数 > 核心线程数 ,会监控多出的线程,空闲时间超过keepAliveTime就会销毁。

unit :这个是keepAliveTime的单位,可以为秒、毫秒等等。

handler: 线程池对拒绝任务的处理策略(在后文讲具体的策略)

四种线程池主要是针对构造ThreadPoolExecutor的时候,对上述参数的设定不同而使得它们的应用场景不同。

newCachedThreadPool:

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

注意这个核心线程数为0,意味着当有新任务进来的时候,会直接放入到缓存队列中SynchronousQueue,而SynchronousQueue是没有容量的,插入操作put必须等待消费者的移除操作take,所以会马上创建一个线程,这里的最大线程数默认为Integer.MAX_VALUE,所以可以无限创建,但是每一个线程有一个60s的存活时间,所以这个线程池为缓存线程池。

newScheduledThreadPool:

1
2
3
4
5
6
7
8
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

DelayedWorkQueue会根据时间的顺序进行排序,周期性的执行任务。

newFixedThreadPool:

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

传入的nThreads作为核心线程数和最大线程数的值。注意这里的线程存活时间为0ms,而且核心数等于最大线程数,意味着不会有多于核心数的线程会被创建,那么就不存在回收创建的线程,多出来的任务就在阻塞队列中等到。所以为固定大小的线程池。

newSingleThreadExecutor:

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

算是一种特殊的newFixedThreadPool线程池。

2019.02.24更新

线程池的四种拒绝策略

AbortPolicy:直接抛出异常,默认策略

CallerRunsPolicy:用调用者所在的线程来执行任务

DiscardOldestPolicy:丢弃阻塞队列最靠前的任务,执行当前的任务。

DiscardPolicy:直接丢弃任务

这四种策略是独立无关的,是对任务拒绝处理的四中表现形式。最简单的方式就是直接丢弃任务。但是却有两种方式,到底是该丢弃哪一个任务,比如可以丢弃当前将要加入队列的任务本身(DiscardPolicy)或者丢弃任务队列中最旧任务(DiscardOldestPolicy)。丢弃最旧任务也不是简单的丢弃最旧的任务,而是有一些额外的处理。

除了丢弃任务还可以直接抛出一个异常(RejectedExecutionException),这是比较简单的方式。抛出异常的方式(AbortPolicy)尽管实现方式比较简单,但是由于抛出一个RuntimeException,因此会中断调用者的处理过程。除了抛出异常以外还可以不进入线程池执行,在这种方式(CallerRunsPolicy)中任务将由调用者线程去执行。

线程池的三种阻塞队列

1
2
3
4
BlockingQueue<Runnable> workQueue = null;
workQueue = new ArrayBlockingQueue<>(5);//基于数组的先进先出队列,有界
workQueue = new LinkedBlockingQueue<>();//基于链表的先进先出队列,无界
workQueue = new SynchronousQueue<>();//无缓冲的等

SynchronousQueue:这是一个很有意思的阻塞队列,其中每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入 操作。因此此队列内部其 实没有任何一个元素,或者说容量是0,严格说并不是一种容器。由于队列没有容量,因此不能调用peek操作,因为只有移除元素时才有元素。

LinkedBlockingQueue:LinkedBlockingQueue是一个无界缓存等待队列。当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待。(所以在使用此阻塞队列时maximumPoolSizes就相当于无效了),每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。

ArrayBlockingQueue:ArrayBlockingQueue是一个有界缓存等待队列,可以指定缓存队列的大小,当正在执行的线程数等于corePoolSize时,多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行,当ArrayBlockingQueue已满时,加入ArrayBlockingQueue失败,会开启新的线程去执行,当线程数已经达到最大的maximumPoolSizes时,再有新的元素尝试加入ArrayBlockingQueue时会报错。