用 Redis 实现分布式锁 与 实现任务队列

转载于 用 Redis 实现分布式锁 与 实现任务队列

这一次总结和分享用Redis实现分布式锁 与 实现任务队列 这两大强大的功能. 先扯点个人观点, 之前我看了一篇博文说博客园的文章大部分都是分享代码, 博文里强调说分享思路比分享代码更重要 (貌似大概是这个意思, 若有误请谅解) , 但我觉得, 分享思路固然重要, 但有了思路, 却没有实现的代码, 那会让人觉得很浮夸的, 在工作中的程序猿都知道, 你去实现一个功能模块, 一段代码, 虽然你有了思路, 但是实现的过程也是很耗时的, 特别是代码调试, 还有各种测试等等. 所以我认为, 思路+代码, 才是一篇好博文的主要核心.

直接进入主题.

一、前言

双十一刚过不久, 大家都知道在天猫、京东、苏宁等等电商网站上有很多秒杀活动, 例如在某一个时刻抢购一个原价1999现在秒杀价只要999的手机时, 会迎来一个用户请求的高峰期, 可能会有几十万几百万的并发量, 来抢这个手机, 在高并发的情形下会对数据库服务器或者是文件服务器应用服务器造成巨大的压力, 严重时说不定就宕机了, 另一个问题是, 秒杀的东西都是有量的, 例如一款手机只有10台的量秒杀, 那么, 在高并发的情况下, 成千上万条数据更新数据库 (例如10台的量被人抢一台就会在数据集某些记录下 减1) , 那次这个时候的先后顺序是很乱的, 很容易出现10台的量, 抢到的人就不止10个这种严重的问题. 那么, 以后所说的问题我们该如何去解决呢? 接下来我所分享的技术就可以拿来处理以上的问题: 分布式锁 和 任务队列.

二、实现思路

1.Redis 实现分布式锁思路

思路很简单, 主要用到的 redis 函数是 setnx() , 这个应该是实现分布式锁最主要的函数. 首先是将某一任务标识名 (这里用 Lock:order 作为标识名的例子) 作为键存到 redis 里, 并为其设个过期时间, 如果是还有 Lock:order 请求过来, 先是通过 setnx() 看看是否能将 Lock:order 插入到 redis 里, 可以的话就返回 true, 不可以就返回 false. 当然, 在我的代码里会比这个思路复杂一些, 我会在分析代码时进一步说明.

2.Redis 实现任务队列

这里的实现会用到上面的 Redis 分布式的锁机制, 主要是用到了Redis 里的有序集合这一数据结构. 例如入队时, 通过 zsetadd() 函数进行入队, 而出对时, 可以用到 zsetgetScore() 函数. 另外还可以弹出顶部的几个任务.

以上就是实现 分布式锁 和 任务队列 的简单思路, 如果你看完有点模棱两可, 那请看接下来的代码实现.

三、代码分析

(一) 先来分析Redis分布式锁的代码实现  

  • (1) 为避免特殊原因导致锁无法释放, 在加锁成功后, 锁会被赋予一个生存时间 (通过 lock方法的参数设置或者使用默认值) , 超出生存时间锁会被自动释放锁的生存时间默认比较短 (秒级) , 因此, 若需要长时间加锁, 可以通过 expire 方法延长锁的生存时间为适当时间, 比如在循环内.

  • (2) 系统级的锁当进程无论何种原因时出现 crash 时, 操作系统会自己回收锁, 所以不会出现资源丢失, 但分布式锁不用, 若一次性设置很长时间, 一旦由于各种原因出现进程crash 或者其他异常导致unlock未被调用时, 则该锁在剩下的时间就会变成垃圾锁, 导致其他进程或者进程重启后无法进入加锁区域.

先看加锁的实现代码:这里需要主要两个参数, 一个是 $timeout,这个是循环获取锁的等待时间, 在这个时间内会一直尝试获取锁知道超时, 如果为 0, 则表示获取锁失败后直接返回而不再等待;另一个重要参数的 $expire, 这个参数指当前锁的最大生存时间, 以秒为单位的, 它必须大于0, 如果超过生存时间锁仍未被释放, 则系统会自动强制释放. 这个参数的最要作用请看上面的 (1) 里的解释.

这里先取得当前时间, 然后再获取到锁失败时的等待超时的时刻 (是个时间戳), 再获取到锁的最大生存时刻是多少. 这里 redis 的 key 用这种格式:Lock: 锁的标识名, 这里就开始进入循环了, 先是插入数据到redis里, 使用 setnx() 函数, 这函数的意思是, 如果该键不存在则插入数据, 将最大生存时刻作为值存储, 假如插入成功, 则对该键进行失效时间的设置, 并将该键放在 $lockedName 数组里, 返回 true, 也就是上锁成功;如果该键存在, 则不会插入操作了, 这里有一步严谨的操作, 那就是取得当前键的剩余时间, 假如这个时间小于 0, 表示 key 上没有设置生存时间 (key是不会不存在的, 因为前面setnx会自动创建) 如果出现这种状况, 那就是进程的某个实例 setnx 成功后 crash 导致紧跟着的 expire 没有被调用, 这时可以直接设置expire并把锁纳为己用. 如果没设置锁失败的等待时间或者已超过最大等待时间了, 那就退出循环, 反之则隔 $waitIntervalUs 后继续请求. 这就是加锁的整一个代码分析.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* 加锁
* @param [type] $name 锁的标识名
* @param integer $timeout 循环获取锁的等待超时时间,在此时间内会一直尝试获取锁直到超时,为 0表示失败后直接返回不等待
* @param integer $expire 当前锁的最大生存时间(秒),必须大于 0,如果超过生存时间锁仍未被释放,则系统会自动强制释放
* @param integer $waitIntervalUs 获取锁失败后挂起再试的时间间隔(微秒)
* @return [type] [description]
*/
public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) {
if ($name == null) return false;

//取得当前时间
$now = time();
//获取锁失败时的等待超时时刻
$timeoutAt = $now + $timeout;
//锁的最大生存时刻
$expireAt = $now + $expire;

$redisKey = "Lock:{$name}";
while (true) {
//将rediskey的最大生存时刻存到redis里,过了这个时刻该锁会被自动释放
$result = $this->redisString->setnx($redisKey, $expireAt);

if ($result != false) {
//设置key的失效时间
$this->redisString->expire($redisKey, $expireAt);
//将锁标志放到lockedNames数组里
$this->lockedNames[$name] = $expireAt;
return true;
}

//以秒为单位,返回给定key的剩余生存时间
$ttl = $this->redisString->ttl($redisKey);

//ttl小于0 表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建)
//如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用
//这时可以直接设置expire并把锁纳为己用
if ($ttl < 0) {
$this->redisString->set($redisKey, $expireAt);
$this->lockedNames[$name] = $expireAt;
return true;
}

/*****循环请求锁部分*****/
//如果没设置锁失败的等待时间 或者 已超过最大等待时间了,那就退出
if ($timeout <= 0 || $timeoutAt < microtime(true)) break;

//隔 $waitIntervalUs 后继续 请求
usleep($waitIntervalUs);

}

return false;
}