Semaphore详解及代码示例

1.什么是Semaphore?
Semaphore是JDK提供的一个同步工具,它通过维护若干个许可证来控制线程对共享资源的访问。 如果许可证剩余数量大于零时,线程则允许访问该共享资源;如果许可证剩余数量为零时,则拒绝线程访问该共享资源。 Semaphore所维护的许可证数量就是允许访问共享资源的最大线程数量。 所以,线程想要访问共享资源必须从Semaphore中获取到许可证。

2.Semaphore有哪些常用方法?
有acquire方法和release方法。 当调用acquire方法时线程就会被阻塞,直到Semaphore中可以获得到许可证为止,然后线程再获取这个许可证。 当调用release方法时将向Semaphore中添加一个许可证,如果有线程因为获取许可证被阻塞时,它将获取到许可证并被释放;如果没有获取许可证的线程, Semaphore只是记录许可证的可用数量。

3.Semaphore应用场景举例
张三、李四和王五和赵六4个人一起去饭店吃饭,不过在特殊时期洗手很重要,饭前洗手也是必须的,可是饭店只有2个洗手池,洗手池就是不能被同时使用的公共资源,这种场景就可以用到Semaphore。

4.demo示例

创建顾客类:

package onemore.study.semaphore;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;

public class Customer implements Runnable {
    private Semaphore washbasin;
    private String name;

    public Customer(Semaphore washbasin, String name) {
        this.washbasin = washbasin;
        this.name = name;
    }

    @Override
    public void run() {
        try {
            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
            Random random = new Random();

            washbasin.acquire();
            System.out.println(
            	sdf.format(new Date()) + " " + name + " 开始洗手...");
            Thread.sleep((long) (random.nextDouble() * 5000) + 2000);
            System.out.println(
            	sdf.format(new Date()) + " " + name + " 洗手完毕!");
            washbasin.release();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

然后,写一个测试类模拟一下他们洗手的过程:

package onemore.study.semaphore;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;

public class SemaphoreTester {
    public static void main(String[] args) throws InterruptedException {
        //饭店里只用两个洗手池,所以初始化许可证的总数为2。
        Semaphore washbasin = new Semaphore(2);

        List<Thread> threads = new ArrayList<>(3);
        threads.add(new Thread(new Customer(washbasin, "张三")));
        threads.add(new Thread(new Customer(washbasin, "李四")));
        threads.add(new Thread(new Customer(washbasin, "王五")));
        threads.add(new Thread(new Customer(washbasin, "赵六")));
        for (Thread thread : threads) {
            thread.start();
            Thread.sleep(50);
        }

        for (Thread thread : threads) {
            thread.join();
        }
    }
}

运行以后的结果应该是这样的:

06:51:54.416 李四 开始洗手...
06:51:54.416 张三 开始洗手...
06:51:57.251 张三 洗手完毕!
06:51:57.251 王五 开始洗手...
06:51:59.418 李四 洗手完毕!
06:51:59.418 赵六 开始洗手...
06:52:02.496 王五 洗手完毕!
06:52:06.162 赵六 洗手完毕!

可以看到,当已经有两个人在洗手的时候,其他人就被阻塞,直到有人洗手完毕才是开始洗手。

5.Semaphore的内部原理
Semaphore内部主要通过AQS(AbstractQueuedSynchronizer)实现线程的管理。Semaphore在构造时,需要传入许可证的数量,它最后传递给了AQS的state值。线程在调用acquire方法获取许可证时,如果Semaphore中许可证的数量大于0,许可证的数量就减1,线程继续运行,当线程运行结束调用release方法时释放许可证时,许可证的数量就加1。如果获取许可证时,Semaphore中许可证的数量为0,则获取失败,线程进入AQS的等待队列中,等待被其它释放许可证的线程唤醒。

6.Semaphore的公平锁与非公平锁,以及实现原理
在上面的代码中。这4个人会按照线程启动的顺序洗手吗?答案是否定的,有可能赵六比王五先洗手,原因就在于使用的是下面这个构造函数:

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

这个类不保证线程获得许可证的顺序,调用acquire方法的线程可以在一直等待的线程之前获得一个许可证。那么如何保证线程的顺序呢?可以使用Semaphore的另一个构造函数:

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

调用时传入true,就可以保证线程的执行顺序了:

Semaphore washbasin = new Semaphore(2, true);

在NonfairSync中,acquire方法核心源码是:

final int nonfairTryAcquireShared(int acquires) {
	//acquires参数默认为1,表示尝试获取1个许可证。
    for (;;) {
        int available = getState();
        //remaining是剩余的许可数数量。
        int remaining = available - acquires;
        //剩余的许可数数量小于0时,
        //当前线程进入AQS中的doAcquireSharedInterruptibly方法
        //等待可用许可证并挂起,直到被唤醒。
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

release方法核心源码是:

protected final boolean tryReleaseShared(int releases) {
	//releases参数默认为1,表示尝试释放1个许可证。
    for (;;) {
        int current = getState();
        //next是如果许可证释放成功,可用许可证的数量。
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        //如果许可证释放成功,
        //当前线程进入到AQS的doReleaseShared方法,
        //唤醒队列中等待许可的线程。
        if (compareAndSetState(current, next))
            return true;
    }
}

当一个线程A调用acquire方法时,会直接尝试获取许可证,而不管同一时刻阻塞队列中是否有线程也在等待许可证,如果恰好有线程C调用release方法释放许可证,并唤醒阻塞队列中第一个等待的线程B,此时线程A和线程B是共同竞争可用许可证,不公平性就体现在:线程A没任何等待就和线程B一起竞争许可证了。

而在FairSync中,acquire方法核心源码是:

protected int tryAcquireShared(int acquires) {
	//acquires参数默认为1,表示尝试获取1个许可证。
    for (;;) {
    	//检查阻塞队列中是否有等待的线程
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        //remaining是剩余的许可数数量。
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

和非公平策略相比,FairSync中多一个对阻塞队列是否有等待的线程的检查,如果没有,就可以参与许可证的竞争;如果有,线程直接被插入到阻塞队列尾节点并挂起,等待被唤醒。

猜你喜欢

转载自blog.csdn.net/huyaowei789/article/details/106690603