通信-对象的等待集wait set
1.等待集
java中的每个对象:等待集
1.1 wait()、notify()、notifyAll()
wait()
的作用是让当前线程进入等待状态,同时,wait()也会让当前线程释放它所持有的锁。“直到其他线程调用此对象的 notify() 方法或 notifyAll() 方法”,当前线程被唤醒(进入“就绪状态”)。notify()
和notifyAll()
的作用,则是唤醒当前对象上的等待线程;notify()是唤醒单个线程,而notifyAll()是唤醒所有的线程。wait(long timeout)
让当前线程处于“等待(阻塞)状态”,“直到其他线程调用此对象的notify()方法或 notifyAll() 方法,或者超过指定的时间量”,当前线程被唤醒(进入“就绪状态”)。
1.2
1.2.1wait()方法
作用:使线程停止运行。
解释:
- wait() 方法是 Object 类的方法,作用是使当前执行代码的线程进行等待,并将当前线程置入"预执行队列"中,在 wait() 所在的代码处停止执行,直到接收通知或被中断为止。
- wait() 方法只能在同步方法中或同步块中调用。如果调用 wait() 时,没有持有适当的锁,会抛出异常。
- wait() 方法执行后,当前线程释放锁,线程与其他线程竞争重新获取锁。
1.2.2notify()方法
作用:使停止的线程继续运行。
解释:
- 方法 notify() 也要在同步方法或同步块中调用,该方法是用来通知那些可能等待该对象的对象锁的其它线程,对其发出通知 notify ,并使它们重新获取该对象的对象锁。如果有多个线程等待,则有线程规划器随机挑选出一个呈 wait 状态的线程。
- 在 notify() 方法后,当前线程不会马上释放该对象锁,要等到执行 notify() 方法的线程将程序执行完,也就是退出同步代码块之后才会释放对象锁。
注意:wait,notify 必须使用在 synchronized 同步方法或者代码块内。
程序测试:
import java.util.Scanner;
public class WaitDemo {
private static Object object = new Object();
public static class A extends Thread {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println(i);
}
// 等待 B 线程启动,并完成某个条件
synchronized (object) {
try {
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (int i = 100; i < 110; i++) {
System.out.println(i);
}
}
}
public static void main(String[] args) {
Thread a = new A();
a.start();
Scanner scanner = new Scanner(System.in);
System.out.println("我不输入,A 线程就绝对不会动");
scanner.nextLine();
synchronized (object) {
object.notify();
}
}
}
程序解释:
synchronized (object) {
object.wait();
}
- 先 释放锁(object持有的锁)
- 把线程放到 object 对象的等待集中
- 把线程状态变为 WATING
synchronized (object) {
object.notify();
}
被唤醒后
- 把A线程从等待集中取出
- 把A线程的状态置为 RUNNABLE (此时Main线程还在CPU上且持有锁)
- A线程尝试重新抢 object 锁
1.2.3notifyAll()方法
作用:一次唤醒所有的等待线程。
注意
- wait会使线程状态发生改变 。
RUNNABLE——》WAITING
WAITING——》RUNNABLE - 等的是对象的等待集上。
解释:wait()
是 Object 的一个方法,所以等在该对象的等待集上。 - 使用的时候必需加锁,等在哪个对象上,就对哪个对象加锁。
wait()
执行成功时会释放锁,被唤醒时重新请求加锁。 - 哪个线程调用wait,哪个线程进入等待集。
notify()
只唤醒等待集中的一个,但不保证是哪一个。
程序测试:
public class NWaitDemo {
private static Object o = new Object();
private static int n = 0;
private static class Sub extends Thread {
Sub() {
super("n--");
}
@Override
public void run() {
while (true) {
synchronized (o) {
if (n == 0) {
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
n--;
System.out.println(getName() + ":" + n);
o.notify();
}
}
}
}
private static class Add extends Thread {
Add() {
super("n++");
}
@Override
public void run() {
while (true) {
synchronized (o) {
if (n == 10) {
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
n++;
System.out.println(getName() + ":" + n);
o.notify();
}
}
}
}
public static void main(String[] args) {
Thread a = new Add();
Thread b = new Sub();
a.start();
b.start();
}
}
//保证结果是在0——10之间,但不保证先从10-0再从0-10,也有可能结果是1 0 1 0……
2.练习—按序打印
题目链接:按序打印
2.1普通解法:
public class OneTwoThree {
/* //优化版
private static class Foo {
private int n = 0;
public void one() {
synchronized (this) {
if (n == 0)
n = 1;
System.out.println("one");
}
Thread.yield();
//通过调用yield释放CPU
//可以适当增加性能
}
public void two() {
synchronized (this) {
if (n != 1)
n = 2;
System.out.println("two");
}
Thread.yield();
}
public void three() {
synchronized (this) {
if (n != 2)
n = 0;
System.out.println("three");
}
Thread.yield();
}
}
*/
private static class Foo {
private int n = 0;
public void one() {
synchronized (this) {
if (n != 0) {
return;
}
n = 1;
System.out.println("one");
}
}
public void two() {
synchronized (this) {//同一个对象
if (n != 1) {
return;
}
n = 2;
System.out.println("two");
}
}
public void three() {
synchronized (this) {
if (n != 2) {
return;
}
n = 0;
System.out.println("three");
}
}
}
private static Foo foo = new Foo();
private static class OneThread extends Thread {
@Override
public void run() {
while (true) {
foo.one();
}
}
}
private static class TwoThread extends Thread {
@Override
public void run() {
while (true) {
foo.two();
}
}
}
private static class ThreeThread extends Thread {
@Override
public void run() {
while (true) {
foo.three();
}
}
}
public static void main(String[] args) {
Thread one = new OneThread();
Thread two = new TwoThread();
Thread three = new ThreeThread();
one.start();
two.start();
three.start();
}
}
2.2利用wait():
public class OneTwoThree {
private static class Foo {
private volatile int n = 0;
public void one() throws InterruptedException {
if (n != 0) {
synchronized (this) {
wait();
}
return;
}
// 两句话的次序不能颠倒
System.out.println("one");
n = 1;
synchronized (this) {
notifyAll();
}
}
public void two() throws InterruptedException {
if (n != 1) {
synchronized (this) {
wait();
}
return;
}
System.out.println("two");
n = 2;
synchronized (this) {
notifyAll();
}
}
public void three() throws InterruptedException {
if (n != 2) {
synchronized (this) {
wait();
}
return;
}
System.out.println("three");
n = 0;
synchronized (this) {
notifyAll();
}
}
}
private static Foo foo = new Foo();
private static class OneThread extends Thread {
@Override
public void run() {
while (true) {
try {
foo.one();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private static class TwoThread extends Thread {
@Override
public void run() {
while (true) {
try {
foo.two();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private static class ThreeThread extends Thread {
@Override
public void run() {
while (true) {
try {
foo.three();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
Thread one = new OneThread();
Thread two = new TwoThread();
Thread three = new ThreeThread();
one.start();
two.start();
three.start();
}
}
2.3小结:四种解法
// 虽然 n 不为 0,但不让出 CPU,导致 n 无法及时变化
private int n = 0;
public void oneVersion1() {
synchronized (this) {
if (n != 0) {
return;
}
System.out.println("one");
n = 1;
}
//Thread.yield();//释放CPU 增加性能
}
// 减少了锁的开销
private volatile int n = 0;
public void oneVersion2() {
if (n != 0) {
return;
}
System.out.println("one");
n = 1;
}
//减少了锁的开销 条件不满足时及时释放CPU,但是还参与CPU的争夺
private volatile int n = 0;
public void oneVersion3() {
if (n != 0) {
Thread.yield();//释放CPU 增加性能
return;
}
System.out.println("one");
n = 1;
}
// 条件不满足,让出 CPU,并且不再参加 CPU 的争夺
private int n = 0;
public void oneVersion4() throws InterruptedException {
if (n != 0) {
synchronized (this) {
wait();
}
return;
}
// 两句话的次序不能颠倒 因为线程的调度任意时刻都有可能发生
System.out.println("one");
n = 1;
synchronized (this) {
notifyAll();
}
}
小结:
- v1 和 v2 会导致 CPU 空转,性能较差
- V3 条件不满足时会释放 CPU (但还是会抢CPU)
- V4 条件不满足,让出 CPU,并且不再参加 CPU 的争夺
3.sleep VS wait
-
理论上:wait 和 sleep 是没有可比性的,
wait()
用于线程之间的通信的,sleep()
使线程阻塞一段时间。 -
相同点:都可以让线程放弃执行一段时间。
-
对比:
- wait 之前需要请求锁,而 wait 执行时会先释放锁,等被唤醒时再重新请求锁。这个锁是 wait 对像上的 monitor lock 。
- sleep 是无视锁的存在的,即之前请求的锁不会释放,没有锁也不会请求。
- wait 是 Object 的方法。
- sleep 是 Thread 的静态方法。
4.生产者消费者模式
生产者消费者模式:通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
4.1模拟流水线
- 问题原因:使用notify()——调度出问题
eg:两个生产者、一个消费者
- 消费者一定唤醒生产者
- 生产者不一定唤醒消费者
- 解决方法:使用notifyAll
public class MyQueue {
private int[] array = new int[10];
private int size = 0;
private int front = 0;
private int rear = 0;
public synchronized void put(int message) throws InterruptedException {
if (size == array.length) {
wait();
}
array[rear] = message;
rear = (rear + 1) % array.length;
size++;
notifyAll();
}
public synchronized int take1() throws InterruptedException {
if (size == 0) {
wait();
}
int message = array[front];
front = (front + 1) % array.length;
size--;
// 消费者必须唤醒一个生产者,但如果只是调用 notify
// 不保证唤醒的是生产者
notifyAll();
return message;
}
private static MyQueue queue = new MyQueue();
private static class Producer extends Thread {
@Override
public void run() {
synchronized (this) {
}
for (int i = 0; i < 100; i++) {
try {
queue.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private static class Customer extends Thread {
@Override
public void run() {
while (true) {
int message = 0;
try {
message = queue.take1();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(message);
}
}
}
public static void main(String[] args) {
Thread t1 = new Producer();
Thread t2 = new Producer();
Thread t3 = new Customer();
t1.start();
t2.start();
t3.start();
}
}
4.2优化
size==0情况比较少,用二次判断减少加锁代码块
public int take2() throws InterruptedException {
if (size == 0) {
do {
synchronized (this) {
if (size == 0) {
wait();
// 假如是一个消费者把我唤醒,我应该怎么办
}
}
} while (size == 0);
}
int message;
synchronized (this) {
message = array[front];
front = (front + 1) % array.length;
size--;
notifyAll();
}
return message;
}