线程通信
为什么线程之间要通信
每一个线程开始运行时,都有自己的独立空间(栈帧),按照既定的代码一步步执行,直到结束。如果线程都是相互孤立的去执行,那么它的的价值就很低了,如果多个线程能够相互协作来完成工作,这将会带来巨大的价值。
线程通信的方式:等待/通知机制
首先先理解两个概念锁池,等待池
锁池: 存放所有争抢锁的线程,当锁被释放时,锁池中的线程都可以争抢锁
等待池:存放等待其他线程唤醒自己的线程(唤醒的概念我们后面会说到)
Object类中的等待/通知方法
wait() :当前线程置为Waiting状态,此时线程阻塞,并且释放锁,等待当前对象的notify()或者notifyAll()方法唤醒自己,继续执行线程。此时线程放入等待池中。
wait(long time):与wait()同样,线程阻塞,并且释放锁,但是此时会将线程置为Time_Waiting 状态,并且当时间结束时,自动唤醒自己继续执行线程(与Thread类中的sleep有点像,但并不一样)。(time的时间单位时毫秒)
wait(long timeout, int nanos):这个方法与wait()方法同样。可以设置更长的等待时间,等待时间为timeout+nanos,第二个参数的单位时纳秒。
notify() :唤醒等待池中任意一个线程(object1.notify(),只唤醒因为object1.wait()的线程),把它放入到锁池中。(具体是唤醒哪一个线程是JVM去调度)
notifyAll() : 唤醒等待池所有的线程,object1.notfiyAll()只能唤醒所有因为object1.wait()阻塞的线程)。
具体实现:
实现wait()的调用
public static void main(String[] args) {
Object lock = new Object();
new Thread("A"){
@Override
public void run() {
synchronized (lock) {
try {
System.out.println(this.getName() + " 获取锁");
System.out.println(this.getName() + "此时置为Waiting状态");
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
结果:
实现notify()和 notifyAll
public static void main(String[] args) {
Object lock = new Object();
for(int i = 0 ;i < 4;i++) {
new Thread("第" + i+"号线程 ") {
@Override
public void run() {
synchronized (lock) {
try {
System.out.println(this.getName() + " 获取锁");
System.out.println(this.getName() + "此时置为Waiting状态");
lock.wait();
System.out.println(this.getName() + "此时被唤醒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
new Thread("notifyTest") {
@Override
public void run() {
synchronized (lock) {
System.out.println(this.getName() + " 获取锁");
System.out.println("唤醒随机一个线程");
lock.notify();
// System.out.println("唤醒对应的所有线程");
// lock.notifyAll();
}
}
}.start();
}
notify()的结果:
notifyAll()的结果:
Condition接口中的方法
condition也可以与Object一样,将线程置为Waiting状态,让对应的方法去唤醒
conidtion的await()对应Objcet的wait();
condition的signal()对应Object的notify();
condition的signalAll()对应Object的notifyAll();
condition是ReentrantLock 重入锁类下实现的,一个ReentrantLock可以对一个多个condition。
Demo:
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();
Condition lock = reentrantLock.newCondition();
for(int i = 0 ;i < 4;i++) {
new Thread("第" + i+"号线程 ") {
@Override
public void run() {
synchronized (lock) {
try {
System.out.println(this.getName() + " 获取锁");
System.out.println(this.getName() + "此时置为Waiting状态");
lock.await();
System.out.println(this.getName() + "此时被唤醒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
new Thread("notifyTest") {
@Override
public void run() {
synchronized (lock) {
System.out.println(this.getName() + " 获取锁");
// System.out.println("唤醒随机一个线程");
lock.signal();
System.out.println("唤醒对应的所有线程");
lock.signalAll();
}
}
}.start();
}
wait()和sleep()方法的区别与联系
1:wait()方法和sleep()都可以将线程阻塞,并且造成中断。
2:wait()方法是Object方法,sleep()是Thread内部方法;wait()方法底层在调用之前必须获得对应对象的monitor,所以wait()方法必须在一个同步代码块中,而sleep()不需要,它可以在线程的任意位置调用。
3:wait()方法如果不给时间参数,会默认将线程置为Waiting,并且只能等待对象的notify()或者notifyAll()唤醒,而sleep()会将线程置为Time_Waiting状态,当时间超时会自动返回继续执行。
4:sleep()方法并不会释放锁的所有权,但是wait()方法会释放锁的所有权。
notify() 和notifyAll()的区别与联系
1:notify()只是唤醒一个正在wait()当前对象锁的线程,而notifyAll则是唤醒所有。并且nofity()是本地方法,具体唤醒哪一个线程由虚拟机控制;如果有多个线程等待,则线程规划器任意挑选出其中一个wait()状态的线程来发出通知。
2:调用notify()和notifyAll()并不会立马释放对锁的所有权,当同步代码块结束时,才会释放。
3:notify()和notifyAll()把等待池中的线程唤醒,转移到锁池中去。
生产者于消费者模型的实现
生产者与消费者的模型:
生产者和消费者是典型的同步模型。
生产者和消费者之间会有一个容器,也就是所谓的缓冲区,
同一时刻只有一个线程在操作Buffer。
只有当Buffer为不为空时,消费者才能从Buffer中获取元素
只有当Buffer满时,生产者才会停止生产等待消费。
根据这种模型我们会发现生产者必须要在buffer满时阻塞,而消费者必须在buffer为空时阻塞。
所以必须要通过线程通信来实现
使用Object类的方法实现
class Buffertest<E>{
private LinkedList<E> buffer = new LinkedList<E>() ;
private static final int defaultMax = 10;
private int max ;
Buffertest(){
this(defaultMax);
}
Buffertest(int max){
this.max = max;
}
public void put(E value){
synchronized(buffer) {
while(buffer.size() >= max) {
try {
System.out.println("data is full!!!");
buffer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
buffer.addLast(value);
System.out.println(Thread.currentThread().getName() + " put value " +value);
buffer.notifyAll();
}
}
public void take(){
synchronized (buffer) {
while (buffer.isEmpty()) {
try {
System.out.println("data is empty !!!");
buffer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
E e = buffer.removeFirst();
System.out.println(Thread.currentThread().getName() + " take value " + e);
buffer.notifyAll();
}
}
}
class test{
public static void main(String[] args) {
Buffertest buffertest = new Buffertest();
new Thread("productor"){
@Override
public void run() {
while(true){
buffertest.put((int)(1+ Math.random()* 100));
}
}
}.start();
new Thread("consumer"){
@Override
public void run() {
while(true){
buffertest.take();
}
}
}.start();
}
}
使用Condition接口中的方法来实现
class Buffertest1<E>{
private LinkedList<E> buffer = new LinkedList<>();
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
private static final int defauleMax = 10;
int max ;
Buffertest1(){
this(defauleMax);
}
Buffertest1(int max){
this.max = max;
}
public void put(E value ){
lock.lock();
try {
while(buffer.size() >= max){
System.out.println("data is full!!!");
condition.await();
}
buffer.addLast(value);
System.out.println(Thread.currentThread().getName() + " put value " + value);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void take(){
lock.lock();
try {
while(buffer.isEmpty()){
System.out.println(Thread.currentThread().getName() + " data is empty");
condition.await();
}
E e = buffer.pollFirst();
System.out.println(Thread.currentThread().getName() + " get value " +e);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
在这里我们讲解以下内部的一些代码,首先为什么要唤醒所有线程,而不是唤醒一个线程,当然如果只有两个线程一个消费者一个消费者,那么一个线程阻塞,另一个必定不阻塞,唤醒的一定是阻塞的,肯定没有问题。但是有没有可能最后把所有生产者都阻塞了,才去消费呢?是有这种可能的,所以如果只唤醒一个线程,将它从等待池放入锁池,那么性能上有一定的概率会降低,所以但是我们把所有线程都唤醒,那么这种概率就会变小。因为每一次争抢锁时,如果消费者和生产者数量相差不多,这种把消费者或者生产者全部阻塞的概率会变的非常小。
其次,为什么判断是一个循环,而不是if?
因为,if语句只执行一次,如果生产者阻塞,然后被唤醒,但是此时size是满的,理论上是不不能放数据的,但是在被唤醒的生产者线程中,size满还是不满已经判断过了,他会直接执行put,所以有可能会放入的数据量大于了max。消费者同样,有可能拿取的数据最后让size < 0。所以必须循环判断。