Java 并发编程

发布时间 2023-10-11 17:47:34作者: LARRY1024

乐观锁和悲观锁

悲观锁

悲观锁总是假设最坏的情况,认为共享资源每次被访问的时候就会出现问题(比如共享数据被修改),所以每次在获取资源操作的时候都会上锁,这样其他线程想拿到这个资源就会阻塞直到锁被上一个持有者释放。也就是说,共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程。

Java 中的 synchronized 和 ReentrantLock 等独占锁,就是悲观锁思想的实现。

示例1:

public void performSynchronisedTask() {
    synchronized (this) {
        // 需要同步的操作
    }
}

示例2:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockTest {
    private Lock lock = new ReentrantLock();

    public void write() {
        lock.lock();
        try {
            // 需要同步的操作
        } finally {
            lock.unlock();
        }
    }
}

高并发的场景下,激烈的锁竞争会造成线程阻塞,大量阻塞线程会导致系统的上下文切换,增加系统的性能开销。并且,悲观锁还可能会存在死锁问题,影响代码的正常运行。

乐观锁

乐观锁总是假设最好的情况,认为共享资源每次被访问的时候不会出现问题,线程可以不停地执行,无需加锁也无需等待,只是在提交修改的时候去验证对应的资源是否被其它线程修改了。

乐观锁一般会使用版本号机制或 CAS 算法实现。

Java 中的 java.util.concurrent.atomic 包下面的原子变量类,比如:AtomicInteger、LongAdder,就是使用了乐观锁的一种实现方式 CAS 实现的。

示例:

// LongAdder 在高并发场景下会比 AtomicInteger 和 AtomicLong 的性能更好
// 代价就是会消耗更多的内存空间(空间换时间)
LongAdder sum = new LongAdder();
sum.increment();

高并发的场景下,乐观锁相比悲观锁来说,不存在锁竞争造成线程阻塞,也不会有死锁的问题,在性能上往往会更胜一筹。但是,如果冲突频繁发生(写占比非常多的情况),会频繁失败和重试,这样同样会非常影响性能,导致 CPU 飙升。

不过,大量失败重试的问题也是可以解决的,像我们前面提到的 LongAdder 以空间换时间的方式就解决了这个问题。

乐观锁的实现

版本号机制

一般是在数据表中加上一个数据版本号 version 字段,表示数据被修改的次数。当数据被修改时,version 值会加一。当线程 A 要更新数据值时,在读取数据的同时也会读取 version 值,在提交更新时,若刚才读取到的 version 值为当前数据库中的 version 值相等时才更新,否则重试更新操作,直到更新成功。

CAS 算法

CAS(Compare And Swap, 比较与交换)的思想用于实现乐观锁,被广泛应用于各大框架中。CAS 的思想很简单,就是用一个预期值和要更新的变量值进行比较,两值相等才会进行更新。CAS 是一个原子操作,底层依赖于一条 CPU 的原子指令。

CAS 涉及到三个操作数:

  • V:要更新的变量值(Var)

  • E:预期值(Expected)

  • N:拟写入的新值(New)

当且仅当变量 V 的值等于 E 时,CAS 通过原子方式用新值 N 来更新 V 的值。如果不等,说明已经有其它线程更新了 V,则当前线程放弃更新。

Java 语言并没有直接实现 CAS,CAS 相关的实现是通过 C++ 内联汇编的形式实现的(JNI 调用)。因此, CAS 的具体实现和操作系统以及 CPU 都有关系。

Unsafe 类提供了 compareAndSwapObject、compareAndSwapInt、compareAndSwapLong 方法来实现的对 Object、int、long 类型的 CAS 操作。

乐观锁的问题

ABA 问题

如果一个变量 V 初次读取的时候是 A 值,并且在准备赋值的时候检查到它仍然是 A 值,那我们就能说明它的值没有被其他线程修改过了吗?很明显是不能的,因为在这段时间它的值可能被改为其他值,然后又改回 A,那 CAS 操作就会误认为它从来没有被修改过。这个问题被称为 CAS 操作的 "ABA"问题。

ABA 问题的解决思路是:在变量前面追加上版本号或者时间戳

JDK 1.5 以后的 AtomicStampedReference 类就是用来解决 ABA 问题的,其中的 compareAndSet() 方法就是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

// expectedReference 表示旧的值,expectedStamp 表示旧的版本号,newReference 表示新的值,newStamp 表示新的版本号
boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp)

循环时间长开销大

CAS 经常会用到自旋操作来进行重试,也就是不成功就一直循环执行直到成功。如果长时间不成功,会给 CPU 带来非常大的执行开销。

如果 JVM 能支持处理器提供的 pause 指令那么效率会有一定的提升,pause 指令有两个作用:

  • 可以延迟流水线执行指令,使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。

  • 可以避免在退出循环的时候因内存顺序冲而引起 CPU 流水线被清空,从而提高 CPU 的执行效率。

只能保证一个共享变量的原子操作

CAS 只对单个共享变量有效,当操作涉及跨多个共享变量时 CAS 无效。

但是从 JDK 1.5 开始,提供了 AtomicReference 类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行 CAS 操作,所以,我们可以使用锁或者利用 AtomicReference 类,把多个共享变量合并成一个共享变量来操作。

对比

一般情况下:

  • 悲观锁通常多用于写比较多的情况(多写场景,竞争激烈),这样可以避免频繁失败和重试影响性能,悲观锁的开销是固定的。

  • 乐观锁通常多用于写比较少的情况(多读场景,竞争较少),这样可以避免频繁加锁影响性能。

synchronized 关键字

synchronized 是 Java 中的一个关键字,翻译成中文是同步的意思,主要解决的是多个线程之间访问资源的同步性,可以保证被它修饰的方法或者代码块在任意时刻只能有一个线程执行。

使用方法

修饰实例方法

给当前对象实例加锁,进入同步代码前要获得 当前对象实例的锁 。

synchronized void method() {
    ...
}

修饰静态方法

给当前类加锁,会作用于类的所有对象实例 ,进入同步代码前要获得 当前 class 的锁。

synchronized static void method() {
    ...
}

因为静态成员不属于任何一个实例对象,归整个类所有,不依赖于类的特定实例,被类的所有实例共享。

修饰代码块

对括号里指定的对象/类加锁:

  • synchronized(object):表示进入同步代码库前要获得 给定对象 的锁。

  • synchronized(类.class):表示进入同步代码前要获得 给定 Class 的锁

synchronized(this) {
    ...
}

synchronized 底层原理

synchronized 同步语句块

synchronized 同步语句块的实现,使用的是两条指令:monitorenter 指令 和 monitorexit 指令。

在 Java 虚拟机(HotSpot)中,Monitor 是基于 C++ 实现的,由 ObjectMonitor 实现的,每个对象中都内置了一个 ObjectMonitor 对象。

另外,wait/notify 等方法也依赖于 monitor 对象,这就是为什么只有在同步的块或者方法中才能调用 wait/notify 等方法,否则会抛出 java.lang.IllegalMonitorStateException 的异常的原因。

monitorenter 指令

monitorenter 指令:指向同步代码块的开始位置。

当执行 monitorenter 指令时,线程试图获取锁也就是获取 对象监视器 monitor 的持有权,即线程会尝试获取对象的锁,如果锁的计数器为 0,则表示锁可以被获取,获取后将锁计数器加 1。

image

monitorexit 指令

monitorexit 指令:则指明同步代码块的结束位置。

只有对象锁的的拥有者线程才可以执行 monitorexit 指令,来释放锁。在执行 monitorexit 指令后,将锁计数器设为 0,表明锁被释放,其他线程可以尝试获取锁。

image

synchronized 修饰方法

synchronized 修饰的方法会使用 ACC_SYNCHRONIZED 标识,该标识指明了该方法是一个同步方法。

JVM 通过该 ACC_SYNCHRONIZED 访问标志来辨别一个方法是否声明为同步方法,从而执行相应的同步调用:

  • 如果是实例方法,JVM 会尝试获取 实例对象 的锁。

  • 如果是静态方法,JVM 会尝试获取 当前 class 的锁。

ReentrantLock

ReentrantLock 是一个可重入的互斥锁,又被称为“独占锁”。ReentrantLock 类实现了 Lock ,它拥有与 synchronized 相同的并发性和内存语义,但是添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。此外,它还提供了在激烈争用情况下更佳的性能。

ReentrantLock 锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock 锁,可以被单个线程多次获取。

ReentrantLock 分为“公平锁”和“非公平锁”,它们的区别体现在获取锁的机制上是否公平。“锁”是为了保护竞争资源,防止多个线程同时操作线程而出错,ReentrantLock 在同一个时间点只能被一个线程获取,当某线程获取到“锁”时,其它线程就必须等待。

ReentraantLock 是通过一个 FIFO 的等待队列来管理获取该锁所有线程的:

  • 在“公平锁”的机制下,线程依次排队获取锁;

  • 而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。

公平锁和非公平锁

  • 公平锁 : 锁被释放之后,先申请的线程先得到锁。性能较差一些,因为公平锁为了保证时间上的绝对顺序,上下文切换更频繁。

  • 非公平锁:锁被释放之后,后申请的线程可能会先获取到锁,是随机或者按照其他优先级排序的。性能更好,但可能会导致某些线程永远无法获取到锁。

synchronized 和 ReentrantLock 有什么区别?

  • 两者都是可重入锁。

可重入锁 也叫递归锁,指的是线程可以再次获取自己的内部锁。比如一个线程获得了某个对象的锁,此时这个对象锁还没有释放,当其再次想要获取这个对象的锁的时候还是可以获取的,如果是不可重入锁的话,就会造成死锁。

JDK 提供的所有现成的 Lock 实现类,包括 synchronized 关键字锁都是可重入的。

  • 锁的实现

    • synchronized 是依赖于 JVM 实现的。

    • ReentrantLock 是 JDK 层面实现的。

      也就是 API 层面,需要 lock() 和 unlock() 方法配合 try/finally 语句块来完成。

相比 synchronized,ReentrantLock 增加了一些高级功能。主要来说主要有三点:

  • 等待可中断 : ReentrantLock 提供了一种能够中断等待锁的线程的机制,通过 lock.lockInterruptibly() 来实现这个机制。也就是说正在等待的线程可以选择放弃等待,改为处理其他事情。

  • 可实现公平锁 : ReentrantLock 可以指定是公平锁还是非公平锁;而 synchronized 只能是非公平锁。

    所谓的公平锁就是先等待的线程先获得锁。ReentrantLock 默认情况是非公平的,可以通过 ReentrantLock 类的 ReentrantLock(boolean fair) 构造方法来指定是否是公平的。

  • 可实现选择性通知(锁可以绑定多个条件):

    • synchronized 关键字与 wait() 和 notify()/notifyAll() 方法相结合可以实现等待/通知机制。

    • ReentrantLock 类也可以实现等待/通知机制,但是需要借助于 Condition 接口与 newCondition() 方法。

Condition

Condition是 JDK1.5 之后才有的,它具有很好的灵活性,比如可以实现多路通知功能,也就是在一个 Lock 对象中可以创建多个 Condition 实例(即对象监视器),线程对象可以注册在指定的 Condition 中,从而可以有选择性的进行线程通知,在调度线程上更加灵活。在使用 notify()/notifyAll() 方法进行通知时,被通知的线程是由 JVM 选择的,用 ReentrantLock 类结合Condition实例可以实现“选择性通知” ,这个功能非常重要,而且是 Condition 接口默认提供的。

而 synchronized 关键字就相当于整个 Lock 对象中只有一个 Condition 实例,所有的线程都注册在它一个身上。如果执行 notifyAll() 方法的话就会通知所有处于等待状态的线程,这样会造成很大的效率问题。而 Condition 实例的 signalAll() 方法,只会唤醒注册在该 Condition 实例中的所有等待线程。

可中断锁和不可中断锁有什么区别?

  • 可中断锁:获取锁的过程中可以被中断,不需要一直等到获取锁之后 才能进行其他逻辑处理。

    ReentrantLock 就属于是可中断锁。

  • 不可中断锁:一旦线程申请了锁,就只能等到拿到锁以后才能进行其他的逻辑处理。

    synchronized 就属于是不可中断锁。

ThreadLocal

ThreadLocal是一个将在多线程中为每一个线程创建单独的变量副本的类; 当使用ThreadLocal来维护变量时, ThreadLocal会为每个线程创建单独的变量副本, 避免因多线程操作共享变量而导致的数据不一致的情况。

ThreadLocal 被提到应用最多的就是 session 管理和数据库链接管理。

示例:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
 
public class ConnectionManager {
 
    private static final ThreadLocal<Connection> dbConnectionLocal = ThreadLocal.withInitial(() -> {
        try {
            return DriverManager.getConnection("https://xxx", "user", "passwd");
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    });

    public Connection getConnection() {
        return dbConnectionLocal.get();
    }
}

ThreadLocal 实现线程隔离的原理

从源码中看到 ThreadLocalMap 其实就是一个简单的 Map 结构,底层是数组,有初始化大小,也有扩容阈值大小,数组的元素是 Entry,Entry 的 key 就是 ThreadLocal 的引用,value 是 ThreadLocal 的值。其中,ThreadLocalMap 解决 hash 冲突的方式采用的是线性探测法,如果发生冲突会继续寻找下一个空的位置。

源码:

public class ThreadLocal<T> {

    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            map.set(this, value);
        } else {
            createMap(t, value);
        }
    }

    public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
    }

    private T setInitialValue() {
        T value = initialValue();
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            map.set(this, value); // key 为当前 ThreadLocal 对象的引用
        } else {
            createMap(t, value);
        }
        if (this instanceof TerminatingThreadLocal) {
            TerminatingThreadLocal.register((TerminatingThreadLocal<?>) this);
        }
        return value;
    }

    public void remove() {
         ThreadLocalMap m = getMap(Thread.currentThread());
         if (m != null) {
             m.remove(this);
         }
     }
}

每个 Thread 中都具备一个 ThreadLocalMap,而 ThreadLocalMap 可以存储以 ThreadLocal 为 key ,Object 对象为 value 的键值对。

image

因此,如果我们在同一个线程中声明了两个 ThreadLocal 对象,Thread 内部都是使用仅有的那个 ThreadLocalMap 存放数据的,ThreadLocalMap 的 key 就是 ThreadLocal 对象,value 就是 ThreadLocal 对象调用 set 方法设置的值。

ThreadLocal 内存泄漏场景

内存泄漏的示例:

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadLocalDemo {
    static class LocalVariable {
        private final Long[] a = new Long[1024 * 1024];
    }
    final static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES,
            new LinkedBlockingQueue<>());
    final static ThreadLocal<LocalVariable> localVariable = new ThreadLocal<>();

    public static void main(String[] args) throws InterruptedException {
        Thread.sleep(5000 * 4);
        for (int i = 0; i < 50; ++i) {
            poolExecutor.execute(() -> {
                localVariable.set(new LocalVariable());
                System.out.println("use local variable" + localVariable.get());
                localVariable.remove();
            });
        }
        System.out.println("pool execute over");
    }
}

ThreadLocalMap 中使用的 key 为 ThreadLocal 的弱引用,而 value 是强引用。所以,如果 ThreadLocal 没有被外部强引用的情况下,在垃圾回收的时候,key 会被清理掉,而 value 不会被清理掉。

这样一来,ThreadLocalMap 中就会出现 key 为 null 的 Entry。假如我们不做任何措施的话,value 永远无法被 GC 回收,这个时候就可能会产生内存泄露。ThreadLocalMap 实现中已经考虑了这种情况,在调用 set()、get()、remove() 方法的时候,会清理掉 key 为 null 的记录。使用完 ThreadLocal方法后最好手动调用 remove() 方法

因此,在使用完 ThreadLocal 变量后,需要我们手动 remove 掉,避免 ThreadLocalMap 中 Entry 一直保持对 value 的强引用,导致 value 不能被回收。

线程池

线程池就是管理一系列线程的资源池。当有任务要处理时,直接从线程池中获取线程来处理,处理完之后线程并不会立即被销毁,而是等待下一个任务。

使用线程池的好处:

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

  • 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。

  • 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池创建方式

线程池创建的方式:

  • ThreadPoolExecutor 创建(推荐)

    使用线程池的好处是:减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。

  • 通过 Executor 框架的工具类 Executors 创建

    我们可以创建多种类型的 ThreadPoolExecutor:

    • FixedThreadPool:该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。

    • SingleThreadExecutor: 该方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。

    • CachedThreadPool: 该方法返回一个可根据实际情况调整线程数量的线程池。初始大小为0。当有新任务提交时,如果当前线程池中没有线程可用,它会创建一个新的线程来处理该任务。如果在一段时间内(默认为60秒)没有新任务提交,核心线程会超时并被销毁,从而缩小线程池的大小。

    • ScheduledThreadPool:该方法返回一个用来在给定的延迟后运行任务或者定期执行任务的线程池

使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。

使用 Executors 创建线程池对象的弊端如下:

  • FixedThreadPool 和 SingleThreadExecutor:使用的是无界的 LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。

  • CachedThreadPool:使用的是同步队列 SynchronousQueue, 允许创建的线程数量为 Integer.MAX_VALUE ,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM。

  • ScheduledThreadPool 和 SingleThreadScheduledExecutor : 使用的无界的延迟阻塞队列DelayedWorkQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。

通过 ThreadPoolExecutor 创建线程池

ThreadPoolExecutor 的构造函数:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • corePoolSize:核心线程数,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去。

  • maximumPoolSize:线程池中的最大线程数量,这个参数会根据 workQueue 任务队列的类型,决定线程池会开辟的最大线程数量。

    任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。

  • keepAliveTime:当线程池中空闲线程数量超过 corePoolSize 时,多余的线程会在多长时间内被销毁。

    线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,多余的空闲线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime 才会被回收销毁,线程池回收线程时,会对核心线程和非核心线程一视同仁,直到线程池中线程的数量等于 corePoolSize ,回收过程才会停止。

  • unit:keepAliveTime 的单位

  • workQueue:工作队列,在任务执行之前用于保存任务的队列。

    可以选择的工作队列:ArrayBlockingQueue、DelayQueue、LinkedBlockingDeque、LinkedBlockingQueue、LinkedTransferQueue、PriorityBlockingQueue、SynchronousQueue

  • threadFactory:线程工厂,用于创建线程,一般用默认即可。

  • handler:拒绝策略;当任务太多来不及处理时,如何拒绝任务;

    可以选择的拒绝策略:ThreadPoolExecutor.AbortPolicy、ThreadPoolExecutor.CallerRunsPolicy、ThreadPoolExecutor.DiscardOldestPolicy、ThreadPoolExecutor.DiscardPolicy

线程池中各个参数的相互关系:

image

线程池的阻塞队列

不同的线程池需要对应的阻塞队列,这里,我们可以结合内置线程池来分析:

  • LinkedBlockingQueue:无界队列,应用:FixedThreadPool 和 SingleThreadExector。

    LinkedBlockingQueue 容量为 Integer.MAX_VALUE,由于队列永远不会被放满,因此,FixedThreadPool 最多只能创建核心线程数的线程。

  • SynchronousQueue:同步队列,应用:CachedThreadPool。

    SynchronousQueue 没有容量,不存储元素,目的是保证对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务。

    因此,CachedThreadPool 的最大线程数是 Integer.MAX_VALUE ,可以理解为线程数是可以无限扩展的,可能会创建大量线程,从而导致 OOM。

  • DelayedWorkQueue:延迟阻塞队列,应用:ScheduledThreadPool 和 SingleThreadScheduledExecutor 。

    DelayedWorkQueue 的内部元素并不是按照放入的时间排序,而是按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的

    DelayedWorkQueue 添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE,所以最多只能创建核心线程数的线程。

线程池的拒绝策略

如果当前同时运行的线程数量,达到最大线程数量并且队列也已经被放满了任务时,会触发线程池的拒绝策略:

  • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。

  • ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用 execute 方法的线程中运行被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。

    因此,这种策略会降低对于新任务提交速度,影响程序的整体性能。如果应用程序可以承受此延迟,并且你要求任何一个任务请求都要被执行的话,可以选择这个策略。

  • ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉。

  • ThreadPoolExecutor.DiscardOldestPolicy:此策略将丢弃最早的未处理的任务请求。

线程池处理任务的流程

线程池处理任务的流程,如下图所示:

image

  • 如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务;

    • 如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行;

    • 如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。

  • 如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么,当前任务会被拒绝,饱和策略会调用 RejectedExecutionHandler.rejectedExecution() 方法。

如何给线程池命名

给线程池里的线程命名通常有下面两种方式:

ThreadFactoryBuilder

示例:

import java.util.concurrent.ThreadFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

ThreadFactory threadFactory = new ThreadFactoryBuilder()
        .setNameFormat(threadNamePrefix + "-%d")
        .setDaemon(true).build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);

实现一个 ThreadFactory

示例:

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public final class NamingThreadFactory implements ThreadFactory {

    private final AtomicInteger threadNum = new AtomicInteger();
    private final ThreadFactory delegate;
    private final String name;

    public NamingThreadFactory(ThreadFactory delegate, String name) {
        this.delegate = delegate;
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = delegate.newThread(r);
        t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
        return t;
    }
}

如何设定线程池的大小

有一个简单并且适用面比较广的公式:

  • CPU 密集型任务:N + 1

    CPU 密集型任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数) + 1。比 CPU 核心数多出来的一个线程,是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

  • I/O 密集型任务:2N

    I/O 密集型任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。


参考: