04无锁_作业

实现一个无锁的Stack

实现一个无锁的Stack,并写一段测试代码(多线程访问),证明这个Stack是线程安全的。给出程序以及运行的截图。

先实现一个不安全的Stack

提供基本的push,pop功能。push没有返回值,Stack为空时pop,返回null。

/**
 * 不安全的栈
 */
public class UnSafeStack<E> {
    Node top = null;


    public void push(E e) {
        Node n = new Node(e);
        n.next = top;
        top = n;
    }

    public E pop() {
        if(top == null) return null;
        Node<E> p = top;
        Node<E> q = top.next;
        top = q;
        return p.ele;
    }

    public String toString() {
        StringBuffer sb = new StringBuffer();
        Node<E> n = top;
        while (n != null) {
            sb.append(n.ele.toString()).append(",");
            n = n.next;
        }
        if(sb.length() > 0) sb.setLength(sb.length() - 1);
        return sb.toString();
    }

    /**
     * 栈元素封装
     */
    static class Node<E> {
        E ele;
        Node next;

        public Node(E ele) {
            this.ele = ele;
        }
    }
}

使用demo(只在主线程中串行使用):

public class Main {
    public static void main(String[] args) {
        System.out.println("一个不安全的栈:");
        UnSafeStack<Integer> unsafeStack = new UnSafeStack<>();
        for(int i = 0; i < 10; i++) {
            unsafeStack.push(i);
        }
        for(int i = 0; i < 3; i++) {
            System.out.println("pop: " + unsafeStack.pop());
        }
        System.out.println("当前栈情况:" + unsafeStack);
    }
}

验证多线程下不安全(测试代码):

public class Main2 {
    final static long PRODUCE_NUM = 10000;
    final static long CONSUME_NUM = 10000;
    static long failCount = 0;
    public static void main(String [] args) throws InterruptedException {
        UnSafeStack<Long> stack = new UnSafeStack<>();
        //LockSafeStack<Long> stack = new LockSafeStack<>();
        Runnable pTask = new Producer(stack);
        Runnable cTask = new Consumer(stack);
        Thread pThread = new Thread(pTask);
        Thread cThread = new Thread(cTask);
        pThread.start();
        cThread.start();
        pThread.join();
        cThread.join();

        long bias = PRODUCE_NUM - CONSUME_NUM + failCount - countStackSize(stack);
        System.out.println("生产,消费成功及消费失败次数,与栈中剩余元素个数偏差:" + bias);
        System.out.println("验证生产,消费成功及消费失败次数,与栈中剩余元素个数是否一致:" + (bias == 0));
        //System.out.println("当前栈情况:" + stack);
    }

    private static long countStackSize(UnSafeStack stack) {
        int count = 0;
        UnSafeStack.Node n = stack.top;
        while(n != null) {
            count ++;
            n = n.next;
        }
        return count;
    }

    static class Producer implements Runnable {
        UnSafeStack<Long> stack;

        public Producer(UnSafeStack<Long> stack) {
            this.stack = stack;
        }
        @Override
        public void run() {
            for(long i = 0; i < PRODUCE_NUM; i ++) {
                stack.push(i);
            }
        }
    }

    static class Consumer implements Runnable {
        UnSafeStack<Long> stack;
        public Consumer(UnSafeStack<Long> stack) {
            this.stack = stack;
        }
        @Override
        public void run() {
            for(long i = 0; i < CONSUME_NUM; i++ ) {
                Long ele = stack.pop();
                if(ele == null) failCount ++;
            }
        }
    }
}

运行结果:

生产,消费成功及消费失败次数,与栈中剩余元素个数偏差:97
验证生产,消费成功及消费失败次数,与栈中剩余元素个数是否一致:false

如果对栈的push,pop操作加锁(监视整个栈对象,临界资源是整个栈),则可以将这个栈变为线程安全的。

public class LockSafeStack<E> extends UnSafeStack<E> {
    @Override
    public synchronized void push(E e) {
        super.push(e);
    }

    @Override
    public synchronized E pop() {
        return super.pop();
    }

    @Override
    public synchronized String toString() {
        return super.toString();
    }
}

继续使用上面的测试,将UnSafeStack替换为LockSafeStack

public class Main2 {
    //...
    public static void main(String [] args) throws InterruptedException {
        //UnSafeStack<Long> stack = new UnSafeStack<>();
        LockSafeStack<Long> stack = new LockSafeStack<>();
        Runnable pTask = new Producer(stack);
        Runnable cTask = new Consumer(stack);
        //...
    }
    //...
}

运行结果为:

生产,消费成功及消费失败次数,与栈中剩余元素个数偏差:0
验证生产,消费成功及消费失败次数,与栈中剩余元素个数是否一致:true

看起来已经是线程安全的了。但是这种方式使用的是synchronized关键字相关的隐式锁。

如下所示几种情况,都有加锁。

//实例方法上加synchronized,锁定类对象(隐式锁)
synchronized fun() {
    //不安全操作
    //...
}

//静态方法上加synchronized,锁定类对象(隐式锁)
static synchronized fun() {
    //不安全操作
    //...
}

//代码块synchronized(obj),锁定指定的对象(隐式锁)
synchronized(obj) {
    //不安全操作
    //...
}
//代码块synchronized(Obj.class),锁定指定的类(隐式锁)
synchronized(Obj.class) {
    //不安全操作
    //...
}

//(显式锁)
//创建锁对象
ReentrantLock lock = new ReentrantLock();
//使用锁
lock.lock();
try {
    //不安全的操作
    //...
} finally {
    lock.unlock();
}

下面我们要想想怎么做一个无锁的Stack。

无锁的Stack版本V1

无锁算法基本都是CAS实现的。(compare and set)

基于前面不安全的Stack开始考虑,哪些操作是不安全的,需要特殊注意。哪些引用更新需要用CAS测试更新。

import java.util.concurrent.atomic.AtomicReference;

/**
 * 无锁栈的简单实现
 * 参考: https://towriting.com/blog/2014/08/18/common-pitfalls-in-writing-lock-free-algorithms/
 * 参考: https://www.singlestore.com/blog/common-pitfalls-in-writing-lock-free-algorithms/
 *
 * @param <E>
 */
public class LockFreeStack<E> {
    AtomicReference<Node> top = new AtomicReference<>(null);


    public void push(E e) {
        Node n = new Node(e);
        do {
            n.next = top.get();
        } while (!top.compareAndSet(n.next, n));
    }

    public E pop() {
        Node<E> p = null;
        Node<E> q = null;
        do {
            if ((p = top.get()) == null) return null;
            q = p.next; //如果是c++,p.next的获取可能会有问题,p可能已经被其他线程手动delete掉了 (但是java不会,这里还保留有引用,不会被gc)
        } while(!top.compareAndSet(p, q));
        return p.node;
    }

    /**
     * 栈元素封装
     */
    static class Node<E> {
        E node;
        Node next;

        public Node(E node) {
            this.node = node;
        }
    }
}

使用前面类似的代码试下:

public class Main3 {
    final static long PRODUCE_NUM = 1000000;
    final static long CONSUME_NUM = 1000000;
    static long failCount = 0;
    public static void main(String [] args) throws InterruptedException {
        LockFreeStack<Long> stack = new LockFreeStack<>();
        Runnable pTask = new Producer(stack);
        Runnable cTask = new Consumer(stack);
        Thread pThread = new Thread(pTask);
        Thread cThread = new Thread(cTask);
        pThread.start();
        cThread.start();
        pThread.join();
        cThread.join();

        long bias = PRODUCE_NUM - CONSUME_NUM + failCount - countStackSize(stack);
        System.out.println("生产,消费成功及消费失败次数,与栈中剩余元素个数偏差:" + bias);
        System.out.println("验证生产,消费成功及消费失败次数,与栈中剩余元素个数是否一致:" + (bias == 0));
        //System.out.println("当前栈情况:" + stack);
    }

    private static long countStackSize(LockFreeStack stack) {
        int count = 0;
        LockFreeStack.Node n = (LockFreeStack.Node) stack.top.get();
        while(n != null) {
            count ++;
            n = n.next;
        }
        return count;
    }

    static class Producer implements Runnable {
        LockFreeStack<Long> stack;

        public Producer(LockFreeStack<Long> stack) {
            this.stack = stack;
        }
        @Override
        public void run() {
            for(long i = 0; i < PRODUCE_NUM; i ++) {
                stack.push(i);
            }
        }
    }

    static class Consumer implements Runnable {
        LockFreeStack<Long> stack;
        public Consumer(LockFreeStack<Long> stack) {
            this.stack = stack;
        }
        @Override
        public void run() {
            for(long i = 0; i < CONSUME_NUM; i++ ) {
                Long ele = stack.pop();
                if(ele == null) failCount ++;
            }
        }
    }
}

暂时看起来正常,但是,是不是会有ABA问题?

想了想,LockFreeStack里AtomicReference<Node> top封装的是Node,每次push都是新建Node, 每次pop都直接丢掉前面的Node,不会有top处的Node从A改成B再改回A的情况。

参考:实现无锁算法的常见陷阱: https://towriting.com/blog/2014/08/18/common-pitfalls-in-writing-lock-free-algorithms/ 参考: https://www.singlestore.com/blog/common-pitfalls-in-writing-lock-free-algorithms/ 参考:https://www.cnblogs.com/catch/p/3164829.html 参考: https://www.cnblogs.com/catch/p/3176636.html#3315157

以上参考还没看完,但上面确实提到了些大坑。

img

评论

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注