7. Callable
- 可以有返回值
- 可以抛出异常
- 方法不同,run() / call()
而runnable没有返回值,只能开辟,不能接收。
futuretask是runnable是实现类,它可以调用callable
由于mythread无法直接联系到Thread类,所以说它需要有个适配类FutureTask。而new出来的futureTask本质上就是一个runnable。
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// new Thread(new Runnable()).start();
// new Thread(new FutureTask<V>()).start();
// new Thread(new FutureTask<V>( Callable )).start();
new Thread().start(); // 怎么启动Callable
MyThread thread = new MyThread();
FutureTask futureTask = new FutureTask(thread); // 适配类
new Thread(futureTask,"A").start();
new Thread(futureTask,"B").start(); // 结果会被缓存,效率高
Integer o = (Integer) futureTask.get(); //get会的callable返回接口
//这个get 方法可能会产生阻塞!把他放到最后
// 或者使用异步通信来处理!
System.out.println(o);
}
}
class MyThread implements Callable<Integer> {
@Override
public Integer call() {
System.out.println("call()"); // 会打印几个call?一个
// 耗时的操作
return 1024;
}
}
Callable<Integer>:泛型的参数等于方法的返回值
细节:
1、有缓存
2、结果可能需要等待,会阻塞!
8. 常用的辅助类
(1) CountDownLatch(减法计数器)
一种同步辅助程序(辅助工具类),允许一个或多个线程等待在其它线程中执行的一组操作完成。使用给定的计数初始化CountDownLatch。由于调用了countDown()方法,await方法阻塞直到当前计数为零,之后释放所有等待线程,并立即返回await的任何后续调用。这是一个一次性现象——计数不能重置。如果需要重置计数的版本,请考虑使用CyclicBarrier。倒计时锁存器是一种通用的同步工具,可用于多种目的。使用计数1初始化的倒计时锁存器用作简单的开/关锁存器或门:调用倒计时()的线程打开它之前,调用它的所有线程都在门处等待。初始化为N的倒计时锁存器可用于使一个线程等待N个线程完成某个操作或某个操作已完成N次。倒计时锁存器的一个有用特性是,它不要求调用倒计时的线程在继续之前等待计数达到零,它只是防止任何线程在所有线程都可以通过之前继续通过等待。
// 计数器
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 总数是6,必须要执行任务的时候,再使用!
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <=6 ; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" Go out");
countDownLatch.countDown(); // 数量-1
},String.valueOf(i)).start();
}
countDownLatch.await(); // 等待计数器归零,然后再向下执行
System.out.println("Close Door");
}
}
latch:门闩
原理:
countDownLatch.countDown(); // 数量-1
countDownLatch.await(); // 等待计数器归零,然后再向下执行
每次有线程调用 countDown() 数量-1,假设计数器变为0,countDownLatch.await() 就会被唤醒,继续 执行!
(2) CyclicBarrier(加法计数器)
一种同步辅助程序,允许一组线程相互等待到达一个公共的屏障点。CyclicBarrier在涉及固定大小的线程方的程序中非常有用,这些线程方有时必须相互等待。这个屏障被称为循环屏障,因为它可以在等待的线程被释放后重新使用。CyclicBarrier支持可选的Runnable命令,该命令在参与方中的最后一个线程到达后,但在释放任何线程之前,每个屏障点运行一次。此屏障操作有助于在任何参与方继续之前更新共享状态。
//加法计数器
public class CyclicBarrierDemo {
public static void main(String[] args) {
/**
* 集齐7颗龙珠召唤神龙
*/
// 召唤龙珠的线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(8,()->{
System.out.println("召唤神龙成功!");
});
for (int i = 1; i <=7 ; i++) {
final int temp = i;
// lambda能操作到 i 吗
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"收集"+temp+"个龙珠");
try {
cyclicBarrier.await(); // 等待
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
(3) Semaphore(信号量,流量控制)
计数信号量。从概念上讲,信号量维护一组许可。 如果需要,每个acquire()都会阻塞,直到有许可证可用,然后获取它。 每个release()添加一个许可,可能会释放一个阻塞的收单机构。 但是,并没有使用实际的许可对象;信号量只是保持一个可用数量的计数,并相应地进行操作。信号量通常用于限制可以访问某些(物理或逻辑)资源的线程数。例如,下面是一个类,它使用信号量来控制对项池的访问
public class SemaphoreDemo {
public static void main(String[] args) {
// 线程数量:停车位! 限流!
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <=6 ; i++) {
new Thread(()->{
// acquire() 得到
try {
semaphore.acquire();//得到停车位
System.out.println(Thread.currentThread().getName()+"抢到车位");
TimeUnit.SECONDS.sleep(2);//停两秒钟
System.out.println(Thread.currentThread().getName()+"离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // release() 释放
}
},String.valueOf(i)).start();
}
}
}
原理:相当于是个通行证
semaphore.acquire();获得,假设如果已经满了,等待,等待被释放为止!
semaphore.release();释放,会将当前的信号释放+1,然后唤醒等待的线程!
作用:多个共享资源互斥的使用!并发限流,控制最大的线程数!
9. 读写锁
ReadWriteLock维护一对关联锁,一个用于只读操作,一个用于写入。只要没有写入程序,读锁可以由多个读线程同时保持。写锁是独占的。
/**
* 独占锁(写锁) 一次只能被一个线程占有
* 共享锁(读锁) 多个线程可以同时占有
* ReadWriteLock
* 读-读 可以共存!
* 读-写 不能共存!
* 写-写 不能共存!
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCacheLock myCache = new MyCacheLock();
// 写入
for (int i = 1; i <= 5 ; i++) {
final int temp = i;
new Thread(()->{
myCache.put(temp+"",temp+"");
},String.valueOf(i)).start();
}
// 读取
for (int i = 1; i <= 5 ; i++) {
final int temp = i;
new Thread(()->{
myCache.get(temp+"");
},String.valueOf(i)).start();
}
}
}
// 加锁的
class MyCacheLock{
private volatile Map<String,Object> map = new HashMap<>();
// 读写锁: 更加细粒度的控制
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private Lock lock = new ReentrantLock();
// 存,写入的时候,只希望同时只有一个线程写
public void put(String key,Object value){
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"写入"+key);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写入OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
// 取,读,所有人都可以读!
public void get(String key){
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"读取"+key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
/**
* 自定义缓存
*/
class MyCache{
private volatile Map<String,Object> map = new HashMap<>();
// 存,写
public void put(String key,Object value){
System.out.println(Thread.currentThread().getName()+"写入"+key);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写入OK");
}
// 取,读
public void get(String key){
System.out.println(Thread.currentThread().getName()+"读取"+key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取OK");
}
}
10. 阻塞队列
BlockingQueue是和list,set同级别的接口,父类是collection
一种队列,它还支持在检索元素时等待队列变为非空,在存储元素时等待队列中的空间变为可用的操作。BlockingQueue方法有四种形式,其处理操作的方式不同,不能立即满足,但可能在将来某个时候满足:一种方法抛出异常,另一种方法返回特殊值(根据操作的不同,可以为null或false),第三个在操作成功之前无限期阻塞当前线程,第四个在放弃之前仅阻塞给定的最大时间限制。
什么情况下,我们会使用阻塞队列:多线程并发处理,线程池
学会使用队列: 四组API
public class Test {
public static void main(String[] args) throws InterruptedException {
test4();
}
/**
* 抛出异常
*/
public static void test1(){
// 3是队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));//返回bool值
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
// IllegalStateException: Queue full 队列满了抛出异常!
// System.out.println(blockingQueue.add("d"));
System.out.println("=-===========");
System.out.println(blockingQueue.element()); // 查看队首元素是谁
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
// java.util.NoSuchElementException 空了抛出异常!谁先进谁先出
// System.out.println(blockingQueue.remove());
}
/**
* 有返回值,没有异常
*/
public static void test2(){
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.peek());//检测队首元素
// System.out.println(blockingQueue.offer("d")); // false 不抛出异常!
System.out.println("============================");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll()); // null 不抛出异常!
}
/**
* 等待,阻塞(一直阻塞)
*/
public static void test3() throws InterruptedException {
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
// 一直阻塞
blockingQueue.put("a");//没有返回值
blockingQueue.put("b");
blockingQueue.put("c");
// blockingQueue.put("d"); // 队列没有位置了,一直阻塞
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take()); // 没有这个元素,一直阻塞
}
/**
* 等待,阻塞(等待超时)
offer和poll重载的方法
*/
public static void test4() throws InterruptedException {
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.offer("a");
blockingQueue.offer("b");
blockingQueue.offer("c");
// blockingQueue.offer("d",2,TimeUnit.SECONDS); // 等待超过2秒就退出
System.out.println("===============");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
blockingQueue.poll(2,TimeUnit.SECONDS); // 等待超过2秒就退出
}
}
SynchronousQueue 同步队列(属于阻塞队列,父子关系):
没有容量,进去一个元素,必须等待取出来之后,才能再往里面放一个元素!最多只能放一个元素。put、take
/**
* 同步队列
* 和其他的BlockingQueue 不一样, SynchronousQueue 不存储元素
* put了一个元素,必须从里面先take取出来,否则不能在put进去值!
*/
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); // 同步队列
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+" put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+" put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2").start();
}
}