Ch3nyang's blog collections_bookmark

post

person

about

category

category

local_offer

tag

rss_feed

rss

深入 Java | (2)
Java 并发编程

calendar_month 2024-01
archive 编程
tag java tag thread tag concurrent

There are 4 posts in series 深入 Java.

Java 在启动时,会创建一个进程,进程中包含一个主线程。主线程会执行 main 方法,我们可以在 main 方法中创建多个线程。

并发基础

本节可能涉及到少量后文的内容,但我们不得不在这里就用到它们。你可以先跳过那些地方,等到后文再回来查看。

乐观锁与悲观锁

悲观锁

悲观锁总是假设最坏的情况,认为数据很可能会被其他线程修改,因此在操作数据之前,会先加锁。也就是说,共享资源始终只能被一个线程访问。

例如 synchronized 关键字就是一种悲观锁。

public class Counter {
    private int count = 0;

    public synchronized void add() {
        count++;
    }
}

这里,当一个线程执行 add 方法时,会锁住整个对象,其他线程无法执行 add 方法。只有当当前线程执行完毕后,其他线程才能执行。

悲观锁的缺点是效率低下。因为只有一个线程能够访问共享资源,其他线程只能等待。当有大量线程需要访问共享资源时,会因为阻塞造成上下文切换,降低性能。

此外,悲观锁还有可能造成死锁。例如线程 A 锁住了资源 1,等待资源 2;线程 B 锁住了资源 2,等待资源 1。这时,线程 A 和线程 B 会一直等待下去。

乐观锁

乐观锁则恰恰相反,它认为数据很可能不会被修改,因此在操作数据之前,不会加锁。当在操作后,会使用算法查看操作期间数据是否被其它线程修改过,如果是,就会进行回滚操作。

例如,java.util.concurrent.atomic 包下的类就是一种乐观锁。

public class Counter {
    private AtomicInteger count = new AtomicInteger(0);

    public void add() {
        count.incrementAndGet();
    }
}

这里,incrementAndGet 方法是原子性的,不需要加锁。它使用了 CAS 指令,当线程执行 incrementAndGet 方法时,会比较内存中的值和期望值,如果相等,就会将新值写入内存。如果不相等,就会重新读取内存中的值,再次比较。

CAS(Compare And Swap)指令是一种乐观锁的实现。它是一种原子性的操作,可以保证线程安全。

CAS 指令有三个操作数:内存位置、期望值和新值。当内存位置的值等于期望值时,将新值写入内存位置。否则,不做任何操作。

CAS 存在三个问题:

  • ABA 问题

    ABA 问题是指,线程 1 读取了内存位置的值 A,然后线程 2 将 A 改为 B,再改回 A。这时,线程 1 会认为内存位置的值没有变化,但实际上已经变化了。

    为了解决 ABA 问题,可以使用版本号。每次修改内存位置的值时,都会增加版本号。

  • 循环时间长开销大

    CAS 指令是一个自旋操作,它会不断尝试修改内存位置的值。如果一直有冲突,会造成 CPU 的浪费。

  • 只能保证一个共享变量的原子操作

    CAS 指令只能保证一个共享变量的原子操作。如果有多个共享变量,就需要加锁。

    JDK 1.5 提供了 AtomicReference 类,可以保证引用类型的原子操作。这部分解决了第三个问题。

乐观锁的优点是效率高,因为不需要加锁。然而,这一切的前提是数据很可能不会被修改。如果数据经常被修改,那么会有大量的回滚重试操作,降低性能。

总的来说,悲观锁适合写操作多的场景,乐观锁适合读操作多的场景。

JMM

内存模型

CPU 由于内存访问速度跟不上 CPU 的运行速度,所以会有多级缓存。而每个 CPU 内核都有自己的缓存,这就会导致多个 CPU 内核之间的数据可能不一致。在多核 CPU 同时操作共享数据时,就会出现数据不一致的问题。

CPU 使用了一些技术来解决这个问题,例如 MESI 协议。这里的 MESI 是四个状态的缩写:

  • M(Modified):数据被修改,只存在于当前 CPU 缓存中。
  • E(Exclusive):数据只存在于当前 CPU 缓存中,没有被修改。
  • S(Shared):数据存在于多个 CPU 缓存中,没有被修改。
  • I(Invalid):数据无效。

当一个 CPU 写入数据时,会将数据状态设置为 M,并通知其他 CPU 缓存中的数据无效。当一个 CPU 读取数据时,会先检查数据状态,如果是 ME,说明数据是最新的,可以直接读取;如果是 S 或者 I,说明数据无效,需要从内存中读取。

操作系统堆底层硬件进行了抽象和封装,然而,这个抽象出来的 CPU 和内存之间也存在数据不一致的问题。我们把在特定的操作协议下,对特定内存或高速缓存进行读写访问的过程进行抽象,得到的就是内存模型了。

对于编程语言来讲,它可以直接使用操作系统的内存模型,例如 C 语言。但是,对于 Java 来讲,它是跨平台的,不同的操作系统有不同的内存模型。为了保证 Java 的跨平台性,Java 定义了自己的内存模型供程序员使用。我们可以说,Java 的内存模型是抽象上的抽象————抽象到家了。

Java 内存模型规定所有的变量都存储在主内存中,每条线程都有自己的工作内存。线程的工作内存中保存了被该线程使用到的变量的主内存副本拷贝,线程对变量的所有操作(读取、赋值等)都必须在工作内存中进行,而不能直接读写主内存中的变量。不同线程之间也无法直接访问对方工作内存中的变量,线程间变量值的传递都需要通过主内存来完成。

这里你就发现了,Java 只能操作自己的工作内存,需要一定的机制来保证各线程的工作内存及主内存的数据一致性。

JMM 定义了 8 种原子操作,来完成这一任务。包括:

  • lock:作用于主内存的变量,把一个变量标识为一条线程独占的状态
  • unlock:作用于主内存的变量,把一个变量标识为可被其他线程独占的状态
  • read:作用于主内存的变量,把一个变量的值从主内存传输到线程的工作内存中
  • load:作用于工作内存的变量,把 read 操作从主内存传输到工作内存中的变量放入工作内存的变量副本中
  • use:作用于工作内存的变量,把工作内存中的变量传输到 JVM 的执行引擎中
  • assign:作用于工作内存的变量,把一个从 JVM 执行引擎中接收到的值赋给工作内存中的变量
  • store:作用于工作内存的变量,把工作内存中的变量的值传输到主内存中
  • write:作用于主内存的变量,把 store 操作从工作内存传输到主内存中的变量中

然而,为了程序运行的效率,编译器可能对指令进行了重排。因此,JMM 还定义了一些规则,来保证指令重排不会影响程序的正确性:

  • readloadstorewrite 是成对出现的,不允许单独出现,但中间可能插入其他操作
  • 不允许丢弃 assign 操作,工作内存中的变量改变后必须同步到主内存中
  • 不允许在没有 assign 的情况下进行 store 操作
  • 新变量只能在主内存中诞生,不允许在工作内存中直接使用一个未被初始化的变量。也就是说,对一个变量进行 usestore 操作之前,必须执行过 assignload 操作
  • 一个变量同一时间只能被一个线程 lock,但一个线程可以执行多次 lock 操作,多次 lock 操作后,必顨执行相同次数的 unlock 操作才能解锁
  • 如果一个变量被 lock 操作锁定,那么会清空工作内存中此变量的值。此后,在使用前必须重新执行 loadassign 操作初始化变量的值
  • 如果一个变量没有被 lock,或是被其它线程 lock,那么不允许对此变量执行 unlock 操作
  • 对一个变量执行 unlock 操作之前,必须先把此变量同步到主内存中(执行 storewrite 操作)

Happens-Before

这些操作和规则实在太麻烦了,但好在,JMM 还定义了一个更简单的等价规则:Happens-Before。

Happens-Before 规则是 Java 内存模型中定义的两个操作之间的偏序关系。如果操作 A Happens-Before 操作 B,那么 A 的执行结果对 B 是可见的。

Happens-Before 规则包括以下几种情况:

  • 程序顺序规则:一个线程内,按照程序顺序,前面的操作 Happens-Before 后面的操作
  • 锁定规则:一个 unlock 操作 Happens-Before 后面对同一个锁的 lock 操作
  • volatile 变量规则:对一个 volatile 变量的写操作 Happens-Before 后面对这个变量的读操作
  • 线程启动规则Thread 对象的 start 方法 Happens-Before 线程的 run 方法
  • 线程终止规则:线程的所有操作 Happens-Before 线程的终止
  • 线程中断规则:对线程的 interrupt 方法的调用 Happens-Before 被中断线程的检测到中断
  • 对象终结规则:一个对象的初始化完成(构造函数执行结束) Happens-Before 它的 finalize 方法的开始
  • 传递性:如果 A Happens-Before B,B Happens-Before C,那么 A Happens-Before C

我们来看一个例子就明白了:

点击举个栗子
private int value = 0;

public void setValue(int value){
    this.value = value;
}

public int getValue(){
    return value;
}

如果线程 A 在时间上先调用了 setValue(1),然后线程 B 在时间上后调用了 getValue,那么:

  • 程序顺序规则:两个操作在不同线程中,不适用
  • 锁定规则:两个操作没有锁,不适用
  • volatile 变量规则:value 是普通变量,不适用
  • 线程启动规则:两个操作在不同线程中,不适用
  • 线程终止规则:两个操作在不同线程中,不适用
  • 线程中断规则:两个操作在不同线程中,不适用
  • 对象终结规则:value 是普通变量,不适用
  • 传递性:两个操作之间没有其他操作,不适用

因此,两个操作之间没有 Happens-Before 关系,线程 B 可能读取到 0,也可能读取到 1!

修复有两种方法,一是使用 synchronized 关键字:

private int value = 0;

public synchronized void setValue(int value){
    this.value = value;
}

public synchronized int getValue(){
    return value;
}

此时,由于 synchronized 关键字为对象加锁,根据锁定规则,setValuegetValue 之间有 Happens-Before 关系,线程 B 读取到的值一定是 1。

另一种方法是使用 volatile 关键字:

private volatile int value = 0;

public void setValue(int value){
    this.value = value;
}

public int getValue(){
    return value;
}

此时,根据 volatile 变量规则,setValuegetValue 之间有 Happens-Before 关系,线程 B 读取到的值一定是 1。

由此,我们发现,时间上的先后顺序并不代表操作之间的先后关系。只有在满足 Happens-Before 规则的情况下,操作之间才有确定的先后关系。

对象内存布局

Java 对象的内存布局分为三部分:

  • 对象头
  • 实例数据
  • 对齐填充

对象头包括两部分信息:

  • 存储自身运行时的数据

    JMM 使用一个标志位来表示对象的状态,而该状态则决定了这部分会存储什么数据:

    状态 标志位 存储内容
    无锁 01 对象的哈希码
    对象的 GC 分代年龄
    轻量级锁 00 指向锁记录的指针
    重量级锁 10 指向重量级锁的指针
    GC 标记 11
    偏向锁 01 偏向线程 ID
    偏向时间戳
    对象的 GC 分代年龄

    这里的各种锁我们会在后文详细介绍。

  • 指向类的指针

    指向对象的类元数据,可以确定对象的类型。

实例数据是对象真正存储的数据,包括成员变量。

对齐填充是为了保证对象的起始地址是 8 字节的整数倍,提高访问效率。

synchronized 关键字

自旋锁

当有多个线程竞争锁时,没抢到的线程可以执行忙循环,不断尝试获取锁。这种锁叫做自旋锁。

自旋锁避免了线程的上下文切换,提高了效率。但是,如果一直没抢到锁,自旋锁会消耗大量的 CPU 时间。因此,Java 使用了自适应的自旋锁。

自旋锁会自旋一段时间后停止自旋,将线程挂起。这个时间由前一次在同一个锁上的自旋时间及锁的拥有者状态决定。如果某个被锁的对象刚刚就有线程获得了锁,JVM 会认为获得锁的概率较大,让它多自旋一会儿;相反,如果锁被长时间占用,一个线程也没抢到,JVM 会认为获得锁的概率较小,让它少自旋一会儿。

程序运行时间越长,对锁的情况越了解,自旋锁的效率就越高。

轻量级锁

如果一个锁一直没有竞争,那么可以使用轻量级锁。轻量级锁是一种乐观锁,它认为锁一直没有竞争,不需要加锁。它的执行过程如下:

  • 当线程 A 获取锁时,JVM 在线程 A 的栈帧中分配一块空间,用来存储锁记录
  • JVM 使用 CAS 指令将对象头中的锁信息替换为线程 A 的锁记录地址
  • 如果 CAS 操作成功,线程 A 获取锁成功

由于有 CAS 操作,因此轻量级锁仅在几乎没有竞争的情况下才好用,否则,速度会比重量级锁更慢。

偏向锁

偏向锁会偏向于第一个获得它的线程。如果在接下来的执行过程中,该锁没有被其他线程获取,则持有偏向锁的线程将永远不需要再进行同步。这样就节省了大量有关锁申请的操作,从而提高了程序性能。

偏向锁的执行过程如下:

  • 当锁第一次被线程 A 获取时,JVM 会将锁的标记设置为偏向锁,并使用 CAS 指令将对象头中的线程 ID 替换为线程 A 的 ID
  • 此后,线程 A 再次获取锁时,不需要再进行同步操作,直接获取锁成功
  • 如果有其他线程 B 获取锁,JVM 会撤销偏向锁,将锁的标记设置为轻量级锁

偏向锁可以提高带有同步但无竞争的程序性能,但是,如果有竞争,偏向锁会降低性能。

synchronized 流程

当一个线程执行 synchronized 方法时,会执行以下流程:

  • 尝试使用偏向锁的方式获取锁,如果获取到了,就表示获取锁成功
  • 如果获取不到偏向锁,将锁升级为轻量级锁,然后使用自适应自旋的方式获取锁
  • 如果获取不到轻量级锁,将锁升级为重量级锁,线程进入阻塞状态,等待锁释放

volatile 关键字

volatile 关键字可以保证变量的可见性,并且禁止指令重排。但是,volatile 关键字并不能保证原子性。

例如下面的代码:

public class RaceTestVolatile {
    private static volatile int race = 0;

    public static void main(String[] args) throws InterruptedException {
        Thread[] threads = new Thread[20];

        for (int i = 0; i < 20; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    race++;
                }
            });
            threads[i].start();
        }

        for (int i = 0; i < 20; i++) {
            threads[i].join();
        }

        System.out.println("最终 race 的值: " + race);
    }
}

这段代码的目的是让 20 个线程分别对 race 变量进行 10000 次自增操作。由于 race 变量是 volatile 的,因此可以保证可见性,所有线程都能看到最新的值。但是,由于 race++ 操作并不是原子性的,因此最终的结果并不是 200000,比如我这里的结果是 64299。

这是因为,race++ 操作实际上是三个操作:读取 race 的值、自增、写回 race 的值。如果两个线程同时读取了 race 的值,然后同时自增,最后写回,那么就会出现问题。

要想让 volatile 变量不发生这种情况,需要满足两个条件:

  • 变量的写操作不依赖于当前值,例如 race = i 就没有这个问题
  • 变量不需要与其他变量共同参与不变约束,也就是说不能 if (a && b) 这样

volatile 能做的事情有两点:

  • 禁止指令重排

    也就是说,volatile 变量的写操作必须在读操作之后,不能重排。这用到了内存屏障指令。

  • 保证可见性

    也就是说,一个线程对 volatile 变量的修改,对其他线程是可见的。放到 JMM 里讲,就是每次使用变量的时候,都会从主内存中读取,而不是从线程的工作内存中读取。

线程操作

线程的创建

创建线程有以下几种方式:

  • 继承 Thread 类。

    public class MyThread extends Thread {
        @Override
        public void run() {
            System.out.println("New Thread");
            Thread.sleep(1000);
        }
    }
    
    Thread thread = new MyThread();
    thread.start();
    
  • 实现 Runnable 接口。

    public class MyRunnable implements Runnable {
        @Override
        public void run() {
            System.out.println("New Thread");
            Thread.sleep(1000);
        }
    }
    
    Thread thread = new Thread(new MyRunnable());
    thread.start();
    

    当然,你也可以使用 Lambda 表达式来简化代码。

    Thread thread = new Thread(() -> {
        System.out.println("New Thread");
        Thread.sleep(1000);
    });
    thread.start();
    

一个线程可以调用 Thread.currentThread() 方法获取当前线程对象,调用 getName() 方法获取线程名。

Thread thread = new Thread(() -> {
    System.out.println(Thread.currentThread().getName());
});

线程调用时,可以向其中传入参数。如果有大量线程要传入相同的参数,可以使用 ThreadLocal 类。该类会为每个线程创建一个独立的变量副本,互不干扰。

ThreadLocal<String> threadLocal = ThreadLocal<>();

try {
    threadLocal.set("Hello!");

    Thread thread = new Thread(() -> {
        System.out.println(threadLocal.get());
    });

    thread.start();
} finally {
    threadLocal.remove();
}

需要注意的是,ThreadLocal 实例一定要在 finally 代码块中调用 remove 方法,以避免内存泄漏。我们可以封装 AutoCloseable 接口来简化代码。

public class ThreadLocalVariable<T> implements AutoCloseable {
    private ThreadLocal<T> threadLocal = new ThreadLocal<>();

    public void set(T value) {
        threadLocal.set(value);
    }

    public T get() {
        return threadLocal.get();
    }

    @Override
    public void close() {
        threadLocal.remove();
    }
}

try (ThreadLocalVariable<String> threadLocal = new ThreadLocalVariable<>()) {
    threadLocal.set("Hello!");

    Thread thread = new Thread(() -> {
        System.out.println(threadLocal.get());
    });

    thread.start();
}

线程状态

线程有以下几种状态:

  • NEW:新建状态,线程已经创建。
  • RUNNABLE:运行状态,线程正在执行。
  • BLOCKED:阻塞状态,线程被阻塞,等待锁。
  • WAITING:等待状态,线程等待其他线程的通知。
  • TIMED_WAITING:超时等待状态,线程等待一段时间。例如正在执行 sleep
  • TERMINATED:终止状态,线程执行完毕。

每个线程对象只能调用一次 start 方法,然后线程就会从 NEW 进入 RUNNABLE 状态。

进入 TERMINATED 状态有三种方式:

  • 线程执行完毕。
  • 线程抛出未捕获的异常。
  • 线程调用 stop 方法。

如果主线程希望在某处阻塞,等待某个线程执行完毕后再执行后续代码,可以使用 join 方法。

Thread thread = new Thread(() -> {
    System.out.println("New Thread");
    Thread.sleep(1000);
});

thread.start();
thread.join();

如果主线程希望停止某个线程,可以使用 interrupt 方法。

Thread thread = new Thread(() -> {
    while (!Thread.currentThread().isInterrupted()) {
        System.out.println("New Thread");
    }
});

thread.start();
thread.interrupt();

需要注意的是,interrupt 方法只是设置了线程的中断标志,线程并不会立即停止。如果线程正在执行 sleepwaitjoin 等方法,会抛出 InterruptedException 异常。

我们前面提到,JVM 在运行时,会创建一个主线程。当主线程执行完毕后,JVM 会退出。如果主线程创建了一些子线程,那么要等待所有子线程执行完毕后,JVM 才会退出。

但是,有时候我们希望主线程执行完毕后,JVM 就退出,而留下一个或多个子线程继续执行。这时,我们可以将子线程设置为守护线程。

Thread thread = new Thread(() -> {
    while (true) {
        System.out.println("Daemon Thread");
    }
});

thread.setDaemon(true);
thread.start();

需要注意的是,守护线程不能持有任何需要关闭的资源,因为它会在 JVM 退出时立即终止。

对于线程键共享的变量,应当使用 volatile 关键字修饰,以保证线程之间的可见性。这是因为,线程之间的数据是存储在各自的工作内存中的,而不是存储在主内存中。volatile 关键字可以保证线程之间的数据同步。

public class MyRunnable implements Runnable {
    private volatile boolean flag = true;

    @Override
    public void run() {
        while (flag) {
            System.out.println("New Thread");
        }
    }

    public void stop() {
        flag = false;
    }
}

对于线程间的互斥访问,可以使用 synchronized 关键字。该关键字可以修饰方法,也可以修饰代码块。它相当于为方法或代码块加了一个锁,只有获取到锁的线程才能执行。

class Counter {
    public static final Object lock = new Object();
    private int count = 0;
}

public class MyRunnable implements Runnable {
    @Override
    public void run() {
        synchronized (Counter.lock) {
            for (int i = 0; i < 1000000; i++) {
                Counter.count++;
            }
        }
    }
}

Thread thread1 = new Thread(new MyRunnable());
Thread thread2 = new Thread(new MyRunnable());
thread1.start();
thread2.start();

syncronized 关键字是重量级锁,同时在获取时会阻塞线程持续等待。java.util.concurrent 包下的 Lock 接口提供了更多的锁实现,比如 ReentrantLockReadWriteLockStampedLock 等。

  • ReentrantLock

    例如,我们有一个计数器:

    class Counter {
      private int count = 0;
        
      public synchronized void add() {
          count++;
      }
    }
    

    使用 ReentrantLock 可以这样实现:

    class Counter {
      private int count = 0;
      private Lock lock = new ReentrantLock();
        
      public void add() {
          lock.lock();
          try {
              count++;
          } finally {
              lock.unlock();
          }
      }
    }
    

    它可以尝试获取锁:

    if (lock.tryLock(1, TimeUnit.SECONDS)) {
        try {
            count++;
        } finally {
            lock.unlock();
        }
    } else {
        // do something
    }
    

    当等待了一秒钟后,还没有获取到锁,就会返回 false。这时程序可以做一些其它事情,而不是一直等待。

  • ReadWriteLock

    读写锁允许多个线程同时读取数据,但只允许一个线程写入数据。这样可以提高读取数据的效率。

    class Counter {
      private int count = 0;
      private ReadWriteLock lock = new ReentrantReadWriteLock();
      private Lock readLock = lock.readLock();
      private Lock writeLock = lock.writeLock();
        
      public int get() {
          readLock.lock();
          try {
              return count;
          } finally {
              readLock.unlock();
          }
      }
        
      public void add() {
          writeLock.lock();
          try {
              count++;
          } finally {
              writeLock.unlock();
          }
      }
    }
    
  • StampedLock

    读写锁的一个问题是,当有线程正在读取数据时,另一个线程不能写入数据。StampedLock 提供了乐观读锁,允许多个线程同时读取数据,但只有一个线程可以写入数据,读写可以同时进行。

    class Counter {
      private int count = 0;
      private StampedLock lock = new StampedLock();
        
      public int get() {
          long stamp = lock.tryOptimisticRead();
          int c = count;
          if (!lock.validate(stamp)) {
              stamp = lock.readLock();
              try {
                  c = count;
              } finally {
                  lock.unlockRead(stamp);
              }
          }
          return c;
      }
    
      public void add() {
          long stamp = lock.writeLock();
          try {
              count++;
          } finally {
              lock.unlockWrite(stamp);
          }
      }
    }
    

    tryOptimisticRead 方法会尝试获取一个乐观读锁,返回一个标记。如果在读取数据时,数据没有被修改,那么可以直接返回。如果数据被修改,那么需要获取一个读锁。validate 方法用来验证标记,如果返回 true,则数据没有被修改,可以直接返回。如果返回 false,则需要获取一个读锁。

    需要注意的是,乐观锁的使用条件是在读取过程中大概率不会发生数据修改。如果数据经常被修改,那么乐观锁的效率会很低。

    同时,StampedLock 是一种不可重入锁,也就是说,一个线程在获取过写锁后,不能再次获取写锁。否则会造成死锁。

原子操作

Java 中有些操作是原子性的,具体包括:

  • 基本数据类型赋值(除 longdouble 外,不过 JVM 通常会让它们也是原子性的)。
  • 引用类型赋值。

对于不可变对象,对它们的操作也不需要加锁。

Java 中提供了 java.util.concurrent.atomic 包,用来实现更多的原子操作。它主要包括:

  • AtomicBoolean:原子性的布尔类型。
  • AtomicInteger:原子性的整型。
  • AtomicLong:原子性的长整型。
  • AtomicReference:原子性的引用类型。

例如,对于 AtomicInteger 类型,他有一些原子性的方法:

  • int get():获取值。
  • int incrementAndGet():自增并获取值。
  • int addAndGet(int delta):增加并获取值。
  • int compareAndSet(int expect, int update):比较并设置值。

它的实现没有使用锁,而是使用了 CAS(Compare And Swap)指令。CAS 指令是一种乐观锁,它会比较内存中的值和期望值,如果相等,就会将新值写入内存。如果不相等,就会重新读取内存中的值,再次比较。

信号量

信号量是一种更高级的同步工具,它可以控制同时访问某个资源的线程数量。

class AccessControl {
    private Semaphore semaphore = new Semaphore(10);

    public void access() throws InterruptedException {
        semaphore.acquire();
        try {
            // do something
        } finally {
            semaphore.release();
        }
    }
}

acquire 方法会尝试获取一个许可,如果没有许可,线程会阻塞。release 方法会释放一个许可。

Semaphore 还有一个 tryAcquire 方法:

if (semaphore.tryAcquire(1, TimeUnit.SECONDS)) {
    try {
        // do something
    } finally {
        semaphore.release();
    }
} else {
    // do something
}

线程同步

例如我们有一个任务队列,多个线程可以向队列中添加任务,也可以从队列中取出任务。这时,我们需要保证线程之间的同步。

class TaskQueue {
    private Queue<String> queue = new LinkedList<>();

    public synchronized void add(String task) {
        queue.add(task);
    }

    public synchronized String get() {
        while (queue.isEmpty()) {
        }
        return queue.remove();
    }
}

然而,这个方法是错误的。如果队列为空,而一个线程在执行 get 方法,那么它会一直等待到有人向队列中添加任务。然而,此时他却锁住了这个对象,其他线程根本无法执行 add 方法。这就需要使用线程同步。

Java 中有两种方式可以实现线程同步:

  • waitnotify 方法。

    wait 方法会让当前线程等待,直到另一个线程调用 notify 方法唤醒它。wait 在等待时会释放锁,这让其他线程执行 add 方法成为了可能。

    class TaskQueue {
        private Queue<String> queue = new LinkedList<>();
    
        public synchronized void add(String task) {
            queue.add(task);
            notifyAll();
        }
    
        public synchronized String get() throws InterruptedException {
            while (queue.isEmpty()) {
                wait();
            }
            return queue.remove();
        }
    }
    

    需要注意的是,wait 被写在了 while 循环中。这是因为,当多个线程同时被唤醒后,依然只有一个线程能够获取到锁,其他线程需要再次检查条件。

  • LockCondition 接口。

    对于非 synchronized 的代码,我们可以使用 LockCondition 接口。

    class TaskQueue {
        private Queue<String> queue = new LinkedList<>();
        private Lock lock = new ReentrantLock();
        private Condition condition = lock.newCondition();
    
        public void add(String task) {
            lock.lock();
            try {
                queue.add(task);
                condition.signalAll();
            } finally {
                lock.unlock();
            }
        }
    
        public String get() throws InterruptedException {
            lock.lock();
            try {
                while (queue.isEmpty()) {
                    condition.await();
                }
                return queue.remove();
            } finally {
                lock.unlock();
            }
        }
    }
    

    可以看到,signal 方法对应 notify 方法,await 方法对应 wait 方法。

    condition.await 同样可以尝试获取锁:

    if (condition.await(1, TimeUnit.SECONDS)) {
        return queue.remove();
    } else {
        // do something
    }
    

我们前面是在一个 Queue 上实现了线程同步。类似的操作非常常见,Java 中提供了一个 BlockingQueue 接口,它提供了线程安全的队列操作。

BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

Thread thread1 = new Thread(() -> {
    try {
        queue.take();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

Thread thread2 = new Thread(() -> {
    try {
        queue.put("Task");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

thread1.start();
thread2.start();

在这段代码中,take 方法会阻塞线程,直到队列中有任务。

类似的,Java 中的集合类基本都有线程安全的实现:

接口 非线程安全 线程安全 行为
List ArrayList CopyOnWriteArrayList 写入时复制;当写入时,会复制一个新的数组,写入完成后,再将新数组赋值给旧数组;读取时,读取旧数组
Set HashSet CopyOnWriteArraySet 写入时复制;当写入时,会复制一个新的集合,写入完成后,再将新集合赋值给旧集合;读取时,读取旧集合
Map HashMap ConcurrentHashMap 分段锁;当写入时,只锁住一个段;读取时,不需要锁
Queue LinkedList ArrayBlockingQueue 阻塞队列;当队列为空时,take 方法会阻塞;当队列满时,put 方法会阻塞
Deque LinkedList LinkedBlockingDeque 阻塞双端队列;当队列为空时,takeFirsttakeLast 方法会阻塞;当队列满时,putFirstputLast 方法会阻塞

线程池

线程池是一种线程复用的机制,它可以减少线程的创建和销毁次数,提高性能。

Java 标准库中提供了 java.util.concurrent 包,其中的 ExecutorService 接口可以用来创建线程池。

ExecutorService executor = Executors.newFixedThreadPool(10);

for (int i = 0; i < 1000000; i++) {
    executor.submit(() -> {
        System.out.println("New Thread");
        Thread.sleep(1000);
    });
}

Executors 类提供了一些静态方法,可以创建不同类型的线程池:

  • newFixedThreadPool:固定大小的线程池。

    线程池中的线程数量是固定的,当线程池中的线程都在执行任务时,新的任务会被放入队列中。只有之前的任务执行完毕后,线程池出现空闲线程时,才会执行队列中的任务。

  • newCachedThreadPool:缓存线程池,线程数量不固定。

    线程池会根据任务的数量动态调整线程的数量。如果线程池中的线程空闲时间超过 60 秒,就会被回收。

  • newScheduledThreadPool:定时任务的线程池。

    线程池可以执行定时任务和周期性任务。例如让线程池在提交任务后延迟 3 秒执行,然后每隔 1 秒执行一次。

    ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
    executor.scheduleAtFixedRate(() -> {
        System.out.println("Execute");
    }, 3, 1, TimeUnit.SECONDS);
    

对于有些任务,它本身非常大,但可以自动拆解成多个小任务,然后再合并结果。这时,我们可以使用 ForkJoinPool。例如,我们有一个任务,需要计算 1 到 100 的和。

class MyTask extends RecursiveTask<Long> {
    private int start;
    private int end;

    public MyTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if (end - start < 10) {
            int sum = 0;
            for (int i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        } else {
            int mid = (start + end) / 2;
            MyTask task1 = new MyTask(start, mid);
            MyTask task2 = new MyTask(mid + 1, end);
            task1.fork();
            task2.fork();
            return task1.join() + task2.join();
        }
    }
}

ForkJoinPool pool = new ForkJoinPool();
MyTask task = new MyTask(1, 100);
Long res = pool.invoke(task);

Future

Future 接口可以用来获取异步任务的结果。

ExecutorService executor = Executors.newFixedThreadPool(10);
Future<String> future = executor.submit(() -> {
    Thread.sleep(1000);
    return "Hello";
});

String res = future.get();

需要注意的是,get 方法是阻塞的,如果异步任务没有执行完毕,它会一直等待。

为了解决这个问题,Future 接口提供了 isDone 方法,可以判断异步任务是否执行完毕。

if (future.isDone()) {
    String res = future.get();
}

Future 接口还提供了 cancel 方法,可以取消异步任务。

future.cancel(true);

但这依然不够好。CompletableFuture 可以传入回调函数,当异步任务执行完毕后,会自动调用回调函数。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    Thread.sleep(1000);
    return "Hello";
});

future.thenAccept(res -> { // 执行成功
    System.out.println(res);
});

future.exceptionally(e -> { // 执行失败
    e.printStackTrace();
    return null;
});

如果没有返回值,可以使用 thenRun 方法。

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("Hello");
    Thread.sleep(1000);
});

future.thenRun(() -> {
    System.out.println("World");
});

CompletableFuture 还提供了一些方法,可以将多个异步任务组合在一起。

CompletableFuture<String> future = CompletableFuture
  .supplyAsync(() -> {
      Thread.sleep(1000);
      return "Hello";
  })
  .thenCompose(res -> CompletableFuture.supplyAsync(() -> {
      return res + " World";
  }));

System.out.println(future.get());

thenCompose 方法可以将两个异步任务组合在一起,第一个任务的结果作为第二个任务的输入。

CompletableFuture 还提供了 thenCombine 方法,可以将两个异步任务的结果组合在一起。

CompletableFuture<String> future = CompletableFuture
  .supplyAsync(() -> {
      Thread.sleep(1000);
      return "Hello";
  })
  .thenCombine(CompletableFuture.supplyAsync(() -> {
      Thread.sleep(1000);
      return "World";
  }), (res1, res2) -> {
      return res1 + " " + res2;
  });

System.out.println(future.get());

到这里,已经非常像 JavaScript 中的 Promise 了。

虚拟线程

Java21 引入了虚拟线程,它是一种轻量级的线程,可以更好地利用系统资源。

在 IO 密集型的应用中,通常会创建大量的线程,这会导致线程的上下文切换,降低性能。虚拟线程可以更好地利用系统资源,减少线程的创建和销毁。

Thread vt = Thread.startVirtualThread(() -> {
    System.out.println("Hello");
    Thread.sleep(1000);
});

Comments

Share This Post