目录
一、死锁
死锁的四个条件
死锁产生的4个必要条件
互斥条件:一个资源每次只能被一个进程使用,即在一段时间内某 资源仅为一个进程所占有。此时若有其他进程请求该资源,则请求进程只能等待。
请求与保持条件:进程已经保持了至少一个资源,但又提出了新的资源请求,而该资源 已被其他进程占有,此时请求进程被阻塞,但对自己已获得的资源保持不放。
不可剥夺条件:进程所获得的资源在未使用完毕之前,不能被其他进程强行夺走,即只能 由获得该资源的进程自己来释放(只能是主动释放)。
循环等待条件: 若干进程间形成首尾相接循环等待资源的关系
死锁的简单例子
1.顺序死锁
public class SimpleDeadLock implements Runnable {
private boolean flag;
MyLock mylock;
public SimpleDeadLock(boolean flag) {
this.flag = flag;
}
@Override
public void run() {
// TODO Auto-generated method stub
if (flag) {
// 同步中嵌套同步
// 一个线程执行if里面的,一个线程执行else里面的
synchronized (mylock.locka) {//顺序死锁
System.out.println("if...locka");
synchronized (mylock.lockb) {
System.out.println("if...lockb");
}
}
} else {
synchronized (MyLock.lockb) {
System.out.println("else...lockb");
synchronized (MyLock.locka) {
System.out.println("else...locka");
}
}
}
}
public static void main(String[] args) {
new Thread(new SimpleDeadLock(true)).start();
MyLock myLock = new MyLock();
new Thread(new SimpleDeadLock(false)).start();
}
}
// 定义一个类来创建两把锁
class MyLock {
static Object locka = new Object();
static Object lockb = new Object();
}
//获取加锁对象的顺序不相同,上一个加锁的对象并没有解锁,发生了顺序死锁
//经典例子是银行转账的账户加锁死锁,有一种解决方案是使用下面的方法保证加锁顺序
//int fromHashValue = System.identityHashCode( from );依靠内存地址来确定加锁的顺序
//int toHashValue = System.identityHashCode( to );
2.饥饿死锁
public class ThreadDeadLock {
ExecutorService exec = Executors.newSingleThreadExecutor();//单例模式线程
RenderTask renderTask = new RenderTask();
public void start(){
exec.submit( renderTask );
}
public void work(){
Runtime.getRuntime().addShutdownHook( new Thread(){
public void run(){
try{
// LoggerService.this.stop(); //注册钩子停止日志的服务
Thread.currentThread().join();
}catch(InterruptedException e){
}
}
} );
}
class RenderTask implements Callable{
@Override
public Object call() throws Exception {
Future<String> header,footer;
header = (Future<String>) exec.submit( new LoadFileTask("header.html") );
footer = (Future<String>) exec.submit( new LoadFileTask("footer.html") );
// String page = renderBody();
return "head:"+header.get()+" ,foot"+footer.get();
}
}
class LoadFileTask implements Runnable{
private String html;
public LoadFileTask(String path){
this.html= path;
}
@Override
public void run() {
System.out.println( html );
}
}
public static void main(String args[]){
ThreadDeadLock t = new ThreadDeadLock();
}
}
//饥饿死锁,单例方式运行线程,想要获取到两次运行的结果
二、同步
1、内置锁(synchronized)
public class GuardTest implements Runnable {
@GuardedBy( "this" ) private BigInteger lastNumber;
@GuardedBy( "this" ) private BigInteger[] lastFactors;
private static int count =1;
public void run() {
System.out.println( "线程开始"+ Thread.currentThread().getName() );
synchronized (this) {
Date date = new Date();
try {
System.out.println( Thread.currentThread().getName() +"休眠" );
Thread.sleep( 1000 );
System.out.println( Thread.currentThread().getName() + date.toString()+"线程:");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String args[]){
GuardTest number1 = new GuardTest();
GuardTest number2 = new GuardTest();
new Thread( number1,"测试1" ).start();
// new Thread( number1,"测试2" ).start();
new Thread( number2,"测试3" ).start();
}
//synchronized锁住的是对象中的代码段。多个对象时并不是同意代码段,所以多对象间无效
}
2、volatile关键字
volatile关键字是比较弱的同步关键字。将变量声明为volatile。编译器将会知道这个变量是共享的,在拥有着对象件是可见的,不会被缓存或者寄存在其他处理器不可见的地方,多以volatile总是会返回最新的结果值。但是被修饰的对象在多线程质检并非总是安全的,后面的java.until.concurrent 同步器中的AQS管理状态使用这个关键字。
public class VolatileTest implements Runnable{
public volatile int count = 0;
@Override
public void run() {
for(int i=0;i<100;i++){
count++;
}
}
public static void main(String args[]) throws InterruptedException {
VolatileTest volatileTest = new VolatileTest();
for(int i=0;i<100;i++){
Thread t = new Thread( volatileTest );
t.start();
//如果加上t.join(),当前线程执行完才会执行下一线程,得的期望值
}
Thread.yield();//暂停当前正在执行的线程,并执行其他线程。
System.out.println( volatileTest.count );
}
}
被volatile修饰的变量在拥有着对象件虽然是可见的,但是在对象的修改过程中是 获取旧值 -> 修改->写入 。当个多个线程同时进行这些操作的时候,并不能保证操作的原子性。
3.ThreadLocal
ThreadLocal类很多地方叫做线程本地变量,也有些地方叫做线程本地存储。因为ThreadLocal在每个线程中对该变量会创建一个副本,即每个线程内部都会有一个该变量,且在线程内部任何地方都可以使用,线程之间互不影响,。
public class ThreadLocalTry {
ThreadLocal<Long> longLocal = new ThreadLocal<Long>();
ThreadLocal<String> stringLocal = new ThreadLocal<String>();
public void set() {
longLocal.set(Thread.currentThread().getId());
stringLocal.set(Thread.currentThread().getName());
}
public long getLong() {
return longLocal.get();
}
public String getString() {
return stringLocal.get();
}
public static void main(String[] args) throws InterruptedException {
final ThreadLocalTry ThreadLocalTry = new ThreadLocalTry();
ThreadLocalTry.set();
System.out.println(ThreadLocalTry.getLong());
System.out.println(ThreadLocalTry.getString());
Thread thread1 = new Thread(){
public void run() {
ThreadLocalTry.set();//创建的副本在线程中创建的副本
System.out.println(ThreadLocalTry.getLong());
System.out.println(ThreadLocalTry.getString());
};
};
thread1.start();
thread1.join();
System.out.println(ThreadLocalTry.getLong());
System.out.println(ThreadLocalTry.getString());
}
}
//出处:http://www.cnblogs.com/dolphin0520/
ThreadLocal更多用于数据库连接和事务的管理
public class ThreadLocalTest {
private static ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>() {
@Override
protected Connection initialValue() {//初始化
Connection conn = null;
try {
conn = DriverManager.getConnection( "jdbc:mysql://localhost:3306/weixy","root","123456" );
} catch (SQLException e) {
e.printStackTrace();
}
return conn;
}
};
public static Connection getConnection() {
return connectionHolder.get();
//进入get()方法, Thread t = Thread.currentThread();ThreadLocalMap map = getMap(t);
// ThreadLocalMap.Entry e = map.getEntry(this);//这里传入的是this,不是当前线程
}
public static void setConnection(Connection conn) {
connectionHolder.set( conn );
}
//使用Map数据结构来将thread和conn经行匹配
//synorized是以时间换取同步。效率低
//final除了类似C++中的CONST,还能确保初始化过程的安全性
//private int num;
//public construct(int num){
// this.num=num;
//}
}
监听器模式
public class PrivateLockTest {
private final Object lock = new Object();
@GuardedBy( "lock" ) private String weight;
void mrthod(){
synchronized (lock){//通过对对象私有属性的加锁,lock获取后才能访问weight
weight+="w";
}
}
}
4.不安全的发布
public class ListHelper<E> {
public List<E> list = Collections.synchronizedList( new ArrayList<E>( ) );//加锁的List??
//如果不包含就添加进列表中
public boolean putIfAbsent(E x){
synchronized(list) {
boolean absent = !list.contains( x );
if (absent) {
list.add( x );
}
return absent;
}
}
}
//list是public的发布的,属性不安全,可以被修改
三.同步容器
1.信号量
public class BoundedHashSet<T> {
private final Set<T> set;
private final Semaphore semaphore;//信号量
//构造函数
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet( new HashSet<T>( ) );
this.semaphore = new Semaphore( bound );//限制线程同时进入的数量。
}
public boolean add(T t) throws InterruptedException {
semaphore.acquire();//获取一个线程的许可,当占用多个线程时,添加参数number
boolean wasAdd = false;
try {
wasAdd = set.add( t );
return wasAdd;
}finally {
if(!wasAdd){
semaphore.release();//释放一个许可
}
}
}
public boolean remove(T t) throws InterruptedException {
boolean wasRemove =false;
try {
wasRemove = set.remove( t );
return wasRemove;
}finally {
semaphore.release();
}
}
}
使用信号量限制线程数
public class SemaphoreDemo {
private static final Semaphore semaphore=new Semaphore(3);
private static final ThreadPoolExecutor threadPool=new ThreadPoolExecutor(5,10,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
private static class InformationThread extends Thread{
private final String name;
private final int age;
public InformationThread(String name,int age)
{
this.name=name;
this.age=age;
}
public void run()
{
try
{
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+":大家好,我是"+name+"我今年"+age+"岁当前时间为:"+System.currentTimeMillis());
Thread.sleep(1000);
System.out.println(name+"要准备释放许可证了,当前时间为:"+System.currentTimeMillis());
semaphore.release();
System.out.println("当前可使用的许可数为:"+semaphore.availablePermits());
}
catch(InterruptedException e)
{
e.printStackTrace();
}
}
}
public static void main(String[] args)
{
String[] name= {"李明","王五","张杰","王强","赵二","李四","张三"};
int[] age= {26,27,33,45,19,23,41};
for(int i=0;i<7;i++)
{
Thread j=new InformationThread(name[i],age[i]);
threadPool.execute(j);
}
}
}
2.栅栏
public class CellularAutomata {
private final CyclicBarrier barrier;
private final Board mainBoard;
private final Worker[] workers;
public CellularAutomata(final Board mainBoard) {
this.mainBoard = mainBoard;
int count = Runtime.getRuntime().availableProcessors();//返回虚拟据的可用内存
this.barrier = new CyclicBarrier( count,new Runnable() {
@Override
public void run() {
mainBoard.commitNewValue();
}
} );
this.workers =new Worker[count];
for(int i=0;i<count;i++){
workers[i] = new Worker(mainBoard.getSubBoard(count,1));
}
}
}
//栅栏测试。闭锁与栅栏不同,
// 闭锁阻塞主线程直到满足条件之后,状态变为一直开锁//
// 栅栏是当满足条件后,所有线程才会开始执行
3.闭锁
public class HarnessTest {
public long timeTasks(int nThreads,final Runnable task) throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch( 1 );//当里面的数等于0的时候,开门
final CountDownLatch endGate = new CountDownLatch( nThreads );
for(int i=0;i<nThreads;i++){
Thread t = new Thread(){
public void run(){
try {
startGate.await();//主线程会阻塞,直到其他线程完成,负责初始化的闭锁。
task.run();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
endGate.countDown();//数量减一
}
}
};
t.start();
}
long start = System.nanoTime();//获取虚拟机运行时常
startGate.countDown();//主线程开启
endGate.await();//子线程阻塞
long end = System.nanoTime();
return end-start;
}
public static void main(String args[]) throws InterruptedException {
HarnessTest h = new HarnessTest();
Runnable task = new Runnable() {
@Override
public void run() {
try {
System.out.println( "线程沉睡" );
Thread.sleep( 1000 );
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
System.out.println(h.timeTasks(10, task ));
}
}
//闭锁使用的CountDownLatch不可复用,CyclicBarrier可以复用
4.生产者-消费者模式
public class ConsumTest {
private int queueSize = 10;
private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
public static void main(String[] args) {
ConsumTest test = new ConsumTest();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();
producer.start();
consumer.start();
}
class Consumer extends Thread{
@Override
public void run() {
try {
consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void consume() throws InterruptedException {
while(true){
synchronized (queue) {
while(queue.size() == 0){
try {
System.out.println("队列空,等待数据");
Thread.sleep( 1000 );
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
queue.notify();
}
}
queue.poll(); //每次移走队首元素
queue.notify();
System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");
Thread.sleep( 1000 );
}
}
}
}
class Producer extends Thread{
@Override
public void run() {
try {
produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void produce() throws InterruptedException {
while(true){
synchronized (queue) {
while(queue.size() == queueSize){
try {
System.out.println("队列满,等待有空余空间");
Thread.sleep( 1000 );
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
queue.notify();
}
}
queue.offer(1); //每次插入一个元素
queue.notify();
Thread.sleep( 1000 );
System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));
}
}
}
}
}
四.线程池框架
public interface ExecutorService extends Executor {
void shutDown();
List<Runnable> shutDownNow();
boolean isShowdown();
boolean isTerminated();
boolean awaitTermination(Long timeOut,TimeUnit unit)throws InterruptedException;
}
public class ExecutorCompleteService {
public static void main(String args[]) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool( 10 );//创建包含10个线程的线程池
CompletionService completionService = new ExecutorCompletionService( executor );
for(int i=0;i<10;i++){
final int result =i;
completionService.submit( new Callable() {
@Override
public Object call() throws Exception {
System.out.println(Thread.currentThread().getName()+"开始沉睡");
Thread.sleep( new Random( ).nextInt(3000) );
System.out.println(Thread.currentThread().getName()+"睡完");
return result;
}
} );
}
// Thread.sleep( 5000 );
executor.shutdown();
System.out.println( "最终结果为:"+completionService.take().get() );
//结果是不确定,多线程的结果提交结束不确定
}
}
类库提供了灵活的线程池和默认的配置
1.1固定大小的线程池
public class ThreadTest {
public static void main(String args[]) throws InterruptedException,ExecutionException {
System.out.println( "----程序开始运行----" );
Date date1 = new Date();
int taskSize = 5;
// 创建一个线程池
ExecutorService pool = Executors.newFixedThreadPool( taskSize );
//创建固定数量的线程池
//newCachedThreadPool 创建
//创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
// 创建多个有返回值的任务
List<Future> list = new ArrayList<Future>();
for (int i = 0; i < taskSize; i++) {
Callable c = new MyCallable( i + " " );
// 执行任务并获取Future对象
Future f = pool.submit( c );//调用对象的call方法
// System.out.println(">>>" + f.get().toString());
list.add( f );
}
}
static class MyCallable implements Callable<Object> {
private String taskNum;
MyCallable(String taskNum) {
this.taskNum = taskNum;
}
public Object call() throws Exception {
System.out.println(">>>" + taskNum + "任务启动");
Date dateTmp1 = new Date();
Thread.sleep(1000);
Date dateTmp2 = new Date();
long time = dateTmp2.getTime() - dateTmp1.getTime();
System.out.println(">>>" + taskNum + "任务终止,花费时间"+time+"ms");
return taskNum + "任务返回运行结果,当前任务时间【" + time + "毫秒】";
}
}
}
1.2缓存线程池
创建一个可以缓存的线程池,如果线程的规模超出了当前的处理需求,将回收空闲的线程。需求增加超出处理规模,则添加新的线程,理论上规模不存在限制
public class ThreadPoolTest {
public static void main(String args[]) throws InterruptedException {
ExecutorService threadPool = Executors.newCachedThreadPool();//缓存线程池
for(int i=0;i<3;i++){
TimeUnit.SECONDS.sleep( 1 );
threadPool.execute( new Runnable() {
@Override
public void run() {
System.out.println( Thread.currentThread().getName() );
}
});
}
}
}
//打印结果是 pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
newSingleThreadExecutor和newScheduledThreadPool
单例模式的日志,会依照串行的方式来执行任务
public class LoggerService {
private final ExecutorService exec = Executors.newSingleThreadExecutor();//单例模式线程池
private long timeOut = 1;
private TimeUnit unit =TimeUnit.SECONDS;
private Writer writer;
public void setWriter(Writer writer){
this.writer = writer;
}
public void start(){
}
public void shutdown() throws IOException {
try {
exec.shutdown();
exec.awaitTermination( timeOut,unit );
} catch (InterruptedException e) {
e.printStackTrace();
writer.close();
}
}
public void sleep() throws InterruptedException {
Thread.sleep( 1000 );
}
public void log(final String msg){
exec.execute( new Runnable() {
@Override
public void run() {
System.out.println( msg );//打印日志
try {
writer.write( msg );
} catch (IOException e) {
e.printStackTrace();
}
}
} );
}
public static void main(String args[]) throws IOException, InterruptedException {
Writer writer = new FileWriter( "C:\\Users\\david\\Desktop\\test.txt");
LoggerService LOGER = new LoggerService();
LOGER.setWriter( writer );
LOGER.log( "test1" );
LOGER.sleep();
LOGER.log( "test2" );
LOGER.shutdown();
}
}
public static void main(String sgrs[]) {
ScheduledExecutorService executors = Executors.newScheduledThreadPool( 5 );
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println( "Heart Beat......." );
}
};
executors.scheduleAtFixedRate( task,5,3,TimeUnit.SECONDS );
//定时完成任务,创建多线程
}
}//模拟心跳机制的线程池
2.取消与关闭
public interface Cancleable<T> extends Callable<T> {
void cancle();
RunnableFuture<T> newTask();
}关闭接口
public abstract class SocketUsingTask<T> implements Cancleable<T>{
@GuardedBy( "this" )private Socket socket;
protected synchronized void setSocket(Socket s) {
this.socket = s;
}
public synchronized void cancle(){
try {
if(socket!=null){
socket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public RunnableFuture<T> newTask(){
return new FutureTask<T>(this){
public boolean cancle( boolean mayInterruptIfRunning) {
try {
SocketUsingTask.this.cancle(); //调用SocketUsingTask的关闭套接字
}finally {
return super.cancel( mayInterruptIfRunning );
}
}
};
}
}
毒丸对象,设置毒丸的临界线,当毒丸到达一定数量的时候,中断任务
public class IndexingService {
private static final File POSITION = new File( "" );//空路劲
private final IndexerThread productor = new IndexerThread();//生产者线程
private final CrawlerThread consumer = new CrawlerThread();//消费者线程
private BlockingQueue<File> queue ;
private FileFilter fileFilter;
private File root = new File("C:\\Users\\david\\Desktop\\test.txt");
public void start(){
productor.start();
consumer.start();
}
public void stop(){
productor.stop();
}
public void awaitTermination() throws InterruptedException {
consumer.join();
}
//生产者
private class IndexerThread extends Thread{
@Override
public void run() {
try {
craw( root );//??
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
while(true){
try {
queue.put( new File("C:\\Users\\david\\Desktop\\test.txt") );
break;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public void craw(File root)throws InterruptedException{
System.out.println( "生产者线程" );//还干嘛呢
}
}
//消费者线程
private class CrawlerThread extends Thread{
@Override
public void run() {
try {
while (true) {
File file = null;
if(!queue.isEmpty()) {
file = queue.take();
}
if(file == POSITION){//生产者为空
break;
}else{
// indexFile(file);//找不到,干嘛的
}
}
}catch (InterruptedException e){
System.out.println( "xxxx" );
}
}
}
//方法内部创建exec,生命周期为程序执行完成
public boolean checkMail(Set<String> hosts,final long timeout,final TimeUnit unit) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();//缓存线程池
final AtomicBoolean hasNextMail = new AtomicBoolean( false );//原子性布尔值
try {
for (final String host : hosts)
exec.execute( new Runnable() {
@Override
public void run() {
if (checkMail( host )) {//任务处理方法
hasNextMail.set( true );
}
}
} );
}finally {
exec.shutdown();//尝试取消正在执行的任务,返回已提交的任务,但是无法分清任务是否完成
exec.awaitTermination( timeout,unit );
}
return hasNextMail.get();
}
private boolean checkMail(String host) {
// dosomething()
return true;
}
public static void main(String args[]) throws InterruptedException {
IndexingService index = new IndexingService();
index.start();
index.stop();
index.awaitTermination();
}
}
使用关键字volatile取消
public class PrimeGenerator implements Runnable{
@GuardedBy("this")private List<BigInteger> primes = new ArrayList<BigInteger>( );
private volatile boolean cancelled;
@Override
public void run() {
BigInteger p = BigInteger.ONE;
while(!cancelled){
p=p.nextProbablePrime();//产生 BigInteger 的素数
synchronized (this){
primes.add( p );
}
}
}
public void cancle(){
cancelled = true;
}
public synchronized List<BigInteger> get(){
return new ArrayList<BigInteger>( primes );
}
List<BigInteger> aSeconfOfPermer() throws InterruptedException {
PrimeGenerator generator =new PrimeGenerator();
new Thread(generator).start();
try {
TimeUnit.SECONDS.sleep( 1 );
} finally {
generator.cancle();
}
return generator.get();
}
public static void main(String args[]) throws InterruptedException {
PrimeGenerator generator = new PrimeGenerator();
List<BigInteger> list = generator.aSeconfOfPermer();
int i=0;
}
}
捕获LoggerService中的shutdown方法不追终未完成的任务,添加跟踪未完成任务的方法
public class TrackingExector extends AbstractExecutorService{
//重写方法
@Override
public void shutdown() {
}
@Override
public List<Runnable> shutdownNow() {
return null;
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void execute(Runnable command) {
}
private final ExecutorService exec ;
private final Set<Runnable> taskCancledOnshutdown = Collections.synchronizedSet( new HashSet<Runnable>( ) );//容器不能修改,但是存储的对象可改
public TrackingExector(ExecutorService exec) {//construct
this.exec = exec;
}
public List<Runnable> getCancledTask(){
if(!exec.isTerminated()){//所有任务提交后返回TRUE
throw new IllegalStateException( );
}
return new ArrayList<Runnable>( taskCancledOnshutdown );
}
/**
* 捕获线程取消时提交未完成的任务
* @param task
*/
public void exectu(final Runnable task){
exec.submit( new Runnable() {
@Override
public void run() {
try {
task.run();
}finally {
if(isShutdown()&&Thread.currentThread().interrupted()){
taskCancledOnshutdown.add( task );//关闭线程时添加提交的任务到未完成列表
}
}
}
} );
}
}
//为捕获的异常写入日志
public class UncatchExe implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t,Throwable e) {
Logger logger = Logger.getAnonymousLogger();//返回一个新的日志
logger.log( Level.SEVERE,"Thread terminated catch"+Thread.currentThread().getName(),e );
//将异常写入异常中
Thread.setDefaultUncaughtExceptionHandler( new UncatchExe());
}
public static void main(String args[]){
Thread.setDefaultUncaughtExceptionHandler( new UncatchExe() );//线程设置为捕获异常处理方式
Thread thread = new Thread( new Task() );
thread.start();
ExecutorService exec = Executors.newCachedThreadPool();
Thread t1 = new Thread( new Task() );
exec.submit( t1 );
exec.shutdown();
//submit返回额错误也认为是返回的结果,不会进入UncaughtExceptionHandler ,execute
}
}
class Task implements Runnable{
Throwable thrown;
@Override
public void run() {
try{
System.out.println( 3/1 );
System.out.println( 3/0 );
System.out.println( 3/2 );
}catch (Exception e){
thrown = e;
}finally {
dealExec( this,thrown );
}
}
public void dealExec(Runnable t,Throwable e){
System.out.println( e.getMessage() );//主动捕获异常
}
}
class UncatchExec implements Thread.UncaughtExceptionHandler{
@Override
public void uncaughtException(Thread t,Throwable e) {//未捕获异常处理 与try{}主动捕获结合
System.out.println( "EXECEPTION ="+e.getMessage() );
}
}
五.串行与并行
并行查找推箱子路径方法
public class ConcurrentPuzzleSolve<P,M> {
private final Puzzle<P,M> puzzle;
private final ExecutorService exec;
private final ConcurrentMap<P,Boolean> seen;
final ValueLatch<Node<P,M>> soulution = new ValueLatch<Node<P, M>>();
public ConcurrentPuzzleSolve(Puzzle<P, M> puzzle,ExecutorService exec,ConcurrentMap<P, Boolean> seen) {
this.puzzle = puzzle;
this.exec = exec;
this.seen = seen;
}
public List<M> solution() throws InterruptedException {
try {
P pos = puzzle.initialPosition();
exec.execute( newTask(pos,null,null) );//查找答案
List<M> ret = (List<M>) soulution.getValue();//闭锁条件,当没有结果的时候,一直等待
return ret==null?null:ret;
}finally {
exec.shutdown();
}
}
public Runnable newTask(P p,M m,Node<P,M> node){
return new SolverTask(p,m,node);
}
class SolverTask extends Node<P,M> implements Runnable{
public SolverTask(P position,M move,Node<P, M> prev) {
super( position,move,prev );
}
@Override
public void run() {
if(soulution.isSet()||seen.putIfAbsent(position, true )!=null){//这里的position是SolverTask的内部变量
return;
}
if(puzzle.isGoal( position )){
soulution.setValue( this );
}else {
for(M m : puzzle.legalMoves( position )){
exec.execute( newTask( puzzle.move( position,m ),m,prev ) );//查询所有的移动可能位置
}
}
}
}
}
串行查找路径方法
public class SequentialPuzzleSolver<P,M> {
private final Puzzle<P,M> puzzle;
private final Set<P> seen = new HashSet<P>( );
public SequentialPuzzleSolver(Puzzle<P, M> puzzle) {
this.puzzle = puzzle;
}
public List<M> solve(){
P pos = puzzle.initialPosition();
return search(new Node<P,M>(pos,null,null));
}
//返回路劲结果,递归方法
public List<M> search(Node<P,M> node){
if(!seen.contains( node.position )){
seen.add( node.position );
if(puzzle.isGoal( node.position )){
return node.asMoveList();
}
for(M move : puzzle.legalMoves( node.position )){
P pos = puzzle.move( node.position,move );//获取更新的位置
Node<P,M> child = new Node(pos,move,node);//move里面的值怎么存和加呢?是legalmoves获取所有可能的下一步移动结果
List<M> result = search( child );//查出列出所有的可能结果。。。。又是暴力破解
if(result!=null){
return result;
}
}
}
return null;
}
}
public class SemaphoreBoundExector {
private final Executor exec;
private final Semaphore semaphore;
public SemaphoreBoundExector(Executor exec , int bound){ //construct
this.exec = exec;
this.semaphore = new Semaphore( bound );//设置信号量
}
public void submitTask(final Runnable command) throws InterruptedException {
semaphore.acquire();//get a new Thread
try{
exec.execute( new Runnable() {
@Override
public void run() {
try {
command.run();
}finally {
semaphore.release();//realse a Thread
}
}
} );
}catch (RejectedExecutionException e){
semaphore.release();
}
}
}
//信号量在创建线程的时候,由信号池中获取线程,在执行完了之后在释放。生产者-消费者模式模式控制线程
//使用信号量(semaphore)来控制任务的提交速率
携带结果的闭锁
public class ValueLatch<T> {
@GuardedBy( "this" )private T value = null;
private final CountDownLatch count = new CountDownLatch( 1 );//闭锁--数量小于0时变为开锁
public boolean isSet(){
return (count.getCount()==0);
}
public synchronized void setValue(T newValue ){
if(!isSet()){
this.value = newValue;
count.countDown();//内部值小于等于0 -- 完成闭锁需要条件变为开锁,发出信号 countDown()//发出信号 await()等待线程完成--阻塞
}
}
public T getValue() throws InterruptedException {
count.await();//判断值是否已经set(),否则将一直阻塞, 即判断内部变量count是否为0
synchronized (this){
return value;
}
}
}
//携带结果的闭锁
加锁顺序死锁的一种解决方案
public class ThreadTest {
private static final Object tieLock = new Object();
private void transferMoney(final Account from,final Account to,final double money) throws Exception {
class Helper {
public void transfer() throws Exception {
if (from.getMoney() < money) {
throw new Exception( "too poor" );
} else {
from.debt( money );//+
to.crete( money );//-
}
}
}
//获取hash值
int fromHashValue = System.identityHashCode( from );
int toHashValue = System.identityHashCode( to );
//如果反过来调用,会出现加锁顺序相反,如果HASH相同,则公用一种锁(相同对象?)
if (fromHashValue < toHashValue) {
synchronized (from) {
synchronized (to) {
synchronized (tieLock) {
new Helper().transfer();
}
}
}
} else if (fromHashValue > toHashValue) {
synchronized (to) {
synchronized (from) {
synchronized (tieLock) {
new Helper().transfer();
}
}
}
} else {
synchronized (tieLock) {
synchronized (from) {
synchronized (to) {
new Helper().transfer();
}
}
}
}
}
}
六.缓存容器
基于信号量的有界缓存,单个信号量可以实现互斥锁。
public class BoundedBuffer<E> {
private final Semaphore avaliableSpaces ,avaliableItems;
@GuardedBy( "this" ) private final E[] items;
@GuardedBy( "this" ) private int putPosition = 0,takePosition = 0 ;
public BoundedBuffer(int capacity) {
avaliableItems =new Semaphore( 0 );//??初始化为0
avaliableSpaces = new Semaphore( capacity );
items = (E[]) new Object[capacity];
}
public boolean isEmpty(){
return avaliableItems.availablePermits() == 0;//Returns the current number of permits available in this semaphore.
}
public boolean isFull(){
return avaliableSpaces.availablePermits() == 0;
}
public void put(E x) throws InterruptedException {
avaliableSpaces.acquire();
doInsert(x);
avaliableItems.release();//信号量加一
}
public synchronized void doInsert(E x){
int i = putPosition;
items[i] = x;
putPosition = (++i==items.length)?0:i; // 真笨,看错了
}
public E take() throws InterruptedException {
avaliableItems.acquire();
E item = doExact();
avaliableSpaces.release();
return item ;
}
public synchronized E doExact(){
int i = takePosition;
E item = items[i];
items[i] = null;
takePosition = (++i == items.length)?0:i;
return item;
}
}
重入锁的公平锁和非公平锁(ReentrantLock)
public class FairAndNoLock implements Runnable{
private ReentrantLock fairLock = new ReentrantLock( true );//非公平锁就会执行很多次一个线程才会去执行另一个线程
/**
* 非公平锁调用,tryAcquire()不相同,非公平锁使用CAS尝试获取锁,而公平锁判断前方节点是否存在
* url:https://blog.csdn.net/weixin_39312465/article/details/83746460
* CAS: https://blog.csdn.net/v123411739/article/details/79561458
*/
@Override
public void run() {
while(true){
try {
fairLock.lock();//获取这个类的锁
System.out.println( Thread.currentThread().getName()+"获取到锁" );
Thread.sleep( 100 );
}catch (InterruptedException e) {
e.printStackTrace();
}finally {
fairLock.unlock();
}
}
}
public static void main(String args[]){
FairAndNoLock fairAndNoLock = new FairAndNoLock();
Thread t1 = new Thread( fairAndNoLock );
Thread t2 = new Thread( fairAndNoLock );
t1.start();
t2.start();
}
}
重入锁的中断异常处理
public class KillDeadLock implements Runnable {
private static ReentrantLock lock1 = new ReentrantLock();
private static ReentrantLock lock2 = new ReentrantLock();
int lock;
public KillDeadLock(int lock) {
this.lock = lock;
}
@Override
public void run() {
try {
if (lock == 1) {
try {
lock1.lockInterruptibly(); //与lock的区别是,lock interrupt也会继续获取锁
Thread.sleep( 3000 );//获取锁之后才会响应中断,lockInterruptibly优先考虑响应中断
lock2.lockInterruptibly();
} catch (InterruptedException e) {
System.out.println( "1中断" );
// e.printStackTrace();
}
} else {
try {
lock2.lockInterruptibly();
Thread.sleep( 3000 );
lock1.lockInterruptibly();
} catch (InterruptedException e) {
System.out.println( "2中断" );
// e.printStackTrace();
}
}
}finally {
if(lock1.isHeldByCurrentThread()) {//如果获取到锁或其他资源
lock1.unlock();
}
if(lock2.isHeldByCurrentThread()) {
lock2.unlock();
}
System.out.println( Thread.currentThread().getName()+"退出啦" );
}
}
public static void main(String[] args) throws InterruptedException {
KillDeadLock deadLock1 = new KillDeadLock(1);
KillDeadLock deadLock2 = new KillDeadLock(2);
Thread t1 = new Thread(deadLock1,"fir");
Thread t2 = new Thread(deadLock2,"sec");
t1.start();t2.start();
Thread.sleep(1000);
t2.interrupt(); //
}
}
读写锁
读写锁、包装MAP --处理策略类似于copyOnWrite,写入锁只能有一个,读锁可以有多个
在公平锁中,等待时间长的线程获得锁,如果该线程获取的是读锁,另一个线程请求写锁时,其他的线程都不可以获取读取锁直到写锁完释放
非公平锁,线程的顺序是不确定的,写锁可以降为读锁,读锁不可以升级为写锁。且写锁只能有一份。
public class ReadWriteLockMap<K,V> {
private final Map<K,V> map;
private final ReadWriteLock lock =new ReentrantReadWriteLock( );
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public ReadWriteLockMap(Map<K, V> map) {
this.map = map;
}
public V put(K k,V v){
readLock.lock();
try {
return map.put( k,v );
}finally {
readLock.unlock();
}
}
public V take(K k){
writeLock.lock();
try{
return map.get( k );
}finally {
writeLock.unlock();
}
}
public static void main(String args[]){
ReadWriteLockMap<String,String> readAndWriteMap = new ReadWriteLockMap<String, String>(new HashMap<String, String>());
readAndWriteMap.put( "testk1","testv1" );
readAndWriteMap.put( "testk2","testv2" );
System.out.println( readAndWriteMap.take( "testk1" ) );
}
}
锁分段
public class StripeMap {
private final static int N_LOCK = 16;
private final Object[] locks;
private final Node[] nodes;
public StripeMap(int numBuckets) {//init
locks = new Object[numBuckets];
nodes = new Node[numBuckets];
}
public static class Node{
Object value;
Node next;
Object key;
};
private final int hash(Object key){
return Math.abs( key.hashCode() % nodes.length );//hash值与节点长度的余数
}
public Object getObject(Object key){
int hash = hash( key );
synchronized (locks[hash % N_LOCK]){//加锁哈希对应的对象
for(Node n = nodes[hash];n!=null;n=n.next){
if(n.key.equals( key )){
return n.value;
}
}
return null;
}
}
public void clear(){
for(int i=0;i<locks.length;i++){
synchronized (locks[i % N_LOCK]){//只加锁操作的对象
locks[i] = null ;
}
}
}
}
//锁分段方法,减小锁之间的竞争状态,不是加锁整个对象,加锁其中的对应对象,减小锁竞争
有界缓存的生产者和消费者
public class TakeerTest {
private static final ExecutorService pool = Executors.newCachedThreadPool();//缓存线程池
private final AtomicInteger putSum = new AtomicInteger( 0 );
private final AtomicInteger takSum = new AtomicInteger( 0 );
private final BoundedBuffer<Integer> bb;
private final CyclicBarrier barrier; //栅栏,满足条件后线程才会全部启动,可以复用。。。。。 blog :https://www.cnblogs.com/dolphin0520/p/3920397.html
private final int tTrial,pTrial;
/**
* 添加时间功能
*/
private BarrierTimer timer;
class BarrierTimer implements Runnable{
private Long startTime , endTime;
private boolean started;
@Override
public void run() {
long time = System.nanoTime();
if(!started){
started = true;
startTime = time;
}else{
endTime = time;
}
}
public synchronized void clear(){
started = false;
}
public synchronized Long getTime(){
return endTime - startTime;
}
}
public TakeerTest(int capacity,int tTiral,int pTrial) {//init
bb = new BoundedBuffer<Integer>( capacity );
this.tTrial = tTiral;
this.pTrial = pTrial;
barrier = new CyclicBarrier( 2*pTrial+1,timer );
}
void test() throws BrokenBarrierException {
try {
timer.clear();
for (int i = 0; i < pTrial; i++) {
pool.execute( new Prosumer() );
pool.execute( new Consumer() );
}
barrier.await();//如何不满足通过栏栅的条件,线程阻塞在这里,其他线程满足条件会所有线程启动
barrier.await();//第二次复用,当任务执行完
long time = timer.getTime()/((long)pTrial*(long)tTrial);
}catch (InterruptedException e){
e.printStackTrace();
}
}
class Prosumer implements Runnable{
@Override
public void run() {
int seed = (int) (this.hashCode()+System.nanoTime());
int sum = 0;
try {
barrier.await();
for(int i=tTrial;i>0;i--){
bb.put( seed );
sum+=seed;
}
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable{
@Override
public void run() {
try {
barrier.await();
for(int i= pTrial; i>0 ;i--){
bb.take();
}
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
有限时间内获取锁
public class TryLockTest implements Runnable{
private ReentrantLock reentrantLock = new ReentrantLock( );
@Override
public void run() {
try {
if(reentrantLock.tryLock( 1,TimeUnit.SECONDS )) {
Thread.sleep( 2000 );
}else {
System.err.println( Thread.currentThread().getName()+"获取锁失败" );
}
}catch (InterruptedException e){
if(reentrantLock.isHeldByCurrentThread()) reentrantLock.unlock();
}
}
public static void main(String args[]) throws InterruptedException {
TryLockTest tryLockTest = new TryLockTest();
Thread t1 = new Thread( tryLockTest ,"one" );
Thread t2 = new Thread( tryLockTest ,"two");
t1.start();
t2.start();
}
}
//等待锁时间内获取锁,如果时间内没有获取到锁,就打印出线程获取锁失败
条件队列(Condition )
public class LockCondition implements Runnable {
private static ReentrantLock lock = new ReentrantLock( true );
private static Condition condition = lock.newCondition();
@Override
public void run() {
try{
lock.lock();
System.out.println( "准备开始线程" );
condition.await();
System.out.println( "开始线程咯" );
}catch (InterruptedException e){
}
}
public static void main(String args[]) throws InterruptedException {
LockCondition lockCondition = new LockCondition();
Thread t = new Thread( lockCondition );
t.start();
Thread.sleep( 1000 );
System.out.println( "一秒以后" );
lock.lock();//除了加锁,->还能获取到锁
condition.signal();//使用signal 需要先获取到锁
lock.unlock();
/**
* mutex + condition 加锁??
*/
}
}
AQS
还有很多知识点,总结的不够详细下次修改添加
public class OneShotLatch {
private final Sync sync = new Sync();
private class Sync extends AbstractQueuedSynchronizer{
@Override
protected int tryAcquireShared(int arg) {
return (getState()==1)?1:-1;//判断返回的是否为开锁状态
}
@Override
protected boolean tryReleaseShared(int arg) {
setState( 1 );
return true ;
}
}
}
CAS(CompareAndSwap)
还有很多知识点,总结的不够详细下次修改添加,
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::isMP(); //判断是否是多处理器
_asm {
mov edx, dest
mov ecx, exchange_value
mov eax, compare_value
LOCK_IF_MP(mp)//判断是否LOCK前缀
cmpxchg dword ptr [edx], ecx
}
}
参考出处:https://www.cnblogs.com/javalyy/p/8882172.html
CAS有一个特别有名的问题ABA,可以通过java中提供的AtomicStampedReference/AtomicMarkableReference来处理。