分布式锁

为什么要分布式锁

什么是分布式锁

  • 当在分布式模型下,数据只有一份(或有限制),此时需要利用锁的技术控制某一时刻修改数据的进程数。
  • 与单机模式下的锁不仅需要保证进程可见,还需要考虑进程与锁之间的网络问题。(我觉得分布式情况下之所以问题变得复杂,主要就是需要考虑到网络的延时和不可靠。。。一个大坑)
  • 分布式锁还是可以将标记存在内存,只是该内存不是某个进程分配的内存而是公共内存如 Redis、Memcache。至于利用数据库、文件等做锁与单机的实现是一样的,只要保证标记能互斥就行。

无锁带来的问题实例

为何需要分布式锁

  • 效率:使用分布式锁可以避免不同节点重复相同的工作,这些工作会浪费资源。比如用户付了钱之后有可能不同节点会发出多封短信。
  • 正确性:加分布式锁同样可以避免破坏正确性的发生,如果两个节点在同一条数据上面操作,比如多个节点机器对同一个订单操作不同的流程有可能会导致该笔订单最后状态出现错误,造成损失。

分布式锁的一些特点

  • 互斥性:和我们本地锁一样互斥性是最基本,但是分布式锁需要保证在不同节点的不同线程的互斥。
  • 可重入性:同一个节点上的同一个线程如果获取了锁之后那么也可以再次获取这个锁。
  • 锁超时:和本地锁一样支持锁超时,防止死锁。
  • 高效,高可用:加锁和解锁需要高效,同时也需要保证高可用防止分布式锁失效,可以增加降级。
  • 支持阻塞和非阻塞:和ReentrantLock一样支持lock和trylock以及tryLock(long timeOut)。
  • 支持公平锁和非公平锁(可选):公平锁的意思是按照请求加锁的顺序获得锁,非公平锁就相反是无序的。这个一般来说实现的比较少。

Redis实现分布式锁

参考文章:文章一

Redis 锁主要利用 Redis 的 setnx 命令。

  • 加锁命令:SETNX key value,当键不存在时,对键进行设置操作并返回成功,否则返回失败。KEY 是锁的唯一标识,一般按业务来决定命名。
  • 解锁命令:DEL key,通过删除键值对释放锁,以便其他线程可以通过 SETNX 命令来获取锁。
  • 锁超时:EXPIRE key timeout, 设置 key 的超时时间,以保证即使锁没有被显式释放,锁也可以在一定时间后自动释放,避免资源被永远锁住。

SETNX的伪代码:

(SETNX 命令对应到 StringRedisTemplate 的 api 是 setIfAbsent)

1
2
3
4
5
6
7
public Boolean setIfAbsent(K key, V value) {
byte[] rawKey = this.rawKey(key);
byte[] rawValue = this.rawValue(value);
return (Boolean)this.execute((connection) -> {
return connection.setNX(rawKey, rawValue);
}, true);
}

上述锁实现方式存在一些问题:

SETNX 和 EXPIRE 非原子性

如果 SETNX 成功,在设置锁超时时间后,服务器挂掉、重启或网络问题等,导致 EXPIRE 命令没有执行,锁没有设置超时时间变成死锁。

解决方式:

有很多开源代码来解决这个问题,比如使用 lua 脚本。示例:

1
2
3
4
5
6
7
8
if (redis.call('setnx', KEYS[1], ARGV[1]) < 1)
then return 0;
end;
redis.call('expire', KEYS[1], tonumber(ARGV[2]));
return 1;

// 使用实例
EVAL "if (redis.call('setnx',KEYS[1],ARGV[1]) < 1) then return 0; end; redis.call('expire',KEYS[1],tonumber(ARGV[2])); return 1;" 1 key value 100

锁误解除

如果线程 A 成功获取到了锁,并且设置了过期时间 30 秒,但线程 A 执行时间超过了 30 秒,锁过期自动释放,此时线程 B 获取到了锁;随后 A 执行完成,线程 A 使用 DEL 命令来释放锁,但此时线程 B 加的锁还没有执行完成,线程 A 实际释放的线程 B 加的锁。

解决方法:

通过在 value 中设置当前线程加锁的标识,在删除之前验证 key 对应的 value 判断锁是否是当前线程持有。可生成一个 UUID 标识当前线程,作为当前线程的唯一标示。

超时解锁导致并发

如果线程 A 成功获取锁并设置过期时间 30 秒,但线程 A 执行时间超过了 30 秒,锁过期自动释放,此时线程 B 获取到了锁,线程 A 和线程 B 并发执行

A、B 两个线程发生并发显然是不被允许的,一般有两种方式解决该问题:

  • 将过期时间设置足够长,确保代码逻辑在锁释放之前能够执行完成。
  • 为获取锁的线程增加守护线程,为将要过期但未释放的锁增加有效时间。

不可重入

当线程在持有锁的情况下再次请求加锁,如果一个锁支持一个线程多次加锁,那么这个锁就是可重入的。如果一个不可重入锁被再次加锁,由于该锁已经被持有,再次加锁会失败。Redis 可通过对锁进行重入计数,加锁时加 1,解锁时减 1,当计数归 0 时释放锁。

在本地记录记录重入次数,如 Java 中使用 ThreadLocal 进行重入次数统计,简单示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static ThreadLocal<Map<String, Integer>> LOCKERS = ThreadLocal.withInitial(HashMap::new);
// 加锁
public boolean lock(String key) {
Map<String, Integer> lockers = LOCKERS.get();
if (lockers.containsKey(key)) {
lockers.put(key, lockers.get(key) + 1);
return true;
} else {
if (SET key uuid NX EX 30) {
lockers.put(key, 1);
return true;
}
}
return false;
}
// 解锁
public void unlock(String key) {
Map<String, Integer> lockers = LOCKERS.get();
if (lockers.getOrDefault(key, 0) <= 1) {
lockers.remove(key);
DEL key
} else {
lockers.put(key, lockers.get(key) - 1);
}
}

本地记录重入次数虽然高效,但如果考虑到过期时间和本地、Redis 一致性的问题,就会增加代码的复杂性。另一种方式是 Redis Map 数据结构来实现分布式锁,既存锁的标识也对重入次数进行计数。Redission 加锁示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 如果 lock_key 不存在
if (redis.call('exists', KEYS[1]) == 0)
then
// 设置 lock_key 线程标识 1 进行加锁
redis.call('hset', KEYS[1], ARGV[2], 1);
// 设置过期时间
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 如果 lock_key 存在且线程标识是当前欲加锁的线程标识
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
// 自增
then redis.call('hincrby', KEYS[1], ARGV[2], 1);
// 重置过期时间
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 如果加锁失败,返回锁剩余时间
return redis.call('pttl', KEYS[1]);

无法等待锁释放

上述命令执行都是立即返回的,如果客户端可以等待锁释放就无法使用。

  • 可以通过客户端轮询的方式解决该问题,当未获取到锁时,等待一段时间重新获取锁,直到成功获取锁或等待超时。这种方式比较消耗服务器资源,当并发量比较大时,会影响服务器的效率。
  • 另一种方式是使用 Redis 的发布订阅功能,当获取锁失败时,订阅锁释放消息,获取锁成功后释放时,发送锁释放消息。如下:

Redisson 实现分布式锁

参考文章:文章一

maven引用

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.2.3</version>
</dependency>

原理图解析

while循环获取锁是CAS机制,等待锁。

-------------本文结束感谢您的阅读-------------