实现一个无锁的Stack
实现一个无锁的Stack,并写一段测试代码(多线程访问),证明这个Stack是线程安全的。给出程序以及运行的截图。
先实现一个不安全的Stack
提供基本的push,pop功能。push没有返回值,Stack为空时pop,返回null。
/**
* 不安全的栈
*/
public class UnSafeStack<E> {
Node top = null;
public void push(E e) {
Node n = new Node(e);
next = top;
n.
top = n;
}
public E pop() {
if(top == null) return null;
Node<E> p = top;
Node<E> q = top.next;
top = q;return p.ele;
}
public String toString() {
StringBuffer sb = new StringBuffer();
Node<E> n = top;
while (n != null) {
append(n.ele.toString()).append(",");
sb.next;
n = n.
}if(sb.length() > 0) sb.setLength(sb.length() - 1);
return sb.toString();
}
/**
* 栈元素封装
*/
static class Node<E> {
E ele;Node next;
public Node(E ele) {
this.ele = ele;
}
} }
使用demo(只在主线程中串行使用):
public class Main {
public static void main(String[] args) {
System.out.println("一个不安全的栈:");
Integer> unsafeStack = new UnSafeStack<>();
UnSafeStack<for(int i = 0; i < 10; i++) {
push(i);
unsafeStack.
}for(int i = 0; i < 3; i++) {
System.out.println("pop: " + unsafeStack.pop());
}System.out.println("当前栈情况:" + unsafeStack);
} }
验证多线程下不安全(测试代码):
public class Main2 {
final static long PRODUCE_NUM = 10000;
final static long CONSUME_NUM = 10000;
static long failCount = 0;
public static void main(String [] args) throws InterruptedException {
Long> stack = new UnSafeStack<>();
UnSafeStack<//LockSafeStack<Long> stack = new LockSafeStack<>();
Runnable pTask = new Producer(stack);
Runnable cTask = new Consumer(stack);
Thread pThread = new Thread(pTask);
Thread cThread = new Thread(cTask);
start();
pThread.start();
cThread.join();
pThread.join();
cThread.
long bias = PRODUCE_NUM - CONSUME_NUM + failCount - countStackSize(stack);
System.out.println("生产,消费成功及消费失败次数,与栈中剩余元素个数偏差:" + bias);
System.out.println("验证生产,消费成功及消费失败次数,与栈中剩余元素个数是否一致:" + (bias == 0));
//System.out.println("当前栈情况:" + stack);
}
private static long countStackSize(UnSafeStack stack) {
int count = 0;
Node n = stack.top;
UnSafeStack.while(n != null) {
count ++;next;
n = n.
}return count;
}
static class Producer implements Runnable {
Long> stack;
UnSafeStack<
public Producer(UnSafeStack<Long> stack) {
this.stack = stack;
}@Override
public void run() {
for(long i = 0; i < PRODUCE_NUM; i ++) {
push(i);
stack.
}
}
}
static class Consumer implements Runnable {
Long> stack;
UnSafeStack<public Consumer(UnSafeStack<Long> stack) {
this.stack = stack;
}@Override
public void run() {
for(long i = 0; i < CONSUME_NUM; i++ ) {
Long ele = stack.pop();
if(ele == null) failCount ++;
}
}
} }
运行结果:
生产,消费成功及消费失败次数,与栈中剩余元素个数偏差:97
验证生产,消费成功及消费失败次数,与栈中剩余元素个数是否一致:false
如果对栈的push,pop操作加锁(监视整个栈对象,临界资源是整个栈),则可以将这个栈变为线程安全的。
public class LockSafeStack<E> extends UnSafeStack<E> {
@Override
public synchronized void push(E e) {
super.push(e);
}
@Override
public synchronized E pop() {
return super.pop();
}
@Override
public synchronized String toString() {
return super.toString();
} }
继续使用上面的测试,将UnSafeStack替换为LockSafeStack
public class Main2 {
//...
public static void main(String [] args) throws InterruptedException {
//UnSafeStack<Long> stack = new UnSafeStack<>();
Long> stack = new LockSafeStack<>();
LockSafeStack<Runnable pTask = new Producer(stack);
Runnable cTask = new Consumer(stack);
//...
}//...
}
运行结果为:
0
生产,消费成功及消费失败次数,与栈中剩余元素个数偏差: 验证生产,消费成功及消费失败次数,与栈中剩余元素个数是否一致:true
看起来已经是线程安全的了。但是这种方式使用的是synchronized关键字相关的隐式锁。
如下所示几种情况,都有加锁。
//实例方法上加synchronized,锁定类对象(隐式锁)
synchronized fun() {
//不安全操作
//...
}
//静态方法上加synchronized,锁定类对象(隐式锁)
static synchronized fun() {
//不安全操作
//...
}
//代码块synchronized(obj),锁定指定的对象(隐式锁)
synchronized(obj) {
//不安全操作
//...
}//代码块synchronized(Obj.class),锁定指定的类(隐式锁)
synchronized(Obj.class) {
//不安全操作
//...
}
//(显式锁)
//创建锁对象
ReentrantLock lock = new ReentrantLock();
//使用锁
lock();
lock.try {
//不安全的操作
//...
finally {
} unlock();
lock. }
下面我们要想想怎么做一个无锁的Stack。
无锁的Stack版本V1
无锁算法基本都是CAS实现的。(compare and set)
基于前面不安全的Stack开始考虑,哪些操作是不安全的,需要特殊注意。哪些引用更新需要用CAS测试更新。
import java.util.concurrent.atomic.AtomicReference;
/**
* 无锁栈的简单实现
* 参考: https://towriting.com/blog/2014/08/18/common-pitfalls-in-writing-lock-free-algorithms/
* 参考: https://www.singlestore.com/blog/common-pitfalls-in-writing-lock-free-algorithms/
*
* @param <E>
*/
public class LockFreeStack<E> {
AtomicReference<Node> top = new AtomicReference<>(null);
public void push(E e) {
Node n = new Node(e);
do {
next = top.get();
n.while (!top.compareAndSet(n.next, n));
}
}
public E pop() {
Node<E> p = null;
Node<E> q = null;
do {
if ((p = top.get()) == null) return null;
next; //如果是c++,p.next的获取可能会有问题,p可能已经被其他线程手动delete掉了 (但是java不会,这里还保留有引用,不会被gc)
q = p.while(!top.compareAndSet(p, q));
} return p.node;
}
/**
* 栈元素封装
*/
static class Node<E> {
E node;Node next;
public Node(E node) {
this.node = node;
}
} }
使用前面类似的代码试下:
public class Main3 {
final static long PRODUCE_NUM = 1000000;
final static long CONSUME_NUM = 1000000;
static long failCount = 0;
public static void main(String [] args) throws InterruptedException {
Long> stack = new LockFreeStack<>();
LockFreeStack<Runnable pTask = new Producer(stack);
Runnable cTask = new Consumer(stack);
Thread pThread = new Thread(pTask);
Thread cThread = new Thread(cTask);
start();
pThread.start();
cThread.join();
pThread.join();
cThread.
long bias = PRODUCE_NUM - CONSUME_NUM + failCount - countStackSize(stack);
System.out.println("生产,消费成功及消费失败次数,与栈中剩余元素个数偏差:" + bias);
System.out.println("验证生产,消费成功及消费失败次数,与栈中剩余元素个数是否一致:" + (bias == 0));
//System.out.println("当前栈情况:" + stack);
}
private static long countStackSize(LockFreeStack stack) {
int count = 0;
Node n = (LockFreeStack.Node) stack.top.get();
LockFreeStack.while(n != null) {
count ++;next;
n = n.
}return count;
}
static class Producer implements Runnable {
Long> stack;
LockFreeStack<
public Producer(LockFreeStack<Long> stack) {
this.stack = stack;
}@Override
public void run() {
for(long i = 0; i < PRODUCE_NUM; i ++) {
push(i);
stack.
}
}
}
static class Consumer implements Runnable {
Long> stack;
LockFreeStack<public Consumer(LockFreeStack<Long> stack) {
this.stack = stack;
}@Override
public void run() {
for(long i = 0; i < CONSUME_NUM; i++ ) {
Long ele = stack.pop();
if(ele == null) failCount ++;
}
}
} }
暂时看起来正常,但是,是不是会有ABA问题?
想了想,LockFreeStack里AtomicReference<Node> top
封装的是Node,每次push都是新建Node, 每次pop都直接丢掉前面的Node,不会有top处的Node从A改成B再改回A的情况。
参考:实现无锁算法的常见陷阱: https://towriting.com/blog/2014/08/18/common-pitfalls-in-writing-lock-free-algorithms/ 参考: https://www.singlestore.com/blog/common-pitfalls-in-writing-lock-free-algorithms/ 参考:https://www.cnblogs.com/catch/p/3164829.html 参考: https://www.cnblogs.com/catch/p/3176636.html#3315157
以上参考还没看完,但上面确实提到了些大坑。

发表回复