条件变量和信号量

条件变量和信号量

Posted by bulingfeng on August 2, 2024

简介

如果让你设计一个生产者和消费者,当队列里面没有数据的时候,消费者暂停;当队列里面数据满了的时候,生产者暂停。而暂停的这个动作怎么来做呢?

  • 写一个sleep函数;但是这个会让程序的效率低下。
  • 使用自旋的方式;这个方式可能会导致cpu飙升,影响系统的性能。

这里就要使用线程之间的互相通讯来解决,条件变量和信号量。

条件变量

条件变量有两种:

  1. 内置的条件变量(其实就是Object类中提供的等待和通知方法);
  2. JUC中带的条件变量;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Object {
    //线程调用此函数之后,便进入WAITING状态。有两种情况可以导致函数返回。
    //1)其他线程调用notify()或notifyAll()唤醒此线程;
    //2)线程被中断,此时,wait()函数会抛出InterruptedException。 
    public final void wait() throws InterruptedException { ... }

    //线程调用此函数之后,便进入WAITING状态。有3种情况可以导致函数返回。
    //1)其他线程调用notify()或notifyAll()唤醒此线程;
    //2)线程被中断,此时,wait()函数会抛出InterruptedException;
    //3)等待时间超过了预设的超时时间:timeout毫秒+nanos纳秒
    public final void wait(long timeout, int nanos)
        throws InterruptedException { ...}

    //跟上一个函数的唯一区别在于超时时间。此函数的超时时间只能精确到毫秒,不能精确到纳秒
    public final native void wait(long timeout) throws InterruptedException;

    //唤醒一个调用了同一个对象上的wait()函数的线程
    public final native void notify();

    //唤醒所有调用了同一个对象上的wait()函数的线程
    public final native void notifyAll();
}

内置条件变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private List<String> list = new ArrayList<>();
  private int count = 0;
  public static void main(String[] args) {
      AtomicDemo demo=new AtomicDemo();
      demo.enqueue("hello");
      String dequeue = demo.dequeue();
      System.out.println(dequeue);
  }

  public void enqueue(String elem) {
      synchronized (this) { // 1)加锁
          list.add(count, elem);
          count++; // 2)更新状态变量
          this.notify(); // 3)通知
      } // 4)解锁
  }

  public String dequeue() {
      synchronized (this) { // 1)加锁
          while (count <= 0) { // 2)检查状态变量是否满足条件,请注意这里必须使用while的条件来判断,因为可能被错误唤醒
              try {
                  this.wait(); // 3)等待并释放锁;4)被唤醒之后重新竞争获取锁
              } catch (InterruptedException e) {
                  return null;
              }
          }
          // 以下为业务逻辑
          count--;
          return list.get(count);
      } // 5)解锁
  }

JUC条件变量使用方法

用法和内置的条件变量用法差不多

  • synchonized -> lock
  • notify -> signal
  • wait -> await
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class QueueCondJUC {
  private List<String> list = new ArrayList<>();
  private int count = 0;
  
  private Lock lock = new ReentrantLock();
  private Condition condition = lock.newCondition();

  public void enqueue(String elem) {
    lock.lock(); // 1)加锁
    try {
      list.add(count, elem);
      count++; // 2)更新状态变量
      condition.signal(); // 3)通知
    } finally {
      lock.unlock(); // 4) 解锁
    }
  }

  public String dequeue() {
    lock.lock(); // 1)加锁
    try {
      while (count <= 0) { // 2)检查状态变量是否满足条件
        try {
          condition.await(); // 3)等待并释放锁;4)被唤醒之后重新竞争获取锁
        } catch (InterruptedException e) {
          return null;
        }
      }
      // 以下为业务逻辑
      count--;
      return list.dequeue(count);
    } finally {
      lock.unlock(); // 5)解锁
    }
  }
}

ReentrantLock的队列用来存储等待锁的线程;

ConditionObject的队列用来存储调用了await的线程队列。

如果调用了signal,就是把调用了await里面的线程给放到ReentrantLock的队列当中。

信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class AtomicDemo {
    public static void main(String[] args) {
        // 创建一个初始许可数量为 3 的 Semaphore
        Semaphore semaphore = new Semaphore(3);

        // 创建并启动多个线程
        for (int i = 1; i <= 5; i++) {
            new Thread(new Worker(i, semaphore)).start();
        }
    }

    static class Worker implements Runnable {
        private int id;
        private Semaphore semaphore;

        public Worker(int id, Semaphore semaphore) {
            this.id = id;
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                // 尝试获取许可
                semaphore.acquire();
                System.out.println("Worker " + id + " 获得资源,开始工作...");

                // 模拟工作
                Thread.sleep(2000);

                System.out.println("Worker " + id + " 完成工作,释放资源");

                // 释放许可
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}