《Java核心技术卷一》第十二章

第12章 并发

多任务(multitasking),这是操作系统的一种能力,看起来可以在同一时刻运行多个程序。并发执行的进程数并不受限于 CPU 数。操作系统会为每个进程分配 CPU 时间片,给人并行处理的感觉。

多线程程序在更低一层扩展了多任务的概念:单个程序看起来在同时完成多个任务。每个任务在一个线程(thread)中执行,线程是控制线程的简称。如果一个程序可以同时运行多个线程,则称这个程序是多线程程序(multithreaded)

**多进程(process)**与多线程本质的区别在于:每个进程都拥有自己的一整套变量,而线程共享数据。不过,共享变量使线程之间的通信比进程之间的通信更高效、更容易。此外,在某些操作系统中,与进程相比较,线程更“轻量级”,创建、撤销单个线程比启动新进程的开销要小得多。

12.1 什么是线程

首先来看一个使用了两个线程的简单程序。这个程序可以在银行账户之间完成资金转账。我们使用了一个 Bank 类,它可以存储给定数量的账户的余额。transfer 方法将一定金额从一个账户转移到另一个账户。具体实现见程序清单 12-2。

在第一个线程中,我们将钱从账户 0 转移到账户 1。第二个线程将钱从账户 2 转移到账户 3。

下面是在一个单独的线程中运行一个任务的简单过程:

  1. 将执行这个任务的代码放在一个类的 run 方法中,这个类要实现 Runnable 接口。

Runnable 接口非常简单,只有一个方法:

1
2
3
public interface Runnable {
void run();
}

由于 Runnable 是一个函数式接口,可以用一个 lambda 表达式创建一个实例:

1
2
3
Runnable r = () -> {
// task code
};
  1. 从这个 Runnable 构造一个 Thread 对象:
1
var t = new Thread(r);
  1. 启动线程:
1
t.start();

为了创建单独的线程来完成转账,我们只需要把转账代码放在一个 Runnablerun 方法中,然后启动一个线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
Runnable r = () -> {
try {
for (int i = 0; i < STEPS; i++) {
double amount = MAX_AMOUNT * Math.random();
bank.transfer(0, 1, amount);
Thread.sleep((int) (DELAY * Math.random()));
}
} catch (InterruptedException e) {
// handle
}
};
var t = new Thread(r);
t.start();

在一个 for 循环中(对于给定的步数 STEPS),这个线程会转账一个随机金额,然后休眠随机的延迟时间。

我们要捕获 sleep 方法有可能抛出的 InterruptedException 异常。一般来说,中断用来请求终止一个线程。相应地,出现 InterruptedException 时,run 方法会退出。

这个程序还会启动第二个线程,它从账户 2 向账户 3 转账。运行这个程序时,可以得到类似这样的输出:

1
2
3
4
5
6
7
8
9
10
Thread[Thread-1,5,main] 606.77 from 2 to 3 Total Balance: 400000.00
Thread[Thread-0,5,main] 98.99 from 0 to 1 Total Balance: 400000.00
Thread[Thread-1,5,main] 476.78 from 2 to 3 Total Balance: 400000.00
Thread[Thread-0,5,main] 653.64 from 0 to 1 Total Balance: 400000.00
Thread[Thread-1,5,main] 807.14 from 2 to 3 Total Balance: 400000.00
Thread[Thread-0,5,main] 481.49 from 0 to 1 Total Balance: 400000.00
Thread[Thread-0,5,main] 203.73 from 0 to 1 Total Balance: 400000.00
Thread[Thread-1,5,main] 111.76 from 2 to 3 Total Balance: 400000.00
Thread[Thread-1,5,main] 794.88 from 2 to 3 Total Balance: 400000.00
...

可以看到,两个线程的输出是交错的,这说明它们在并发运行。实际上,两个输出行交错显示时,输出有时会有些混乱。

你要了解的就是这些!现在你已经知道了如何并发地运行任务。

这个程序的完整代码见程序清单 12-1。

注释:还可以通过建立 Thread 类的一个子类来定义线程,如下所示:

1
2
3
4
5
class MyThread extends Thread {
public void run() {
// task code
}
}

然后可以构造这个子类的一个对象,并调用它的 start 方法。不过,现在不再推荐这种方法。应当把要并行运行的任务与运行机制解耦合。如果有多个任务,为每个任务分别创建一个单独的线程开销太大。实际上,可以使用一个线程池,参见 12.6.2 节的介绍。

警告:不要调用 Thread 类或 Runnable 对象的 run 方法。**直接调用 run 方法只会在同一个线程中执行这个任务,而没有启动新的线程。**实际上,应当调用 Thread.start 方法,这会创建一个新线程来执行 run 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
程序清单 12-1  threads/ThreadTest.java

1 package threads;
2
3 /**
4 * @version 1.30 2004-08-01
5 * @author Cay Horstmann
6 */
7 public class ThreadTest {
8 public static final int DELAY = 10;
9 public static final int STEPS = 100;
10 public static final double MAX_AMOUNT = 1000;
11
12 public static void main(String[] args) {
13 var bank = new Bank(4, 10000);
14
15 Runnable task1 = () -> {
16 try {
17 for (int i = 0; i < STEPS; i++) {
18 double amount = MAX_AMOUNT * Math.random();
19 bank.transfer(0, 1, amount);
20 Thread.sleep((int) (DELAY * Math.random()));
21 }
22 } catch (InterruptedException e) {
23 // do nothing
24 }
25 };
26
27 Runnable task2 = () -> {
28 try {
29 for (int i = 0; i < STEPS; i++) {
30 double amount = MAX_AMOUNT * Math.random();
31 bank.transfer(2, 3, amount);
32 Thread.sleep((int) (DELAY * Math.random()));
33 }
34 } catch (InterruptedException e) {
35 // do nothing
36 }
37 };
38
39 new Thread(task1).start();
40 new Thread(task2).start();
41 }
42 }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
程序清单 12-2  threads/Bank.java

1 package threads;
2
3 import java.util.*;
4
5 /**
6 * A bank with a number of bank accounts.
7 */
8 public class Bank {
9 private final double[] accounts;
10
11 /**
12 * Constructs the bank.
13 * @param n the number of accounts
14 * @param initialBalance the initial balance for each account
15 */
16 public Bank(int n, double initialBalance) {
17 accounts = new double[n];
18 Arrays.fill(accounts, initialBalance);
19 }
20
21 /**
22 * Transfers money from one account to another.
23 * @param from the account to transfer from
24 * @param to the account to transfer to
25 * @param amount the amount to transfer
26 */
27 public void transfer(int from, int to, double amount) {
28 if (accounts[from] < amount) return;
29 System.out.print(Thread.currentThread());
30 accounts[from] -= amount;
31 System.out.printf(" %10.2f from %d to %d", amount, from, to);
32 accounts[to] += amount;
33 System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
34 }
35
36 /**
37 * Gets the sum of all account balances.
38 * @return the total balance
39 */
40 public double getTotalBalance() {
41 double sum = 0;
42 for (double a : accounts)
43 sum += a;
44 return sum;
45 }
46
47 /**
48 * Gets the number of accounts in the bank.
49 * @return the number of accounts
50 */
51 public int size() {
52 return accounts.length;
53 }
54 }

12.2 线程状态

线程可以有如下 6 种状态:

  • New(新建)
  • Runnable(可运行)
  • Blocked(阻塞)
  • Waiting(等待)
  • Timed waiting(计时等待)
  • Terminated(终止)

要确定一个线程的当前状态,只需要调用 getState() 方法。

12.2.1 新建线程

当用 new 操作符创建一个新线程时,如 new Thread(r),这个线程还没有开始运行。这意味着它的状态是新建(New)。当一个线程处于新建状态时,程序还没有开始运行线程中的代码。线程可以运行之前还有一些基础工作要做。

12.2.2 可运行线程

一旦调用 start() 方法,线程就处于可运行(Runnable)状态一个可运行的线程可能正在运行,也可能没有运行——要由操作系统为线程提供具体的运行时间。(不过,Java 规范没有将“正在运行”作为一个单独的状态;一个正在运行的线程仍然处于可运行状态。)

**一旦一个线程开始运行,它不一定始终保持运行。**事实上,运行中的线程有时需要暂停,让其他线程有机会运行。线程调度的细节依赖于操作系统提供的服务。抢占式调度系统给每一个可运行线程一个时间片来执行任务。当时间片用完时,操作系统会剥夺该线程的运行权,并给另一个线程一个机会来运行(见图 12-2)。当选择下一个线程时,操作系统会考虑线程的优先级(priority)。

所有现代桌面和服务器操作系统都使用抢占式调度。但是,像手机这样的小型设备可能使用协作式调度。在这样的设备中,一个线程只有在调用 yield 方法或者被阻塞或等待时才失去控制权。

在有多个处理器的机器上,每个处理器可以运行一个线程,而且可以有多个线程并行运行。当然,如果线程数多于处理器的数目,调度器还是需要分配时间片。

一定要记住,在任何给定时刻,一个可运行的线程可能正在运行也可能没有运行(正是出于该原因,这个状态称为“可运行”而不是“正在运行”)。

[!TIP]

API java.lang.Thread 1.0

1
static void yield()

使当前正在执行的线程向另一个线程交出运行权。注意这是一个静态方法。

12.2.3 阻塞和等待线程

**当线程处于阻塞或等待状态时,它暂时是不活动的。它不执行任何代码,并且消耗最少的资源。**要由线程调度器重新激活这个线程。具体细节取决于它是怎样到达非活动状态的。

  • 当一个线程试图获取一个内部的对象锁(而不是 java.util.concurrent 库中的 Lock),而这个锁目前被其他线程占有,该线程就会被阻塞当所有其他线程都释放了这个锁,并且线程调度器允许该线程持有这个锁时,它将变成非阻塞状态。

  • **当线程等待另一个线程通知调度器出现某个条件时,这个线程会进入等待状态。**调用 Object.wait 方法或 Thread.join 方法,或者是等待 java.util.concurrent 库中的 LockCondition 时,就会出现这种情况。实际上,阻塞状态与等待状态并没有太大区别。

  • 有几个方法有超时参数,调用这些方法会让线程进入**计时等待(timed waiting)**状态。**这一状态将一直保持到超时期满或者接收到适当的通知。**带有超时参数的方法有 Thread.sleep 和计时版的 Object.waitThread.joinLock.tryLock 以及 Condition.await

图 12-1 展示了线程可能的状态以及从一个状态到另一个状态可能的转换。当一个线程阻塞或等待时(或者终止时),可以调度另一个线程运行。当一个线程被重新激活(例如,因为超时期满或成功地获得了一个锁),调度器检查它是否具有比当前运行线程更高的优先级。如果是这样,调度器会剥夺某个当前运行线程的运行权,选择运行一个新线程。

12.2.4 终止线程

线程会由于以下两个原因之一而终止:

  • 由于 run 方法正常退出,线程自然终止。
  • 因为一个没有捕获的异常终止了 run 方法,使线程意外终止。

image-20251206102733689

具体来说,可以调用线程的 stop 方法杀死一个线程。该方法抛出一个 ThreadDeath 错误对象,这会杀死线程。不过,stop 方法已经废弃,不要在你自己的代码中调用这个方法。

[!TIP]

API java.lang.Thread 1.0

void join()

​ 等待指定的线程终止。
void join(long millis)
​ 等待指定的线程终止或者等待经过指定的毫秒数。
Thread.State getState() 5
​ 得到这个线程的状态;取值为 NEW、RUNNABL、BLOCKED、WAITING、TIMED_WAITING 或 TERMINATED。
void stop()
​ 停止该线程。这个方法已经废弃。
void suspend()
​ 暂停这个线程的执行。这个方法已经废弃,将来会删除。
void resume()
​ 恢复线程。这个方法只能在调用 suspend() 之后使用。这个方法已经废弃,将来会删除。

12.3 线程属性

下面几节将讨论线程的各种属性,包括中断的状态、守护线程、未捕获异常的处理器以及不应使用的一些遗留特性。

12.3.1 中断线程

当一个线程的 run 方法返回时(执行了方法体中最后一条语句后,执行 return 语句返回),或者如果出现方法中未捕获的异常,这个线程将终止。在 Java 的早期版本中,还有一个 stop 方法,其他线程可以调用这个方法来终止一个线程。但是,这个方法现在已经废弃

除了已经废弃的 stop 方法,没有办法强制一个线程终止。不过,interrupt 方法可以用来请求终止一个线程。

当对一个线程调用 interrupt 方法时,就会设置线程的中断状态(interrupted status)。这是每个线程都有的一个 boolean 标志。各个线程都应该不时地检查这个标志,以判断线程是否被中断。

要确定是否设置了中断状态,首先调用静态方法 Thread.currentThread() 获得当前线程,然后调用 isInterrupted() 方法:

1
2
while (!Thread.currentThread().isInterrupted() && more work to do)
do more work

但是,如果线程被阻塞,就无法检查中断状态。这里就要引入 InterruptedException 异常。在一个被 sleepwait 调用阻塞的线程上调用 interrupt 方法时,那个阻塞调用(即 sleepwait 调用)将被一个 InterruptedException 异常中断。(有一些阻塞 I/O 调用不能被中断,对此应该考虑选择可中断的调用。)

Java 语言并没有要求中断的线程应当终止。中断一个线程只是要引起它的注意。被中断的线程可以决定如何响应中断。某些线程非常重要,所以应该处理这个异常,然后再继续执行。但是,更普遍的情况是,线程只希望将中断解释为一个终止请求。这种线程的 run 方法有如下形式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Runnable r = () -> {
try {
while (!Thread.currentThread().isInterrupted() && more work to do) {
// do more work
}
}
catch (InterruptedException e) {
// thread was interrupted during sleep or wait
}
finally {
// cleanup, if required
}
// exiting the run method terminates the thread

};

如果在每次工作迭代之后都调用 sleep 方法(或者其他可中断方法),isInterrupted 检查既没有必要也没有用处。如果设置了中断状态,此时倘若调用 sleep 方法,它不会休眠。实际上,它会清除中断状态( !)并抛出 InterruptedException。因此,如果你的循环调用了 sleep,不要检查中断状态,而应当捕获 InterruptedException 异常,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
Runnable r = () -> {
try {
while (more work to do) {
// do more work
Thread.sleep(delay);
}
} catch (InterruptedException e) {
// thread was interrupted during sleep
} finally {
// cleanup, if required
}
// exiting the run method terminates the thread
};

注释:有两个非常类似的方法,interruptedisInterruptedinterrupted 方法是一个静态方法,它检查当前线程是否被中断。而且,调用 interrupted 方法会清除该线程的中断状态。 另一方面,isInterrupted 方法是一个实例方法,可以用来检查是否有线程被中断。调用这个方法不会改变中断状态。
你可能会发现以前发布的大量代码在底层抑制了 InterruptedException 异常,如下所示:

1
2
3
4
5
6
7
8
9
void mySubTask() {
...
try {
sleep(delay);
} catch (InterruptedException e) {
// don't ignore!
}
...
}

不要这样做!如果想不出在 catch 子句中可以做什么有意义的工作,仍然有两个合理的选择:

  • catch 子句中调用 Thread.currentThread().interrupt() 来设置中断状态。这样一来,调用者就可以检测中断状态。
1
2
3
4
5
6
7
8
9
void mySubTask() {
...
try {
sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
...
}
  • 或者,更好的选择是,用 throws InterruptedException 标记你的方法,并去掉 try 语句块。这样一来,调用者(或者最终的 run 方法)就可以捕获这个异常。
1
2
3
4
5
void mySubTask() throws InterruptedException {
...
sleep(delay);
...
}

[!TIP]

API java.lang.Thread 1.0

  • void interrupt()

    向线程发送中断请求。线程的中断状态将被设置为 true。如果当前该线程被一个 sleep 调用阻塞,则抛出一个 InterruptedException 异常。

  • static boolean interrupted()

    测试当前线程(即正在执行这个指令的线程)是否被中断。注意,这是一个静态方法。这个调用有一个副作用——它会将当前线程的中断状态重置为 false

  • boolean isInterrupted()

    测试一个线程是否被中断。与静态 interrupted 方法不同,这个调用不改变线程的中断状态。

  • static Thread currentThread()

    返回表示当前正在执行的线程的 Thread 对象。

12.3.2 守护线程

可以通过调用

1
t.setDaemon(true);

将一个线程转换为守护线程(daemon thread)。守护线程并没有什么魔力,它的唯一用途是为其他线程提供服务。计时器线程就是一个例子,它定时地向其他线程发送“计时器嘀嗒”信号,另外清空过时缓存项的线程也是守护线程。只剩下守护线程时,虚拟机就会退出。因为如果只剩下守护线程,就没必要继续运行程序了。

[!TIP]

API java.lang.Thread 1.0

void setDaemon(boolean isDaemon)

标记该线程为守护线程或用户线程。这一方法必须在线程启动之前调用

12.3.3 线程名

默认情况下,线程有容易记的名字,如 Thread-2。可以用 setName 方法为线程设置任何名字:

1
2
var t = new Thread(runnable);
t.setName("Web crawler");

这在线程转储时可能很有用。

12.3.4 未捕获异常的处理器

线程的 run 方法不能抛出任何检查型异常,但是,非检查型异常可能会导致线程终止。在这种情况下,线程会死亡。

不过,对于可以传播的异常,并没有任何 catch 子句。实际上,在线程死亡之前,异常会传递到一个用于处理未捕获异常的处理器。

这个处理器必须属于一个实现了 Thread.UncaughtExceptionHandler 接口的类。这个接口只有一个方法:

1
void uncaughtException(Thread t, Throwable e)

可以用 setUncaughtExceptionHandler 方法为任何线程安装一个处理器。也可以用 Thread 类的静态方法 setDefaultUncaughtExceptionHandler 为所有线程安装一个默认的处理器。替代处理器可以使用日志 API 将未捕获异常的报告发送到一个日志文件。

如果没有安装默认处理器,默认处理器则为 null。但是,如果没有为单个线程安装处理器,那么处理器就是该线程的 ThreadGroup 对象。

注释:**线程组是可以一起管理的线程的集合。**默认情况下,你创建的所有线程都属于同一个线程组,不过也可以建立其他线程组。由于现在引入了更好的特性来处理线程集合,所以建议不要在你自己的程序中使用线程组。

ThreadGroup 类实现了 Thread.UncaughtExceptionHandler 接口。它的 uncaughtException 方法执行以下操作:

  1. 如果该线程组有父线程组,那么调用父线程组的 uncaughtException 方法。
  2. 否则,如果 Thread.getDefaultUncaughtExceptionHandler() 方法返回一个非 null 的处理器,则调用该处理器。
  3. 否则,如果 ThrowableThreadDeath 的一个实例,什么都不做。
  4. 否则,将线程的名字以及 Throwable 的栈轨迹输出到 System.err

你在程序中肯定看到过许多这样的栈轨迹。

[!TIP]

API java.lang.Thread 1.0

  • static void setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler handler) 5

  • static Thread.UncaughtExceptionHandler getDefaultUncaughtExceptionHandler() 5

    设置或获得未捕获异常的默认处理器。

  • void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler handler) 5

  • Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() 5

    设置或获得未捕获异常的处理器。如果没有安装处理器,则将线程组对象作为处理器。

API java.lang.Thread.UncaughtExceptionHandler 5

  • void uncaughtException(Thread t, Throwable e)

    当线程因一个未捕获异常而终止时,要记录一个定制报告。

API java.lang.ThreadGroup 1.0

  • void uncaughtException(Thread t, Throwable e)

    如果有父线程组,调用父线程组的这个方法;或者,如果有默认处理器,就调用 Thread 类的默认处理器;否则,将栈轨迹打印到标准错误流(不过,如果 e 是一个 ThreadDeath 对象,则会抑制栈轨迹。ThreadDeath 对象由已经废弃的 stop 方法生成)。

12.3.5 线程优先级

在 Java 程序设计语言中,每一个线程有一个优先级。默认情况下,一个线程会继承构造它的那个线程的优先级。可以用 setPriority 方法提高或降低任何一个线程的优先级。可以将优先级设置为 MIN_PRIORITY(在 Thread 类中定义为 1)与 MAX_PRIORITY(定义为 10)之间的任何值。NORM_PRIORITY 定义为 5。

每当线程调度器有机会选择新线程时,它首先选择有较高优先级的线程。但是,线程优先级高度依赖于系统。当虚拟机依赖于主机平台的线程实现时,Java 线程的优先级会映射到主机平台的优先级,平台的线程优先级可能有更多级别,也可能更少。

例如,Windows 有 7 个优先级别。Java 的一些优先级会映射到相同的操作系统优先级。

在面向 Linux 的 Oracle JVM 中,会完全忽略线程优先级,即所有线程都有相同的优先级。

在没有使用操作系统线程的 Java 早期版本中,线程优先级可能很有用。不过现在不要使用线程优先级了。

[!TIP]

API java.lang.Thread 1.0

  • void setPriority(int newPriority)

    设置这个线程的优先级。优先级必须在 Thread.MIN_PRIORITYThread.MAX_PRIORITY 之间。一般使用 Thread.NORM_PRIORITY 优先级。

  • static int MIN_PRIORITY

    这是 Thread 可以有的最小优先级。最小优先级的值为 1。

  • static int NORM_PRIORITY

    这是 Thread 的默认优先级。默认优先级为 5。

  • static int MAX_PRIORITY

    这是 Thread 可以有的最大优先级。最大优先级的值为 10。

12.4 同步

在大多数实际的多线程应用中,两个或两个以上的线程需要共享存取相同的数据。如果两个线程存取同一个对象,并且每个线程分别调用了一个修改该对象状态的方法,会发生什么呢?可以想见,这两个线程会相互覆盖。取决于线程访问数据的次序,可能会导致对象被破坏。这种情况通常称为竞态条件(race condition)

12.4.1 竞态条件的一个例子

为了避免多线程破坏共享数据,必须学习如何同步存取(synchronize the access)。在本节中,你会看到如果没有使用同步会发生什么。

在下面的测试程序中,还是考虑我们模拟的银行。与 12.1 节中的例子不同,我们要随机地选择从哪个源账户转账到哪个目标账户。由于这会产生问题,所以下面再来仔细查看 Banktransfer 方法的代码。

1
2
3
4
5
6
7
8
public void transfer(int from, int to, double amount) {
// CAUTION: unsafe when called from multiple threads
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf("%10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
}

下面是 Runnable 实例的代码。run 方法不断地从一个给定银行账户取钱。在每次迭代中,run 方法选择一个随机的目标账户和一个随机金额,调用 bank 对象的 transfer 方法,然后休眠。

1
2
3
4
5
6
7
8
9
10
11
12
Runnable r = () -> {
try {
while (true) {
int toAccount = (int) (bank.size() * Math.random());
double amount = MAX_AMOUNT * Math.random();
bank.transfer(fromAccount, toAccount, amount);
Thread.sleep((int) (DELAY * Math.random()));
}
} catch (InterruptedException e) {
// 被中断即退出
}
};

这个模拟程序运行时,我们不清楚在某一时刻某个银行账户中有多少钱,但是我们知道所有账户的总金额应该保持不变,因为我们所做的只是把钱从一个账户转移到另一个账户。每一次交易结束时,transfer 方法会重新计算总金额并打印出来。

这个程序永远不会结束。只能按下组合键 Ctrl+C 来终止这个程序。

下面是典型的输出:

1
2
3
4
5
6
7
8
9
10
11
12
Thread[Thread-11,5,main]  588.48 from 11 to 44 Total Balance: 100000.00
Thread[Thread-12,5,main] 976.11 from 12 to 22 Total Balance: 100000.00
Thread[Thread-14,5,main] 521.51 from 14 to 22 Total Balance: 100000.00
Thread[Thread-13,5,main] 359.89 from 13 to 81 Total Balance: 100000.00
Thread[Thread-36,5,main] 401.71 from 36 to 73 Total Balance: 99291.06
Thread[Thread-35,5,main] 691.46 from 35 to 77 Total Balance: 99291.06
Thread[Thread-37,5,main] 78.64 from 37 to 3 Total Balance: 99291.06
Thread[Thread-34,5,main] 197.11 from 34 to 69 Total Balance: 99291.06
Thread[Thread-36,5,main] 85.96 from 36 to 4 Total Balance: -99291.06
Thread[Thread- 4,5,main] Thread[Thread-33,5,main]
7.31 from 31 to 32 Total Balance: 99979.24
627.50 from 4 to 5 Total Balance: 99979.24

可以看到,这里出现了错误。对于最初的几次交易,银行余额保持在 $100 000,这是正确的,因为共 100 个账户,每个账户 $1000。不过,经过一段时间后,余额有细微的变化。运行这个程序的时候,可能很快就能发现出错了,有时则可能需要很长的时间才能发现余额不对。这种情况很影响人们的信任,你可能不希望将辛苦挣来的钱存进这样一个银行。

看你能不能找出程序清单 12-3 和程序清单 12-2 中 Bank 类的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
程序清单 12-3  unsynch/UnsynchBankTest.java

1 package unsynch;
2
3 /**
4 * This program shows data corruption when multiple threads access a data structure.
5 * @version 1.32 2018-04-10
6 * @author Cay Horstmann
7 */
8 public class UnsynchBankTest {
9 public static final int NACCOUNTS = 100;
10 public static final double INITIAL_BALANCE = 1000;
11 public static final double MAX_AMOUNT = 1000;
12 public static final int DELAY = 10;
13
14 public static void main(String[] args) {
15 var bank = new Bank(NACCOUNTS, INITIAL_BALANCE);
16
17 for (int i = 0; i < NACCOUNTS; i++) {
18 int fromAccount = i;
19 Runnable r = () -> {
20 try {
21 while (true) {
22 int toAccount = (int) (bank.size() * Math.random());
23 double amount = MAX_AMOUNT * Math.random();
24 bank.transfer(fromAccount, toAccount, amount);
25 Thread.sleep((int) (DELAY * Math.random()));
26 }
27 } catch (InterruptedException e) {
28 // 被中断即退出
29 }
30 };
31 var t = new Thread(r);
32 t.start();
33 }
34 }
35 }

12.4.2 竞态条件详解

12.4.1 节中运行了一个程序,其中有多个线程更新银行账户余额。一段时间之后,不知不觉地出现了错误,可能有些钱会丢失,也可能凭空有钱进账。当两个线程试图同时更新同一个账户时,就会出现这个问题。假设两个线程同时执行指令:

1
accounts[to] += amount;

问题在于这不是原子操作。这个指令可能如下处理:

  1. accounts[to] 加载到寄存器。
  2. 增加 amount
  3. 将结果写回 accounts[to]

现在,假定第 1 个线程执行步骤 1 和步骤 2,然后,它的运行权被抢占。再假设第 2 个线程被唤醒,更新 account 数组中的同一个元素。然后,第 1 个线程被唤醒并完成其第 3 步。这个动作会抹去第 2 个线程所做的修改。这样一来,总金额就不再正确了(见图 12-2)。

image-20251206155701809

我们的测试程序可以检测到这种破坏。(当然,如果线程在完成测试时被中断,尽管概率很小,不过确实有可能出现误报!)

注释:实际上可以查看执行这个类中每一个语句的虚拟机字节码。运行以下命令

1
javap -c -v Bank

Bank.class 文件进行反编译。例如,以下代码行

1
accounts[to] += amount;

会转换为下面的字节码:

1
2
3
4
5
6
7
8
aload_0
getfield #2; // Field accounts:[D
iload_2
dup2
daload
dload 3
dadd
dastore

这些代码的含义无关紧要。重要的是这个自增命令是由多条指令组成的,执行这些指令的线程有可能在任何一条指令上被中断。

出现这种破坏的可能性有多大呢?在一个有多个内核的现代处理器上,出问题的风险相当高。我们将交错执行打印语句和更新余额的语句,以提高在单核处理器上观察到这种问题的概率。

如果删除打印语句,出问题的风险会降低,因为每个线程在再次休眠之前所做的工作很少,调度器不太可能在线程的计算过程中间抢占它的运行权。但是,产生破坏的风险并没有完全消失。如果在负载很重的机器上运行大量线程,那么,即使删除了打印语句,程序依然会出错。这种错误可能几分钟、几小时或几天后才出现。坦白地说,对程序员而言,最糟糕的事情莫过于这种不定期地出现错误。

真正的问题是 transfer 方法可能会在执行到中间时被中断。如果能够确保线程失去控制权之前方法已经运行完成,那么银行账户对象的状态就不会被破坏。

12.4.3 锁对象

有两种机制可防止并发访问同一个代码块。Java 语言为此提供了一个 synchronized 关键字,另外 Java 5 引入了 ReentrantLock 类。synchronized 关键字会自动提供一个锁以及相关的“条件”,对于大多数需要显式锁的情况,这种机制功能很强大,也很便利。不过,我们相信在分别了解锁和条件的内容之后,能更容易地理解 synchronized 关键字。java.util.concurrent 框架为这些基础机制提供了单独的类。

ReentrantLock 保护代码块的基本结构如下:

1
2
3
4
5
6
myLock.lock(); // a ReentrantLock object
try {
// critical section
} finally {
myLock.unlock(); // make sure the Lock is unlocked even if an exception is thrown
}

这个结构确保任何时刻只有一个线程进入临界区。一旦一个线程锁定了锁对象,任何其他线程都无法通过 lock 语句。当其他线程调用 lock 时,它们会暂停,直到第一个线程释放这个锁对象。

警告要把 unlock 操作包在 finally 子句中,这一点至关重要。如果临界区中的代码抛出一个异常,必须释放锁。否则,其他线程将永远阻塞。

注释使用锁时,就不能使用 try-with-resources 语句。首先,解锁方法名不是 close。不过,即使将它重命名(例如,重命名为 close),try-with-resources 语句也无法正常工作。它的首部希望声明一个新变量。但是如果使用一个锁,你可能想使用由多个线程共享的同一个变量,而不是使用一个新变量。

下面使用一个锁来保护 Bank 类的 transfer 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Bank {
private final Lock bankLock = new ReentrantLock();

public void transfer(int from, int to, int amount) {
bankLock.lock();
try {
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf("%10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
} finally {
bankLock.unlock();
}
}
}

假设一个线程调用了 transfer,但是在执行结束前被抢占。再假设第二个线程也调用了 transfer,由于第二个线程不能获得锁,将在调用 lock 方法时被阻塞。它会暂停,必须等待第一个线程执行完 transfer 方法。当第一个线程释放锁时,第二个线程才能开始运行(见图 12-3)。

尝试一下。把加锁代码增加到 transfer 方法并再次运行程序。这个程序可以一直运行下去,银行余额绝对不会有错误。

注意每个 Bank 对象都有自己的 ReentrantLock 对象。如果两个线程试图访问同一个 Bank 对象,那么锁可以用来保证串行化访问。不过,**如果两个线程访问不同的 Bank 对象,每个线程会得到不同的锁对象,两个线程都不会阻塞。**本该如此,因为线程在处理不同的 Bank 实例时,线程之间不会相互影响。

这个锁称为重入(reentrant)锁,因为线程可以反复获得已拥有的锁。锁有一个持有计数(hold count)来跟踪对 lock 方法的嵌套调用。线程每一次调用 lock 后都要调用 unlock 来释放锁。由于这个特性,由一个锁保护的代码可以调用另一个同样使用这个锁的方法。

例如,transfer 方法调用 getTotalBalance 方法,这也会锁定 bankLock 对象,此时 bankLock 对象的持有计数为 2。当 getTotalBalance 方法退出时,持有计数变回 1。当 transfer 方法退出的时候,持有计数变为 0,线程释放锁。

image-20251206160211477

通常我们可能希望保护会更新或检查共享对象的代码块,从而能确信当前操作执行完之后其他线程才能使用同一个对象。

警告:要注意确保不能由于抛出异常而绕过临界区中的代码。如果在临界区代码结束之前抛出了异常,finally 子句将释放锁,但是对象可能处于被破坏的状态。

[!TIP]

API java.util.concurrent.locks.Lock 5

  • void lock()

    获得这个锁;如果锁当前被另一个线程占有,则阻塞。

  • void unlock()

    释放这个锁。

API java.util.concurrent.locks.ReentrantLock 5

  • ReentrantLock()

    构造一个重入锁,可以用来保护一个临界区。

    • ReentrantLock(boolean fair)

      构造一个采用公平策略的锁。一个公平锁倾向于等待时间最长的线程。不过,这种公平保证可能严重影响性能。所以,默认情况下,不要求锁是公平的。

警告:听起来公平锁很不错,但是公平锁要比常规锁慢得多。只有当你确实了解自己要做什么,而且对于你要解决的问题,有一个特定的理由确实要考虑公平性时,才应使用公平锁。即使使用公平锁,也不能保证线程调度器是公平的。如果线程调度器选择忽略一个已经为锁等待很长时间的线程,它就没有机会得到锁的公平处理。

12.4.4 条件对象

通常,线程进入临界区后却发现只有满足了某个条件之后它才能执行。可以使用一个条件对象(condition object)管理那些已经获得了一个锁却不能有效工作的线程。在这一节中,我们会介绍 Java 库中条件对象的实现。由于历史原因,条件对象经常被称为条件变量(conditional variable)

现在来优化银行的模拟程序。如果一个账户没有足够的资金用于转账,则我们不希望从这样的账户转出资金。注意不能使用类似下面的代码:

1
2
if (bank.getBalance(from) >= amount)
bank.transfer(from, to, amount);

在成功地通过这个测试之后,但在调用 transfer 方法之前,当前线程完全有可能被中断:

1
2
3
if (bank.getBalance(from) >= amount)
// thread might be deactivated at this point
bank.transfer(from, to, amount);

在这个线程再次运行时,账户余额可能已经低于提款金额。必须确保在检查余额与转账动作之间没有其他线程修改余额。为此,可以使用一个锁来保护这个测试和转账动作:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void transfer(int from, int to, int amount) {
bankLock.lock();
try {
while (accounts[from] < amount){
// wait
...
}
// transfer funds
...
} finally {
bankLock.unlock();
}
}

现在,当账户中没有足够的资金时,我们会做什么呢?我们会等待,直到另一个线程增加了该账户的资金。但是,这个线程刚刚获得了对 bankLock 的独占访问权,因此别的线程没有存款的机会。这里就要引入条件对象。

**一个锁对象可以有一个或多个关联的条件对象。**可以用 newCondition 方法获得一个条件对象。习惯上会给每个条件对象一个合适的名字来反映它表示的条件。例如,在这里我们建立了一个条件对象来表示“资金充足”条件:

1
2
3
4
5
6
7
class Bank {
private Condition sufficientFunds;

public Bank() {
sufficientFunds = bankLock.newCondition();
}
}

如果 transfer 方法发现资金不足,它会调用:

1
sufficientFunds.await();

当前线程现在暂停,并放弃锁。这允许另一个线程执行,我们希望它能增加账户余额。

等待获得锁的线程和调用了 await 方法的线程存在本质上的不同。一旦一个线程调用了 await 方法,它就进入这个条件的等待集(wait set)。**当锁可用时,该线程并不会变为可运行状态。实际上,它仍保持非活动状态,**直到另一个线程在同一条件上调用 signalAll() 方法。

当另一个线程完成转账时,它应该调用:

1
sufficientFunds.signalAll();

**这个调用会重新激活等待这个条件的所有线程。当这些线程从等待集中移出时,它们再次变为可运行状态,调度器最终将它们再次激活。**同时,它们会尝试重新进入该对象。一旦锁可用,它们中的某个线程将从 await 调用返回,得到这个锁,并从之前暂停的地方继续执行。

此时,线程应当再次测试条件。不能保证现在一定满足条件——signalAll() 方法仅仅是通知等待的线程:现在有可能满足条件,有必要再次检查条件

注释:通常,await 调用应该放在如下形式的一个循环中:

1
2
while (!(OK to proceed))
condition.await();

最终需要有某个其他线程调用 signalAll() 方法,这一点至关重要。当一个线程调用 await 时,它没有办法自行重新激活。它寄希望于其他线程。如果没有其他线程来重新激活这个等待的线程,它就再也不能运行了。这将导致令人不快的死锁(deadlock) 现象。如果所有其他线程都被阻塞,最后一个活动线程调用了 await 方法但没有先解除另外某个线程的阻塞,现在这个线程也会阻塞。此时没有线程可以解除其他线程的阻塞状态,程序会永远挂起。

应该什么时候调用 signalAll() 呢?从经验上讲,只要一个对象的状态有变化,而且可能有利于正在等待的线程,就可以调用 signalAll()。例如,当一个账户余额发生改变时,就应该再给等待的线程一个机会来检查余额。在这个例子中,完成转账时,我们就会调用 signalAll() 方法:

1
2
3
4
5
6
7
8
9
10
11
public void transfer(int from, int to, int amount) {
bankLock.lock();
try {
while (accounts[from] < amount)
sufficientFunds.await();
// transfer funds
sufficientFunds.signalAll();
} finally {
bankLock.unlock();
}
}

注意signalAll() 调用不会立即激活一个等待的线程。它只是解除等待线程的阻塞,使这些线程可以在当前线程释放锁之后竞争访问对象。

另一个方法 signal() 只是随机选择等待集中的一个线程,并解除这个线程的阻塞状态。这比解除所有线程的阻塞更高效,但也存在危险。如果随机选择的线程发现自己仍然不能运行,它就会再次阻塞。如果没有其他线程再次调用 signal(),系统就会进入死锁。

警告只有当线程拥有一个条件的锁时,它才能在这个条件上调用 await()signalAll()signal() 方法。

如果运行程序清单 12-4 中的程序,你会注意到不再有任何错误。总余额永远是 $100000。任何账户都不会出现负的余额(同样地,还是需要按下组合键 Ctrl+C 来终止程序)。你可能还会注意到,这个程序运行起来要慢一些——这是为实现同步机制所涉及的额外工作付出的代价。

实际上,正确使用条件很有挑战性。开始实现你自己的条件对象之前,应该考虑使用 12.5 节中描述的某个结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
程序清单 12-4  synch/Bank.java

1 package synch;
2
3 import java.util.*;
4 import java.util.concurrent.locks.*;
5
6 /**
7 * A bank with a number of bank accounts that uses locks for serializing access.
8 */
9 public class Bank {
10 private final double[] accounts;
11 private Lock bankLock;
12 private Condition sufficientFunds;
13
14 /**
15 * Constructs the bank.
16 * @param n the number of accounts
17 * @param initialBalance the initial balance for each account
18 */
19 public Bank(int n, double initialBalance) {
20 accounts = new double[n];
21 Arrays.fill(accounts, initialBalance);
22 bankLock = new ReentrantLock();
23 sufficientFunds = bankLock.newCondition();
24 }
25
26 /**
27 * Transfers money from one account to another.
28 * @param from the account to transfer from
29 * @param to the account to transfer to
30 * @param amount the amount to transfer
31 */
32 public void transfer(int from, int to, double amount) throws InterruptedException {
33 bankLock.lock();
34 try {
35 while (accounts[from] < amount)
36 sufficientFunds.await();
37 System.out.print(Thread.currentThread());
38 accounts[from] -= amount;
39 System.out.printf(" %10.2f from %d to %d", amount, from, to);
40 accounts[to] += amount;
41 System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
42 sufficientFunds.signalAll();
43 } finally {
44 bankLock.unlock();
45 }
46 }
47
48 /**
49 * Gets the sum of all account balances.
50 * @return the total balance
51 */
52 public double getTotalBalance() {
53 bankLock.lock();
54 try {
55 double sum = 0;
56 for (double a : accounts)
57 sum += a;
58 return sum;
59 } finally {
60 bankLock.unlock();
61 }
62 }
63
64 /**
65 * Gets the number of accounts in the bank.
66 * @return the number of accounts
67 */
68 public int size() {
69 return accounts.length;
70 }
71 }

[!TIP]

API java.util.concurrent.locks.Lock 5

  • Condition newCondition()

    返回一个与这个锁相关联的条件对象。

API java.util.concurrent.locks.Condition 5

  • void await()

    将该线程放在这个条件的等待集中。

  • void signalAll()

    解除该条件等待集中所有线程的阻塞状态。

  • void signal()

    从该条件的等待集中随机选择一个线程,解除其阻塞状态。

12.4.5 synchronized 关键字

在前面的小节中,我们已经了解了如何使用 LockCondition 对象。在进一步深入之前,先对锁和条件的要点做一个总结:

  • 锁用来保护代码段,一次只允许一个线程执行被保护的代码。
  • 锁可以管理试图进入被保护代码段的线程。
  • 一个锁可以有一个或多个关联的条件对象。
  • 每个条件对象管理那些已经进入被保护代码段但还不能运行的线程。

LockCondition 接口允许程序员充分控制锁定。不过,大多数情况下,你并不需要那样控制,完全可以使用 Java 语言内置的一种机制。从 1.0 版开始,Java 中的每个对象都有一个内部锁(intrinsic lock)。如果一个方法声明时有 synchronized 关键字,那么对象的锁将保护整个方法。也就是说,要调用这个方法,线程必须获得内部对象锁。

换句话说,

1
2
3
public synchronized void method() {
method body
}

等价于

1
2
3
4
5
6
7
8
public void method() {
this.intrinsicLock.lock();
try {
method body
} finally {
this.intrinsicLock.unlock();
}
}

例如,可以简单地将 Bank 类的 transfer 方法声明为 synchronized,而不必使用一个显式的锁。

内部对象锁只有一个关联条件。wait 方法将一个线程增加到等待集中,notifyAll / notify 方法可以解除等待线程的阻塞。换句话说,调用 waitnotifyAll 等价于:

1
2
intrinsicCondition.await();
intrinsicCondition.signalAll();

注释waitnotifyAll 以及 notify 方法是 Object 类的 final 方法。Condition 方法必须命名为 awaitsignalAllsignal,从而不会与那些方法发生冲突。

例如,可以用 Java 如下实现 Bank 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Bank {
private double[] accounts;

public synchronized void transfer(int from, int to, int amount)
throws InterruptedException {
while (accounts[from] < amount)
wait(); // wait on intrinsic object lock's single condition
accounts[from] -= amount;
accounts[to] += amount;
notifyAll(); // notify all threads waiting on the condition
}

public synchronized double getTotalBalance() {
// ...
}
}

可以看到,使用 synchronized 关键字可以得到更为简洁的代码。当然,要理解这个代码,你必须知道每个对象都有一个内部锁,并且这个锁有一个内部条件。这个锁会管理试图进入 synchronized 方法的线程,这个条件会管理调用了 wait 的线程。

将静态方法声明为同步也是合法的。如果调用这样一个方法,它会获得关联类对象的内部锁。例如,如果 Bank 类有一个静态同步方法,调用这个方法时,会锁定 Bank.class 对象的锁。因此,没有其他线程可以调用 Bank 类的这个方法或任何其他同步静态方法。

内部锁和条件存在一些限制,包括:

  • 不能中断一个正在尝试获得锁的线程。
  • 不能指定尝试获得锁的超时时间。
  • 每个锁只有一个条件,这很低效。

在代码中应该使用哪一种做法呢?LockCondition 对象还是同步方法?下面是我们的一些建议:

  • 最好既不使用 Lock/Condition 也不使用 synchronized 关键字。在许多情况下,可以使用 java.util.concurrent 包中的某种机制,它会为你处理所有的锁定。
  • 如果 synchronized 关键字适合你的程序,那么尽量使用这种做法,这样可以减少编写的代码量,还能减少出错。程序清单 12-5 给出了用同步方法实现的银行示例。
  • 如果特别需要 Lock/Condition 结构提供的额外能力,则使用 Lock/Condition

[!TIP]

API java.lang.Object 1.0

  • void notifyAll()

    解除在这个对象上调用 wait 方法的那些线程的阻塞状态。该方法只能在同步方法或同步块中调用。如果当前线程不是对象锁的所有者,该方法会抛出一个 IllegalMonitorStateException 异常。

  • void notify()

    随机选择一个在这个对象上调用 wait 方法的线程,解除其阻塞状态。该方法只能在一个同步方法或同步块中调用。如果当前线程不是对象锁的所有者,该方法会抛出一个 IllegalMonitorStateException 异常。

  • void wait()

    导致一个线程进入等待状态,直到它得到通知。该方法只能在一个同步方法或同步块中调用。如果当前线程不是对象锁的所有者,该方法会抛出一个 IllegalMonitorStateException 异常。

  • void wait(long millis)

  • void wait(long millis, int nanos)

    导致一个线程进入等待状态,直到它得到通知或者经过了指定的时间。这些方法只能在一个同步方法或同步块中调用。如果当前线程不是对象锁的所有者,这些方法会抛出 IllegalMonitorStateException 异常。纳秒数不能超过 1 000 000。

12.4.6 同步块

正如前面讨论的,每个 Java 对象都有一个锁。线程可以通过调用同步方法获得这个锁。还有另一种机制可以获得这个锁:即进入一个同步块(synchronized block)。当线程进入有如下形式的一个块时:

1
2
3
synchronized (obj) {  // this is the syntax for a synchronized block
critical section
}

它会获得 obj 的锁。

有时我们会看到一些“专用”(ad hoc)锁,例如:

1
2
3
4
5
6
7
8
9
10
11
12
public class Bank {
private double[] accounts;
private final Object lock = new Object();

public void transfer(int from, int to, int amount) {
synchronized (lock) { // an ad-hoc lock
accounts[from] -= amount;
accounts[to] += amount;
System.out.println(...);
}
}
}

在这里,创建 lock 对象只是为了使用每个 Java 对象拥有的锁。

警告:使用同步块时,要注意锁对象。例如,下面的代码是有问题的:

1
2
3
private final String lock = "LOCK";
synchronized (lock) { ... } // Don't lock on string literal!
...

如果这个代码在同一个程序中出现两次,锁将是同一个对象,因为字符串字面量会共享。这可能导致死锁。

另外,要避免使用基本类型包装器作为锁:

1
private final Integer lock = new Integer(42); // Don't lock on wrappers

构造器调用 new Integer(42) 已经废弃,而且你也不希望维护程序的程序员将这个调用改为 Integer.valueOf(42)。如果将同一个魔法数使用两次,这会意外地共享锁。

如果需要修改一个静态字段,会从特定的类上获得锁,而不是从 getClass() 返回的值上获得:

1
2
synchronized (MyClass.class) { staticCounter++; } // ok
synchronized (getClass()) { staticCounter++; } // Don't

如果从一个子类调用包含这个代码的方法,getClass() 会返回一个不同的 Class 对象!这就不再能保证互斥!

一般来讲,如果必须使用同步块,一定要了解你的锁对象!必须对所有受保护的访问路径使用相同的锁,而且别人不能使用你的锁。

有时程序员使用一个对象的锁来实现额外的原子操作,这种做法称为客户端锁定(client-side locking)。例如,考虑 Vector 类,这是一个列表,它的方法是同步的。现在,假设我们将银行余额存储在一个 Vector<Double> 中。下面是 transfer 方法的一个原生实现:

1
2
3
4
5
public void transfer(Vector<Double> accounts, int from, int to, int amount) { // ERROR
accounts.set(from, accounts.get(from) - amount);
accounts.set(to, accounts.get(to) + amount);
System.out.println(...);
}

Vector 类的 getset 方法是同步的,但是,这对于我们并没有什么帮助。一个线程完全有可能在 transfer 方法中执行完第一个 get 调用之后被抢占。然后另一个线程可能会在相同的位置存储一个不同的值。不过,我们可以截获这个锁:

1
2
3
4
5
6
7
public void transfer(Vector<Double> accounts, int from, int to, int amount) {
synchronized (accounts) {
accounts.set(from, accounts.get(from) - amount);
accounts.set(to, accounts.get(to) + amount);
System.out.println(...);
}
}

这个方法是可行的,但是完全依赖于这样一个事实:Vector 类会对自己的所有更改器方法使用内部锁。不过,确实如此吗?Vector 类的文档没有给出这样的承诺。你必须仔细研究源代码,而且还得希望将来的版本不会引入非同步的更改器方法。可以看到,客户端锁定是非常脆弱的,通常不建议使用。

注释:Java 虚拟机对同步方法提供了内置支持。不过,同步块会编译为很长的字节码序列来管理内部锁。

12.4.7 监视器概念

锁和条件是实现线程同步的强大工具,但是,严格地讲,它们不是面向对象的。多年来,研究人员在努力寻找方法,希望不要求程序员考虑显式锁就可以保证多线程的安全性。最成功的解决方案之一是监视器(monitor),这一概念最早是由 Per Brinch Hansen 和 Tony Hoare 在 20 世纪 70 年代提出的。用 Java 的术语来讲,监视器有如下属性:

  • 监视器是只包含私有字段的类。
  • 监视器类的每个对象有一个关联的锁。
  • 所有方法由这个锁锁定。换句话说,如果客户端调用 obj.method(),那么在方法调用开始时会自动获得 obj 对象的锁,并在方法返回时自动释放这个锁。因为所有的字段是私有的,这样的安排可以确保一个线程处理字段时,没有其他线程能够访问这些字段。
  • 锁可以有任意多个关联的条件。

监视器的早期版本只有单一的条件,使用一种很优雅的语法:可以简单地调用 await accounts[from] >= amount,而不使用任何显式的条件变量。不过,研究表明,盲目地重新测试条件是很低效的。可以利用显式的条件变量解决这一问题,每一个条件变量管理单独的一组线程。

Java 设计者以不太严格的方式调整了监视器概念:Java 中的每一个对象都有一个内部锁和一个内部条件。如果一个方法用 synchronized 关键字声明,那么,它表现得就像是一个监视器方法。可以通过调用 wait / notifyAll / notify 来访问条件变量。

不过,Java 对象在以下 3 个重要方面不同于监视器,这削弱了线程安全性:

  • 字段不要求是 private
  • 方法不要求是 synchronized
  • 内部锁对客户是可用的。

12.4.8 volatile 字段

有时,如果只是为了读写一两个实例字段而使用同步,所带来的开销好像有些不合算。毕竟,怎么可能出错呢?遗憾的是,由于使用现代的处理器与编译器,出错的可能性很大。

  • 有多处理器的计算机能够暂时在寄存器或本地内存缓存中保存内存值。其结果是,运行在不同处理器上的线程可能看到同一个内存位置有不同的值。

  • 编译器可能改变指令执行的顺序以得到最大的吞吐量。编译器不会选择可能改变代码语义的顺序,但是编译器有一个假定,认为内存值只在代码中有显式的修改指令时才会改变。不过,内存值有可能被另一个线程改变!

如果你使用锁来保护可能被多个线程访问的代码,那么不存在这些问题。编译器必须遵守锁的要求,为此要在必要的时候刷新输出本地缓存,而且不能不适当地重排指令顺序。详细的解释见 JSR 133 的 Java 内存模型和线程规范(参见 http://www.jcp.org/en/jsr/detail?id=133)。该规范的大部分内容都很复杂而且技术性很强,不过这个文档中还包含很多解释得很清楚的例子。Brian Goetz 写了一个更易懂的概述文章(www.ibm.com/developerworks/library/j-jtp02244)。

注释:Brian Goetz 创造了以下“同步格言”:“如果写一个变量,而这个变量接下来可能会被另一个线程读取,或者,如果读一个变量,而这个变量可能已经被另一个线程写入值,那么必须使用同步。”

volatile 关键字为实例字段的同步访问提供了一种免锁机制。如果声明一个字段为 volatile,那么编译器和虚拟机就会考虑到该字段可能被另一个线程并发更新。

例如,假设一个对象有一个 boolean 标记 done,它的值由一个线程设置,而由另一个线程查询。如同我们讨论过的,你可以使用锁:

1
2
3
private boolean done;
public synchronized boolean isDone() { return done; }
public synchronized void setDone() { done = true; }

或许使用内部对象锁不是个好主意。如果另一个线程已经对该对象加锁,isDone()setDone() 方法可能会阻塞。如果这是个问题,可以只为这个变量使用一个单独的锁。但是,这会很麻烦。

在这种情况下,将字段声明为 volatile 就很合适:

1
2
3
private volatile boolean done;
public boolean isDone() { return done; }
public void setDone() { done = true; }

编译器会插入适当的代码,以确保如果一个线程中对 done 变量做了修改,这个修改对读取这个变量的所有其他线程都可见。

警告:volatile 变量不能提供原子性。例如,方法

1
public void flipDone() { done = !done; } // not atomic

不能确保将字段中的值取反。无法保证读取、取反和写入不被中断。

12.4.9 final 变量

除非使用锁或 volatile 修饰符,否则无法从多个线程安全地读取一个字段。

还有一种情况可以安全地访问一个共享字段,即这个字段声明为 final 时。考虑以下声明:

1
final var accounts = new HashMap<String, Double>();

其他线程会在构造器完成构造之后才看到这个 accounts 变量。如果不使用 final,就不能保证其他线程看到的是 accounts 更新后的值,它们可能都只是看到 null,而不是新构造的 HashMap。当然,映射的操作并不是线程安全的。如果有多个线程更改和读取这个映射,仍然需要进行同步。

12.4.10 原子性

假设对共享变量除了赋值之外并不做其他操作,那么可以将这些共享变量声明为 volatilejava.util.concurrent.atomic 包中有很多类使用了很高效的机器级指令来保证其他操作的原子性(而没有使用锁)。例如,AtomicInteger 类提供了方法 incrementAndGetdecrementAndGet,它们分别以原子方式对一个整数完成自增或自减操作。例如,可以安全地生成一个数值序列,如下所示:

1
2
3
public static AtomicLong nextNumber = new AtomicLong();
// in some thread...
long id = nextNumber.incrementAndGet();

incrementAndGet 方法以原子方式将 AtomicLong 自增,并返回自增后的值。也就是说,获得值、增 1、设置值和生成新值的操作不会被中断。可以保证即使是多个线程并发地访问同一个实例,也会计算并返回正确的值。

有很多方法可以以原子方式设置和增减值,不过,如果希望完成更复杂的更新,就必须使用 compareAndSet 方法。例如,假设希望跟踪不同线程观察的最大值。下面的代码是不可行的:

1
2
3
public static AtomicLong largest = new AtomicLong();
// in some thread...
largest.set(Math.max(largest.get(), observed)); // ERROR--race condition!

这个更新不是原子的。实际上,可以提供一个 lambda 表达式更新变量,它会为你完成更新。对于这个例子,我们可以调用:

1
largest.updateAndGet(x -> Math.max(x, observed));

1
largest.accumulateAndGet(observed, Math::max);

accumulateAndGet 方法利用一个二元操作符来合并原子值和所提供的参数。还有 getAndUpdategetAndAccumulate 方法可以返回原值。

注释:类 AtomicIntegerAtomicIntegerArrayAtomicIntegerFieldUpdaterAtomicLongArrayAtomicLongFieldUpdaterAtomicReferenceAtomicReferenceArrayAtomicReferenceFieldUpdater 也提供了这些方法。

如果有大量线程要访问相同的原子值,性能会大幅下降,因为乐观更新需要太多次重试。LongAdderLongAccumulator 类解决了这个问题。LongAdder 包括多个变量(加数),其总和为当前值。可以有多个线程更新不同的加数,线程数增加时会自动提供新的加数。通常情况下,只有当所有工作都完成之后才需要总和的值,对于这种情况,这种方法会很高效。性能会有显著的提升。

如果预期可能存在大量竞争,只需要使用 LongAdder 而不是 AtomicLong。方法名稍有区别。要调用 increment 让一个计数器自增,或者调用 add 来增加一个量,另外调用 sum 来获取总和。

1
2
3
4
5
6
7
8
9
var adder = new LongAdder();
for (...) {
pool.submit(() -> {
while (...) {
if (...) adder.increment();
}
});
}
long total = adder.sum();

注释:当然,increment 方法不返回原值。这样做会消除将求和分解到多个加数所带来的性能提升。

LongAccumulator 将这种思想推广到任意的累加操作。在构造器中,可以提供这个操作以及它的零元素。要加入新的值,可以调用 accumulate。调用 get 来获得当前值。下面的代码可以得到与 LongAdder 同样的效果:

1
2
3
var adder = new LongAccumulator(Long::sum, 0);
// in some thread...
adder.accumulate(value);

在内部,这个累加器包含变量 a₁, a₂, …, aₙ。每个变量初始化为零元素(这个例子中零元素为 0)。

调用 accumulate 并提供值 v 时,其中一个变量会以原子方式更新为 aᵢ = aᵢ op v,这里 op 是中缀形式的累加操作。在我们这个例子中,调用 accumulate 会对某个 i 计算 aᵢ = aᵢ + v。

get 的结果是 a₁ op a₂ op … op aₙ。在我们的例子中,这就是累加器的总和:a₁ + a₂ + … + aₙ。

如果选择一个不同的操作,可以计算最小值或最大值。一般来说,这个操作必须满足结合律和交换律。这说明,最终结果不能依赖于以什么顺序结合这些中间值。

另外 DoubleAdderDoubleAccumulator 做法也相同,只不过处理的是 double 值。

12.4.11 死锁

锁和条件不能解决多线程中可能出现的所有问题。考虑下面的情况:

1.账户 1:$200

  1. 账户 2:$300
  2. 线程 1:从账户 1 转 $300 到账户 2
  3. 线程 2:从账户 2 转 $400 到账户 1

如图 12-4 所示,线程 1 和线程 2 显然都被阻塞。因为账户 1 以及账户 2 中的余额都不足以进行转账,两个线程都无法继续执行。

image-20251208215838358

有可能因为每一个线程都在等待更多的钱款存入而导致所有线程都被阻塞。这样的状态称为死锁(deadlock)。

在这个程序里,死锁不会发生,原因很简单。每一次转账金额至多 $1000。因为总共有 100 个账户,而且所有账户的总金额是 $100000,在任意时刻,至少有一个账户的余额高于 $1000。所以,从该账户转账的线程可以继续运行。

但是,如果修改线程的 run 方法,把每次转账至多 $1000 的限制去掉,很快就会发生死锁。试试看。将 NACCOUNTS 设置为 10。使用 MAX_AMOUNT = 2 * INITIAL_BALANCE 构造各个转账线程。然后运行该程序。程序运行一段时间后就会挂起。

提示:当程序挂起时,按下组合键 Ctrl+\,将得到一个线程转储,这会列出所有线程。每一个线程有一个栈轨迹,告诉你线程当前在哪里阻塞。如第 7 章所述,可以运行 jconsole 并查看线程(Threads)面板(见图 12-5)。

image-20251208221248927

还有一种做法会导致死锁:让第 1 个线程负责向第 1 个账户存钱,而不是从第 1 个账户取钱。这样一来,有可能所有线程都集中到一个账户上,每一个线程都试图从这个账户中取出大于该账户余额的钱。试试看。在 SynchBankTest 程序中,来看 TransferRunnable 类的 run 方法。在 transfer 调用中,交换 fromAccounttoAccount。运行程序,会看到它几乎会立即死锁。

还有一种很容易导致死锁的情况:在 SynchBankTest 程序中,将 signalAll 方法改为 signal 方法,会发现程序最终会挂起。(同样,将 NACCOUNTS 设为 10 可以更快地看到这个结果。)signalAll 方法会通知所有等待增加资金的线程,与此不同,signal 方法只解除一个线程的阻塞。如果该线程不能继续运行,所有的线程都会阻塞。考虑下面的场景,这就可能发生死锁:

  1. 账户 1:$1990

  2. 所有其他账户:分别有 $990

  3. 线程 1:从账户 1 转 $995 到账户 2

  4. 所有其他线程:从它们的账户转 $995 到另一个账户

显然,除了线程 1,所有的线程都被阻塞,因为它们的账户中没有足够的金额。

线程 1 继续执行,现在情况如下:

  1. 账户 1:$995
  2. 账户 2:$1985
  3. 所有其他账户:分别有 $990

然后,线程 1 调用 signal 方法。signal 方法随机选择一个线程将它解除阻塞。假定它选择了线程 3。该线程被唤醒,发现在它的账户中没有足够的金额,它再次调用 await。但是,线程 1 仍在运行,将随机地产生一个新的交易,例如:

  1. 线程 1:从账户 1 转 $997 到账户 2

现在,线程 1 也调用 await,所有的线程都被阻塞。系统死锁。

这里的罪魁祸首是 signal 调用。它只为一个线程解除阻塞,而且,它很可能选择一个根本不能继续运行的线程(在我们的例子中,线程 2 必须从账户 2 中取钱)。

遗憾的是,Java 程序设计语言中没有提供任何特性可以避免或打破这些死锁。你必须仔细设计程序,确保不会出现死锁。

12.4.12 为什么废弃 stopsuspend 方法

最初的 Java 版本定义了一个 stop 方法来终止一个线程,另外还有一个 suspend 方法来阻塞一个线程直至另一个线程调用 resumestopsuspend 方法有一些共同点:它们都试图控制一个给定线程的行为,而没有线程的互操作。

stopsuspendresume 方法已经被废弃。stop 方法天生就不安全,经验证明,suspend 方法经常会导致死锁。在本节中,你将看到这些方法为什么有问题,以及怎样避免这些问题。

首先来看看 stop 方法,该方法会终止所有未完成的方法,包括 run 方法。一个线程终止时,它会立即释放被它锁定的所有对象的锁。这会导致对象处于不一致的状态。例如,假设一个 TransferRunnable 在从一个账户向另一个账户转账的过程中被终止,钱已经取出,但还没有存入目标账户,现在银行对象就被破坏了。因为锁已经释放,其他未终止的线程也可以观察到这种破坏。

当一个线程想要终止另一个线程时,它无法知道什么时候调用 stop 方法是安全的,而什么时候会导致对象被破坏。因此,这个方法已经被废弃。希望停止一个线程的时候应该中断该线程,然后被中断的线程可以在安全的时候终止。

注释:一些作者声称 stop 方法被废弃是因为它会导致对象被一个已停止的线程永久锁定。但是,这一说法是错误的。从技术上讲,停止的线程会抛出 ThreadDeath 异常,从而退出它调用的所有同步方法。因此,这个线程会释放它持有的内部对象锁。

接下来看看 suspend 方法有什么问题。与 stop 不同,suspend 不会破坏对象。但是,如果用 suspend 挂起一个持有锁的线程,那么,在这个线程恢复运行之前这个锁是不可用的。如果调用 suspend 方法的线程试图获得同一个锁,程序就会死锁:被挂起的线程等着被恢复,而将其挂起的线程等待获得锁。

在图形用户界面中经常出现这种情况。假设我们有一个图形化的银行模拟程序。Pause 按钮用来挂起转账线程,还有一个 Resume 按钮用来恢复线程。

1
2
3
4
5
6
7
8
9
pauseButton.addActionListener(event -> {
for (int i = 0; i < threads.length; i++)
threads[i].suspend(); // don't do this
});

resumeButton.addActionListener(event -> {
for (int i = 0; i < threads.length; i++)
threads[i].resume();
});

假设有一个 paintComponent 方法,它通过调用 getBalances 方法获得一个余额数组,从而绘制每个账户的一个图表。

就像在 12.7.3 节将要看到的,按钮动作和重绘动作都在同一个线程中,即事件分派线程(event dispatch thread)。考虑下面的情况:

  1. 某个转账线程获得 bank 对象的锁。
  2. 用户点击 Pause 按钮。
  3. 所有转账线程被挂起;其中之一仍然持有 bank 对象的锁。
  4. 因为某种原因,需要重新绘制账户图表。
  5. paintComponent 方法调用 getBalances 方法。
  6. 该方法试图获得 bank 对象的锁。

现在程序会被冻结。事件分派线程不能继续运行,因为锁由一个挂起的线程持有。因此,用户不能点击 Resume 按钮,这些线程永远无法恢复。

如果想安全地挂起线程,可以引入一个变量 suspendRequested,并在 run 方法的某个安全的地方测试这个变量,安全的地方是指在这里该线程没有锁定其他线程需要的对象。当该线程发现 suspendRequested 变量已经设置,就要继续等待,直到再次可用。

12.4.13 按需初始化

有时候,对于某些数据结构,你可能希望第一次需要它时才进行初始化,而且你希望确保这种初始化只发生一次。与其设计你自己的机制,不如利用这样一个事实:虚拟机会在第一次使用类时执行一个静态初始化器,而且只执行一次。虚拟机利用一个锁来确保这一点,所以你不需要自己编程实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class OnDemandData {
// private constructor to ensure only one object is constructed
private OnDemandData() {}

public static OnDemandData getInstance() {
return Holder.INSTANCE;
}

// only initialized on first use, i.e. in the first call to getInstance
private static class Holder {
// VM guarantees that this happens at most once
static final OnDemandData INSTANCE = new OnDemandData();
}
}

警告:要采用这种用法,必须确保构造器不会抛出任何异常。虚拟机不会做第二次尝试来初始化 Holder 类。

12.4.14 线程局部变量

前面几节中,我们讨论了在线程间共享变量的风险。有时可能要避免共享变量,使用 ThreadLocal 辅助类为各个线程提供各自的实例。例如,SimpleDateFormat 类不是线程安全的。假设有一个静态变量:

1
public static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");

如果两个线程都执行以下操作:

1
String dateStamp = dateFormat.format(new Date());

结果可能很混乱,因为 dateFormat 使用的内部数据结构可能会被并发访问所破坏。当然可以使用同步,但这样开销很大;或者也可以在需要时构造一个局部 SimpleDateFormat 对象,不过这也很浪费。

要为每个线程构造一个实例,可以使用以下代码:

1
2
public static final ThreadLocal<SimpleDateFormat> dateFormat =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));

要访问具体的格式化方法,可以调用:

1
String dateStamp = dateFormat.get().format(new Date());

在一个给定线程中首次调用 get() 时,会调用构造器中的 lambda 表达式。在此之后,get() 方法会返回属于当前线程的那个实例。

在多个线程中生成随机数也存在类似的问题。java.util.Random 类是线程安全的,但是如果多个线程需要等待一个共享的随机数生成器,这会很低效。

可以使用 ThreadLocal 辅助类为各个线程提供一个单独的生成器,不过 Java 7 还另外提供了一个便利类。只需要调用以下方法:

1
int random = ThreadLocalRandom.current().nextInt(upperBound);

ThreadLocalRandom.current() 调用会返回特定于当前线程的一个随机数生成器实例。

线程局部变量有时用于向协作完成某个任务的所有方法提供对象,而不必在调用者之间传递这个对象。例如,假设你想共享一个数据库连接。声明以下变量:

1
2
public static final ThreadLocal<Connection> connection =
ThreadLocal.withInitial(() -> null);

任务开始时,为这个线程初始化这个连接:

1
connection.set(connect(url, username, password));

任务调用某些方法,所有方法都在同一个线程中,最终其中一个方法需要这个连接:

1
var result = connection.get().executeQuery(query);

需要说明,同一个调用可以出现在多个线程中。每个线程会得到它自己的连接对象。

警告:在前面的例子中,至关重要的一点是:只有一个任务使用线程。如果使用一个线程池执行任务,你可能不希望向共享相同线程的其他任务提供你的数据库连接。

[!TIP]

API java.lang.ThreadLocal 1.2

  • T get()

    得到这个线程的当前值。如果是首次调用 get,会调用 initialize 来得到这个值。

  • void set(T t)

    为这个线程设置一个新值。

  • void remove()

    删除对应这个线程的值。

  • static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) 8

    创建一个线程局部变量,其初始值通过调用给定的提供者(supplier)生成。

API java.util.concurrent.ThreadLocalRandom 7

  • static ThreadLocalRandom current()

    返回特定于当前线程的 Random 类的一个实例。

12.5 线程安全的集合

如果多个线程要并发地修改一个数据结构,例如散列表,那么很容易破坏这个数据结构(有关散列表的详细信息见第 9 章)。例如,一个线程可能开始向表中插入一个新元素。假设在调整散列表各个桶之间的链接关系的过程中,这个线程的控制权被抢占。如果另一个线程开始遍历同一个散列表,可能会使用无效的链接并造成混乱,有可能抛出异常或者陷入无限循环。

可以通过提供锁来保护共享的数据结构,但是通常更容易的做法是选择线程安全的实现。在下面各小节中,将讨论 Java 类库提供的另外一些线程安全的集合。

12.5.1 阻塞队列

很多线程问题可以使用一个或多个队列以优雅而安全的方式来解决。生产者线程向队列插入元素,消费者线程则获取元素。使用队列,可以安全地从一个线程向另一个线程传递数据。例如,考虑银行转账程序,转账线程可以将转账指令对象插入一个队列,而不是直接访问银行对象。另一个线程从队列中取出指令并完成转账。只有这个线程可以访问银行对象的内部。因此不需要同步。(当然,线程安全的队列类的实现者必须考虑锁和条件,但那是他们的问题,而不是你要考虑的问题。)

当试图向队列添加元素而队列已满,或是想从队列移出元素而队列为空的时候,阻塞队列(blocking queue)将导致线程阻塞。在协调多个线程的工作时,阻塞队列是一个有用的工具。工作线程可以周期性地将中间结果存储在阻塞队列中。其他工作线程移除中间结果,并进一步修改。队列会自动地平衡负载。如果第一组线程运行得比第二组慢,第二组在等待结果时会阻塞。如果第一组线程运行得更快,队列会填满,直到第二组赶上来。表 12-1 给出了阻塞队列的方法。

image-20251208222125796

阻塞队列方法分为以下 3 类,它们的区别在于当队列满或空时它们完成的动作。如果使用队列作为线程管理工具,要用到 puttake 方法。试图向满队列添加元素或者想从空队列得到队头元素时,addremoveelement 操作会抛出异常。当然,在一个多线程程序中,队列可能会在任何时候变空或变满,因此,你可能更想使用 offerpollpeek 方法。如果不能完成任务,这些方法只是返回一个错误提示而不会抛出异常。

注释pollpeek 方法返回 null 来指示失败。因此,向这些队列中插入 null 值是非法的。

还有带有超时时间的 offer 方法和 poll 方法。例如,下面的调用:

1
boolean success = q.offer(x, 100, TimeUnit.MILLISECONDS);

尝试在 100 毫秒时间内在队尾插入一个元素。如果成功返回 true;否则,如果超时,则返回 false。类似地,下面的调用:

1
Object head = q.poll(100, TimeUnit.MILLISECONDS);

尝试在 100 毫秒时间内移除队头元素;如果成功返回队头元素,否则,如果超时,则返回 null

如果队列满,则 put 方法阻塞;如果队列空,则 take 方法阻塞。它们与不带超时参数的 offerpoll 方法等效。

java.util.concurrent 包提供了阻塞队列的几个变体。默认情况下,LinkedBlockingQueue 的容量没有上界,但是,也可以选择指定一个最大容量。LinkedBlockingDeque 是一个双端队列。ArrayBlockingQueue 在构造时需要指定容量,另外可以有一个可选的参数来指定是否需要公平性。若指定了公平性,那么等待了最长时间的线程会优先得到处理。与以往一样,公平性会降低性能,应当在确实非常需要时才使用公平性参数。

PriorityBlockingQueue 是一个优先队列,而不是先进先出队列。元素按照它们的优先级顺序移除。这个队列没有容量上限,但是,如果队列是空的,获取元素的操作会阻塞。(有关优先队列的详细内容参见第 9 章。)

DelayQueue 包含实现了 Delayed 接口的对象:

1
2
3
interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}

getDelay 方法返回对象的剩余延迟。负值表示延迟已经结束。元素只有在延迟结束的情况下才能从 DelayQueue 移除。还需要实现 compareTo 方法。DelayQueue 使用这个方法对元素排序。

Java 7 增加了一个 TransferQueue 接口,允许生产者线程等待,直到消费者准备就绪可以接收元素。如果生产者调用:

1
q.transfer(item);

这个调用会阻塞,直到另一个线程将元素删除。LinkedTransferQueue 类实现了这个接口。

程序清单 12-6 中的程序展示了如何使用阻塞队列来控制一组线程。程序在一个目录及其所有子目录下搜索所有文件,打印出包含指定关键字的行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
程序清单 12-6  blockingQueue/BlockingQueueTest.java

1 package blockingQueue;
2
3 import java.io.*;
4 import java.nio.charset.*;
5 import java.nio.file.*;
6 import java.util.*;
7 import java.util.concurrent.*;
8 import java.util.stream.*;
9
10 /**
11 * @version 1.03 2018-03-17
12 * @author Cay Horstmann
13 */
14 public class BlockingQueueTest {
15 private static final int FILE_QUEUE_SIZE = 10;
16 private static final int SEARCH_THREADS = 100;
17 private static final Path DUMMY = Path.of("");
18
19 private static BlockingQueue<Path> queue =
20 new ArrayBlockingQueue<>(FILE_QUEUE_SIZE);
21
22 public static void main(String[] args) {
23 try (var in = new Scanner(System.in)) {
24 System.out.print("Enter base directory (e.g. /opt/jdk-11-src): ");
25 String directory = in.nextLine();
26 System.out.print("Enter keyword (e.g. volatile): ");
27 String keyword = in.nextLine();
28
29 Runnable enumerator = () -> {
30 try {
31 enumerate(Path.of(directory));
32 queue.put(DUMMY); // 结束标记
33 } catch (IOException e) {
34 e.printStackTrace();
35 } catch (InterruptedException e) {
36 // 被中断则退出
37 }
38 };
39 new Thread(enumerator).start();
40
41 for (int i = 1; i <= SEARCH_THREADS; i++) {
42 Runnable searcher = () -> {
43 try {
44 boolean done = false;
45 while (!done) {
46 Path file = queue.take();
47 if (file == DUMMY) { // 遇到结束标记
48 queue.put(file); // 放回,让其他搜索线程也结束
49 done = true;
50 } else {
51 search(file, keyword);
52 }
53 }
54 } catch (IOException e) {
55 e.printStackTrace();
56 } catch (InterruptedException e) {
57 // 被中断则退出
58 }
59 };
60 new Thread(searcher).start();
61 }
62 }
63 }
64
65 /**
66 * 递归枚举目录及其子目录中的所有文件。
67 * @param directory 起始目录
68 */
69 public static void enumerate(Path directory)
70 throws IOException, InterruptedException
71 {
72 try (Stream<Path> children = Files.list(directory)) {
73 for (Path child : children.toList()) {
74 if (Files.isDirectory(child)) {
75 enumerate(child);
76 } else {
77 queue.put(child);
78 }
79 }
80 }
81 }
82
83 /**
84 * 在文件中搜索给定关键字并打印所有匹配行。
85 * @param file 待搜索文件
86 * @param keyword 关键字
87 */
88 public static void search(Path file, String keyword) throws IOException {
89 try (var in = new Scanner(file, StandardCharsets.UTF_8)) {
90 int lineNumber = 0;
91 while (in.hasNextLine()) {
92 lineNumber++;
93 String line = in.nextLine();
94 if (line.contains(keyword)) {
95 System.out.printf("%s:%d:%s%n", file, lineNumber, line);
96 }
96 }
97 }
98 }
99 }

生产者线程遍历所有子目录下的所有文件并把它们放到一个阻塞队列中。这个操作很快,如果队列没有上限的话,很快就会包含文件系统中的所有文件。

我们同时启动了大量搜索线程。每个搜索线程从队列中取出一个文件,打开它,打印包含指定关键字的所有行,然后取出下一个文件。我们使用了一个小技巧,从而在没有更多工作时终止这个应用。为了发出完成信号,枚举线程会在队列中放置一个虚拟对象(这就像在行李传送带上放一个标着“last bag”的虚拟行李箱)。当搜索线程取到这个虚拟对象时,将其放回并终止。

注意,这里不需要显式的线程同步。在这个应用中,我们使用了队列数据结构作为一种同步机制。

[!TIP]

API java.util.concurrent.ArrayBlockingQueue 5

  • ArrayBlockingQueue(int capacity)

  • ArrayBlockingQueue(int capacity, boolean fair)

    用指定的容量和公平性设置构造一个阻塞队列。队列实现为一个循环数组。

API java.util.concurrent.LinkedBlockingQueue 5

API java.util.concurrent.LinkedBlockingDeque 6

  • LinkedBlockingQueue()

  • LinkedBlockingDeque()

    构造一个无上限的阻塞队列或双向队列,实现为一个链表。

  • LinkedBlockingQueue(int capacity)

  • LinkedBlockingDeque(int capacity)

    根据指定容量构建一个有上限的阻塞队列或双向队列,实现为一个链表。

API java.util.concurrent.DelayQueue 5

  • DelayQueue()

    构造一个包含 Delayed 元素的无上限阻塞队列。只有那些延迟结束的元素可以从队列中移除。

API java.util.concurrent.Delayed 5

  • long getDelay(TimeUnit unit)

    得到该对象的延迟,用给定的时间单位度量。

API java.util.concurrent.PriorityBlockingQueue 5

  • PriorityBlockingQueue()

  • PriorityBlockingQueue(int initialCapacity)

  • PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)

    构造一个无上限阻塞优先队列,实现为一个堆。优先队列的默认初始容量为 11。如果没有指定比较器,则元素必须实现 Comparable 接口。

    API java.util.concurrent.BlockingQueue 5

    • void put(E element)

      添加元素,在必要时阻塞。

    • E take()

      移除并返回队头元素,必要时阻塞。

    • boolean offer(E element, long time, TimeUnit unit)

      添加给定的元素,如果成功返回 true,必要时阻塞,直至元素已经添加或者时间已到。

    • E poll(long time, TimeUnit unit)

      移除并返回队头元素,必要时阻塞,直至元素可用或时间已到。失败时返回 null

    API java.util.concurrent.BlockingDeque 6

    • void putFirst(E element)

    • void putLast(E element)

      添加元素,必要时阻塞。

    • E takeFirst()

    • E takeLast()

      移除并返回队头或队尾元素,必要时阻塞。

    • boolean offerFirst(E element, long time, TimeUnit unit)

    • boolean offerLast(E element, long time, TimeUnit unit)

      添加给定的元素,成功时返回 true,必要时阻塞,直至元素已经添加或时间已到。

    • E pollFirst(long time, TimeUnit unit)

    • E pollLast(long time, TimeUnit unit)

      移除并返回队头或队尾元素,必要时阻塞,直至元素可用或时间已到。失败时返回 null

    API java.util.concurrent.TransferQueue 7

    • void transfer(E element)

    • boolean tryTransfer(E element, long time, TimeUnit unit)

      传输一个值,或者尝试在给定的超时时间内传输这个值,这个调用将阻塞,直到另一个线程将元素删除。第二个方法会在调用成功时返回 true

12.5.2 高效的映射、集和队列

java.util.concurrent 包提供了映射、有序集和队列的高效实现:ConcurrentHashMapConcurrentSkipListMapConcurrentSkipListSetConcurrentLinkedQueue

这些集合使用复杂的算法,通过允许并发地访问数据结构的不同部分尽可能减少竞争。

与大多数集合不同,这些类的 size 方法不一定在常量时间内完成操作。确定这些集合的当前大小通常需要遍历。

注释:有些应用使用庞大的并发散列映射,这些映射太过庞大,以至于无法用 size 方法得到它的大小,因为这个方法只能返回 int。如果一个映射包含超过 20 亿个条目,该如何处理?mappingCount 方法可以把大小作为 long 返回。

集合返回弱一致性(weakly consistent)的迭代器。这意味着迭代器不一定能反映出它们构造之后所做的全部更改,但是,它们不会将同一个值返回两次,也不会抛出 ConcurrentModificationException 异常。

注释:与之形成对照的是,对于 java.util 包中的集合,如果集合在迭代器构造之后发生改变,集合的迭代器将抛出一个 ConcurrentModificationException 异常。

并发散列映射可以高效地支持大量阅读器线程和有限的书写器线程。

注释:散列映射将有相同散列码的所有条目放在同一个“桶”中。有些应用使用的散列函数不太好,以至于所有条目最后都放在很少的桶中,这会使性能严重恶化。即使是通常还算合理的散列函数,如 String 类的散列函数,也可能存在问题。例如,攻击者可以制造大量能得出相同散列值的字符串,让程序速度减慢。在较新的 Java 版本中,并发散列映射将桶组织为树,而不是列表,键类型实现 Comparable,从而可以保证 O(log n) 的性能。

[!TIP]

API java.util.concurrent.ConcurrentLinkedQueue 5

  • ConcurrentLinkedQueue<E>()

    构造一个可以由多个线程安全访问的无上限非阻塞的队列。

API java.util.concurrent.ConcurrentSkipListSet 6

  • ConcurrentSkipListSet<E>()

  • ConcurrentSkipListSet<E>(Comparator<? super E> comp)

    构造一个可以由多个线程安全访问的有序集。第一个构造器要求元素实现 Comparable 接口。

API java.util.concurrent.ConcurrentHashMap<K,V> 5

API java.util.concurrent.ConcurrentSkipListMap<K,V> 6

  • ConcurrentHashMap<K,V>()

  • ConcurrentHashMap<K,V>(int initialCapacity)

  • ConcurrentHashMap<K,V>(int initialCapacity, float loadFactor, int concurrencyLevel)

    构造一个可以由多个线程安全访问的散列映射。默认的初始容量为 16。如果每个桶的平均负载超过装填因子,表的大小会重新调整。装填因子默认值为 0.75。并发级别是估计的并发书写器线程数。

    • ConcurrentSkipListMap<K,V>()

    • ConcurrentSkipListMap<K,V>(Comparator<? super K> comp)

      构造一个可以由多个线程安全访问的有序映射。第一个构造器要求键实现 Comparable 接口。

12.5.3 映射条目的原子更新

ConcurrentHashMap 原来的版本只有为数不多的方法可以实现原子更新,这使得编程有些麻烦。假设我们希望统计观察到某些特性的频度。作为一个简单的例子,假设多个线程会遇到单词,我们想统计它们的频率。

可以使用 ConcurrentHashMap<String, Long> 吗?考虑让计数自增的代码。显然,下面的代码不是线程安全的:

1
2
3
Long oldValue = map.get(word);
Long newValue = oldValue == null ? 1 : oldValue + 1;
map.put(word, newValue); // ERROR—might not replace oldValue

可能会有另一个线程在同时更新同一个计数。

注释:有些程序员很奇怪为什么原本线程安全的数据结构会允许非线程安全的操作。有两种完全不同的情况。如果多个线程修改一个普通的 HashMap,它们可能会破坏内部结构(一个链表数组)。有些链接可能丢失,或者甚至会构成环,使得这个数据结构不再可用。对于 ConcurrentHashMap 绝对不会发生这种情况。在上面的例子中,getput 代码永远不会破坏数据结构。不过,由于操作序列不是原子的,所以结果不可预知。

在老版本的 Java 中,必须使用 replace 操作,它会以原子方式用一个新值替换原值,前提是之前没有其他线程把原值替换为其他值。必须一直这么做,直到替换成功:

1
2
3
4
do {
oldValue = map.get(word);
newValue = oldValue == null ? 1 : oldValue + 1;
} while (!map.replace(word, oldValue, newValue));

或者,可以使用一个 ConcurrentHashMap<String, AtomicLong>,以及以下更新代码:

1
2
map.putIfAbsent(word, new AtomicLong());
map.get(word).incrementAndGet();

很遗憾,这会为每个自增构造一个新的 AtomicLong,而不管是否需要。

如今,Java API 提供了一些新方法,可以更方便地完成原子更新。调用 compute 方法时可以提供一个键和一个计算新值的函数。这个函数接收键和相关联的值(如果没有值,则为 null),它会计算新值。例如,可以如下更新一个整数计数器映射:


《Java核心技术卷一》第十二章
http://example.com/2025/12/06/《Java核心技术卷一》第十二章/
作者
Under1ines
发布于
2025年12月6日
许可协议