Java 在启动时,会创建一个进程,进程中包含一个主线程。主线程会执行 main
方法,我们可以在 main
方法中创建多个线程。
并发基础
本节可能涉及到少量后文的内容,但我们不得不在这里就用到它们。你可以先跳过那些地方,等到后文再回来查看。
乐观锁与悲观锁
悲观锁
悲观锁总是假设最坏的情况,认为数据很可能会被其他线程修改,因此在操作数据之前,会先加锁。也就是说,共享资源始终只能被一个线程访问。
例如 synchronized
关键字就是一种悲观锁。
public class Counter {
private int count = 0;
public synchronized void add() {
count++;
}
}
这里,当一个线程执行 add
方法时,会锁住整个对象,其他线程无法执行 add
方法。只有当当前线程执行完毕后,其他线程才能执行。
悲观锁的缺点是效率低下。因为只有一个线程能够访问共享资源,其他线程只能等待。当有大量线程需要访问共享资源时,会因为阻塞造成上下文切换,降低性能。
此外,悲观锁还有可能造成死锁。例如线程 A 锁住了资源 1,等待资源 2;线程 B 锁住了资源 2,等待资源 1。这时,线程 A 和线程 B 会一直等待下去。
乐观锁
乐观锁则恰恰相反,它认为数据很可能不会被修改,因此在操作数据之前,不会加锁。当在操作后,会使用算法查看操作期间数据是否被其它线程修改过,如果是,就会进行回滚操作。
例如,java.util.concurrent.atomic
包下的类就是一种乐观锁。
public class Counter {
private AtomicInteger count = new AtomicInteger(0);
public void add() {
count.incrementAndGet();
}
}
这里,incrementAndGet
方法是原子性的,不需要加锁。它使用了 CAS
指令,当线程执行 incrementAndGet
方法时,会比较内存中的值和期望值,如果相等,就会将新值写入内存。如果不相等,就会重新读取内存中的值,再次比较。
CAS(Compare And Swap)指令是一种乐观锁的实现。它是一种原子性的操作,可以保证线程安全。
CAS 指令有三个操作数:内存位置、期望值和新值。当内存位置的值等于期望值时,将新值写入内存位置。否则,不做任何操作。
CAS 存在三个问题:
-
ABA 问题
ABA 问题是指,线程 1 读取了内存位置的值 A,然后线程 2 将 A 改为 B,再改回 A。这时,线程 1 会认为内存位置的值没有变化,但实际上已经变化了。
为了解决 ABA 问题,可以使用版本号。每次修改内存位置的值时,都会增加版本号。
-
循环时间长开销大
CAS 指令是一个自旋操作,它会不断尝试修改内存位置的值。如果一直有冲突,会造成 CPU 的浪费。
-
只能保证一个共享变量的原子操作
CAS 指令只能保证一个共享变量的原子操作。如果有多个共享变量,就需要加锁。
JDK 1.5 提供了
AtomicReference
类,可以保证引用类型的原子操作。这部分解决了第三个问题。
乐观锁的优点是效率高,因为不需要加锁。然而,这一切的前提是数据很可能不会被修改。如果数据经常被修改,那么会有大量的回滚重试操作,降低性能。
总的来说,悲观锁适合写操作多的场景,乐观锁适合读操作多的场景。
JMM
内存模型
CPU 由于内存访问速度跟不上 CPU 的运行速度,所以会有多级缓存。而每个 CPU 内核都有自己的缓存,这就会导致多个 CPU 内核之间的数据可能不一致。在多核 CPU 同时操作共享数据时,就会出现数据不一致的问题。
CPU 使用了一些技术来解决这个问题,例如 MESI 协议。这里的 MESI 是四个状态的缩写:
-
M
(Modified):数据被修改,只存在于当前 CPU 缓存中。 -
E
(Exclusive):数据只存在于当前 CPU 缓存中,没有被修改。 -
S
(Shared):数据存在于多个 CPU 缓存中,没有被修改。 -
I
(Invalid):数据无效。
当一个 CPU 写入数据时,会将数据状态设置为 M
,并通知其他 CPU 缓存中的数据无效。当一个 CPU 读取数据时,会先检查数据状态,如果是 M
或 E
,说明数据是最新的,可以直接读取;如果是 S
或者 I
,说明数据无效,需要从内存中读取。
操作系统堆底层硬件进行了抽象和封装,然而,这个抽象出来的 CPU 和内存之间也存在数据不一致的问题。我们把在特定的操作协议下,对特定内存或高速缓存进行读写访问的过程进行抽象,得到的就是内存模型了。
对于编程语言来讲,它可以直接使用操作系统的内存模型,例如 C 语言。但是,对于 Java 来讲,它是跨平台的,不同的操作系统有不同的内存模型。为了保证 Java 的跨平台性,Java 定义了自己的内存模型供程序员使用。我们可以说,Java 的内存模型是抽象上的抽象————抽象到家了。
Java 内存模型规定所有的变量都存储在主内存中,每条线程都有自己的工作内存。线程的工作内存中保存了被该线程使用到的变量的主内存副本拷贝,线程对变量的所有操作(读取、赋值等)都必须在工作内存中进行,而不能直接读写主内存中的变量。不同线程之间也无法直接访问对方工作内存中的变量,线程间变量值的传递都需要通过主内存来完成。
这里你就发现了,Java 只能操作自己的工作内存,需要一定的机制来保证各线程的工作内存及主内存的数据一致性。
JMM 定义了 8 种原子操作,来完成这一任务。包括:
-
lock
:作用于主内存的变量,把一个变量标识为一条线程独占的状态 -
unlock
:作用于主内存的变量,把一个变量标识为可被其他线程独占的状态 -
read
:作用于主内存的变量,把一个变量的值从主内存传输到线程的工作内存中 -
load
:作用于工作内存的变量,把read
操作从主内存传输到工作内存中的变量放入工作内存的变量副本中 -
use
:作用于工作内存的变量,把工作内存中的变量传输到 JVM 的执行引擎中 -
assign
:作用于工作内存的变量,把一个从 JVM 执行引擎中接收到的值赋给工作内存中的变量 -
store
:作用于工作内存的变量,把工作内存中的变量的值传输到主内存中 -
write
:作用于主内存的变量,把store
操作从工作内存传输到主内存中的变量中
然而,为了程序运行的效率,编译器可能对指令进行了重排。因此,JMM 还定义了一些规则,来保证指令重排不会影响程序的正确性:
-
read
和load
、store
和write
是成对出现的,不允许单独出现,但中间可能插入其他操作 - 不允许丢弃
assign
操作,工作内存中的变量改变后必须同步到主内存中 - 不允许在没有
assign
的情况下进行store
操作 - 新变量只能在主内存中诞生,不允许在工作内存中直接使用一个未被初始化的变量。也就是说,对一个变量进行
use
、store
操作之前,必须执行过assign
和load
操作 - 一个变量同一时间只能被一个线程
lock
,但一个线程可以执行多次lock
操作,多次lock
操作后,必顨执行相同次数的unlock
操作才能解锁 - 如果一个变量被
lock
操作锁定,那么会清空工作内存中此变量的值。此后,在使用前必须重新执行load
或assign
操作初始化变量的值 - 如果一个变量没有被
lock
,或是被其它线程lock
,那么不允许对此变量执行unlock
操作 - 对一个变量执行
unlock
操作之前,必须先把此变量同步到主内存中(执行store
和write
操作)
Happens-Before
这些操作和规则实在太麻烦了,但好在,JMM 还定义了一个更简单的等价规则:Happens-Before。
Happens-Before 规则是 Java 内存模型中定义的两个操作之间的偏序关系。如果操作 A Happens-Before 操作 B,那么 A 的执行结果对 B 是可见的。
Happens-Before 规则包括以下几种情况:
- 程序顺序规则:一个线程内,按照程序顺序,前面的操作 Happens-Before 后面的操作
-
锁定规则:一个
unlock
操作 Happens-Before 后面对同一个锁的 lock 操作 -
volatile
变量规则:对一个volatile
变量的写操作 Happens-Before 后面对这个变量的读操作 -
线程启动规则:
Thread
对象的start
方法 Happens-Before 线程的run
方法 - 线程终止规则:线程的所有操作 Happens-Before 线程的终止
-
线程中断规则:对线程的
interrupt
方法的调用 Happens-Before 被中断线程的检测到中断 -
对象终结规则:一个对象的初始化完成(构造函数执行结束) Happens-Before 它的
finalize
方法的开始 - 传递性:如果 A Happens-Before B,B Happens-Before C,那么 A Happens-Before C
我们来看一个例子就明白了:
点击举个栗子
private int value = 0;
public void setValue(int value){
this.value = value;
}
public int getValue(){
return value;
}
如果线程 A 在时间上先调用了 setValue(1)
,然后线程 B 在时间上后调用了 getValue
,那么:
- 程序顺序规则:两个操作在不同线程中,不适用
- 锁定规则:两个操作没有锁,不适用
-
volatile
变量规则:value
是普通变量,不适用 - 线程启动规则:两个操作在不同线程中,不适用
- 线程终止规则:两个操作在不同线程中,不适用
- 线程中断规则:两个操作在不同线程中,不适用
- 对象终结规则:
value
是普通变量,不适用 - 传递性:两个操作之间没有其他操作,不适用
因此,两个操作之间没有 Happens-Before 关系,线程 B 可能读取到 0,也可能读取到 1!
修复有两种方法,一是使用 synchronized
关键字:
private int value = 0;
public synchronized void setValue(int value){
this.value = value;
}
public synchronized int getValue(){
return value;
}
此时,由于 synchronized
关键字为对象加锁,根据锁定规则,setValue
和 getValue
之间有 Happens-Before 关系,线程 B 读取到的值一定是 1。
另一种方法是使用 volatile
关键字:
private volatile int value = 0;
public void setValue(int value){
this.value = value;
}
public int getValue(){
return value;
}
此时,根据 volatile
变量规则,setValue
和 getValue
之间有 Happens-Before 关系,线程 B 读取到的值一定是 1。
由此,我们发现,时间上的先后顺序并不代表操作之间的先后关系。只有在满足 Happens-Before 规则的情况下,操作之间才有确定的先后关系。
对象内存布局
Java 对象的内存布局分为三部分:
- 对象头
- 实例数据
- 对齐填充
对象头包括两部分信息:
-
存储自身运行时的数据
JMM 使用一个标志位来表示对象的状态,而该状态则决定了这部分会存储什么数据:
状态 标志位 存储内容 无锁 01
对象的哈希码
对象的 GC 分代年龄轻量级锁 00
指向锁记录的指针 重量级锁 10
指向重量级锁的指针 GC 标记 11
空 偏向锁 01
偏向线程 ID
偏向时间戳
对象的 GC 分代年龄这里的各种锁我们会在后文详细介绍。
-
指向类的指针
指向对象的类元数据,可以确定对象的类型。
实例数据是对象真正存储的数据,包括成员变量。
对齐填充是为了保证对象的起始地址是 8 字节的整数倍,提高访问效率。
synchronized
关键字
自旋锁
当有多个线程竞争锁时,没抢到的线程可以执行忙循环,不断尝试获取锁。这种锁叫做自旋锁。
自旋锁避免了线程的上下文切换,提高了效率。但是,如果一直没抢到锁,自旋锁会消耗大量的 CPU 时间。因此,Java 使用了自适应的自旋锁。
自旋锁会自旋一段时间后停止自旋,将线程挂起。这个时间由前一次在同一个锁上的自旋时间及锁的拥有者状态决定。如果某个被锁的对象刚刚就有线程获得了锁,JVM 会认为获得锁的概率较大,让它多自旋一会儿;相反,如果锁被长时间占用,一个线程也没抢到,JVM 会认为获得锁的概率较小,让它少自旋一会儿。
程序运行时间越长,对锁的情况越了解,自旋锁的效率就越高。
轻量级锁
如果一个锁一直没有竞争,那么可以使用轻量级锁。轻量级锁是一种乐观锁,它认为锁一直没有竞争,不需要加锁。它的执行过程如下:
- 当线程 A 获取锁时,JVM 在线程 A 的栈帧中分配一块空间,用来存储锁记录
- JVM 使用 CAS 指令将对象头中的锁信息替换为线程 A 的锁记录地址
- 如果 CAS 操作成功,线程 A 获取锁成功
由于有 CAS 操作,因此轻量级锁仅在几乎没有竞争的情况下才好用,否则,速度会比重量级锁更慢。
偏向锁
偏向锁会偏向于第一个获得它的线程。如果在接下来的执行过程中,该锁没有被其他线程获取,则持有偏向锁的线程将永远不需要再进行同步。这样就节省了大量有关锁申请的操作,从而提高了程序性能。
偏向锁的执行过程如下:
- 当锁第一次被线程 A 获取时,JVM 会将锁的标记设置为偏向锁,并使用 CAS 指令将对象头中的线程 ID 替换为线程 A 的 ID
- 此后,线程 A 再次获取锁时,不需要再进行同步操作,直接获取锁成功
- 如果有其他线程 B 获取锁,JVM 会撤销偏向锁,将锁的标记设置为轻量级锁
偏向锁可以提高带有同步但无竞争的程序性能,但是,如果有竞争,偏向锁会降低性能。
synchronized
流程
当一个线程执行 synchronized
方法时,会执行以下流程:
- 尝试使用偏向锁的方式获取锁,如果获取到了,就表示获取锁成功
- 如果获取不到偏向锁,将锁升级为轻量级锁,然后使用自适应自旋的方式获取锁
- 如果获取不到轻量级锁,将锁升级为重量级锁,线程进入阻塞状态,等待锁释放
volatile
关键字
volatile
关键字可以保证变量的可见性,并且禁止指令重排。但是,volatile
关键字并不能保证原子性。
例如下面的代码:
public class RaceTestVolatile {
private static volatile int race = 0;
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[20];
for (int i = 0; i < 20; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 10000; j++) {
race++;
}
});
threads[i].start();
}
for (int i = 0; i < 20; i++) {
threads[i].join();
}
System.out.println("最终 race 的值: " + race);
}
}
这段代码的目的是让 20 个线程分别对 race
变量进行 10000 次自增操作。由于 race
变量是 volatile
的,因此可以保证可见性,所有线程都能看到最新的值。但是,由于 race++
操作并不是原子性的,因此最终的结果并不是 200000,比如我这里的结果是 64299。
这是因为,race++
操作实际上是三个操作:读取 race
的值、自增、写回 race
的值。如果两个线程同时读取了 race
的值,然后同时自增,最后写回,那么就会出现问题。
要想让 volatile
变量不发生这种情况,需要满足两个条件:
- 变量的写操作不依赖于当前值,例如
race = i
就没有这个问题 - 变量不需要与其他变量共同参与不变约束,也就是说不能
if (a && b)
这样
volatile
能做的事情有两点:
-
禁止指令重排
也就是说,
volatile
变量的写操作必须在读操作之后,不能重排。这用到了内存屏障指令。 -
保证可见性
也就是说,一个线程对
volatile
变量的修改,对其他线程是可见的。放到 JMM 里讲,就是每次使用变量的时候,都会从主内存中读取,而不是从线程的工作内存中读取。
线程操作
线程的创建
创建线程有以下几种方式:
-
继承
Thread
类。public class MyThread extends Thread { @Override public void run() { System.out.println("New Thread"); Thread.sleep(1000); } } Thread thread = new MyThread(); thread.start();
-
实现
Runnable
接口。public class MyRunnable implements Runnable { @Override public void run() { System.out.println("New Thread"); Thread.sleep(1000); } } Thread thread = new Thread(new MyRunnable()); thread.start();
当然,你也可以使用
Lambda
表达式来简化代码。Thread thread = new Thread(() -> { System.out.println("New Thread"); Thread.sleep(1000); }); thread.start();
一个线程可以调用 Thread.currentThread()
方法获取当前线程对象,调用 getName()
方法获取线程名。
Thread thread = new Thread(() -> {
System.out.println(Thread.currentThread().getName());
});
线程调用时,可以向其中传入参数。如果有大量线程要传入相同的参数,可以使用 ThreadLocal
类。该类会为每个线程创建一个独立的变量副本,互不干扰。
ThreadLocal<String> threadLocal = ThreadLocal<>();
try {
threadLocal.set("Hello!");
Thread thread = new Thread(() -> {
System.out.println(threadLocal.get());
});
thread.start();
} finally {
threadLocal.remove();
}
需要注意的是,ThreadLocal
实例一定要在 finally
代码块中调用 remove
方法,以避免内存泄漏。我们可以封装 AutoCloseable
接口来简化代码。
public class ThreadLocalVariable<T> implements AutoCloseable {
private ThreadLocal<T> threadLocal = new ThreadLocal<>();
public void set(T value) {
threadLocal.set(value);
}
public T get() {
return threadLocal.get();
}
@Override
public void close() {
threadLocal.remove();
}
}
try (ThreadLocalVariable<String> threadLocal = new ThreadLocalVariable<>()) {
threadLocal.set("Hello!");
Thread thread = new Thread(() -> {
System.out.println(threadLocal.get());
});
thread.start();
}
线程状态
线程有以下几种状态:
-
NEW
:新建状态,线程已经创建。 -
RUNNABLE
:运行状态,线程正在执行。 -
BLOCKED
:阻塞状态,线程被阻塞,等待锁。 -
WAITING
:等待状态,线程等待其他线程的通知。 -
TIMED_WAITING
:超时等待状态,线程等待一段时间。例如正在执行sleep
。 -
TERMINATED
:终止状态,线程执行完毕。
每个线程对象只能调用一次 start
方法,然后线程就会从 NEW
进入 RUNNABLE
状态。
进入 TERMINATED
状态有三种方式:
- 线程执行完毕。
- 线程抛出未捕获的异常。
- 线程调用
stop
方法。
如果主线程希望在某处阻塞,等待某个线程执行完毕后再执行后续代码,可以使用 join
方法。
Thread thread = new Thread(() -> {
System.out.println("New Thread");
Thread.sleep(1000);
});
thread.start();
thread.join();
如果主线程希望停止某个线程,可以使用 interrupt
方法。
Thread thread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
System.out.println("New Thread");
}
});
thread.start();
thread.interrupt();
需要注意的是,interrupt
方法只是设置了线程的中断标志,线程并不会立即停止。如果线程正在执行 sleep
、wait
、join
等方法,会抛出 InterruptedException
异常。
我们前面提到,JVM 在运行时,会创建一个主线程。当主线程执行完毕后,JVM 会退出。如果主线程创建了一些子线程,那么要等待所有子线程执行完毕后,JVM 才会退出。
但是,有时候我们希望主线程执行完毕后,JVM 就退出,而留下一个或多个子线程继续执行。这时,我们可以将子线程设置为守护线程。
Thread thread = new Thread(() -> {
while (true) {
System.out.println("Daemon Thread");
}
});
thread.setDaemon(true);
thread.start();
需要注意的是,守护线程不能持有任何需要关闭的资源,因为它会在 JVM 退出时立即终止。
锁
对于线程键共享的变量,应当使用 volatile
关键字修饰,以保证线程之间的可见性。这是因为,线程之间的数据是存储在各自的工作内存中的,而不是存储在主内存中。volatile
关键字可以保证线程之间的数据同步。
public class MyRunnable implements Runnable {
private volatile boolean flag = true;
@Override
public void run() {
while (flag) {
System.out.println("New Thread");
}
}
public void stop() {
flag = false;
}
}
对于线程间的互斥访问,可以使用 synchronized
关键字。该关键字可以修饰方法,也可以修饰代码块。它相当于为方法或代码块加了一个锁,只有获取到锁的线程才能执行。
class Counter {
public static final Object lock = new Object();
private int count = 0;
}
public class MyRunnable implements Runnable {
@Override
public void run() {
synchronized (Counter.lock) {
for (int i = 0; i < 1000000; i++) {
Counter.count++;
}
}
}
}
Thread thread1 = new Thread(new MyRunnable());
Thread thread2 = new Thread(new MyRunnable());
thread1.start();
thread2.start();
syncronized
关键字是重量级锁,同时在获取时会阻塞线程持续等待。java.util.concurrent
包下的 Lock
接口提供了更多的锁实现,比如 ReentrantLock
、ReadWriteLock
、StampedLock
等。
-
ReentrantLock
:例如,我们有一个计数器:
class Counter { private int count = 0; public synchronized void add() { count++; } }
使用
ReentrantLock
可以这样实现:class Counter { private int count = 0; private Lock lock = new ReentrantLock(); public void add() { lock.lock(); try { count++; } finally { lock.unlock(); } } }
它可以尝试获取锁:
if (lock.tryLock(1, TimeUnit.SECONDS)) { try { count++; } finally { lock.unlock(); } } else { // do something }
当等待了一秒钟后,还没有获取到锁,就会返回
false
。这时程序可以做一些其它事情,而不是一直等待。 -
ReadWriteLock
:读写锁允许多个线程同时读取数据,但只允许一个线程写入数据。这样可以提高读取数据的效率。
class Counter { private int count = 0; private ReadWriteLock lock = new ReentrantReadWriteLock(); private Lock readLock = lock.readLock(); private Lock writeLock = lock.writeLock(); public int get() { readLock.lock(); try { return count; } finally { readLock.unlock(); } } public void add() { writeLock.lock(); try { count++; } finally { writeLock.unlock(); } } }
-
StampedLock
:读写锁的一个问题是,当有线程正在读取数据时,另一个线程不能写入数据。
StampedLock
提供了乐观读锁,允许多个线程同时读取数据,但只有一个线程可以写入数据,读写可以同时进行。class Counter { private int count = 0; private StampedLock lock = new StampedLock(); public int get() { long stamp = lock.tryOptimisticRead(); int c = count; if (!lock.validate(stamp)) { stamp = lock.readLock(); try { c = count; } finally { lock.unlockRead(stamp); } } return c; } public void add() { long stamp = lock.writeLock(); try { count++; } finally { lock.unlockWrite(stamp); } } }
tryOptimisticRead
方法会尝试获取一个乐观读锁,返回一个标记。如果在读取数据时,数据没有被修改,那么可以直接返回。如果数据被修改,那么需要获取一个读锁。validate
方法用来验证标记,如果返回true
,则数据没有被修改,可以直接返回。如果返回false
,则需要获取一个读锁。需要注意的是,乐观锁的使用条件是在读取过程中大概率不会发生数据修改。如果数据经常被修改,那么乐观锁的效率会很低。
同时,
StampedLock
是一种不可重入锁,也就是说,一个线程在获取过写锁后,不能再次获取写锁。否则会造成死锁。
原子操作
Java 中有些操作是原子性的,具体包括:
- 基本数据类型赋值(除
long
和double
外,不过 JVM 通常会让它们也是原子性的)。 - 引用类型赋值。
对于不可变对象,对它们的操作也不需要加锁。
Java 中提供了 java.util.concurrent.atomic
包,用来实现更多的原子操作。它主要包括:
-
AtomicBoolean
:原子性的布尔类型。 -
AtomicInteger
:原子性的整型。 -
AtomicLong
:原子性的长整型。 -
AtomicReference
:原子性的引用类型。
例如,对于 AtomicInteger
类型,他有一些原子性的方法:
-
int get()
:获取值。 -
int incrementAndGet()
:自增并获取值。 -
int addAndGet(int delta)
:增加并获取值。 -
int compareAndSet(int expect, int update)
:比较并设置值。
它的实现没有使用锁,而是使用了 CAS
(Compare And Swap)指令。CAS
指令是一种乐观锁,它会比较内存中的值和期望值,如果相等,就会将新值写入内存。如果不相等,就会重新读取内存中的值,再次比较。
信号量
信号量是一种更高级的同步工具,它可以控制同时访问某个资源的线程数量。
class AccessControl {
private Semaphore semaphore = new Semaphore(10);
public void access() throws InterruptedException {
semaphore.acquire();
try {
// do something
} finally {
semaphore.release();
}
}
}
acquire
方法会尝试获取一个许可,如果没有许可,线程会阻塞。release
方法会释放一个许可。
Semaphore
还有一个 tryAcquire
方法:
if (semaphore.tryAcquire(1, TimeUnit.SECONDS)) {
try {
// do something
} finally {
semaphore.release();
}
} else {
// do something
}
线程同步
例如我们有一个任务队列,多个线程可以向队列中添加任务,也可以从队列中取出任务。这时,我们需要保证线程之间的同步。
class TaskQueue {
private Queue<String> queue = new LinkedList<>();
public synchronized void add(String task) {
queue.add(task);
}
public synchronized String get() {
while (queue.isEmpty()) {
}
return queue.remove();
}
}
然而,这个方法是错误的。如果队列为空,而一个线程在执行 get
方法,那么它会一直等待到有人向队列中添加任务。然而,此时他却锁住了这个对象,其他线程根本无法执行 add
方法。这就需要使用线程同步。
Java 中有两种方式可以实现线程同步:
-
wait
和notify
方法。wait
方法会让当前线程等待,直到另一个线程调用notify
方法唤醒它。wait
在等待时会释放锁,这让其他线程执行add
方法成为了可能。class TaskQueue { private Queue<String> queue = new LinkedList<>(); public synchronized void add(String task) { queue.add(task); notifyAll(); } public synchronized String get() throws InterruptedException { while (queue.isEmpty()) { wait(); } return queue.remove(); } }
需要注意的是,
wait
被写在了while
循环中。这是因为,当多个线程同时被唤醒后,依然只有一个线程能够获取到锁,其他线程需要再次检查条件。 -
Lock
和Condition
接口。对于非
synchronized
的代码,我们可以使用Lock
和Condition
接口。class TaskQueue { private Queue<String> queue = new LinkedList<>(); private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void add(String task) { lock.lock(); try { queue.add(task); condition.signalAll(); } finally { lock.unlock(); } } public String get() throws InterruptedException { lock.lock(); try { while (queue.isEmpty()) { condition.await(); } return queue.remove(); } finally { lock.unlock(); } } }
可以看到,
signal
方法对应notify
方法,await
方法对应wait
方法。condition.await
同样可以尝试获取锁:if (condition.await(1, TimeUnit.SECONDS)) { return queue.remove(); } else { // do something }
我们前面是在一个 Queue
上实现了线程同步。类似的操作非常常见,Java 中提供了一个 BlockingQueue
接口,它提供了线程安全的队列操作。
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
Thread thread1 = new Thread(() -> {
try {
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread thread2 = new Thread(() -> {
try {
queue.put("Task");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread1.start();
thread2.start();
在这段代码中,take
方法会阻塞线程,直到队列中有任务。
类似的,Java 中的集合类基本都有线程安全的实现:
接口 | 非线程安全 | 线程安全 | 行为 |
---|---|---|---|
List | ArrayList | CopyOnWriteArrayList | 写入时复制;当写入时,会复制一个新的数组,写入完成后,再将新数组赋值给旧数组;读取时,读取旧数组 |
Set | HashSet | CopyOnWriteArraySet | 写入时复制;当写入时,会复制一个新的集合,写入完成后,再将新集合赋值给旧集合;读取时,读取旧集合 |
Map | HashMap | ConcurrentHashMap | 分段锁;当写入时,只锁住一个段;读取时,不需要锁 |
Queue | LinkedList | ArrayBlockingQueue | 阻塞队列;当队列为空时,take 方法会阻塞;当队列满时,put 方法会阻塞 |
Deque | LinkedList | LinkedBlockingDeque | 阻塞双端队列;当队列为空时,takeFirst 和 takeLast 方法会阻塞;当队列满时,putFirst 和 putLast 方法会阻塞 |
线程池
线程池是一种线程复用的机制,它可以减少线程的创建和销毁次数,提高性能。
Java 标准库中提供了 java.util.concurrent
包,其中的 ExecutorService
接口可以用来创建线程池。
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000000; i++) {
executor.submit(() -> {
System.out.println("New Thread");
Thread.sleep(1000);
});
}
Executors
类提供了一些静态方法,可以创建不同类型的线程池:
-
newFixedThreadPool
:固定大小的线程池。线程池中的线程数量是固定的,当线程池中的线程都在执行任务时,新的任务会被放入队列中。只有之前的任务执行完毕后,线程池出现空闲线程时,才会执行队列中的任务。
-
newCachedThreadPool
:缓存线程池,线程数量不固定。线程池会根据任务的数量动态调整线程的数量。如果线程池中的线程空闲时间超过 60 秒,就会被回收。
-
newScheduledThreadPool
:定时任务的线程池。线程池可以执行定时任务和周期性任务。例如让线程池在提交任务后延迟 3 秒执行,然后每隔 1 秒执行一次。
ScheduledExecutorService executor = Executors.newScheduledThreadPool(10); executor.scheduleAtFixedRate(() -> { System.out.println("Execute"); }, 3, 1, TimeUnit.SECONDS);
对于有些任务,它本身非常大,但可以自动拆解成多个小任务,然后再合并结果。这时,我们可以使用 ForkJoinPool
。例如,我们有一个任务,需要计算 1 到 100 的和。
class MyTask extends RecursiveTask<Long> {
private int start;
private int end;
public MyTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start < 10) {
int sum = 0;
for (int i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
int mid = (start + end) / 2;
MyTask task1 = new MyTask(start, mid);
MyTask task2 = new MyTask(mid + 1, end);
task1.fork();
task2.fork();
return task1.join() + task2.join();
}
}
}
ForkJoinPool pool = new ForkJoinPool();
MyTask task = new MyTask(1, 100);
Long res = pool.invoke(task);
Future
Future
接口可以用来获取异步任务的结果。
ExecutorService executor = Executors.newFixedThreadPool(10);
Future<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "Hello";
});
String res = future.get();
需要注意的是,get
方法是阻塞的,如果异步任务没有执行完毕,它会一直等待。
为了解决这个问题,Future
接口提供了 isDone
方法,可以判断异步任务是否执行完毕。
if (future.isDone()) {
String res = future.get();
}
Future
接口还提供了 cancel
方法,可以取消异步任务。
future.cancel(true);
但这依然不够好。CompletableFuture
可以传入回调函数,当异步任务执行完毕后,会自动调用回调函数。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
Thread.sleep(1000);
return "Hello";
});
future.thenAccept(res -> { // 执行成功
System.out.println(res);
});
future.exceptionally(e -> { // 执行失败
e.printStackTrace();
return null;
});
如果没有返回值,可以使用 thenRun
方法。
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("Hello");
Thread.sleep(1000);
});
future.thenRun(() -> {
System.out.println("World");
});
CompletableFuture
还提供了一些方法,可以将多个异步任务组合在一起。
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
Thread.sleep(1000);
return "Hello";
})
.thenCompose(res -> CompletableFuture.supplyAsync(() -> {
return res + " World";
}));
System.out.println(future.get());
thenCompose
方法可以将两个异步任务组合在一起,第一个任务的结果作为第二个任务的输入。
CompletableFuture
还提供了 thenCombine
方法,可以将两个异步任务的结果组合在一起。
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
Thread.sleep(1000);
return "Hello";
})
.thenCombine(CompletableFuture.supplyAsync(() -> {
Thread.sleep(1000);
return "World";
}), (res1, res2) -> {
return res1 + " " + res2;
});
System.out.println(future.get());
到这里,已经非常像 JavaScript 中的 Promise
了。
虚拟线程
Java21 引入了虚拟线程,它是一种轻量级的线程,可以更好地利用系统资源。
在 IO 密集型的应用中,通常会创建大量的线程,这会导致线程的上下文切换,降低性能。虚拟线程可以更好地利用系统资源,减少线程的创建和销毁。
Thread vt = Thread.startVirtualThread(() -> {
System.out.println("Hello");
Thread.sleep(1000);
});
Comments