04无锁

无锁

无锁类的原理

无锁类的使用

无锁类算法详解

这一节关注资源的无锁使用。

无锁类的原理

无锁是无障碍的运行(获取版本号,进入临界区,提交前检查版本号,有更新则重新获取版本号重新操作;版本号无更新则直接提交),无锁在此基础上要求每次竞争一定有一个线程可以胜出。

CAS

image-20230309184136193

CAS compare and swap

CAS(V,E,N) 其中V表示某个变量的指针或句柄(或者基于此的一定偏移位置);E表示变量的期待值(旧值),N表示变量的新值。

CAS是一条CPU级别的原子操作,不会被打断。

如果V指定位置代表的数据,仍为期待值E,就更新为新值N。

cas抱有乐观态度,基本认为 变量没有被其他线程改变

cas算法:

循环:
{
先读;
。。。;
然后cas 比较,设值;(这一步对应一条cpu指令)
成功就跳出
}

//eg:
void getAndIncrement() {
    for(;;) {
        E = get();
        N = E + 1;
        if(compareAndSet(V, E, N)) {
            return E;
        }
    }
}

CPU指令

CAS 对应一条cpu指令,cmxchg,一个时钟周期完成。同一时钟周期内只能有一个cpu操作同一个地址,这就保证了原子性。

这条cpu指令类似完成如下操作,但是是在一个时钟周期完成。

image-20230309184708331

无锁类的使用

Java提供了一些无锁类,底层是使用比较交换指令实现的。

没有使用锁,理论上讲,如果不是手动挂起线程,基本不会被阻塞。只会失败不断重试。

一般认为,有锁的方式,是阻塞的方式。

AtomicInteger

Unsafe

AtomicReference

AtomicStampedReference

AtomicIntegerArray

AtomicIntegerFieldUpdater

java 1.5引进原子类,在java.util.concurrent.atomic包下一共提供了13个类,分为4种类型,分别是:

原子更新基本类型:AtomicLong、AtomicInteger、AtomicBoolean 原子更新数组:AtomicLongArray、AtomicIntegerArray、AtomicReferenceArray 原子更新引用:AtomicReference、AtomicMarkableReference、AtomicStampedReference 原子更新属性:AtomicLongFieldUpdater、AtomicIntegerFieldUpdater、AtomicReferenceUpdater 原子类也是java实现同步的一套解决方案。既然已经有了synchronized关键字和lock,为什么还要引入原子类呢?或者什么场景下使用原子类更好呢?

在很多时候,我们需要的仅仅是一个简单的、高效的、线程安全的递增或者递减方案,这个方案一般需要满足以下要求:

简单:操作简单,底层实现简单 高效:占用资源少,操作速度快 安全:在高并发和多线程环境下要保证数据的正确性 对于是需要简单的递增或者递减的需求场景,使用synchronized关键字和lock固然可以实现,但代码写的会略显冗余,且性能会有影响,此时用原子类更加方便。

以上类型中提供了一些无锁的方法。

AtomicInteger

compareAndSet

weakCompareAndSet

getAndIncrement

incrementAndGet

Unsafe

Unsafe提供了一些不太安全的操作。

eg:

比如根据偏移量设置值 park() 底层的CAS操作 非公开API,可能在不同版本jdk中有较大变化

image-20230310002341862

c语言struct和Java中class的比较

image-20230310000351674

获取Unsafe对象

Java魔法类:Unsafe应用解析

AtomicReference

AtomicStampedReference

CAS可能会有ABA问题:

线程1获取到某个资源的最初值A; 然后线程2可能随后将这个资源的值变为B; 然后线程3再讲这个资源的值变为A; 线程1此时使用CAS将A更新为C。

如果此过程对过程状态是敏感的,就会有问题。

解决方法是,在资源上再加一个stamp,对每个版本进行区分。

image-20230310211150303

实例:

import java.util.concurrent.atomic.AtomicStampedReference;

public class Main {
    static AtomicStampedReference<Integer> money = new AtomicStampedReference<>(19, 0);

    public static void main(String[] args) {
        System.out.println("Hello world!");

        Thread producer = new Producer();
        Thread consumer = new Consumer();
        producer.start();
        consumer.start();
    }

    static class Producer extends Thread {
        @Override
        public void run() {
//            final int timestamp = money.getStamp();
            while (true) {
                final int timestamp = money.getStamp();
                Integer curMoney = money.getReference();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
//                    throw new RuntimeException(e);
                }
                if (curMoney < 20) {
                    if(money.compareAndSet(curMoney, curMoney + 20, timestamp, timestamp + 1)) {
                        System.out.println("少于20元,充值20元");
                    } else {
                        System.out.println("充值失败");
                    }
                }
                else {
                    System.out.println("大于等于20元,不充值");
                }
            }
        }
    }

    static class Consumer extends Thread {
        @Override
        public void run() {
            while (true) {
                final int timestamp = money.getStamp();
                Integer curMoney = money.getReference();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
//                    throw new RuntimeException(e);
                }
                if (curMoney > 10) {
                    if (money.compareAndSet(curMoney, curMoney - 10, timestamp, timestamp + 1)) {
                        System.out.println("当前金额大于10元,消费10元");
                    } else {
                        System.out.println("消费失败");
                    }
                }
                else {
                    System.out.println("当前金额小于等于10元,停止消费");
                }
            }
        }
    }
}

AtomicIntegerArray

相比AtomicInteger相比,需要多传一个下标。

需要数组的length

数组中的每个元素都是线程安全的。

实例:

import java.util.concurrent.atomic.AtomicIntegerArray;

public class Main {
    static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);
    public static void main(String[] args) throws InterruptedException {
        System.out.println("Hello world!");
        Thread [] threads = new Thread[10];
        for(int i = 0; i< 10; i++ ) {
            threads[i] = new AddThread();
            threads[i].start();
        }
        for(int i = 0; i < 10 ; i++ ) {
            threads[i].join();
        }
        System.out.println(atomicIntegerArray);

    }

    static class AddThread extends Thread {
        @Override
        public void run() {
            for(int i = 0; i < 10000000; i++) {
                atomicIntegerArray.getAndIncrement(i %atomicIntegerArray.length());
            }
        }
    }
}

AtomicIntegerFieldUpdater

让普通变量也可以享受原子操作。

可能某个变量最初只是定义为了普通Integer,使用过程中突然想使用原子操作方法。

AtomicIntegerFieldUpdater.newUpdater()

incrementAndGet()

实例:为了使Candidate类实例的score可以无锁的自增,而改动又要尽可能小,可以使用AtomicIntegerFieldUpdarter

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class Main {
    static AtomicInteger scoreCopy = new AtomicInteger();
    static AtomicIntegerFieldUpdater<Candidate> scoreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");

    static Candidate stu = new Candidate();
    public static void main(String[] args) throws InterruptedException {
        System.out.println("Hello world!");

        Thread[] threads = new Thread[1000];
        for(int i = 0; i< 1000; i++) {
            threads[i] = new AddThread();
            threads[i].start();
        }
        for(int i = 0; i < 1000; i++) {
            threads[i].join();
        }
        System.out.println("score of AtomicInteger:" + scoreCopy);
        System.out.println("score of stu:" + stu.score);
    }

    static class Candidate {
        int id;
        //int score;
        volatile int score;
    }

    static class AddThread extends Thread {
        @Override
        public void run() {
            if(Math.random() > 0.4) {
                scoreCopy.incrementAndGet();
                scoreUpdater.incrementAndGet(stu);
            }
        }
    }
}

无锁的实际的应用案例

无锁的Vector的实现(注意jdk中vector是有锁的)


评论

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注