特定的无锁场景问题(如无锁栈)解决:AtomicStampedReference

虽然无锁比较交换让代码看起来比直接加锁复杂一点点,但是无锁比较交换能提供更高的效率,没错,且不存在死锁问题,但是无锁操作不是所有场景下都是安全的。先用最简单直白的概括回顾下CAS,有三个参数V,E,N,只有当“预期值”E和对象V的值相等时,才把V的值更新为“更新值”V。可以看到,是否修改V的值,取决于V的值和E是否相等,如果不相等,那么程序就会重新读取V的值,直到V的值与E相等后,才做修改操作(当然也可以放弃等待),这样做看似没什么问题,但是在某些特定场景下,和实际应用中,是存在一些问题的,因为在多线程环境下,一个对象可能会被多个线程访问,修改,且修改的值是不确定的,所以无锁比较交换操作下可能出现的问题是以下这个情景。

使用AtomicReference的无锁比较交换,假设对象V的初始值为0,线程T1要对对象V做修改操作,首先取得V的值为0保存到变量tmp中后,在做修改操作V.compareAndSet(tmp, tmp+1);之前,线程T1被阻塞了(可能cpu调度或时间片问题),此时另外的线程T2和T3也对这个对象V做修改操作,T2线程把V修改为1,接着T3线程把V修改回0,最后当T1线程唤醒继续执行修改操作,发现tmp的值为0,仍然为预期值,接着就对对象V写入新值。

上述这个场景我们可以发现,T1线程做compareAndSet()方法时,拿到的tmp其实是个新值,只是这个新值与旧值相等了,所以T1线程“以为”对象V没有做过修改,符和预期值。虽然从结果上看,这样做好像没什么问题,对于T1线程来说,它的任务就是当对象V的值等于期望值tmp(也就是为0)时,把它修改为1嘛。在这样的场景下可能是没问题,例如,如果你要做累加或累减操作,这样无锁比较交换是不会出错的,哪怕过程中有其他线程做过修改,只要对象的值不符合期望值,当前线程都不会对对象的值做修改,而是一直重复读取对象V,直到与期望值相等,或是放弃,这种做法有一个问题,就是只看结果,而忽略了过程的改变。

在某些特定场景中,忽略过程是会出问题的,例如我们看一个多线程操作同一个栈的例子:

package atomicstampedreference;

import java.util.concurrent.atomic.AtomicReference;

public class Stack {
	private AtomicReference<Node> stackTop = new AtomicReference<>();
	
	//栈的存储结构
	static class Node {
		int value;
		Node next; //指向下一个结点
		
		//构造函数初始化
		public Node(int value) {
			this.value = value;
		}
	}
	
	//无锁CAS入栈
	public void push(Node newNode) {
		//无锁CAS操作,所以用一个死循环
		for(;;) {
			Node tmpTop = stackTop.get(); //首先获得栈顶结点
			newNode.next = tmpTop; //新栈顶结点的next指向旧的栈顶结点
			
			//更新新的栈顶结点
			if(stackTop.compareAndSet(tmpTop, newNode)) {
				return;
			}
		}
	}
	
	//无锁CAS出栈
	public Node pop() {
		//同样死循环
		for(;;) {
			Node tmpTop = stackTop.get(); //获得栈顶结点
			//特殊情况判断
			if(tmpTop == null) {
				System.out.println("错误!栈顶元素为空.");
				return null;
			}
			Node newTop = tmpTop.next; //保存栈中的新栈顶结点
			
			//更新栈顶结点
			if(stackTop.compareAndSet(tmpTop, newTop)) {
				tmpTop.next = null;
				return tmpTop;
			}
		}
	}
	
	//输出当前栈
	public void printStack() {
		
	}
}
package atomicstampedreference;
 
public class UnSafeStack {

	public static void main(String[] args) {
		Stack stack = new Stack(); //实例化栈
		Stack.Node node1 = new Stack.Node(1);
		Stack.Node node2= new Stack.Node(2);
		Stack.Node node3= new Stack.Node(3);
		//三个结点入栈
		stack.push(node3);
		stack.push(node2);
		stack.push(node1);
		
		Thread T1 = new Thread(new Runnable() {
			@Override
			public void run() {
				Stack.Node tmp = stack.pop();
			}
		});
		
		Thread T2 = new Thread(new Runnable() {
			@Override
			public void run() {
				Stack.Node tmp1 = stack.pop();
				Stack.Node tmp2 = stack.pop();
				stack.push(tmp1);
			}
		});
		
		T1.start();
		T2.start();
	}

}

我们往栈中压入三个结点,值分别为int型1,2,3。在线程T1中做出栈操作中,取得当前栈顶元素后,先让线程T1阻塞,模拟cpu调度。此时另一个线程启动,做两次出栈操作和一次入栈操作,那么结点1和2都被出栈,栈中只剩结点3,然后在把结点1入栈,之后当前栈变成1->3,线程完成任务。然后我们让线程T1唤醒,继续它的任务,此时线程T1执行compareAndSet()方法,虽然方法重新获取一遍tmpTop的值,但发现发现栈顶元素的value还是等于1,与期望值相等,所以把结点1出栈,之后就出问题了,compareAndSet()方法操作是更新栈顶元素,此时线程T1会把栈顶元素也就是tmpTop更新为结点2。这与线程2修改后的栈状态冲突,可能会发生意想不到的灾难。

在实际应用中,哪怕不涉及什么复杂的算法,也会出现足以让你损失巨大的问题,来看这样一个例子:假如通讯服务商做回馈用户活动,在本月为所有当前剩余流量低于50M的用户免费赠送100M的流量,每个用户本月只能赠送一次。设计这样的程序很容易,只要扫描所有用户,查询当前用户的剩余流量,低于50M就赠送100M,可是如果在多线程下,会出现这样的情况,假设在给一位剩余流量低于50M的用户赠送流量时,该用户同时也在使用流量,且使用的流量大于或等于100M,这就使得赠送流量后,用户剩余流量等于甚至低于赠送前的流量,使得赠送线程误以为这个用户没用赠送过流量,然后再次给这个用户赠送100M。表述的不太好,来看看代码是怎么回事:

package atomicstampedreference;

import java.util.concurrent.atomic.AtomicReference;

public class UnSafeAtomicReference {

	static AtomicReference<Integer> flow = new AtomicReference<Integer>();

	public static void main(String[] args) {
		flow.set(45); //初始流量为45M
		
		//模拟服务提供商赠送
		for(int i=0; i<20; i++) {
			Thread T1 = new Thread(new Runnable() {
				@Override
				public void run() {
					while(true) {
						for(;;) {
							Integer f = flow.get();
							if(f<50) {
								System.out.println("剩余流量小于50M,赠送100M....");
								if(flow.compareAndSet(f, f+100)) {
									System.out.println("流量充值成功!当前流量剩余:"+flow.get());
									break;
								}
							} else {
								//System.out.println("剩余流量大于50M,不参与赠送活动.");
								break;
							}
						}
					}	
				}
			});
			T1.start();
		}
		
		//模拟消耗流量
		for(int i=0; i<20; i++) {
			Thread T2 = new Thread(new Runnable() {
				@Override
				public void run() {
					while(true) {
						Integer f = flow.get();
						if(f>50) {
							System.out.println("剩余流量大于50M,使用流量....");
							if(flow.compareAndSet(f, f-25)) {
								System.out.println("流量使用25M,当前流量剩余:"+flow.get());
								break;
							}
						} else {
							System.out.println("剩余流量不足50M,无法使用.");
							break;
						}
						try {
							Thread.sleep(200);
						} catch(InterruptedException e) {
							e.printStackTrace();
						}
					}	
				}
			});
			T2.start();
		}
		
	}

}

初始化时假设用户剩余流量只有45M,符合赠送流量资格,然后多线程T1模拟通信服务商赠送,假如某一用户已经被赠送过100M流量了,其他线程就会因为compareAndSet()方法中的多项flow的值不等于变量f,而赠送失败,保证了每一个用户只会被充值一次。

多线程T2模拟用户使用流量,第44行,只要用户的剩余流量大于50M,就使用25M,否则低于50M就无法使用(虽然这样设计不合理- -、低于50M就无法使用25M流量,但这里为了模拟某一特定场景,就不太较真了,关键是要看多线程在赠送流量后,因为剩余流量仍低于50M而错误地继续赠送)。来看看运行结果:

可以看到,一开始线程检查到用户剩余流量低于50M,于是赠送了100M,接着用户同时不断地消耗流量,在用户剩余流量到45M后,又被重新赠送了一次,这是因为有多线程在不断扫描,且这个赠送流量线程采用的是无锁比较交换算法,只要用户的剩余流量不低于50M,就不断地重新获取用户剩余流量,直到低于50M后,再一次为这个用户赠送流量。

出现这种问题的原因在于,无锁比较交换线程中,对象V的值不等于期望值E,就会重新获取,线程只看重V的值和E是否相等,相等就做修改,如上面无锁栈中所说的,只看重结果,不看过程,假如其他线程把某一对象的值修改一次后,又修改会原来的值,那么其他线程就无法知道这个对象的值是否被修改过,因为它还是等于原来的 值。上面的模拟流量场景也是,虽然赠送线程给用户赠送了100M,可是消耗线程同时也在执行,使得赠送后用户的剩余流量还是低于50M,所以多线程赠送线程误以为该用户没有被赠送流量。

         解决的方法就是要“关注”这个过程信息,Java的atomic包中提供了AtomicStampedReference,与AtomicReference不同,它多了两个参数excpectedStamp和newStamp。这两个参数比较多人称它为版本号,也有人称它为时间戳,使用方法是这样:

package atomicstampedreference;

import java.util.concurrent.atomic.AtomicStampedReference;

public class SafeAtomicStampedReference {

	static AtomicStampedReference<Integer> flow = new AtomicStampedReference<Integer>(45, 0);
	
	public static void main(String[] args) {
		//模拟服务提供商赠送
		for(int i=0; i<1; i++) {
			final int time = flow.getStamp();
			Thread T1 = new Thread(new Runnable() {
				@Override
				public void run() {
					while(true) {
						for(;;) {
							Integer f = flow.getReference();
							 
							if(f<50) {
								//System.out.println("剩余流量小于50M,赠送100M....");
								if(flow.compareAndSet(f, f+100, time, time+10)) {
									System.out.println("流量充值成功!当前流量剩余:"+flow.getReference());
									break;
								}
							} else {
								break;
							}
						}
					}	
				}
			});
			T1.start();
		}
		
		for(int i=0; i<20; i++) {
			Thread T2 = new Thread(new Runnable() {
				@Override
				public void run() {
					while(true) {
						final int time = flow.getStamp();
						Integer f = flow.getReference();
						if(f>50) {
							System.out.println("剩余流量大于50M,使用流量....");
							if(flow.compareAndSet(f, f-25, time, time+10)) {
								System.out.println("流量使用25M,当前流量剩余:"+flow.getReference());
								break;
							}
						} else {
							System.out.println("剩余流量不足50M,无法使用.");
							break;
						}
						try {
							Thread.sleep(200);
						} catch(InterruptedException e) {
							e.printStackTrace();
						}
					}	
				}
			});
			T2.start();
		}
		
	}

}

第22行可以看到,每次做compareAndSet()方法时,要先获得这个版本号/时间戳,做比较交换操作时,除了看对象的值和期望值E外,还要看这个时间戳,如果compareAndSet()方法修改成功后,还要修改这个时间戳,这个时间戳可以用任何数来表示,你可以让它加10,也可以减10。使用AtomicStampedReference,只有当对象的值和时间戳都满足期望值时,才会修改,所以哪怕对象值被多次写入回到原值,但时间戳发生变化,就不会出现上面的问题。

完整代码已上传:

https://github.com/justinzengtm/Java-Multithreading/tree/master/Lock%20optimizations/AtomicStampedReference

发布了97 篇原创文章 · 获赞 71 · 访问量 7万+

猜你喜欢

转载自blog.csdn.net/justinzengTM/article/details/90175796