温习线可重入锁、阻塞队列、线程池的原理

直接看代码注释,很好理解的!

  • Lock
package com.wen.java.concurrent.mylock;

/*
 * 可重入锁原理
 */
public class Lock {

    //被锁标识
    boolean isLocked = false;
    //获得当前锁的线程
    Thread lockedBy = null;
    //被锁的次数
    int lockedCount = 0;

    public synchronized void lock() throws InterruptedException {
        Thread callingThread = Thread.currentThread();
        //避免虚假唤醒,只有当当前线程获得锁或者该锁没有被获取,while才不会被执行
        while(isLocked && lockedBy!= callingThread) {
            wait();
        }
        isLocked = true;
        lockedCount++;
        lockedBy = callingThread;
    }

    public synchronized void unlock() {
        //判断可重入的次数,每锁一次,加1,每解锁一次,减1
        if(Thread.currentThread() == this.lockedBy) {
            lockedCount--;
        }
        //该锁已经没有被获取了
        if(lockedCount == 0){
            isLocked = false;
            notify();
        }
    }

}
  • BlockingQueue
package com.wen.java.concurrent.mylock;

import java.util.LinkedList;
import java.util.List;

/*
 * 阻塞队列:
 * 队列是空时,从队列中获取元素的操作会被阻塞
 * 队列是满时,往队列里添加元素的操作会被阻塞
 */
public class BlockingQueue {

    //基于链表的list
    private List queue = new LinkedList();
    //元素个数的限制
    private int limit = 10;

    public BlockingQueue(int limit) {
        this.limit = limit;
    }

    //添加元素
    public synchronized void enqueue(Object item) throws InterruptedException {

        while(this.queue.size() == this.limit) {
            wait();
        }

        if(this.queue.size() == 0) {
            notifyAll();
        }

        this.queue.add(item);
    }

    //删除元素
    public synchronized Object dequeue() throws InterruptedException {

        while(this.queue.size() == 0) {
            wait();
        }

        if(this.queue.size() == this.limit) {
            notifyAll();
        }

        return this.queue.remove(0);
    }

}
  • ThreadPool
package com.wen.java.concurrent.mylock;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;

/*
 * 线程池原理:ThreadPool暴露接口,但是具体任务的执行是由子线程PoolThread完成的
 */
public class ThreadPool {

    private BlockingQueue taskQueue = null;
    private List<PoolThread> threads = new ArrayList<PoolThread>();
    private boolean isStopped = false;

    /*//初始化的时候封装一个阻塞队列并激活所有任务
    public ThreadPool(int noOfThreads, int maxNoOfTasks) {
         taskQueue = new BlockingQueue(maxNoOfTasks);
        for(int i = 0; i < noOfThreads; i++) {
            threads.add(new PoolThread(taskQueue));
        }
        for(PoolThread thread : threads){
            thread.start();
        }
    }*/

    //防止在stop()之后再调用execute()
    public synchronized void execute(Runnable task) {
        if(this.isStopped) throw new IllegalStateException("ThreadPool is stopped");
    }

    public synchronized void stop() {
        this.isStopped = true;
        for(PoolThread thread : threads){
            thread.stop();
        }
    }

}

//执行子线程
class PoolThread extends Thread{

    private BlockingQueue<Runnable> taskQueue = null;
    private boolean isStopped = false;

    public PoolThread(BlockingQueue<Runnable> queue) {
        taskQueue = queue;
    }

    public void run() {
        while(!isStopped) {
            try{
                //从阻塞队列中取出任务执行
                Runnable runnable = taskQueue.take();
                runnable.run();
            }catch(Exception e){

            }
        }
    }

    public synchronized void toStop() {
        isStopped = true;
        //interrupt()确保阻塞在 taskQueue.dequeue() 里的 wait() 调用的线程能够跳出 wait() 调用
        this.interrupt();
    }

    public synchronized boolean isStopped() {
        return isStopped;
    }
}

猜你喜欢

转载自blog.csdn.net/change_on/article/details/79808806