一、Java中的线程同步
在多线程环境中,多个线程可能会尝试修改同一资源。不正确管理线程当然会导致一致性问题。从线程的生命周期来讲,有很多方法可以控制这个生命周期。这里关注wait()和notify()。
Object.wait() 挂起一个线程
Object.notify() 唤醒线程
1、wait()方法
调用wait()会强制当前线程等待,直到其他线程调用同一对象上的notify()或notifyAll() 。为此,当前线程必须拥有对象的monitor。根据Javadocs,这可以通过以下方式发生:
(1)当我们为给定对象执行同步实例方法时
(2)当我们在给定对象上执行了同步块的主体时
(3)通过为Class类型的对象执行同步的静态方法
应该注意,一次只有一个活动线程可以拥有一个对象的监视器。
(1)wait()
wait()方法使当前线程无限期地等待,直到另一个线程为此对象调用notify ( )或notifyAll()。
(2)wait(long timeout)
使用这种方法,我们可以指定一个超时时间,之后线程将被自动唤醒。 可以使用 notify() 或 notifyAll() 在达到超时之前唤醒线程。
请注意,调用 wait(0) 与调用 wait() 相同。
(3)wait(long timeout, int nanos)
这是另一个提供相同功能的方法。可以提供更高的精度。
2、notify() and notifyAll()
我们使用 notify() 方法来唤醒等待访问该对象的监视器的线程。
有两种通知等待线程的方法。
(1)notify()
对于在此对象的监视器上等待的所有线程(通过使用任何一种 wait() 方法),方法 notify() 通知它们中的任何一个任意唤醒。 确切地选择唤醒哪个线程是不确定的,并且取决于实现。
由于 notify() 会唤醒单个随机线程,因此我们可以使用它来实现线程正在执行类似任务的互斥锁定。 但在大多数情况下,实现 notifyAll() 会更可行。
(2)notifyAll()
这个方法只是唤醒所有在这个对象的监视器上等待的线程。被唤醒的线程将以通常的方式完成,就像任何其他线程一样。
3、发送者-接收者同步问题
(1)Data 类
现在我们了解了基础知识,让我们看一个简单的 Sender-Receiver 应用程序,它将使用 wait() 和 notify() 方法来设置它们之间的同步:
Sender应该向Receiver发送一个数据包。
在发送方完成发送之前,接收方无法处理数据包。
同样,发送方不应尝试发送另一个数据包,除非接收方已经处理了前一个数据包。
让我们首先创建一个 Data 类,该类包含将从 Sender 发送到 Receiver 的数据包。 我们将使用 wait() 和 notifyAll() 来设置它们之间的同步:
public class Data {
private String packet;
// True if receiver should wait
// False if sender should wait
private boolean transfer = true;
public synchronized String receive() {
while (transfer) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Thread Interrupted");
}
}
transfer = true;
String returnPacket = packet;
notifyAll();
return returnPacket;
}
public synchronized void send(String packet) {
while (!transfer) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Thread Interrupted");
}
}
transfer = false;
this.packet = packet;
notifyAll();
}
}
(1)packet变量表示正在通过网络传输的数据。
(2)transfer变量,发送者和接收者将使用它进行同步。
如果此变量为true,则Receiver应等待Sender发送消息。
如果为false,则Sender应等待Receiver接收消息。
(3)Sender使用send()方法向Receiver发送数据
如果transfer为false,我们将通过在该线程上调用wait()来等待。
但是当它为true时,我们切换状态,设置我们的消息,并调用notifyAll()以唤醒其他线程以指定发生了重大事件,并且它们可以检查它们是否可以继续执行。
(4)Receiver将使用receive()方法
如果传输被Sender设置为false,它才会继续,否则我们将在这个线程上调用wait() 。
当条件满足时,我们切换状态,通知所有等待线程唤醒,并返回接收到的数据包。
(2)Sender类
public class Sender implements Runnable {
private Data data;
public Sender(Data data) {
this.data = data;
}
public void run() {
String packets[] = {
"First packet",
"Second packet",
"Third packet",
"Fourth packet",
"End"
};
for (String packet : packets) {
data.send(packet);
// Thread.sleep() to mimic heavy server-side processing
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Log.error("Thread interrupted", e);
}
}
}
}
创建一些随机数据包,这些数据包将在packet[]数组中通过网络发送。
对于每个数据包,调用 send()进行发送。
然后随机间隔调用Thread.sleep()来模拟密集的服务器端处理。
(3)Receiver类
循环中调用load.receive(),直到我们得到最后一个数据包。
public class Receiver implements Runnable {
private Data load;
public Receiver(Data load) {
this.load = load;
}
public void run() {
for(String receivedMessage = load.receive();
!"End".equals(receivedMessage);
receivedMessage = load.receive()) {
System.out.println(receivedMessage);
// ...
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Log.error("Thread interrupted", e);
}
}
}
}
(4)调用并输出
public static void main(String[] args) {
Data data = new Data();
Thread sender = new Thread(new Sender(data));
Thread receiver = new Thread(new Receiver(data));
sender.start();
receiver.start();
}
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
将会收到以下输出
First packet
Second packet
Third packet
Fourth packet
二、suspend()与resume()
暂停线程意味着此线程还可以恢复运行,在Java多线程中可以使用suspend()方法暂停线程,使用resume()方法来恢复线程的执行。
但是这两个方法均被标识为作废过期,所以想要实现对线程进行暂停与恢复的处理时可使用wait()、notify()或notifyAll()方法。
三、使用LockSupport类
suspend()与resume()方法是过期作废的,若想实现同样的功能,也可以使用JDK并发包里提供的LockSupport类作为替代,效果是一样的。
1、LockSupport常用源码
使用park和unpark进行线程的阻塞和唤醒操作,park和unpark底层是借助系统层(C语言)方法pthread_mutex和pthread_cond来实现的,通过pthread_cond_wait函数可以对一个线程进行阻塞操作,在这之前,必须先获取pthread_mutex,通过pthread_cond_signal函数对一个线程进行唤醒操作。
// LockSupport
public static void park(Object blocker) {
Thread t = Thread.currentThread();
// blocker在什么对象上进行的阻塞操作
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
public static void parkNanos(Object blocker, long nanos) {
if (nanos > 0) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
// 超时阻塞
UNSAFE.park(false, nanos);
setBlocker(t, null);
}
}
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
2、demo实例1
创建一个再run方法内,调用park方法的线程。
package com.algorithm.demo.thread;
import java.util.concurrent.locks.LockSupport;
public class LockSupportDemo extends Thread{
@Override
public void run()
{
System.out.println("begin" + System.currentTimeMillis());
LockSupport.park();
System.out.println("end" + System.currentTimeMillis());
}
}
实例化线程,主线程休眠然后调用unpark方法。
@Test
void test_locksupport()
{
LockSupportDemo demo = new LockSupportDemo();
demo.start();
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LockSupport.unpark(demo);
}
输出如下
begin1647607496657
end1647607497661
park()方法的作用是将线程暂停,unpark()方法的作用是恢复线程的运行。
3、demo实例2
如果先执行unpark()再执行park()方法,则park()方法不会呈暂停的效果。
package com.algorithm.demo.thread;
import java.util.concurrent.locks.LockSupport;
public class LockSupportDemo extends Thread{
@Override
public void run()
{
System.out.println("begin" + System.currentTimeMillis());
LockSupport.park();
System.out.println("end" + System.currentTimeMillis());
}
}
实例化线程,主线程调用unpark方法。
@Test
void test_locksupport()
{
LockSupportDemo demo = new LockSupportDemo();
demo.start();
LockSupport.unpark(demo);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
输出如下
begin1647607590322
end1647607590322
表明如果先执行unpark()再执行park()方法,则park()方法没有起到暂停的效果。