Contents
分布式锁的三种实现的对比
转载来源: https://www.jianshu.com/p/c2b4aa7a12f1
锁是开发过程中十分常见的工具,在处理高并发请求的时候和订单数据的时候往往需要锁来帮助我们保证数据的安全。
场景1.前端点击太快,导致后端重复调用接口。两次调用一个接口,这样就会产生同一个请求执行了两次,而从用户的角度出发,他是因为太卡而点了两次,他的目标是执行一次请求。
场景2.对于高并发场景,我们往往需要引入分布式缓存,来加快整个系统的响应速度。但是缓存是有失效机制的,如果某一时刻缓存失效,而此时有大量的请求过来,那么所有的请求会瞬间直接打到DB上,那么这么大的并发量,DB可能是扛不住的。那么这里需要引入一个保护机制。当发生“缓存击穿”的时候加锁,从而保护DB不被拖垮。
看完了上面的场景,其实分布式锁的场景一直在我们身边。说分布式锁之前,应该先说一下java提供的锁,比较能单机解决的并发问题,没必要引入分布式的解决方案。
java提供了两种内置的锁的实现,一种是由JVM实现的synchronized和JDK提供的Lock,当你的应用是单机或者说单进程应用时,可以使用synchronized或Lock来实现锁。
但是,当你的应用涉及到多机、多进程共同完成时,例如现在的互联网架构,一般都是分布式的RPC框架来支撑,那么这样你的Server有多个,由于负载均衡的路由规则随机,相同的请求可能会打到不同的Server上进行处理,那么这时候就需要一个全局锁来实现多个线程(不同的进程)之间的同步。
实现全局的锁需要依赖一个第三方系统,此系统需要满足高可用、一致性比较强同时能应付高并发的请求。
常见的处理办法有三种:数据库、缓存、分布式协调系统。数据库和缓存是比较常用的,但是分布式协调系统是不常用的。
数据库实现分布式锁
利用DB来实现分布式锁,有两种方案。两种方案各有好坏,但是总体效果都不是很好。但是实现还是比较简单的。
- 利用主键唯一规则: 我们知道数据库是有唯一主键规则的,主键不能重复,对于重复的主键会抛出主键冲突异常。 了解JDK reentrantlock的人都知道,reentrantlock是利用了OS的CAS特性实现的锁。主要是维护一个全局的状态,每次竞争锁都会CAS修改锁的状态,修改成功之后就占用了锁,失败的加入到同步队列中,等待唤醒。 其实这和分布式锁实现方案基本是一致的,首先我们利用主键唯一规则,在争抢锁的时候向DB中写一条记录,这条记录主要包含锁的id、当前占用锁的线程名、重入的次数和创建时间等,如果插入成功表示当前线程获取到了锁,如果插入失败那么证明锁被其他人占用,等待一会儿继续争抢,直到争抢到或者超时为止。
这里我主要写了一个简单的实现:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* 利用mysql实现可重入分布式锁
*/
public class MysqlprimaryLock {
private static Connection connection;
static {
try {
Class.forName("com.mysql.jdbc.Driver");
catch (ClassNotFoundException e) {
} printStackTrace();
e.
}String url = "jdbc:mysql://10.0.0.212:3308/dbwww_lock?user=lock_admin&password=lock123";
try {
DriverManager.getConnection(url);
connection = catch (SQLException e) {
} printStackTrace();
e.
}
}
/**
* 加锁
* @param lockID
*/
public void lock(String lockID) {
acquire(lockID);
}
/**
* 获取锁
* @param lockID
* @return
*/
public boolean acquire(String lockID) {
String sql = "insert into test_lock('id','count','thName','addtime') VALUES (?,?,?,?)";
while (true) {
try {
PreparedStatement statement = connection.prepareStatement(sql);
setString(1, lockID);
statement.setInt(2, 1);
statement.setLong(1, System.currentTimeMillis());
statement.boolean ifsucess = statement.execute();//如果成功,那么就是获取到了锁
if (ifsucess)
return true;
catch (SQLException e) {
} printStackTrace();
e.
}try {
Thread.sleep(1000);
catch (InterruptedException e) {
} printStackTrace();
e.
}continue;
}
}
/**
* 超时获取锁
* @param lockID
* @param timeOuts
* @return
* @throws InterruptedException
*/
public boolean acquire(String lockID, long timeOuts) throws InterruptedException {
String sql = "insert into test_lock('id','count','thName','addtime') VALUES (?,?,?,?)";
long futureTime = System.currentTimeMillis() + timeOuts;
long ranmain = timeOuts;
long timerange = 500;
while (true) {
CountDownLatch latch = new CountDownLatch(1);
try {
PreparedStatement statement = connection.prepareStatement(sql);
setString(1, lockID);
statement.setInt(2, 1);
statement.setLong(1, System.currentTimeMillis());
statement.boolean ifsucess = statement.execute();//如果成功,那么就是获取到了锁
if (ifsucess)
return true;
catch (SQLException e) {
} printStackTrace();
e.
}await(timerange, TimeUnit.MILLISECONDS);
latch.System.currentTimeMillis();
ranmain = futureTime - if (ranmain <= 0)
break;
if (ranmain < timerange) {
timerange = ranmain;
}continue;
}return false;
}
/**
* 释放锁
* @param lockID
* @return
* @throws SQLException
*/
public boolean unlock(String lockID) throws SQLException {
String sql = "DELETE from test_lock where id = ?";
PreparedStatement statement = connection.prepareStatement(sql);
setString(1, lockID);
statement.boolean ifsucess = statement.execute();
if (ifsucess)
return true;
return false;
}
}
这里是利用主键冲突规则,加入了id’,‘count’,‘thName’,‘addtime’,count主要是为了重入计数,thName为了判断占用锁的线程,addtime是记录占用时间。上面代码没有实现重入的逻辑。
重入主要实现思路是,在每次获取锁之前去取当前锁的信息,如果锁的线程是当前线程,那么更新锁的count+1,并且执行锁之后的逻辑。如果不是当前锁,那么进行重试。释放的时候也要进行count-1,最后减到0时,删除锁标识释放锁。
优点:实现简单
缺点:没有超时保护机制,mysql存在单点,并发量大的时候请求量太大、没有线程唤醒机制,用异常去控制逻辑多少优点恶心。
对于超时保护:如果可能,可以采用定时任务去扫描超过一定阈值的锁,并删除。但是也会存在,锁住的任务执行时间很长,删除锁会导致并发问题。所以需要对超时时间有一个很好的预估。
对于单点问题:有条件可以搞一个主从,但是为了一个锁来搞一个主从是不是优点浪费?同时主从切换的时候系统不可用,这也是一个问题。
并发量大的时候请求量太大:因为这种实现方式是没有锁的唤醒机制的,不像reentrantlock在同步队列中的节点,可以通过唤醒来避免多次的循环请求。但是分布式环境数据库这种锁的实现是不能做到唤醒的。所以只能将获取锁的时间间隔调高,避免死循环给系统和DB带来的巨大压力。这样也牺牲了系统的吞吐量,因为总会有一定的间隔锁是空闲的。
用异常去控制逻辑多少优点恶心:就不说了,每次失败都抛异常…..
- 利用Mysql行锁的特性:
Mysql是有表锁、页锁和行锁的机制的,可以利用这个机制来实现锁。这里尽量使用行锁,它的吞吐量是最高的。
/**
* 超时获取锁
* @param lockID
* @param timeOuts
* @return
* @throws InterruptedException
*/
public boolean acquireByUpdate(String lockID, long timeOuts) throws InterruptedException, SQLException {
String sql = "SELECT id from test_lock where id = ? for UPDATE ";
long futureTime = System.currentTimeMillis() + timeOuts;
long ranmain = timeOuts;
long timerange = 500;
setAutoCommit(false);
connection.while (true) {
CountDownLatch latch = new CountDownLatch(1);
try {
PreparedStatement statement = connection.prepareStatement(sql);
setString(1, lockID);
statement.setInt(2, 1);
statement.setLong(1, System.currentTimeMillis());
statement.boolean ifsucess = statement.execute();//如果成功,那么就是获取到了锁
if (ifsucess)
return true;
catch (SQLException e) {
} printStackTrace();
e.
}await(timerange, TimeUnit.MILLISECONDS);
latch.System.currentTimeMillis();
ranmain = futureTime - if (ranmain <= 0)
break;
if (ranmain < timerange) {
timerange = ranmain;
}continue;
}return false;
}
/**
* 释放锁
* @param lockID
* @return
* @throws SQLException
*/
public void unlockforUpdtate(String lockID) throws SQLException {
commit();
connection.
}
利用for update加显式的行锁,这样就能利用这个行级的排他锁来实现分布式锁了,同时unlock的时候只要释放commit这个事务,就能达到释放锁的目的。
优点:实现简单
缺点:连接池爆满和事务超时的问题单点的问题,单点问题,行锁升级为表锁的问题,并发量大的时候请求量太大、没有线程唤醒机制。
连接池爆满和事务超时的问题单点的问题:利用事务进行加锁的时候,query需要占用数据库连接,在行锁的时候连接不释放,这就会导致连接池爆满。同时由于事务是有超时时间的,过了超时时间自动回滚,会导致锁的释放,这个超时时间要把控好。
对于单点问题:同上。
并发量大的时候请求量太大:同上。
行锁升级为表锁的问题:Mysql行锁默认需要走索引,如果不走索引会导致锁表,如果可以,在sql中可以强制指定索引。
缓存分布式锁
缓存实现分布式锁还是比较常见的,因为缓存比较轻量,并且缓存的响应快、吞吐高。最重要的是还有自动失效的机制来保证锁一定能释放。
缓存的分布式锁主要通过Redis实现,当然其他的缓存也是可以的。关于缓存有两种实现吧:
- 基于SetNX实现: setNX是Redis提供的一个原子操作,如果指定key存在,那么setNX失败,如果不存在会进行Set操作并返回成功。我们可以利用这个来实现一个分布式的锁,主要思路就是,set成功表示获取锁,set失败表示获取失败,失败后需要重试。
具体看下代码:
import redis.clients.jedis.Jedis;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Redis分布式锁
*/
public class RedisLockTest {
private Jedis jedisCli = new Jedis("localhost",6381);
private int expireTime = 1;
/**
* 获取锁
* @param lockID
* @return
*/
public boolean lock(String lockID){
while(true){
long returnFlag = jedisCli.setnx(lockID,"1");
if (returnFlag == 1){
System.out.println(Thread.currentThread().getName() + " get lock....");
return true;
}System.out.println(Thread.currentThread().getName() + " is trying lock....");
try {
Thread.sleep(2000);
catch (InterruptedException e) {
} printStackTrace();
e.return false;
}
}
}
/**
* 超时获取锁
* @param lockID
* @param timeOuts
* @return
*/
public boolean lock(String lockID,long timeOuts){
long current = System.currentTimeMillis();
long future = current + timeOuts;
long timeStep = 500;
CountDownLatch latch = new CountDownLatch(1);
while(future > current){
long returnFlag = jedisCli.setnx(lockID,"1");
if (returnFlag == 1){
System.out.println(Thread.currentThread().getName() + " get lock....");
expire(lockID,expireTime);
jedisCli.return true;
}System.out.println(Thread.currentThread().getName() + " is trying lock....");
try {
await(timeStep, TimeUnit.MILLISECONDS);
latch.catch (InterruptedException e) {
} printStackTrace();
e.
}
current = current + timeStep;
}return false;
}
public void unlock(String lockId){
long flag = jedisCli.del(lockId);
if (flag>0){
System.out.println(Thread.currentThread().getName() + " release lock....");
else {
}System.out.println(Thread.currentThread().getName() + " release lock fail....");
}
}
/**
* 线程工厂,命名线程
*/
public static class MyThreadFactory implements ThreadFactory{
public static AtomicInteger count = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
getAndIncrement();
count.Thread thread = new Thread(r);
setName("Thread-lock-test "+count);
thread.return thread;
}
}
public static void main(String args[]){
final String lockID = "test1";
Runnable task = () ->{
new RedisLockTest();
RedisLockTest testCli = lock(lockID);
testCli.try {
Thread.sleep(2000);
catch (InterruptedException e) {
} printStackTrace();
e.
}unlock(lockID);
testCli.
};
new MyThreadFactory();
MyThreadFactory factory = ExecutorService services = Executors.newFixedThreadPool(10);
for (int i = 0;i<3;i++)
execute(factory.newThread(task));
services.
}
}
看看结果:
-1-thread-3 is trying lock....
pool-1-thread-2 get lock....
pool-1-thread-1 is trying lock....
pool-1-thread-3 is trying lock....
pool-1-thread-2 release lock....
pool-1-thread-1 get lock....
pool-1-thread-3 is trying lock....
pool-1-thread-1 release lock....
pool-1-thread-3 get lock....
pool-1-thread-3 release lock.... pool
可以看到,几个线程很好的进行了同步。
这种方式也是有优点和缺点:
优点:实现简单,吞吐量十分客观,对于高并发情况应付自如,自带超时保护,对于网络抖动的情况也可以利用超时删除策略保证不会阻塞所有流程。
缺点:单点问题、没有线程唤醒机制、网络抖动可能会引起锁删除失败。
对单点问题:因为redis一般都是单实例使用,那么对于单点问题,可以做一个主从。当然主从切换的时候也是不可用的,因为主从同步是异步的,可能会并发问题。如果对于主从还是不能保证可靠性的话,可以上Redis集群,对于Redis集群,因为使用了类一致性Hash算法,虽然不能避免节点下线的并发问题(当前的任务没有执行完,其他任务就开始执行),但是能保证Redis是可用的。可用性的问题是出了问题之后的备选方案,如果我们系统天天都出问题还玩毛啊,对于突发情况牺牲一两个请求还是没问题的。
对于线程唤醒机制:分布式锁大多都是这样轮训获取锁的,所以控制住你的重试频率,也不会导致负载特别高的。可能就是吞吐量低点而已。
对于锁删除失败:分布式锁基本都有这个问题,可以对key设置失效时间。这个超时时间需要把控好,过大那么系统吞吐量低,很容易导致超时。如果过小那么会有并发问题,部分耗时时间比较长的任务就要遭殃了。
基于Zookeeper的分布式锁
Zookeeper是一个分布式一致性协调框架,主要可以实现选主、配置管理和分布式锁等常用功能,因为Zookeeper的写入都是顺序的,在一个节点创建之后,其他请求再次创建便会失败,同时可以对这个节点进行Watch,如果节点删除会通知其他节点抢占锁。
Zookeeper实现分布式锁虽然是比较重量级的,但实现的锁功能十分健全,由于Zookeeper本身需要维护自己的一致性,所以性能上较Redis还是有一定差距的。
Zookeeper实现分布式锁有几种形式,后面会单独的总结一下。
对比:
Mysql实现比较简单,不需要引入第三个应用,但实现多少有些重,性能不是很好。 Redis的话实现比较简单,同时性能很好,引入集群可以提高可用性。同时定期失效的机制可以解决因网络抖动锁删除失败的问题,所以我比较倾向Redis实现。 Zookeeper实现是有些重的,同时我们还需要维护Zookeeper集群,实现起来还是比较复杂的,实现不好的话还会引起“羊群效应”。如果不是原有系统就依赖Zookeeper,同时压力不大的情况下。一般不使用Zookeeper实现分布式锁。
使用分布式锁的正确姿势
转载来源: https://xie.infoq.cn/article/a3d1f29c0d71b4e06f3aae51b
1 背景
应用开发时,如果需要在同进程内的不同线程并发访问某项资源,可以使用各种互斥锁、读写锁;如果一台主机上的多个进程需要并发访问某项资源,则可以使用进程间同步的原语,例如信号量、管道、共享内存等。但如果多台主机需要同时访问某项资源,就需要使用一种在全局可见并具有互斥性的锁了。这种锁就是分布式锁,可以在分布式场景中对资源加锁,避免竞争资源引起的逻辑错误。

2 分布式锁的特性

3 分布式锁的实现方式
根据锁资源本身的安全性,我们将分布式锁分为三个大类:
- 单点读写系统
- 基于异步复制的分布式系统,例如 mysql,redis 等;
- 基于分布式一致性协议的分布式系统,例如 zookeeper,etcd,consul 等;

三种实现方式对比:
- 单点系统性能最好,可用性最差,适用于锁故障对业务影响相对可控的服务。
- 基于异步复制的分布式系统,存在数据丢失(丢锁)的风险,不够安全,往往通过 TTL 的机制承担细粒度的锁服务,该系统接入简单,适用于对时间很敏感,期望设置一个较短的有效期,执行短期任务,丢锁对业务影响相对可控的服务。
- 基于分布式一致性协议的分布式系统,通过分布式一致性协议保证数据的多副本,数据安全性高,往往通过租约(会话)的机制承担粗粒度的锁服务,该系统需要一定的门槛,适用于对安全性很敏感,希望长期持有锁,不期望发生丢锁现象的服务。
PS:redis 作为异步复制的分布式系统,为了解决数据丢失的问题,引入了 redlock 机制和 WAIT 命令,具体实现方式粘贴在附录中
4 分布式锁注意事项
- 加锁和解锁的操作要保证原子性

这种实现方式把加锁和设置过期时间的步骤分成两步,他们并不是原子操作,如果加锁成功之后程序崩溃、服务宕机等异常情况,导致没有设置过期时间,那么就会导致死锁的问题,其他线程永远都无法获取这个锁。
正确用法(解锁操作同理)

- 服务端每把锁都和唯一的会话绑定,每个会话设置唯一编号,避免误删除

- 设置合理的超时时间,通过其他的线程自动续租,为将要过期的锁延长持有时间
不续租可能导致同一个锁被多个客户端持有

在异步线程中续期,避免操作时间太长续期不及时


- 处理十分重要的数据时,引入 io fence 机制
在极端情况下,设置了合理的超时时间和续租也没有办法保证完全正确,如下图所示,Client1 获取了锁,在操作数据的时候发生了 GC,在 GC 完成时候丢失了锁的所有权,造成了数据不一致。(错误举例:系统GC导致秒杀商品超卖的例子)

需要分布式锁系统、业务系统和底层存储同时协作来完成一个完全正确的互斥访问,在存储系统引入 IO Fence 能力,如下图所示,全局锁服务提供全局自增的 token,Client1 拿到锁返回的 token 是 33,并带入存储系统,发生 GC,当 Client2 抢锁成功返回 34,带入存储系统,存储系统会拒绝 token 较小的请求,那么经过了长时间 full gc 重新恢复后的 Client 1 再次写入数据的时候,因为存储层记录的 Token 已经更新,携带 token 值为 33 的请求将被直接拒绝,从而达到了数据保护的效果。


5 分布式锁的方案选择

对于并发不高并且比较简单的场景,有什么现成的就用什么
6 如果分布式锁出现故障了,系统怎么保证不故障的
- 数据分类,根据每个业务操作的数据特点,进行分类

- 业务处理,针对不同的数据分类,选择不同的故障处理方式

目前想到的处理方式包括:数据库加锁、数据原子操作、降级为单机锁直接执行、IO Fence、直接报错不执行。
- 分布式锁出故障的地方要留有足够的日志,方便及时发现故障和事后数据的修正。
附录:
redis 的分布式锁的特殊用法:
redlock 方式
因为在 Redis 的主从架构下,主从同步是异步的,如果在 Master 节点加锁成功后,指令还没有同步到 Slave 节点,此时 Master 挂掉,Slave 被提升为 Master,新的 Master 上并没有锁的数据,其他的客户端仍然可以加锁成功。对于这种问题,Redis 作者提出了 RedLock 红锁的概念。
RedLock 的理念下需要至少 2 个 Master 节点,多个 Master 节点之间完全互相独立,彼此之间不存在主从同步和数据复制。
主要步骤如下:
- 获取当前 Unix 时间
- 按照顺序依次尝试从多个节点锁,如果获取锁的时间小于超时时间,并且超过半数的节点获取成功,那么加锁成功。这样做的目的就是为了避免某些节点已经宕机的情况下,客户端还在一直等待响应结果。举个例子,假设现在有 5 个节点,过期时间=100ms,第一个节点获取锁花费 10ms,第二个节点花费 20ms,第三个节点花费 30ms,那么最后锁的过期时间就是 100-(10+20+30),这样就是加锁成功,反之如果最后时间<0,那么加锁失败
- 如果加锁失败,那么要释放所有节点上的锁
红锁的问题在于:
- 加锁和解锁的延迟较大。
- 占用的资源过多,为了实现红锁,需要创建多个互不相关的云 Redis 实例或者自建 Redis。
- 节点崩溃重启时会导致异常情况,比如有 1~5 号五个节点,并且没有开启持久化,客户端 A 在 1,2,3 号节点加锁成功,此时 3 号节点崩溃宕机后发生重启,就丢失了加锁信息,客户端 B 在 3,4,5 号节点加锁成功。
使用 WAIT 命令
Redis 的 WAIT 命令会阻塞当前客户端,直到这条命令之前的所有写入命令都成功从 master 同步到指定数量的 replica,命令中可以设置单位为毫秒的等待超时时间。在云 Redis 版中使用 WAIT 命令提高分布式锁一致性的示例如下:
SET resource_1 random_value NX EX 5
WAIT 1 5000
使用以上代码,客户端在加锁后会等待数据成功同步到 replica 才继续进行其它操作,最大等待时间为 5000 毫秒。执行 WAIT 命令后如果返回结果是 1 则表示同步成功,无需担心数据不一致。相比红锁,这种实现方法极大地降低了成本。
需要注意的是:
- WAIT 只会阻塞发送它的客户端,不影响其它客户端。
- WAIT 返回正确的值表示设置的锁成功同步到了 replica,但如果在正常返回前发生高可用切换,数据还是可能丢失,此时 WAIT 只能用来提示同步可能失败,无法保证数据不丢失。您可以在 WAIT 返回异常值后重新加锁或者进行数据校验。
- 解锁不一定需要使用 WAIT,因为锁只要存在就能保持互斥,延迟删除不会导致逻辑问题。
两种方式对比
- 使用红锁实现成本高,优势是 Redis 节点越多则一致性越强。
- 使用 WAIT 命令最大优势是实现成本低,但是 redis 数据复制是有成本的,一个 master 节点下面无法挂很多 slave 节点,一致性不如红锁
基于 Redis 的分布式锁实现及其踩坑案例
转载来源:https://learn.lianglianglee.com/%E4%B8%93%E6%A0%8F/%E5%88%86%E5%B8%83%E5%BC%8F%E4%B8%AD%E9%97%B4%E4%BB%B6%E5%AE%9E%E8%B7%B5%E4%B9%8B%E8%B7%AF%EF%BC%88%E5%AE%8C%EF%BC%89/08%20%E5%9F%BA%E4%BA%8E%20Redis%20%E7%9A%84%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81%E5%AE%9E%E7%8E%B0%E5%8F%8A%E5%85%B6%E8%B8%A9%E5%9D%91%E6%A1%88%E4%BE%8B.md
分布式锁的实现,目前常用的方案有以下三类:
- 数据库乐观锁;
- 基于分布式缓存实现的锁服务,典型代表有 Redis 和基于 Redis 的 RedLock;
- 基于分布式一致性算法实现的锁服务,典型代表有 ZooKeeper、Chubby 和 ETCD。
基于 Redis 实现分布式锁,网上可以查到很多相关资料,我最初也借鉴了这些资料,但是,在分布式锁的实现和使用过程中意识到这些资料普遍存在问题,容易误导初学者,鉴于此,撰写了本文,希望为对分布式锁感兴趣的读者提供一篇切实可用的参考文档。
1. 分布式锁原理介绍
1.1 分布式锁基本约束条件
为了确保锁服务可用,通常,分布式锁需同时满足以下四个约束条件。
- 互斥性:在任意时刻,只有一个客户端能持有锁;
- 安全性:即不会形成死锁,当一个客户端在持有锁的期间崩溃而没有主动解锁的情况下,其持有的锁也能够被正确释放,并保证后续其它客户端能加锁;
- 可用性:就 Redis 而言,当提供锁服务的 Redis Master 节点发生宕机等不可恢复性故障时,Slave 节点能够升主并继续提供服务,支持客户端加锁和解锁;对基于分布式一致性算法实现的锁服务(如 ETCD)而言,当 Leader 节点宕机时,Follow 节点能够选举出新的 Leader 继续提供锁服务;
- 对称性:对于任意一个锁,其加锁和解锁必须是同一个客户端,即客户端 A 不能把客户端 B 加的锁给解了。
1.2 基于 Redis 实现分布式锁(以 Redis 单机模式为例)
基于 Redis 实现锁服务的思路比较简单。我们把锁数据存储在分布式环境中的一个节点,所有需要获取锁的调用方(客户端),都需访问该节点,如果锁数据(Key-Value 键值对)已经存在,则说明已经有其它客户端持有该锁,可等待其释放(Key-Value 被主动删除或者因过期而被动删除)再尝试获取锁;如果锁数据不存在,则写入锁数据(Key-Value),其中 Value 需要保证在足够长的一段时间内在所有客户端的所有获取锁的请求中都是唯一的,以便释放锁的时候进行校验;锁服务使用完毕之后,需要主动释放锁,即删除存储在 Redis 中的 Key-Value 键值对。其架构如下:
1.3 加解锁流程
根据 Redis 官方的文档,获取锁的操作流程如下。
**步骤1,向 Redis 节点发送命令,请求锁。**代码如下:
SET lock_name my_random_value NX PX 30000
下面解释下各参数的意义。
lock_name
,即锁名称,这个名称应是公开的,在分布式环境中,对于某一确定的公共资源,所有争用方(客户端)都应该知道对应锁的名字。对于 Redis 而言,lock_name
就是 Key-Value 中的 Key,具有唯一性。my_random_value
是由客户端生成的一个随机字符串,它要保证在足够长的一段时间内,且在所有客户端的所有获取锁的请求中都是唯一的,用于唯一标识锁的持有者。- NX 表示只有当
lock_name(key)
不存在的时候才能 SET 成功,从而保证只有一个客户端能获得锁,而其它客户端在锁被释放之前都无法获得锁。 - PX 30000 表示这个锁节点有一个 30 秒的自动过期时间(目的是为了防止持有锁的客户端故障后,无法主动释放锁而导致死锁,因此要求锁的持有者必须在过期时间之内执行完相关操作并释放锁)。
步骤2,如果步骤 1 的命令返回成功,则代表获取锁成功,否则获取锁失败。
对于一个拥有锁的客户端,释放锁流程如下。
(1)向 Redis 节点发送命令,获取锁对应的 Value,代码如下:
GET lock_name
(2)如果查询回来的 Value 和客户端自身的 my_random_value
一致,则可确认自己是锁的持有者,可以发起解锁操作,即主动删除对应的 Key,发送命令:
DEL lock_name
通过 Redis-cli 执行上述命令,显示如下:
100.X.X.X:6379> set lock_name my_random_value NX PX 30000
OK100.X.X.X:6379> get lock_name
"my_random_value"
100.X.X.X:6379> del lock_name
(integer) 1100.X.X.X:6379> get lock_name
(nil)
2. 基于 Redis 的分布式锁的安全性分析
2.1 预防死锁
我们看下面这个典型死锁场景。
一个客户端获取锁成功,但是在释放锁之前崩溃了,此时该客户端实际上已经失去了对公共资源的操作权,但却没有办法请求解锁(删除 Key-Value 键值对),那么,它就会一直持有这个锁,而其它客户端永远无法获得锁。
我们的解决方案是:在加锁时为锁设置过期时间,当过期时间到达,Redis 会自动删除对应的 Key-Value,从而避免死锁。需要注意的是,这个过期时间需要结合具体业务综合评估设置,以保证锁的持有者能够在过期时间之内执行完相关操作并释放锁。
2.2 设置锁自动过期时间以预防死锁存在的隐患
为了避免死锁,可利用 Redis 为锁数据(Key-Value)设置自动过期时间,虽然可以解决死锁的问题,但却存在隐患。
我们看下面这个典型场景。
- 客户端 A 获取锁成功;
- 客户端 A 在某个操作上阻塞了很长时间(对于 Java 而言,如发生 Full-GC);
- 过期时间到,锁自动释放;
- 客户端 B 获取到了对应同一个资源的锁;
- 客户端 A 从阻塞中恢复过来,认为自己依旧持有锁,继续操作同一个资源,导致互斥性失效。
这时我们可采取的解决方案见下。
- 存在隐患的方案。第 5 步中,客户端 A 恢复后,可以比较下目前已经持有锁的时间,如果发现已经过期,则放弃对共享资源的操作即可避免互斥性失效的问题。但是,客户端 A 所在节点的时间和 Redis 节点的时间很可能不一致(比如客户端与 Redis 节点不在同一台服务器,而不同服务器时间通常不完全同步),因此,严格来讲,任何依赖两个节点时间比较结果的互斥性算法,都存在隐患。目前网上很多资料都采用了这种方案,鉴于其隐患,不推荐。
- 可取的方案。既然比较时间不可取,那么,还可以比较
my_random_value
,即客户端 A 恢复后,在操作共享资源前应比较目前自身所持有锁的my_random_value
与 Redis 中存储的my_random_value
是否一致,如果不相同,说明已经不再持有锁,则放弃对共享资源的操作以避免互斥性失效的问题。
2.3 解锁操作的原子性
为了保证每次解锁操作都能正确进行,需要引入全局唯一变量 my_random_value
。具体而言,解锁需要两步,先查询(GET)锁对应的 Value,与自己加锁时设置的 my_random_value
进行对比,如果相同,则可确认这把锁是自己加的,然后再发起解锁(DEL)。需要注意的是,GET 和 DEL 是两个操作,非原子性,那么解锁本身也会存在破坏互斥性的可能。
下面是典型场景。
- 客户端 A 获取锁成功;
- 客户端 A 访问共享资源;
- 客户端 A 为了释放锁,先执行 GET 操作获取锁对应的随机字符串的值;
- 客户端 A 判断随机字符串的值,与预期的值相等;
- 客户端 A 由于某个原因阻塞了很长时间;
- 过期时间到了,锁自动释放了;
- 客户端 B 获取到了对应同一个资源的锁;
- 客户端 A 从阻塞中恢复过来,执行 DEL 操纵,释放掉了客户端 B 持有的锁。
下面给出解决方案。
如何保障解锁操作的原子性呢?在实践中,我总结出两种方案。
1. 使用 Redis 事务功能,使用 Watch 命令监控锁对应的 Key,释放锁则采用事务功能(Multi 命令),如果持有的锁已经因过期而释放(或者过期释放后又被其它客户端持有),则 Key 对应的 Value 将改变,释放锁的事务将不会被执行,从而避免错误的释放锁,示例代码如下:
new Jedis("127.0.0.1", 6379);
Jedis jedis =
// “自旋”,等待锁
null;
String result = while (true)
{// 申请锁,只有当“lock_name”不存在时才能申请成功,返回“OK",锁的过期时间设置为5s
set("lock_name", "my_random_value", SET_IF_NOT_EXIST,
result = jedis.5000);
SET_WITH_EXPIRE_TIME, if ("OK".equals(result))
{break;
}
}
// 监控锁对应的 Key,如果其它的客户端对这个 Key 进行了更改,那么本次事务会被取消。
watch("lock_name");
jedis.// 成功获取锁,则操作公共资源,自定义流程
// to do something...
// 释放锁之前,校验是否持有锁
if (jedis.get("lock_name").equals("my_random_value"))
{// 开启事务功能,
multi();
Transaction multi = jedis.// 模拟客户端阻塞10s,锁超时,自动清除
try
{sleep(5000);
Thread.
}catch (InterruptedException e)
{printStackTrace();
e.
}// 客户端恢复,继续释放锁
del("lock_name");
multi.// 执行事务(如果其它的客户端对这个Key进行了更改,那么本次事务会被取消,不会执行)
exec();
multi.
}
// 释放资源
unwatch();
jedis.close(); jedis.
2. Redis 支持 Lua 脚本并保证其原子性,使用 Lua 脚本实现锁校验与释放,并使用 Redis 的 eval 函数执行 Lua 脚本,代码如下:
new Jedis("127.0.0.1", 6379);
Jedis jedis =
// “自旋”,等待锁
String result = null;
while (true)
{// 申请锁,只有当“lock_name”不存在时才能申请成功,返回“OK",锁的过期时间设置为 5s
set("lock_name", "my_random_value", SET_IF_NOT_EXIST,
result = jedis.5000);
SET_WITH_EXPIRE_TIME, if ("OK".equals(result))
{break;
}
}// 成功获取锁,则操作公共资源,自定义流程
// to do something...
// Lua脚本,用于校验并释放锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
try
{// 模拟客户端阻塞10s,锁超时,自动清除
Thread.sleep(10000);
}catch (InterruptedException e)
{printStackTrace();
e.
}
// 执行Lua脚本,校验并释放锁
eval(script, Collections.singletonList("lock_name"),
jedis.Collections.singletonList("my_random_value"));
close(); jedis.
2.4 Redis 节点故障后,主备切换的数据一致性
考虑 Redis 节点宕机,如果长时间无法恢复,则导致锁服务长时间不可用。为了保证锁服务的可用性,通常的方案是给这个 Redis 节点挂一个 Slave(多个也可以),当 Master 节点不可用的时候,系统自动切到 Slave 上。但是由于 Redis 的主从复制(Replication)是异步的,这可能导致在宕机切换过程中丧失锁的安全性。
我们看下典型场景。
- 客户端 A 从 Master 获取了锁;
- Master 宕机了,存储锁的 Key 还没有来得及同步到 Slave 上;
- Slave 升级为 Master;
- 客户端 B 从新的 Master 获取到了对应同一个资源的锁;
- 客户端 A 和客户端 B 同时持有了同一个资源的锁,锁的安全性被打破。
解决方案有两个。
方案1,设想下,如果要避免上述情况,可以采用一个比较“土”的方法,即自认为持有锁的客户端在对敏感公共资源进行写操作前,先进行校验,确认自己是否确实持有锁,校验的方式前面已经介绍过——通过比较自己的 my_random_value
和 Redis 服务端中实际存储的 my_random_value
。
显然,这里仍存在一个问题。如果校验完毕后,Master 数据尚未同步到 Slave 的情况下 Master 宕机,该如何是好?诚然,我们可以为 Redis 服务端设置较短的主从复置周期,以尽量避免上述情况出现,但是,隐患还是客观存在的。
方案2,针对该问题场景,Redis 的作者 Antirez 提出了 RedLock,其原理基于分布式一致性算法的核心理念:多数派思想。下面对 RedLock 做简要介绍。
2.5 RedLock 简要介绍
2.4 节介绍了基于单 Redis 节点的分布式锁在主从故障倒换(Failover)时会产生安全性问题。针对问题场景,Redis 的作者 Antirez 提出了 RedLock,它基于 N 个完全独立的 Redis 节点,其原理基于分布式一致性算法的核心理念:多数派思想,不过,RedLock 目前还不成熟,争议较大,本节仅作简要介绍。
运行 Redlock 算法的客户端依次执行以下步骤,来进行加锁的操作:
- 获取当前系统时间(毫秒数)。
- 按顺序依次向 N 个 Redis 节点执行获取锁的操作。这个获取操作跟前面基于单 Redis 节点获取锁的过程相同,包含随机字符串
my_random_value
,也包含过期时间(比如 PX 30000,即锁的有效时间)。为了保证在某个 Redis 节点不可用的时候算法能够继续运行,这个获取锁的操作还有一个超时时间(Time Out),它要远小于锁的有效时间(几十毫秒量级)。客户端在向某个 Redis 节点获取锁失败以后,应该立即尝试下一个 Redis 节点。这里的失败,应该包含任何类型的失败,比如该 Redis 节点不可用。 - 计算获取锁的整个过程总共消耗了多长时间,计算方法是用当前时间减去第 1 步记录的时间。如果客户端从大多数 Redis 节点(
>=N/2+1
)成功获取到了锁,并且获取锁总共消耗的时间没有超过锁的有效时间(Lock Validity Time),那么这时客户端才认为最终获取锁成功;否则,认为最终获取锁失败。 - 如果最终获取锁成功了,那么这个锁的有效时间应该重新计算,它等于最初的锁的有效时间减去第 3 步计算出来的获取锁消耗的时间。
- 如果最终获取锁失败了(可能由于获取到锁的 Redis 节点个数少于
N/2+1
,或者整个获取锁的过程消耗的时间超过了锁的最初有效时间),那么客户端应该立即向所有 Redis 节点发起释放锁的操作(即前面介绍的 Redis Lua 脚本)。
**我们再来了解下解锁步骤。**上面描述的只是获取锁的过程,而释放锁的过程比较简单,即客户端向所有 Redis 节点发起释放锁的操作,不管这些节点在获取锁的时候成功与否。
该方法在理论上的可靠性如何呢?
N 个 Redis 节点中的大多数能正常工作,就能保证 Redlock 正常工作,因此理论上它的可用性更高。2.4 节中所描述的问题在 Redlock 中就不存在了,但如果有节点发生崩溃重启,还是会对锁的安全性有影响的。
它有哪些潜在问题呢,我们来看下面这个例子。
从加锁的过程,读者应该可以看出:RedLock 对系统时间是强依赖的,那么,一旦节点系统时间出现异常(Redis 节点不在同一台服务器上),问题便又来了,如下场景,假设一共有 5 个 Redis 节点:A、B、C、D、E。
- 客户端 1 成功锁住了 A、B、C,获取锁成功(但 D 和 E 没有锁住)。
- 节点 C 时间异常,导致 C 上的锁数据提前到期,而被释放。
- 客户端 2 此时尝试获取同一把锁:锁住了C、D、E,获取锁成功。
3. 加锁的正确方式及典型错误
3.1 客户端选择
这里,我选用了 Redis 开源客户端 Jedis,读者在运行示例代码前,需在对应的 Maven 工程的 Pom 文件中加入如下依赖:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
Jedis 是一个优秀的基于 Java 语言的 Redis 客户端。但是,其不足也很明显,Jedis 在实现上是直接连接 Redis-Server,在多个线程间共享一个 Jedis 实例时是线程不安全的,如果想要在多线程场景下使用 Jedis,需要使用连接池,每个线程都使用自己的 Jedis 实例,当连接数量增多时,会消耗较多的物理资源。本文中使用 Jedis,采用的是连接池模式。如下代码:
new JedisPoolConfig();
JedisPoolConfig config = // 设置最大连接数
setMaxTotal(200);
config.// 设置最大空闲数
setMaxIdle(8);
config.// 设置最大等待时间
setMaxWaitMillis(1000 * 100);
config.// 在borrow一个jedis实例时,是否需要验证,若为true,则所有jedis实例均是可用的
setTestOnBorrow(true);
config.// 创建连接池
new JedisPool(config, "127.0.0.1", 6379, 3000); JedisPool jedisPool =
3.2 正确的加锁方式
基于第 2 节《基于 Redis 的分布式锁的安全性分析》,我们很容易写出以下加锁代码:
public class DistributedLock
{private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
/**
* 加锁
* @param jedisPool jedis 连接池
* @param lockName 锁名,对应被争用的公共资源
* @param myRandomValue 需保持全局唯一,以校验锁的持有者
* @param expireTime 过期时间。过期将自动删除(释放锁)
*/
public static void Lock(JedisPool jedisPool, String lockName, String myRandomValue,
int expireTime)
{null;
Jedis jedis = try
{getResource();
jedis = jedisPool.// "自旋",等待锁
while (true)
{String result = jedis.set(lockName, myRandomValue, SET_IF_NOT_EXIST,
SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result))
{return;
}
}
}catch (Exception e)
{throw e;
}finally
{if (null != jedis)
{close();
jedis.
}
}
} }
加锁核心方法为:
jedis.set(String key, String value, String nxxx, String expx, int time)
接下来说明下各个参数的意义。
- key:Redis 是 Key-Value 型数据库,key 具有唯一性,因此,用 key 作为锁。
- value:即例子中的
my_random_value
,在 2.2 节《设置锁自动过期时间以预防死锁存在的隐患》中,我分析了隐患场景并给出了解决方案。为了保障可靠性,在解锁时,仅仅依赖 Key 是不够的,为了避免错误得释放锁,释放前需要进行校验,即根据 Key 取出 Value,将其与自己加锁时设置的my_random_value
进行对比,以便确认是否是自己持有的锁。my_random_value
可以使用特定的随机算法生成,如UUID.randomUUID().toString()
。 - nxxx:根据 Redis 文档,这个参数填 NX,意思是 SET IF NOT EXIST,即当 Key 不存在时方可进行 SET 操作;若 Key 已经存在,则不做任何操作;
- expx:这个参数传的是 PX,表示给 Key 设置过期时间,具体时间由参数 time 决定。
- time:与参数 expx 相呼应,代表 Key 的过期时间,单位为毫秒。
最后,我们做下小结。
通过上述说明,set(…) 方法可以满足加锁的安全性,执行 set(…) 方法有两种结果。
- 如果被争用的公共资源没有锁(即 Key 不存在),那么就进行加锁操作,并对锁设置个有效期,同时用具有特异性(一段时间内具有唯一性)Value 来标识加锁的客户端,以便解锁时进行校验。
- 如果被争用的公共资源已经被加锁(即 Key 存在),则不做任何操作,通常的做法是等待锁释放,采用不断轮询的方式来确定锁是否释放,这种方式也被称为“自旋”等待。此外,还可以设置一个超时时间,如果在超时时间内未能加锁成功则退出。
3.3 典型错误案例
分别使用 jedis.setnx()
和 jedis.expire()
组合实现加锁,代码如下:
public static void lock(JedisPool jedisPool, String lockName, String myRandomValue, int expireTime)
{getResource();
Jedis jedis = jedisPool.// 如果锁不存在,则加锁
Long result = jedis.setnx(lockName, myRandomValue);
if (result == 1)
{// 为锁设置过期时间,由于加锁和设置过期时间是两步完成的,非原子操作
expire(lockName, expireTime);
jedis.
} }
setnx() 方法的作用就是 SET IF NOT EXIST,expire() 方法就是给锁加一个过期时间。初看,似乎没有什么问题,但经不起推敲:加锁实际上使用了两条 Redis 命令,非原子性,如果程序在执行完 setnx() 之后突然崩溃,导致锁没有设置过期时间,那么将会造成死锁。
网上很多资料中采用的就是这种最初级的实现方式,读者切勿仿效。
4. 解锁代码
在 2.3 节《解锁操作的原子性》中,我曾分析了解锁操作可能出现的异常,并给出了两种解决方案,在此,我们再介绍下完整代码。
4.1 正确的解锁方式一
Redis 支持 Lua 脚本并保证其原子性,使用 Lua 脚本实现锁校验与释放,并使用 Redis 的 eval() 函数执行 Lua 脚本,代码如下:
public class DistributedLock
{// 释放锁成功标志
= 1L;
private static final Long RELEASE_SUCCESS /**
* 释放锁
* @param jedisPool jedis连接池
* @param lockName 锁名,对应被争用的公共资源
* @param myRandomValue 需保持全局唯一,以校验锁的持有者
* @return 是否释放成功
*/
unLock(JedisPool jedisPool, String lockName, String myRandomValue)
public static boolean
{
= null;
Jedis jedis // Lua脚本,用于校验并释放锁
= "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
String script
try
{= jedisPool.getResource();
jedis = jedis.eval(script, Collections.singletonList(lockName),
Object result .singletonList(myRandomValue));
Collections
// 注意:如果脚本顺利执行将返回1,如果执行脚本中,其它的客户端对这个lockName对应的值进行了更改,那么将返回0
if (RELEASE_SUCCESS.equals(result))
{;
return true
}
}catch (Exception e)
{;
throw e
}
finally
{if (null != jedis)
{.close();
jedis
}
}
;
return false
} }
从上面的示例代码可以看出,解锁操作只用了两行代码。
第一行使用了 Lua 脚本,其语义为通过 GET 命令访问参数 KEYS[1]
对应的锁,获得锁对应的 Value,并将其与参数 ARGV[1]
对比,如果相同则调用 DEL 命令删除 KEYS[1]
对应的键值对(即释放锁操作)。
// Lua脚本,用于校验并释放锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
第二行通过 Redis 的 eval() 函数执行 Lua 脚本,其中入参 lockName 赋值给参数 KEYS[1]
,myRandomValue 赋值给 ARGV[1]
,eval() 函数将 Lua 脚本交给 Redis 服务端执行。
jedis.eval(script, Collections.singletonList(lockName), Collections.singletonList(myRandomValue));
根据 Redis 官网文档说明,通过 eval() 执行 Lua 代码时,Lua 代码将被当成一个命令去执行(可保证原子性),并且直到 eval 命令执行完成,Redis 才会执行其他命令。因此,通过 Lua 脚本结合 eval 函数,可以科学得实现解锁操作的原子性,避免误解锁。
4.2 正确的解锁方式二
使用 Redis 事务功能,通过 Watch 命令监控锁对应的 Key,释放锁则采用事务功能(Multi 命令),如果持有的锁已经因过期而释放(也可能释放后又被其它客户端持有),则 Key 对应的 Value 将改变,释放锁的事务将不会被执行,从而保证原子性,同时避免错误的释放锁,示例代码如下:
public class DistributedLock
{private static final Long RELEASE_SUCCESS = 1L;
/**
* 释放锁
* @param jedisPool jedis连接池
* @param lockName 锁名,对应被争用的公共资源
* @param myRandomValue 需保持全局唯一,以校验锁的持有者
* @return 是否释放成功
*/
public static boolean unLockII(JedisPool jedisPool, String lockName, String myRandomValue)
{null;
Jedis jedis = try
{getResource();
jedis = jedisPool.
// 监控锁对应的Key,如果其它的客户端对这个Key进行了更改,那么本次事务会被取消。
watch(lockName);
jedis.// 成功获取锁,则操作公共资源,自定义流程
// to do something...
// 校验是否持有锁
if (myRandomValue.equals(jedis.get(lockName)))
{// 开启事务功能,
multi();
Transaction multi = jedis.// 释放锁
del(lockName);
multi.// 执行事务(如果其它的客户端对这个Key进行了更改,那么本次事务会被取消,不会执行)
// 如果正常执行,由于只有一个删除操作,返回的list将只有一个对象。
List<Object> result = multi.exec();
if (RELEASE_SUCCESS.equals(result.size()))
{return true;
}
}
}catch (Exception e)
{throw e;
}finally
{if (null != jedis)
{unwatch();
jedis.close();
jedis.
}
}
return false;
} }
这里稍微解释下。
参考百度百科,所谓事务,应该具有 4 个属性,即原子性、一致性、隔离性、持久性。这四个属性通常称为 ACID 特性。
- 原子性(Atomicity):一个事务是一个不可分割的工作单位,事务中包括的诸操作要么都做,要么都不做。
- 一致性(Consistency):事务必须使数据库从一个一致性状态变到另一个一致性状态。一致性与原子性是密切相关的。
- 隔离性(Isolation):一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对并发的其他事务是隔离的,并发执行的各个事务之间不能互相干扰。
- 持久性(Durability):持久性也称永久性(Permanence),指一个事务一旦提交,它对数据库中数据的改变就应该是永久性的。接下来的其他操作或故障不应该对其有任何影响。
Redis 支持事务功能,根据事务所具有特征,读者应该可以发现,我们解锁时最关心的所有问题,事务都可以解决。这也是我介绍事务功能解锁的原因。Redis 使用事务功能,通常采用的步骤如下。
步骤1,Watch 命令监控锁。
监控锁对应的 key(lockName),事务开启后,如果其它的客户端对这个 Key 进行了更改,那么本次事务会被取消而不会执行 jedis.watch(lockName)
。
步骤2,开启事务功能,代码如下:
jedis.multi()
步骤3,释放锁。
注意,事务开启后,释放锁的操作便是事务中的一个元素,隶属于该事务,代码如下:
del(lockName); multi.
步骤4,执行事务,代码如下:
exec(); multi.
步骤5,释放资源,代码如下:
jedis.unwatch();
jedis.close();
4.3 典型解锁错误案例一
直接使用 jedis.del()
方法删除锁,而没有进行校验。在 2.3 节所述的异常场景下,这种不校验锁的拥有者而直接解锁的方式,会导致锁被错误的释放,从而破坏互斥性,如下面代码所示。
unLock(JedisPool jedisPool, String lockName)
public static void
{= jedisPool.getResource();
Jedis jedis .del(lockName);
jedis }
4.4 典型解锁错误案例二
如下解锁方式相较于上一种已经有了明显进步,在解锁之前进行了校验。但是问题并没有解决,整个解锁过程仍然是独立的两条命令,并非原子操作。代码如下:
unLock1(JedisPool jedisPool, String lockName, String myRandomValue)
public static void
{= jedisPool.getResource();
Jedis jedis // 判断加锁与解锁是不是同一个客户端
if (myRandomValue.equals(jedis.get(lockName)))
{// 解锁,如果在此之前出现异常而使客户端阻塞,锁已经过期被自动释放,本客户端已经不再持有锁,则会误解锁
.del(lockName);
jedis
} }
致谢
本文引用了以下文档中的一些图片和文字,一一列出,以表敬意。
- 官方文档:Distributed locks with Redis
- 官方文档:EVAL command
发表回复