测试 ThreadPoolExecutor 中遇到的近似死锁问题

今天在做一个简单的小test时,发现了ThreadPoolExecutor的一个问题,先列出代码:主要功能是往一个链接中插入、删除数据

链表的节点:

public class BoundedNode {
	 
    public Object value;
    public BoundedNode next;
    public BoundedNode(Object x){
         value=x;
         next=null;
    }
}

增加、删除节点的操作:

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedQueue {
  	  ReentrantLock enqlock,deqlock;
	  Condition notempty,notfull;
	  AtomicInteger size;
	  BoundedNode head,tail;//队列头哨兵和尾哨兵
	  int capacity;
	
	  public BoundedQueue(int capacity) throws InterruptedException{
	      this.capacity=capacity;
	      head=new BoundedNode(null);
	      tail=head;
	      size=new AtomicInteger(0);
	      enqlock=new ReentrantLock();
	      deqlock=new ReentrantLock();
	      notfull=enqlock.newCondition();
	      notempty=deqlock.newCondition();
	  }
	
	  public void enq(Object x) throws InterruptedException{
	      boolean weakdeq=false;
	      //入队者首先获取入队锁
	      enqlock.lock();
	
	      try{
	          //判断队列是否为满,通过循环判断,结合上面的加锁,因此此方法也称为自旋//加锁,优势效率较高,缺点造成CPU消耗较大
	          while(size.get()==capacity){
	           
		          //如果队列满,则在“不满”条件上等待,直到队列不满的条件发生,等待时会//暂时释放入队锁
		          notfull.await();
		       }
	
		       //如果“不满”条件满足,则构建新的队列元素,并将新的队列元素挂接到队列//尾部
		       BoundedNode e=new BoundedNode(x);
		       tail.next=tail=e;
		       
		       System.out.println("add:"+size);
		
		       //获取元素入队前队列容量,并在获取后将入队前队列容量增加1
		       if(size.getAndIncrement()==0){
		       //如果入队前队列容量等于0,则说明有出队线程正在等待出队条件notempty
		       //发生,因此要将相关标志置为true
		           weakdeq=true;
		       }
	    }finally{
	       //入队者释放入队锁
	       enqlock.unlock();
	    }
	
	    //判断出队等待标志
	    if(weakdeq){
	        //入队线程获取出队锁
	        deqlock.lock();
	
	        try{
	        //触发出队条件,即队列“不空”条件,使等待出队的线程能够继续执行
	          notempty.signalAll();
	       }finally{
	          //入队线程释放出队锁
	          deqlock.unlock();
	       }
	    }
	}
	
	public Object deq() throws InterruptedException{
	
	    Object result=null;
	    boolean weakenq=false;
	    //出队者首先获取出队锁
	    deqlock.lock();
	    try{
	
	        //判断队列是否为空,通过循环判断,结合上面的加锁,因此此方法也称为自旋//加锁,优势效率较高,缺点造成CPU消耗较大
	        while(size.get()==0){
	            //如果队列空,则在“不空”条件上等待,直到队列不空的条件发生,等待时会//暂时释放出队锁
	            notempty.await();
	       }
	
	      //如果“不空”条件满足,则通过队列头部哨兵获取首节点,并获取队列元素值
	
	       result=head.next.value;
	       head=head.next;
	       //获取元素出队前队列容量,并在获取后将出队前队列容量减少1
	       System.out.println("delete:"+size);
	       if(size.getAndDecrement()==capacity){
	            //如果出队前队列容量等于队列限额,则说明有入队线程正在等待入队条件//notfull发生,因此要将相关标志置为true
	            weakenq=true;
	       }
	    }finally{
	        //出队者释放出队锁
	        deqlock.unlock();
	    }
	
	    //判断入队等待标志
	    if(weakenq){
	        //出队线程获取入队锁
	        enqlock.lock();
	       try{
	            //触发入队条件,即队列“不满”条件,使等待入队的线程能够继续执行
	            notfull.signalAll();
	       }finally{
	            //出队线程释放入队锁
	            enqlock.unlock();
	       }
	    }
	
	    return result;
	}
}

两个线程,一个增加、一个删除

import java.util.concurrent.Callable;

public class ThreadAdd implements Callable<String>{
	
	public BoundedQueue boundedQueue;
	
	public ThreadAdd(BoundedQueue boundedQueue){
		this.boundedQueue = boundedQueue;
	}

	@Override
	public String call() {
		try {
			boundedQueue.enq("x");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return "";
	}

}

import java.util.concurrent.Callable;

public class ThreadDelete implements Callable<String>{

	public BoundedQueue boundedQueue;
	
	public ThreadDelete(BoundedQueue boundedQueue){
		this.boundedQueue = boundedQueue;
	}

	@Override
	public String call() {
		
		try {
			boundedQueue.deq();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return "";
		
	}
}

测试案例:

package pool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class mainTest {
	
	public static void main(String[] args) throws InterruptedException{

		BoundedQueue boundedQueue = new BoundedQueue(10);
		ThreadPoolExecutor x = new ThreadPoolExecutor(1,1,1000,TimeUnit.MILLISECONDS,   
				 			new LinkedBlockingQueue<Runnable>());
		List<Future<String>> list = new ArrayList<Future<String>>();
		
		for(int i=0;i<3;i++){
			Future<String> n = x.submit(new ThreadAdd(boundedQueue));  //增加
			Future<String> m = x.submit(new ThreadDelete(boundedQueue));  //删除
			Future<String> l = x.submit(new ThreadDelete(boundedQueue));  //删除
			
			list.add(n);
			list.add(m);
			list.add(l);
		}
		
		Thread.sleep(3000);
		
		for(Future<String> future : list){
			System.out.println(future.isDone());
		}
		
		x.shutdown();
	}
}

运行结果:

add:0
delete:1
true
true
false
false
false
false
false
false
false
发现只有前面两个线程操作成功,其它的都陷入等待状态;

分析原因发现可能是线程池设置过小有关,调整如下:

ThreadPoolExecutor x = new ThreadPoolExecutor(10,10,1000,TimeUnit.MILLISECONDS,   
				 	 new LinkedBlockingQueue<Runnable>());
此时开了10个线程的线程池,按理来说足够用了,运行下:

add:0
delete:1
add:0
delete:1
add:0
delete:1
true
true
true
true
true
false
true
false
false
发现还是有三个线程未能操作成功,分析原因可知,是由于队列为空,删除操作的线程陷入了等待的状态。


总体分析原因:

1、线程池大小为1时,由于删除操作过多,导致某次删除线程A的操作挂起,等待能删的时候继续执行;A一直等某个新增线程操作后才能继续删除

     而由于线程池大小设计为1,剩余的线程都被放入队列进行等待A执行完后继续执行,但是A被挂起,一直不能执行,导致整个操作都相当于死锁了~~~

2、线程池大小为10时,删除的操作比新增的操作多,就会产生删除线程一直等待这个问题;


发布了142 篇原创文章 · 获赞 345 · 访问量 45万+

猜你喜欢

转载自blog.csdn.net/zhengchao1991/article/details/73135731