共计 2814 个字符,预计需要花费 8 分钟才能阅读完成。
导读 | 最近首度应用 ” 分布式锁 ”,现在想想,分布式锁不是孤立的技能点,这其实就是跨主机的线程同步。 |
最近首度应用 ” 分布式锁 ”,现在想想,分布式锁不是孤立的技能点,这其实就是跨主机的线程同步。
单机服务器可以通过共享某堆内存来标记上锁 / 解锁,线程同步说到底是建立在单机操作系统的用户态 / 内核态对共享内存的访问控制。
而分布式服务器不是在同一台机器上:跨主机,因此需要将锁标记存储在所有机器进程都能看到的地方。
在开发很多业务场景会使用到锁,例如库存控制,抽奖等。
例如库存只剩 1 个商品,有三个用户同时打算购买,谁先购买库存立即清零,不能让其他二人也购买成功。
我们常说的线程安全、线程同步方案,包括此次的分布式锁都是基于
“多线程 / 多进程对特定资源同时有更新操作”。
- 分布式系统,一个锁在同一时间只能被一个服务器获取 (这是分布式锁的基础)
- 具备锁失效机制,防止死锁 (防止某些意外,锁没有得到释放,别人也无法得到锁)
Redis SET resource-name anystring NX EX max-lock-time
是一种最简单的分布式锁实现方案。
SET 命令支持多个参数:
- EX seconds– 设置过期时间 (s)
- NX — 如果 key 不存在,则设置 ……
因为 SET 命令参数可以替代 SETNX,SETEX,GETSET,这些命令在未来可能被废弃。
上面的命令返回 OK(或经过重试),客户端就获取到这个锁;
使用 DEL 命令解锁; 到达超时时间会自动释放锁。
在解锁时,增加一些设计,让系统更加健壮:
3. 不要使用固定的 String 值作为锁标记值,而是使用一个不易被猜中的随机值,业内称为 token
4. 不使用 DEL 命令释放锁,而是发送 script 去移除 key
第 3、4 点是为了解决:“锁提前过期,客户端 A 还没有执行完,然后客户端 B 获取了锁,这时客户端 A 执行完了,会不会在删锁的时候把 B 的锁给删掉”— 4 是 3 技术上的推荐实现。
脚本如下:
if redis.call("get",KEYS1] ==ARGV[1]) | |
then | |
return redis.call("DEL",KEYS[1]) | |
else | |
return 0 | |
end |
下面使用 StackExchange.Redis 写了基于以上考量的代码示例:
/// | |
/// Acquires the lock. | |
/// | |
/// | |
/// 随机值 | |
/// | |
/// 非阻塞锁 | |
static bool Lock(string key, string token,int expireSecond=10, double waitLockSeconds = 0) | |
{ | |
var waitIntervalMs = 50; | |
bool isLock; | |
DateTime begin = DateTime.Now; | |
do | |
{isLock = Connection.GetDatabase().StringSet(key, token, TimeSpan.FromSeconds(expireSecond), When.NotExists); | |
if (isLock) | |
return true; | |
// 不等待锁则返回 | |
if (waitLockSeconds == 0) break; | |
// 超过等待时间,则不再等待 | |
if ((DateTime.Now - begin).TotalSeconds >= waitLockSeconds) break; | |
Thread.Sleep(waitIntervalMs); | |
} while (!isLock); | |
return false; | |
} | |
/// | |
/// Releases the lock. | |
/// | |
/// true, if lock was released, false otherwise. | |
/// Key. | |
/// value | |
static bool UnLock(string key, string value) | |
{string lua_script = @"if (redis.call('GET', KEYS[1]) == ARGV[1]) then | |
redis.call('DEL', KEYS[1]) | |
return true | |
else | |
return false | |
end | |
"; | |
try | |
{var res = Connection.GetDatabase().ScriptEvaluate(lua_script, | |
new RedisKey[] { key}, | |
new RedisValue[] { value}); | |
return (bool)res; | |
} | |
catch (Exception ex) | |
{Console.WriteLine($"ReleaseLock lock fail...{ex.Message}"); | |
return false; | |
} | |
} | |
private static Lazy lazyConnection = new Lazy(() => | |
{ | |
ConfigurationOptions configuration = new ConfigurationOptions | |
{ | |
AbortOnConnectFail = false, | |
ConnectTimeout = 5000, | |
}; | |
configuration.EndPoints.Add("10.100.219.9", 6379); | |
return ConnectionMultiplexer.Connect(configuration.ToString()); | |
}); | |
public static ConnectionMultiplexer Connection => lazyConnection.Value; |
以上代码新增了第五点考量:
5. 为避免无限制抢锁,增加了非阻塞锁:轮询_s 等待锁,未等到则不再抢锁
下面并行开启三个任务,同时减少库存:
static void Main(string[] args) | |
{ | |
// 尝试并行执行 3 个任务 | |
Parallel.For(0, 3, x => | |
{string token = $"loki:{x}"; | |
bool isLocked = Lock("loki", token, 5, 10); | |
if (isLocked) | |
{Console.WriteLine($"{token} begin reduce stocks (with lock) at {DateTime.Now}."); | |
Thread.Sleep(1000); | |
Console.WriteLine($"{token} release lock {UnLock("loki", token)} at {DateTime.Now}."); | |
} | |
else | |
{Console.WriteLine($"{token} begin reduce stocks at {DateTime.Now}."); | |
} | |
}); | |
} |
可以看到三个并行任务依次获取 / 释放锁
本文从基础的线程安全、线程同步,认识到分布式锁是跨主机的资源线程 / 进程同步方案,以步步为营的风格 演示了 RedisSET 命令做分布式锁的设计考量,好记性不如烂笔头。
