Java高并发程序设计/七周七并发
一 概念
- 同步
调动者–>发送消息–>调动者等待反馈–>调动者继续做其他事情 - 异步
调动者–>发送消息–>调动者做其他事情–>调动者接收到消息反馈 - 可见性
共享变量的修改在不同的线程中立即感知 - 并发(concurrency)、并行(parallelism)
并发:多个任务交替执行,多个任务之间还是串行的
并行:真正的同时执行 - 进程和线程
进程:
数据集上的一次运算活动,系统资源分配和调度的基本单位.
包含多个线程(>=1)
线程:
轻量级的线程,程序执行的最小单位,多个线程共享同一线程中资源
二、线程基础
1-基本线程
线程状态
New、Runnable、Terminated、Blocked、Waiting、Timed_Waiting
New–>start—>Runnable—>Terminated
Runnable<—–同步synchronized—–>Blocked.
Runnable—–wait等待—–> Waiting(Timed_Waiting)
Runnable<—-notify通知—– Waiting(Timed_Waiting)
public enum State {
/**
* 尚未启动的线程.
*/
NEW,
/**
* runnable线程的状态,但可能在等待系统资源
*/
RUNNABLE,
/**
* 在等待监控锁lock的线程的状态,等待一个monitor lock来进入同步块或方法,
* 或者重入synchronized block/method
*/
BLOCKED,
/**
* waiting thread的状态
* 当调用如下方法并未超时时thread处于waiting state:
* Object.wait
* Thread.join
* LockSupport.park
* 处于waiting的线程等待另外的线程调用特定的方法
* 如
* (1)一个线程在一个对象上调用了Object.wait(),等待另外一个线程调用
* Object.notify或者Object.notifyAll()
* (2)一个线程调用了Thread.join(),等待另外一个线程terminate结束
*/
WAITING,
/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
*/
TIMED_WAITING,
/**
* Thread state for a terminated thread.
* The thread has completed execution.
*/
TERMINATED;
}
新建线程
class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("实现Runnable,以构造参数形式传入");
}
}
public class ThreadTest {
public static void main(String[] args) {
//继承Thread重写run
Thread thread1 = new Thread(){
@Override
public void run() {
System.out.println("匿名内部类重写run方法");
}
};
thread1.start();
//当有Runnable时调用start会调用Runnable的start
Thread thread2 = new Thread(new MyRunnable());
thread2.start();
}
}
2-stop
thread(实例).stop()停止线程,但也会释放资源,可能会造成资源破坏
3-中断
清楚中断
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
判断是否中断
public boolean isInterrupted() {
return isInterrupted(false);
}
判断并清楚中断
private native boolean isInterrupted(boolean ClearInterrupted);
public class Interrupt {
public static void main(String[] args) {
Thread thread = new Thread(){
@Override
public void run() {
while (true) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("当前线程被中断");//A
}
try {//sleep过程中被中断会有InterruptedException异常
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("Thread.sleep 被中断");
//sleep过程中被中断会有InterruptedException异常,会清楚中断标志
//重新设置中断,让A处可以响应
Thread.currentThread().interrupt();
}
}
}
};
thread.start();
thread.interrupt();
}
}
4-线程协同(wait、notify)
wait和notify是Object对象中的实例方法
重点:
获取监听器–>wait–>释放监听器–>等待被notify–>重获监听器–>继续
wait和sleep区别,同样是线程进入等待
- wait会被唤醒,wait执行完会释放资源,被唤醒后重新获取资源后才继续
- sleep不会释放资源
/**
*
* 随机唤醒一个等待对象object 监听器的线程,被唤醒线程不会立即执行,而是先获得这个对象的锁,然后继续执行
* 只有拥有对象监听器的线程才能调用这个方法,线程成为对象监听器拥有者的方法:
* By executing a synchronized instance method of that object.
* By executing the body of a synchronized statement that synchronizes on the object.
* For objects of type Class by executing a synchronized static method of that class.
*
*/
public final native void notify();
/**
* 同notify,只是唤醒对象等待队列中所有线程,被唤醒的线程同样不会立即执行,必须等待获取到对象的监听器
*/
public final native void notifyAll();
/**
* 使得线程进入等待队列,到另外一个线程调用notify或者notifyAll,相当于wait(0)
* 该线程必须是这个对象监听器的拥有者,当调用wait后,释放监听器,等待另外一个线程通知,然后再尝试获取到对象监听后继续
* 通常的使用模式为
* <pre>
* synchronized (obj) {
* while (<condition does not hold>)
* obj.wait();
* ... // Perform action appropriate to condition
* }
*/
public final void wait() throws InterruptedException {
wait(0);
}
另外时间型等待,详见JDK说明
public final native void wait(long timeout) throws InterruptedException;
public final void wait(long timeout, int nanos) throws InterruptedException {
}
demo
public class WaitAndnotify {
static Object obj = new Object();
public static class WaitThread extends Thread {
@Override
public void run() {
synchronized (obj) {//获取监听器
System.out.println(System.currentTimeMillis() + ": WaitThread获取监听器");
try {
System.out.println(System.currentTimeMillis() + ": WaitThread执行完wait,释放监听器");
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() + ": 等待后,尝试充新获得到了监听器,继续执行");
}
}
}
public static class NotifyThread extends Thread {
@Override
public void run() {
synchronized (obj) {//获取监听器
System.out.println(System.currentTimeMillis() + ": NotifyThread获取监听器");
System.out.println(System.currentTimeMillis() + ": NotifyThread通知");
obj.notify();//通知
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() + ": NotifyThread释放监听器");
}
}
}
public static void main(String[] args) {
Thread waitThread = new WaitThread();
Thread notifyThread = new NotifyThread();
waitThread.start();
notifyThread.start();
}
//输出
//1528299707504: WaitThread获取监听器
//1528299707504: WaitThread执行完wait,释放监听器
//1528299707504: NotifyThread获取监听器
//1528299707504: NotifyThread通知
//1528299709510: NotifyThread释放监听器
//1528299709510: 等待后,尝试充新获得到了监听器,继续执行
}
5-join
调用者,等待该线程die
yeild是礼让cpu,暂时退出资源和操作系统的中yeild类似
public final synchronized void join(long millis)
throws InterruptedException {
}
public final synchronized void join(long millis, int nanos)
throws InterruptedException {
}
public final void join() throws InterruptedException {
join(0);
}
demo,mian等待joinThreadA执行完毕
class JoinThread extends Thread {
public JoinThread(String name) {
super(name);
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println(this.getName() + ": " + i);
}
}
}
public class Join {
public static void main(String[] args) throws InterruptedException {
JoinThread joinThreadA = new JoinThread("A");
JoinThread joinThreadB = new JoinThread("B");
joinThreadA.start();
joinThreadA.join();
joinThreadB.run();
}
// A: 0
// A: 1
// A: 2
// A: 3
// A: 4
// A: 5
// A: 6
// A: 7
// A: 8
// A: 9
// B: 0
// B: 1
// B: 2
// B: 3
// B: 4
// B: 5
// B: 6
// B: 7
// B: 8
// B: 9
}
6-join
优先级,和系统的任务优先级类似,高优先级优先获取资源,thread中有10个等级的优先级,数值越大等级越高
/**
* The minimum priority that a thread can have.
*/
public final static int MIN_PRIORITY = 1;
/**
* The default priority that is assigned to a thread.
*/
public final static int NORM_PRIORITY = 5;
/**
* The maximum priority that a thread can have.
*/
public final static int MAX_PRIORITY = 10;
demo在存在资源竞争下高优先级,获取资源的可能性大
class PriorityThread extends Thread {
public PriorityThread(String name) {
super(name);
}
int cnt = 0;
@Override
public void run() {
while (true) {
synchronized (PriorityTest.class) {//产生资源竞争
cnt++;
if (cnt > 10000000) {
System.out.println(Thread.currentThread().getName() + " completed");
break;
}
}
}
}
}
public class PriorityTest {
public static void main(String[] args) {
Thread higherPriorityThread = new PriorityThread("HigherPriorityThrea");
Thread lowerPriorityThread = new PriorityThread("LowerPriorityThrea");
higherPriorityThread.setPriority(Thread.MAX_PRIORITY);//设置高优先级
lowerPriorityThread.setPriority(Thread.MIN_PRIORITY);//设置低优先级
lowerPriorityThread.start();
higherPriorityThread.start();
}
}
7-synchronized
synchronized实现线程间的同步协作,对同步的代码加锁,保证原子重入性
使用:
- 指定对象构造同步代码块:对给定对象加锁
- 作为方法的修饰:相当于对当前对象实例加锁
- 作为静态方法修饰:相当于对当前类加锁
1-指定对象:对给定对象加锁
class SynchronizedThread1 extends Thread {
Object instance;
static int cnt = 0;
public SynchronizedThread1(Object instance, String name) {
super(name);
this.instance = instance;
}
private void increse() {
cnt++;
}
@Override
public void run() {
for (int j = 0; j < 10000; j++) {
//构造同步代码块,保证同步
synchronized (instance) { //构造同步代码块,对对象加锁
increse();
}
//--不能保证同步,cnt计数<20000
//increse();
}
}
public static void main(String[] args) throws InterruptedException {
Object instance = new Object();
Thread thread1 = new SynchronizedThread1(instance, "thread1");
Thread thread2 = new SynchronizedThread1(instance, "thread2");
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(cnt);
}
}
2-作用与静态方法和方法
public class SynchronizedThread2 implements Runnable {
static int cnt = 0;
//对方法加锁==对实例加锁
private synchronized void increse() {
cnt++;
}
// //对方法加锁==对类加锁
// private static synchronized void increse() {
// cnt++;
// }
@Override
public void run() {
for (int j = 0; j < 10000; j++) {
increse();
}
}
public static void main(String[] args) throws InterruptedException {
SynchronizedThread2 instance = new SynchronizedThread2();
Thread thread1 = new Thread(instance, "thread1");
Thread thread2 = new Thread(instance, "thread2");
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(cnt);
//instance3和instance4是不同实例
//修正需要private static synchronized void increse() 对静态方法== 对类加锁
// SynchronizedThread2 instance3 = new SynchronizedThread2();
// SynchronizedThread2 instance4 = new SynchronizedThread2();
// Thread thread3 = new Thread(instance3, "thread3");
// Thread thread4 = new Thread(instance4, "thread4");
// thread3.start();
// thread4.start();
// thread3.join();
// thread4.join();
// System.out.println(cnt);
}
}
6-线程中的非线程安全容器
ArrayList是线程非安全的多线程中会导致扩容时的不一致性,抛出类似
java.lang.ArrayIndexOutOfBoundException异常
HashMap
也是非线程安全的,用ConcurrentHashMap
TODO 这地方补充下HashMap和ConcurrentHashMap的实现Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析
深入分析hashmap
Java容器(四):HashMap(Java 7)的实现原理
三 并发包
同步
方法一:synchronized 同步块,方法 结合Object.wait()和Object.notify()
方法二:synchronized 的“升级版” 重入锁ReentrantLock
synchronized等待锁,获得锁执行
ReentrantLock在此基础上,增加中断机制,中断后放弃后取消对锁的请求
重入锁ReentrantLock
- 一般应用
和synchronized很相似,但需要开发者成对的加锁和释放锁
reentrantLock.lock();
try {
//临界区
} finally {
reentrantLock.unlock();
}
- 支持中断响应
ReentrantLock reentrantLock = new ReentrantLock();
try {
reentrantLock.lockInterruptibly();
//临界区
} catch (InterruptedException e) {
e.printStackTrace();
reentrantLock.unlock();
}
- 限时锁获取(避免死锁的一种方法)
在设定时间内尝试获取锁,获取锁后返回true
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
ReentrantLock reentrantLock = new ReentrantLock();
try {
if (reentrantLock.tryLock(10, TimeUnit.SECONDS)) {
//临界区
}
} catch (InterruptedException e) {
e.printStackTrace();
if (reentrantLock.isHeldByCurrentThread()) {
reentrantLock.unlock();
}
}
相似,tryLock(),尝试获取锁,若此时锁被占用,直接返回flase
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
- 公平锁
lock在请求等待队列,随机获取一个等待对象,同时根据系统调度的高效性倾向,线程倾向再次获取已持有的锁,这样锁是不公平的,ReentrantLock 很简练的实现了公平锁. 当然,要维护一个有序队列,效率要低
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
重入锁ReentrantLock的Condition
和synchronized和object.wait()和object.notify()类似
ReentrantLock的newCondition方法返回一个和ReentrantLock绑定的Condition,配合使用使得ReentrantLock有资源释放等待通知和通知执行的功能
public Condition newCondition() {
return sync.newCondition();
}
public interface Condition {
//使得当前线程进入等待,自动释放锁,直到被signalled或者interrupted,通知后先re-acquire the lock associated with this condition然后返回继续执行
void await() throws InterruptedException;
//相似不可中断,具体见JDK文档
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
/**
* Wakes up one waiting thread.
*/
void signal();
/**
* Wakes up all waiting threads
*/
void signalAll();
}
demo片段
ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();
try {
if (reentrantLock.tryLock(10, TimeUnit.SECONDS)) {
condition.await();//等待通知
//临界区
}
} catch (InterruptedException e) {
e.printStackTrace();
if (reentrantLock.isHeldByCurrentThread()) {
reentrantLock.unlock();
}
}