package cn.dengbin97.concurrency;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import org.apache.log4j.Logger;
import cn.dengbin97.concurrency.annoations.NotThreadSafe;
@NotThreadSafe
public class ConcurrencyTest {
//请求总数
public static int clientTotal = 5000;
//并发数
public static int threadTotal = 200;
public static int count = 0;
private static Logger logger = Logger.getLogger(ConcurrencyTest.class);
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
//信号量
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for(int i = 0; i < clientTotal; ++i) {
es.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (InterruptedException e) {
logger.error("异常");
}
countDownLatch.countDown();
}
});
}
countDownLatch.await();
es.shutdown();
logger.info(count);
}
private static void add() {
count++;
}
}
//应该输出5000,但结果却不正确
线程安全性
当多个线程访问某个类时,不管运行时环境采用何种调度方式,或者这些进程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为
原子性:原子性是指一个操作是不可中断的,要么全部执行成功要么全部执行失败,有着“同生共死”的感觉
可见性:一个线程对主内存的修改可以及时的被其他线程观察到
有序性:由于指令重排序的存在,代码执行杂乱无序,需要保证执行顺序与代码顺序一致
package cn.dengbin97.concurrency.example.count;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
import cn.dengbin97.concurrency.annoations.NotThreadSafe;
import cn.dengbin97.concurrency.annoations.ThreadSafe;
@NotThreadSafe
@ThreadSafe
public class CountExample2 {
//请求总数
public static int clientTotal = 5000;
//并发数
public static int threadTotal = 200;
//此处把原来的int换成AtomicInteger
public static AtomicInteger count = new AtomicInteger(0);
private static Logger logger = Logger.getLogger(CountExample2.class);
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
//信号量
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for(int i = 0; i < clientTotal; ++i) {
es.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (InterruptedException e) {
logger.error("异常");
}
countDownLatch.countDown();
}
});
}
countDownLatch.await();
es.shutdown();
logger.info(count);
}
private static void add() {
count.incrementAndGet();
}
}
//可以看到,在把int换成AtomicInteger 之后,便输出了正确的结果
//看下incrementAndGet()这个方法,是通过这个对count进行增加
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
//getAndAddInt()
//采用cas进行实现
//比较当前值(也叫预期值)与内存中实际值是否相同,若相同则进行修改,若不相同则说明被更改过,重新获取值
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}
LongAdder和AtomicLong
AtomicLong采用不断循环来更改值,但是若竞争十分激烈,那么可能效率就不太高
LongAdder
把他进行分散,每个线程去更新自己的cell,最后将基础值和这些cell的值进行求和,得出最后值
首先和AtomicLong一样,都会先采用cas方式更新值
在初次cas方式失败的情况下(通常证明多个线程同时想更新这个值),尝试将这个值分隔成多个cell(sum的时候求和就好),让这些竞争的线程只管更新自己所属的cell(因为在rehash之前,每个线程中存储的hashcode不会变,所以每次都应该会找到同一个cell),这样就将竞争压力分散了
AtomicReference
public class AtomicExample4 {
private static AtomicReference<Integer> count = new AtomicReference<>(0);
public static void main(String[] args) throws InterruptedException {
//当值等于0的时候赋值为2
count.compareAndSet(0, 2);
//当值等于0的时候赋值为1
count.compareAndSet(0, 1);
//当值等于1的时候赋值为3
count.compareAndSet(1, 3);
//当值等于2的时候赋值为4
count.compareAndSet(2, 4);
//当值等于3的时候赋值为5
count.compareAndSet(3, 5);
}
}
AtomicIntegerFieldUpdater
public class AtomicExample5 {
public volatile int count = 100;
//必须要求这个字段为volatile修饰,并且布恩那个为static
private static AtomicIntegerFieldUpdater<AtomicExample5> updater = AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class, "count");
private static Logger logger = Logger.getLogger(AtomicExample5.class);
public static void main(String[] args) throws InterruptedException {
AtomicExample5 example5 = new AtomicExample5();
//当设置的字段值为100时更新成120,更新成功返回true
if(updater.compareAndSet(example5, 100, 120)) {
logger.info("更新成功-1");
}
//此时值为120,就不会更新了
if(updater.compareAndSet(example5, 100, 120)) {
logger.info("更新成功-2");
}
}
}
AtomicStampedReference
使用方法和前几个类一致,主要是用来解决ABA问题
ABA问题:在进行更新时,其他线程对该值进行2次更新,改成B又改成A,这时你与预期值比较是符合的,但是实际上值已经发生了改变
这个类是采用的版本戳来进行比较,不仅比较值是否相同, 还要比较版本戳是否相同确保值未被更改
AtomicLongArray
一个安全的long型数组,可以通过下标对指定项的值进行更新
AtomicBoolean
下面代码只输出了一次,并且结果为true
保证了在多线程环境下,只被执行一次
public class AtomicExample6 {
private static AtomicBoolean isHappend = new AtomicBoolean(false);
private static Logger logger = Logger.getLogger(AtomicExample1.class);
// 请求总数
public static int clientTotal = 5000;
// 并发数
public static int threadTotal = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
//信号量
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for(int i = 0; i < clientTotal; ++i) {
es.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
test();
semaphore.release();
} catch (InterruptedException e) {
logger.error("异常");
}
countDownLatch.countDown();
}
});
}
countDownLatch.await();
es.shutdown();
logger.info(isHappend.get());
}
public static void test() {
if(isHappend.compareAndSet(false, true)) {
logger.info("execute");
}
}
}
//21:44:59.553 [pool-1-thread-1] INFO cn.dengbin97.concurrency.example.atomic.AtomicExample1 - execute
//21:44:59.553 [main] INFO cn.dengbin97.concurrency.example.atomic.AtomicExample1 - true
synchronized
synchronized不可被继承,
父类方法加上该关键字,子类必须自己加上,否则无效,因为synchronized不是方法声明一部分
public void test1() {
//修饰代码块
synchronized (this) {
for(int i = 0; i < 10; ++i) {
logger.info(i + "-Test1");
}
}
}
//修饰一个方法
public synchronized void test2() {
for(int i = 0; i < 10; ++i) {
logger.info(i + "-Test2");
}
}
// 修饰静态方法
public synchronized static void test3() {
for (int i = 0; i < 10; ++i) {
logger.info(i + "-Test1");
}
}
// 修饰一个类,其实就是把锁的对象写成类对象
public void test4() {
synchronized (SynchronizedExample2.class) {
for (int i = 0; i < 10; ++i) {
logger.info(i + "-Test2");
}
}
}
//对于文章开头的计数案例,也可以使用synchronized实现
//给添加的方法加上syncronized关键字,对类加锁即可
private synchronized static void add() {
++count;
}
jvm关于syncronized的两条规定:
1.线程解锁前,必须把共享变量的最新值刷新到主内存
2.线程加锁时,将情况工作内存中共享变量的值,从而使用共享变量时需要从主内存中重新读取最新的值(加锁与解锁是同一把锁)
volatile的可见性
1.对volatile变量进行写操作时,将本地内存中的共享变量值刷新到主内存
2.对volatile变量读操作时,从内存中读取共享变量
内存屏障(memory barrier) 是一个CPU指令。基本上,它是这样一条指令: a) 确保一些特定操作执行的顺序; b) 影响一些数据的可见性(可能是某些指令执行后的结果)。编译器和CPU可以在保证输出结果一样的情况下对指令重排序,使性能得到优化。插入一个内存屏障, 相当于告诉CPU和编译器先于这个命令的必须先执行,后于这个命令的必须后执行。内存屏障另一个作用是强制更新一次不同CPU的缓存。例如,一个写屏障会 把这个屏障前写入的数据刷新到缓存,这样任何试图读取该数据的线程将得到最新值,而不用考虑到底是被哪个cpu核心或者哪颗CPU执行的。
volatile有序性
LoadLoad屏障:对于这样的语句Load1; LoadLoad; Load2,在Load2及后续读取操作要读取的数据被访问前,保证Load1要读取的数据被读取完毕。
StoreStore屏障:对于这样的语句Store1; StoreStore; Store2,在Store2及后续写入操作执行前,保证Store1的写入操作对其它处理器可见。
LoadStore屏障:对于这样的语句Load1; LoadStore; Store2,在Store2及后续写入操作被刷出前,保证Load1要读取的数据被读取完毕。
StoreLoad屏障:对于这样的语句Store1; StoreLoad; Load2,在Load2及后续所有读取操作执行前,保证Store1的写入对所有处理器可见。它的开销是四种屏障中最大的。
在每个volatile写操作的前面插入一个StoreStore屏障。
在每个volatile写操作的后面插入一个StoreLoad屏障。
在每个volatile读操作的后面插入一个LoadLoad屏障。
在每个volatile读操作的后面插入一个LoadStore屏障。
volatile不能保证原子性
例如上面的例子,他的操作分为3部
1.取值
2.加1
3.写回主存
2个线程同时取值,获取的值一样,最后赋值,就损失了一次加1操作
发布对象与对象逸出
发布对象:使一个对象能够被当前范围之外的代码所使用、
public class UnsafePublish {
//此处是states被发布,通过共有get方法,使外部能够访问并使用
private int[] states = {1,2,3,4,5};
public int[] getStates() {
return states;
}
public static void main(String[] args) {
UnsafePublish up = new UnsafePublish();
int[] states2 = up.getStates();
}
}
对象逸出:一种错误的发布,不“希望”发布的对象却被发布了,
例如当一个对象还没有构造完成时,就使他被其他线程可见
public class Escape {
private int tihsCanBeEscape = 0;
public Escape() {
new InnerClass();
}
private class InnerClass{
public InnerClass() {
//此处的this则逸出,因为此时对象不一定初始化完毕
System.out.println(Escape.this.tihsCanBeEscape);
}
}
}
不可变对象
对象创建以后其状态就不能修改
对象所有域都是final类型
对象是正确创建的(在对象创建期间,this引用没有逸出)
final
修饰类:不能被继承
修饰方法:1.锁定方法 不能被继承类修改;2.效率
修饰变量:基本数据类型的值无法更改,引用数据类型无法指向另一个对象
public class ImmutableExample1 {
private final static Integer a = 1;
private final static String b = "2";
private final static Map<Integer, Integer> map = new HashMap<>();
static {
map.put(1, 1);
map.put(2, 2);
map.put(3, 3);
}
public static void main(String[] args) {
//下面三行都错误的,无法通过编译
a = 2;
b = "3";
map = new HashMap<>();
//但是下面这行不受影响
map.put(4, 4);
}
public void test(final int a) {
//这一行也是错误的
a = 1;
}
}
将list等集合变成不可变
List<Integer> list = new ArrayList<>();
//通过collections工具类进行操作,更改后的list无法进行插入等操作
List<Integer> newList = Collections.unmodifiableList(list);
newList.add(1);
//抛出异常
Exception in thread "main" java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at cn.dengbin97.concurrency.example.immutable.ImmutableExample1.main(ImmutableExample1.java:27)
//另外一种,直接使用这个包下的类
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
public class ImmutableExample2 {
private final static ImmutableList list = ImmutableList.of(1, 2,3,4,5);
private final static ImmutableSet set = ImmutableSet.copyOf(list);
//map中第一个值为key,第二个为value,以此类推
private final static ImmutableMap<Integer, Integer> map = ImmutableMap.of(1,1,2,2);
public static void main(String[] args) {
//下面三行都会抛出异常
list.add(1);
set.add(1);
map.put(1, 1);
}
}
线程封闭
对象封装到一个线程里,只有这一个线程能看到此对象。那么这个对象就算不是线程安全的也不会出现任何安全问题。
ThreadLocal
//写过类似下面的代码吗,可以把用户放到threadlocal,在后面的controller,service,dao都可以使用
public class RequestHolder {
private final static ThreadLocal<Long> requestHolder = new ThreadLocal<>();
public static void add(Long id) {
requestHolder.set(id);
}
public static Long get() {
return requestHolder.get();
}
public static void remove() {
requestHolder.remove();
}
}
//ThreadLocal的get方法
public T get() {
//获取当前线程
Thread t = Thread.currentThread();
//通过当前线程获得对应的map
ThreadLocalMap map = getMap(t);
if (map != null) {
//通过threadlocal从之前获得的map里面获取值,这里的this是threadlocal,不是当前线程
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null)
return (T)e.value;
}
return setInitialValue();
}
CopyOnWriteArrayList
在插入,删除,赋值时会把原数组拷贝一份进行操作,操作完后把新数组赋值给list中的数组
适用读多写少的操作,因为写需要拷贝数组,耗费资源
//下面代码最终输出的list的大小为5000
public class CopyOnWriteTest {
// 请求总数
public static int clientTotal = 5000;
// 并发数
public static int threadTotal = 200;
public static List<Integer> list = new CopyOnWriteArrayList<Integer>();
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
// 信号量
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; ++i) {
es.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (InterruptedException e) {
}
countDownLatch.countDown();
}
});
}
countDownLatch.await();
es.shutdown();
System.out.println(list.size());
}
private static void add() {
list.add(1);
}
}
//CopyOnWriteArrayList的add方法
//删除方法与之类似,也拷贝数组进行操作
public boolean add(E e) {
final ReentrantLock lock = this.lock;
//加锁,避免多线程下创建多个数组
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
//获取新的数组
Object[] newElements = Arrays.copyOf(elements, len + 1);
//增加元素
newElements[len] = e;
//把新的数组赋值给list
setArray(newElements);
return true;
} finally {
//解锁
lock.unlock();
}
}