内容如下:
- 使用wait/notify实现线程通信。
- 生产者/消费者模式的实现。
- 方法join的使用。
- ThreadLocal类的使用。
3.1 等待/通知机制
3.1.1 等待通知机制的实现
方法wait:
使得当前执行代码的线程进行等待,该方法会将当前线程置于预置执行队列中,并且在wait()所在的代码处停止执行,直到接收到通知或者中断。在调用wait()之前,线程必须获取该对象的对象级锁,也就是说只能在同步方法或者代码块中使用wait()方法。执行wait方法后释放锁。
方法notify:
也要在同步方法或者代码块中执行,在调用前,线程也必须获取到该对象的对象级锁,用来通知那些可能等待该对象的对象锁的其他线程(如果有多个等待,随机挑选一个),对其发出通知,并使它获得当前锁对象。要注意的是只有当notify代码执行完毕,退出同步代码块或者方法的时候才会释放锁,wait等待的线程也不会马上得到锁。
package demo03;
public class Test02 extends Thread{
private Object lock;
public Test02 (Object lock){
this.lock=lock;
}
@Override
public void run() {
synchronized (lock){
System.out.println("刚进入同步方法,开始等待");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("得到通知继续执行");
}
}
}
package demo03;
public class Test01 extends Thread{
private Object lock;
public Test01 (Object lock){
this.lock=lock;
}
@Override
public void run() {
synchronized (lock){
System.out.println("刚进入同步方法,开始等待");
lock.notify();
System.out.println("得到通知继续执行");
}
}
}
package demo03;
public class Run {
public static void main(String[] args) {
Object lock = new Object();
Test02 test02 = new Test02(lock);
test02.start();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Test01 test01 = new Test01(lock);
test01.start();
}
}
结果为:
刚进入同步方法,开始等待
刚进入同步方法,开始等待
得到通知继续执行
得到通知继续执行
3.2 生产者消费者模式的实现
package demo05;
/**
* 生产者
*/
public class P {
private String lock;
public P(String lock){
this.lock = lock;
}
public void setValue() throws InterruptedException {
synchronized (lock){
if(!ValueObject.value.equals("")){
//当有值的时候 生产者就处于等待状态并释放锁(这时候等待消费者唤醒)
lock.wait();
}
//当消费者消费完毕后生产者就重新生产数据并唤醒消费者去消费
String value=System.currentTimeMillis()+"_"+System.nanoTime();
System.out.println("set的值="+value);
ValueObject.value=value;
lock.notify();
}
}
}
package demo05;
/**
* 消费者
*/
public class C {
private String lock;
public C(String lock){
this.lock=lock;
}
public void getValue() throws InterruptedException {
synchronized (lock){
if(ValueObject.value.equals("")){
//当检测到数据被消费掉后 消费者就会处于等待状态并释放锁(这时候等待生产者唤醒)
lock.wait();
}
//当检测到有生产到数据就会去消费掉然后去唤醒生产者去生产
System.out.println("get的值是"+ValueObject.value);
ValueObject.value="";
lock.notify();
}
}
}
package demo05;
/**
* 生产者线程
*/
public class ThreadP extends Thread {
private P p;
public ThreadP(P p){
this.p = p;
}
@Override
public void run() {
while (true){
try {
p.setValue();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package demo05;
public class ThreadC extends Thread{
private C c;
public ThreadC(C c){
this.c = c;
}
@Override
public void run() {
while (true){
try {
c.getValue();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package demo05;
public class Run {
public static void main(String[] args) {
String lock = new String("");
P p = new P(lock);
C c = new C(lock);
ThreadP threadP = new ThreadP(p);
ThreadC threadC = new ThreadC(c);
threadP.start();
threadC.start();
}
}
3.3 通过管道进行线程间通信:字节流
管道流是一种特殊的流,用在于不同的线程直接传送数据,一个线程发送数据到输出管道,另一个线程从输入管道中读取数据。通过管道流实现线程之间的通信,无需再借助临时文件。
package demo07;
import java.io.PipedOutputStream;
public class WriteData {
public void writeMethod(PipedOutputStream out){
try {
System.out.println("write: ");
String outData = "java多线程核心技术";
out.write(outData.getBytes());
System.out.println("write finish");
out.close();
}catch (Exception ex){
ex.printStackTrace();
}
}
}
package demo07;
import java.io.PipedInputStream;
public class ReadData {
public void readMethod(PipedInputStream in){
try {
System.out.println("read :");
byte[] bytes = new byte[1024];
int readLen = in.read(bytes);
while (readLen!=-1){
String newData = new String(bytes,0,readLen);
System.out.print(newData);
readLen = in.read(bytes);
}
System.out.println();
in.close();
}catch (Exception ex){
ex.printStackTrace();
}
}
}
package demo07;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.PipedOutputStream;
public class ThreadWrite extends Thread{
private WriteData writeData;
private PipedOutputStream outputStream;
public ThreadWrite(WriteData writeData, PipedOutputStream out){
this.writeData = writeData;
this.outputStream = out;
}
@Override
public void run() {
writeData.writeMethod(outputStream);
}
}
package demo07;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class ThreadRead extends Thread{
private ReadData readData;
private PipedInputStream inputStream;
public ThreadRead(ReadData readData, PipedInputStream inputStream){
this.readData = readData;
this.inputStream = inputStream;
}
@Override
public void run() {
readData.readMethod(inputStream);
}
}
package demo07;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class Run {
public static void main(String[] args) {
try {
WriteData writeData = new WriteData();
ReadData readData = new ReadData();
PipedInputStream inputStream = new PipedInputStream();
PipedOutputStream outputStream = new PipedOutputStream();
//用这种方式使得两个stream之间进行数据传输
outputStream.connect(inputStream);
ThreadRead threadRead = new ThreadRead(readData,inputStream);
threadRead.start();
Thread.sleep(2000);
ThreadWrite threadWrite = new ThreadWrite(writeData,outputStream);
threadWrite.start();
}catch (Exception ex){
ex.printStackTrace();
}
}
}
3.4 通过管道进行线程间通信:字符流
package demo9;
import java.io.PipedOutputStream;
import java.io.PipedWriter;
public class WriteData {
public void writeMethod(PipedWriter out){
try {
System.out.println("write: ");
String outData = "java多线程核心技术";
out.write(outData);
System.out.println("write finish");
out.close();
}catch (Exception ex){
ex.printStackTrace();
}
}
}
package demo9;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.PipedReader;
public class ReadData {
public void readMethod(PipedReader in){
try {
System.out.println("read :");
char[] chars = new char[2];
int readLen = in.read(chars);
StringBuffer newData = new StringBuffer();
while (readLen!=-1){
String str = new String(chars,0,readLen);
newData.append(str);
readLen = in.read(chars);
}
System.out.println(newData);
in.close();
}catch (Exception ex){
ex.printStackTrace();
}
}
}
package demo9;
import java.io.PipedOutputStream;
import java.io.PipedWriter;
public class ThreadWrite extends Thread{
private WriteData writeData;
private PipedWriter pipedWriter;
public ThreadWrite(WriteData writeData, PipedWriter out){
this.writeData = writeData;
this.pipedWriter = out;
}
@Override
public void run() {
writeData.writeMethod(pipedWriter);
}
}
package demo9;
import java.io.PipedInputStream;
import java.io.PipedReader;
public class ThreadRead extends Thread{
private ReadData readData;
private PipedReader pipedReader;
public ThreadRead(ReadData readData,PipedReader pipedReader){
this.readData = readData;
this.pipedReader = pipedReader;
}
@Override
public void run() {
readData.readMethod(pipedReader);
}
}
package demo9;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.PipedReader;
import java.io.PipedWriter;
public class Run {
public static void main(String[] args) {
try {
WriteData writeData = new WriteData();
ReadData readData = new ReadData();
PipedReader reader = new PipedReader();
PipedWriter pipedWriter = new PipedWriter();
//用这种方式使得两个stream之间进行数据传输
pipedWriter.connect(reader);
ThreadRead threadRead = new ThreadRead(readData,reader);
threadRead.start();
Thread.sleep(2000);
ThreadWrite threadWrite = new ThreadWrite(writeData,pipedWriter);
threadWrite.start();
}catch (Exception ex){
ex.printStackTrace();
}
}
}
3.2 方法join的使用
join的作用:等待线程对象销毁。
它会使所属线程对象X正常执行run()方法中的任务,而使得当前线程Z进入无限期的阻塞中,等待线程X销毁掉后再继续执行Z线程后面的代码。join()方法具有使得线程排队运行的作用,类似于同步效果。
join和synchronized的区别:
join在内部使用wait()方法进行等待,而synchronized关键字使用的是对象监视器原理进行同步。
3.2.1 方法join(long)和sleep(long)的区别
方法join(long)是在内部使用wait(long) 方法实现的,所以join(long)有释放锁的特点。
而sleep(long)方法不释放锁。
3.3 ThreadLocal的使用
ThreadLocal解决的是变量在不同的线程间的隔离性,也就是不同线程拥有自己的值,不同线程的值是可以放入ThreadLocal类中保存的。
3.3.1 验证线程变量的隔离性。
package demo12;
public class Tools {
public static ThreadLocal threadLocal = new ThreadLocal();
}
package demo12;
public class ThreadA extends Thread {
@Override
public void run() {
for (int i = 0; i <100 ; i++) {
Tools.threadLocal.set("ThreadA"+(i+1));
System.out.println("ThreadA 得到value"+Tools.threadLocal.get());
}
}
}
package demo12;
public class ThreadB extends Thread {
@Override
public void run() {
for (int i = 0; i <100 ; i++) {
Tools.threadLocal.set("ThreadB"+(i+1));
System.out.println("ThreadB 得到value"+Tools.threadLocal.get());
}
}
}
package demo12;
public class Run {
public static void main(String[] args) {
try {
ThreadA threadA = new ThreadA();
ThreadB threadB = new ThreadB();
threadA.start();
threadB.start();
for (int i = 0; i <100 ; i++) {
Tools.threadLocal.set("Main"+(i+1));
System.out.println("Main get value ="+Tools.threadLocal.get());
}
}catch (Exception ex){
ex.printStackTrace();
}
}
}
可以看到尽管三个线程都往一个变量中放值,但是每个线程还是能取出自己的值,相互之间隔离。
3.3.2 解决get()返回null的问题
自定义类继承ThreadLocal 类,重写initialValue方法。
3.4 InheritableThreadLocal的使用
3.4.1 值继承
使用InheritableThreadLocal可以在子线程中取得父线程继承下来的值。
package demo13;
import java.util.Date;
public class MyInheritableThreadLocal extends InheritableThreadLocal{
@Override
protected Object initialValue() {
long time = System.currentTimeMillis();
return time;
}
}
package demo13;
public class Tools {
public static MyInheritableThreadLocal my = new MyInheritableThreadLocal();
}
package demo13;
public class ThreadA extends Thread {
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
System.out.println("在ThreadA线程中取值="+Tools.my.get());
Thread.sleep(100);
}
}catch (Exception ex){
}
}
}
package demo13;
public class Run {
public static void main(String[] args) {
try {
for (int i = 0; i < 10; i++) {
System.out.println("在ThreadMain线程中取值="+Tools.my.get());
Thread.sleep(100);
}
Thread.sleep(5000);
ThreadA threadA = new ThreadA();
threadA.start();
}catch (Exception ex){
}
}
}
3.4.2 值继承再修改
我们在继承父线程的值的同时,还可以对值做进一步修改。
package demo13;
import java.util.Date;
public class MyInheritableThreadLocal extends InheritableThreadLocal{
@Override
protected Object initialValue() {
long time = System.currentTimeMillis();
return time;
}
@Override
protected Object childValue(Object parentValue) {
return parentValue+"我在子线程加的~";
}
}
但是要注意:
如果主线程将InheritableThreadLocal中的值做二次修改的话,子线程取得还是原来的旧值。