0、ReentrantReadwriteLock读写锁
- 顾名思义:读可以被多线程同时读,写的时候只能有一个线程去写。
- 独占锁(写锁) :一次只能被一个线程占有
- 共享锁(读锁):多个线程可以同时占有ReadwriteLock
- 读-读可以共存!
- 读-写不能共存!
- 写-写不能共存!
package threadMain;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @ProjectName: www-ES-Api
* @Package: threadMain
* @ClassName: ReentrantReadwriteLockTest
* @Author: 125827
* @Description: ReentrantReadwriteLock
* @Date: 1/11/2022 11:21 PM
* @Version: 1.0
*/
public class ReentrantReadwriteLockTest {
public static void main(String[] args) {
MyCacheLock cache = new MyCacheLock();
for (int i = 0; i < 15; i++) {
final int temp = i;
new Thread(()->{
cache.put(temp+"线程","线程"+temp+"值");
},String.valueOf(i)+"thread").start();
}
for (int i = 0; i < 15; i++) {
final int temp = i;
new Thread(()->{
cache.get(temp+"线程");
},String.valueOf(i)+"thread20").start();
}
}
}
/** 自定义缓存 */
class MyCacheLock {
// 原子操作
private volatile Map<String,Object> map = new ConcurrentHashMap<>();
// 读写锁:更细粒度的控制
private ReadWriteLock lock = new ReentrantReadWriteLock();
// 读
public void get(String key) {
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + ":在读");
Object o = map.get(key);
}catch (Exception e){
e.printStackTrace();
}finally{
lock.readLock().unlock();
}
}
//写
public void put(String key, Object obj) {
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+":在写");
Object put = map.put(key, obj);
System.out.println(Thread.currentThread().getName()+":写完");
}catch (Exception e){
e.printStackTrace();
}finally{
lock.writeLock().unlock();
}
}
}
/** 自定义缓存 */
class MyCache {
private volatile Map<String,Object> map = new ConcurrentHashMap<>();
//读
public void get(String key) {
System.out.println(Thread.currentThread().getName()+":在读");
Object o = map.get(key);
}
//写
public void put(String key, Object obj) {
System.out.println(Thread.currentThread().getName()+":在写");
Object put = map.put(key, obj);
System.out.println(Thread.currentThread().getName()+":写完");
}
}
1、ArrayList安全性问题:
- 并发下ArrayList是不安全的,Synchronized, 会报java.util.ConcurrentModificationException ;
- 解决方案:
-
- *1、List List = new Vector<>();
-
- *2、List List = Collections.synchronizedList(new ArrayList<>());
-
- *3、List List = new CopyOnWriteArrayList<>();
-
- CopyOnWrite 写入时复制 COw 计算机程序设计领域的一种优化策略;
-
- 多个线程调用的时候,List,读取的时候,固定的,写入(覆盖)
-
- 在写入的时候避免覆盖,造成数据问题!读写分离
-
- CopyonwriteArrayList比 vector 好在哪里? 效率高
package unsafe;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* @ProjectName: www-ES-Api
* @Package: unsafe
* @ClassName: ListTest
* @Author: 125827
* @Description: 读写锁
* @Date: 1/11/2022 4:43 PM
* @Version: 1.0
*/
public class ListTest {
public static void main(String[] args) {
/*List<String> list1 = new Vector<>();
List<Object> list2 =Collections.synchronizedList(new ArrayList<>());*/
List<Object> list = new CopyOnWriteArrayList<>();
for(int i= 0 ; i < 10 ; i++ ){
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)+"-thread").start();
System.out.println(Thread.currentThread().getName());
}
//System.exit(0);
}
}
2、HashSet安全问题
package unsafe;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* @ProjectName: www-ES-Api
* @Package: unsafe
* @ClassName: SetList
* @Author: 125827
* @Description: 集合
* 同理可证 : concurrentModificationException
* //1、Set<String> hashSet = Collections.synchronizedSet(new HashSet<>());;
* //2、CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
* @Date: 1/11/2022 7:23 PM
* @Version: 1.0
*/
public class SetList {
public static void main(String[] args) {
// Set<String> hashSet = new HashSet<>();
// Set<String> hashSet = Collections.synchronizedSet(new HashSet<>());
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
for(int i= 0 ; i < 30 ; i++ ){
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(set);
System.out.println(Thread.currentThread().getName());
},String.valueOf(i)+"-thread").start();
}
}
}
hashSet 底层是什么?
public HashSet() {
map = new HashMap<>();
}
// add set本质就是map key是无法重复的!
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
3、HashMap安全问题
package unsafe;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* @ProjectName: ES-Api
* @Package: unsafe
* @ClassName: MapTest
* @Author: 125827
* @Description: Map
* *java.util.ConcurrentModificationException
* @Date: 1/11/2022 8:39 PM
* @Version: 1.0
*/
public class MapTest {
public static void main(String[] args) {
// Map<String, String> hashMap = new HashMap<>();
// Map<String, String> map = Collections.synchronizedMap(hashMap);
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
for(int i= 0 ; i < 30 ; i++ ){
new Thread(()->{
map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
// System.out.println(Thread.currentThread().getName());
},String.valueOf(i)+"-thread").start();
}
}
}
4、Callable
类似于Runnable,它们都是为其实例可能由另一个线程执行的类设计的。然而,A Runnable不返回结果,也不能抛出被检查查的异常。
- 1、有缓存
- 2、结果可能需要等待,会阻塞!|
package unsafe;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
* @ProjectName: www-ES-Api
* @Package: unsafe
* @ClassName: CallableTest
* @Author: 125827
* @Description: callable
* @Date: 1/11/2022 9:13 PM
* @Version: 1.0
*/
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// new Thread(new MyThread()).start();
MyCallableThread myCallableThread = new MyCallableThread();
FutureTask futureTask = new FutureTask(myCallableThread);
new Thread(futureTask,"A").start();
String o = (String) futureTask.get(); //获取Callable的返回结果
System.out.println(o);
}
}
//class MyThread implements Runnable {
// @Override
// public void run() {
//
// }
//}
class MyCallableThread implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("call()");
return "12345";
}
}
5、线程辅助类
-
CountDowmLatch 减法计数器
允许一个或多个线程等待直到在其他程中执行的一组操作究成的同步辅助。 AcountDownLatch用给定的计数初始化。await方法阻塞,直到由于countDown()方法的调用而导致当前计数达到零,之后所有等待线程释放,并且任何后续的awit调用立即返回。这是一个一次性的现象、–计数无法重置。如果您需要重置计数的版本,请考虑使用CyclicBarrier 。 A CountDownLatch是一种通用的同步工具,可用于多种用途。一个CountDownLatch为一个计数的CcountDownLatch用作一个简单的开/关锁存器,或者门:所有调用await在门口等待,直到被调用countDown()的线程打开。一个CountoownLatch初始化N可以用来做一个线程等待,直到N个线程完成某项操作,或某些动作已经完成N次。countDownLatch一个有用的属性是,它不要求调用countDown时刻程等待计数到达零之前继势,它只是阻止任何线程通过await,直到所有线程可以通过。 示例用法:这是一组类,其中一组工作线程使用两个倒计时锁存器: ·第一个是启动信号,防止任何工作人员进入,直到驾驶员准备好继纳前进; ·第二个是完成信号,分许司机等到所有的工作人员完成。
package unsafe; import java.util.concurrent.CountDownLatch; /** * @ProjectName: ES-Api * @Package: unsafe * @ClassName: CountDownLatchDemo * @Author: 125827 * @Description: 减法计数器 * @Date: 1/11/2022 10:24 PM * @Version: 1.0 */ public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { CountDownLatch count = new CountDownLatch(20); for (int i = 0; i < 20; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName()+"Go out"); count.countDown(); },String.valueOf(i)).start(); } count.await(); System.out.println("close Door"+ count.getCount()); } }
-
CyclicBarrier 加法计数器
package unsafe;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* @ProjectName: www-ES-Api
* @Package: unsafe
* @ClassName: CyclicBarrier
* @Author: 125827
* @Description: CyclicBarrier
* @Date: 1/11/2022 10:38 PM
* @Version: 1.0
*/
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(8, () -> {
System.out.println("十八罗汉");
});
for (int i = 0; i < 10 ; i++) {
final int temp = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"收集:"+temp);
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
},String.valueOf(i) ).start();
}
}
}
- Semaphore信号量
public class Semaphore
一个计数信号量。在概念上,信号量维持一组许可证。如果有必要,每个acquire()都会阻塞,直到许可证可用,然后才能使用它。每个release()添加许可证,潜在地释放阻塞获取方。但是,没有使用央际的许可证对象; Semaphore只保留可用数量的计数,并和相应地执行。信号量通常用于限制线程数,而不是访问某些〈物理威逻辑〉资源。例如,这是一个使用信号量来控制对一个项目池的访问的类:
class Pool ( private static final int NAX_AVAILABLE m 100; private final semaphore available m new Semaphore(wX_AVAI
在获得项目之前,每个线程必须从信号量获取许可证,以确保某个项目可用。当线程完成该项目后,它将返回到池中,并将许可证返回到信号量,允许另一个钱程获取该项目。请注意,当调用acquire()时,不会保持同步锁定,因为这将阻止某个项目返回到池中。信号量封装了限制对池的访问所需的同步,与保持池本身一致性所需的任何同步分开。
信号量被初始化为一个,并且被使用,使得它只有至多一个允许可用,可以用作互斥锁。这通常被称为二诺制信号量,因为它只有两个状态t一个许可证可用,或零个许可证可用。当以这种方式使用时寸,二进制信号量具有属性〈与许多Lock实现不同》,“锁”可。除所耆。之外的线程释放(国为信号量没有所有权概念〉。这在某些专门的上下文中是有用的,例如死锁怏复。
package unsafe;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* @ProjectName: www-ES-Api
* @Package: unsafe
* @ClassName: SemaphoreTest
* @Author: 125827
* @Description: Semaphore
* @Date: 1/11/2022 11:04 PM
* @Version: 1.0
*/
public class SemaphoreTest {
public static void main(String[] args) {
// 资源:线程数据量
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 10; i++) {
new Thread(()->{
try {
semaphore.acquire(); //得到资源
System.out.println(Thread.currentThread().getName()+"抢占一个资源");
TimeUnit.SECONDS.sleep(2); //等一秒
System.out.println(Thread.currentThread().getName()+"释放一个资源");
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
semaphore.release(); //释放资源
}
},String.valueOf(i)+"T").start();
}
}
}
原理:
- semaphore.acquire();获得,假设如果已经满了,等待,等待被释放为止!
- semaphore.release();释放,会将当前的信号量释放+1,然后唤醒等待的线程!
- 作用:多个共享资源互斥的使用!并发限流,控制最大的线程数!