简介
如果让你设计一个生产者和消费者,当队列里面没有数据的时候,消费者暂停;当队列里面数据满了的时候,生产者暂停。而暂停的这个动作怎么来做呢?
- 写一个sleep函数;但是这个会让程序的效率低下。
- 使用自旋的方式;这个方式可能会导致cpu飙升,影响系统的性能。
这里就要使用线程之间的互相通讯来解决,条件变量和信号量。
条件变量
条件变量有两种:
- 内置的条件变量(其实就是Object类中提供的等待和通知方法);
- JUC中带的条件变量;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Object {
//线程调用此函数之后,便进入WAITING状态。有两种情况可以导致函数返回。
//1)其他线程调用notify()或notifyAll()唤醒此线程;
//2)线程被中断,此时,wait()函数会抛出InterruptedException。
public final void wait() throws InterruptedException { ... }
//线程调用此函数之后,便进入WAITING状态。有3种情况可以导致函数返回。
//1)其他线程调用notify()或notifyAll()唤醒此线程;
//2)线程被中断,此时,wait()函数会抛出InterruptedException;
//3)等待时间超过了预设的超时时间:timeout毫秒+nanos纳秒
public final void wait(long timeout, int nanos)
throws InterruptedException { ...}
//跟上一个函数的唯一区别在于超时时间。此函数的超时时间只能精确到毫秒,不能精确到纳秒
public final native void wait(long timeout) throws InterruptedException;
//唤醒一个调用了同一个对象上的wait()函数的线程
public final native void notify();
//唤醒所有调用了同一个对象上的wait()函数的线程
public final native void notifyAll();
}
内置条件变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private List<String> list = new ArrayList<>();
private int count = 0;
public static void main(String[] args) {
AtomicDemo demo=new AtomicDemo();
demo.enqueue("hello");
String dequeue = demo.dequeue();
System.out.println(dequeue);
}
public void enqueue(String elem) {
synchronized (this) { // 1)加锁
list.add(count, elem);
count++; // 2)更新状态变量
this.notify(); // 3)通知
} // 4)解锁
}
public String dequeue() {
synchronized (this) { // 1)加锁
while (count <= 0) { // 2)检查状态变量是否满足条件,请注意这里必须使用while的条件来判断,因为可能被错误唤醒
try {
this.wait(); // 3)等待并释放锁;4)被唤醒之后重新竞争获取锁
} catch (InterruptedException e) {
return null;
}
}
// 以下为业务逻辑
count--;
return list.get(count);
} // 5)解锁
}
JUC条件变量使用方法
用法和内置的条件变量用法差不多
- synchonized -> lock
- notify -> signal
- wait -> await
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class QueueCondJUC {
private List<String> list = new ArrayList<>();
private int count = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void enqueue(String elem) {
lock.lock(); // 1)加锁
try {
list.add(count, elem);
count++; // 2)更新状态变量
condition.signal(); // 3)通知
} finally {
lock.unlock(); // 4) 解锁
}
}
public String dequeue() {
lock.lock(); // 1)加锁
try {
while (count <= 0) { // 2)检查状态变量是否满足条件
try {
condition.await(); // 3)等待并释放锁;4)被唤醒之后重新竞争获取锁
} catch (InterruptedException e) {
return null;
}
}
// 以下为业务逻辑
count--;
return list.dequeue(count);
} finally {
lock.unlock(); // 5)解锁
}
}
}
ReentrantLock的队列用来存储等待锁的线程;
ConditionObject的队列用来存储调用了await的线程队列。
如果调用了signal,就是把调用了await里面的线程给放到ReentrantLock的队列当中。
信号量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class AtomicDemo {
public static void main(String[] args) {
// 创建一个初始许可数量为 3 的 Semaphore
Semaphore semaphore = new Semaphore(3);
// 创建并启动多个线程
for (int i = 1; i <= 5; i++) {
new Thread(new Worker(i, semaphore)).start();
}
}
static class Worker implements Runnable {
private int id;
private Semaphore semaphore;
public Worker(int id, Semaphore semaphore) {
this.id = id;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
// 尝试获取许可
semaphore.acquire();
System.out.println("Worker " + id + " 获得资源,开始工作...");
// 模拟工作
Thread.sleep(2000);
System.out.println("Worker " + id + " 完成工作,释放资源");
// 释放许可
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}