【Redis 分布式锁】(3)完善这把“锁”

在上一篇文章(好用一点的锁)的最后,我们又发现了目前的锁存在两个问题:

  • 由于不可控的异常状况,持有锁的线程不能显示的释放锁,导致锁一直处在被持有的状态。
  • 目前锁不支持在某一段时间内只允许获取一次锁。

我们首先来看第一个问题。为了避免出现这种“长生不老”的锁,我们肯定需要给它设置一个过期时间的。你可能会想到使用 expire 命令对锁设置过期时长,但是 setnxexpire 的两次执行需要是原子性的。为什么需要是原子性的?还是那句话 “往往在生产环境中会出现各种意想不到的问题” ,万一刚执行完 setnx 命令机器突然宕机了呢!所以,这两次操作是需要原子性的。幸好,从 Redis 2.6.12 开始 set 命令增加了可选参数,例如: SET key-with-expire-and-NX "hello" EX 10086 NX 。这样我们在获取锁的时候,同时设置一个过期时长就能避免出现“长生不老”的锁了。获取锁核心代码如下:

/**
  * 默认锁最大存活时长
  */
private final static int LOCK_MAX_TIMEOUT_SEC = 600;

/**
  * 加锁/获取锁
  * @param key 锁名称
  * @param identity 标识
  * @param expireSec 过期时间
  * @return boolean 是否加锁成功
  */
public boolean lock(String key, String identity, int expireSec) {
    boolean lock;
    try {
        lock = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, identity, Duration.ofSeconds(expireSec)));
    } catch (Exception e) {
        log.error("获取锁:{},出现错误{}", key, e);
        return false;
    }
    return lock;
}

public boolean lock(String key, String identity) {
    return lock(key, identity, LOCK_MAX_TIMEOUT_SEC);
}

lockb 方法的代码,可自行修改。其中 expireSec 的大小,也可以根据业务需要设置一个默认的固定值。技术本身是要为业务服务的,所以并不是一成不变的,代码实现上还要具体问题具体分析。

如果不使用上面的方法,并且也不用 Lua 脚本还有其他办法吗?当然有!不过在实现上就会稍微麻烦些了。前面说的这个问题,是由于“长生锁”的存在导致,后续线程获取锁失败。那么我们可以在获取锁失败后,让当前线程去看看这个锁超时了没?如果超时了,那么就由当前线程直接“结束”它,并且再去获取锁。如果发现没有超时,那就仍然返回 false 乖乖等着吧。这个方案如何来实现呢?代码如下:

/**
  * 默认锁最大存活时长
  */
private final static int LOCK_MAX_TIMEOUT_SEC = 600;

/**
  * 加锁/获取锁,自定义实现[过期时间]
  * @param key 锁名称
  * @param identity 标识
  * @param expireSec 过期时间
  * @return boolean 是否加锁成功
  */
public boolean lockV2(String key, String identity, int expireSec) {
    // 每个节点时间可能不一样,所以使用 Redis 时间
    Long currTime = redisTemplate.getRequiredConnectionFactory().getConnection().time();
    Assert.notNull(currTime, "Redis 服务异常");
    // 超时时间戳
    String expire = String.valueOf(currTime + expireSec);
    String separator = ",";
    // 获取锁
    if (Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, identity + separator + expire))) {
        return true;
    }
    // 未获取锁,检查锁状态
    Object obj = redisTemplate.opsForValue().get(key);
    String value = obj != null ? obj.toString() : null;
    if (value != null) {
        // 锁已过期
        if (Long.parseLong(value.split(separator)[1]) < currTime) {
            log.error("锁:{},已过期", key);
            // 释放锁
            unlockV2(key, value.split(separator)[0]);
            // 再次尝试获取锁
            return lockV2(key, identity, expireSec);
        }
    }
    return false;
}

public boolean lockV2(String key, String identity) {
    return lockV2(key, identity, LOCK_MAX_TIMEOUT_SEC);
}

/**
  * 释放锁
  * @param key 锁名称
  * @param identity 标识
  */
public void unlockV2(String key, String identity) {
    try {
        Object obj = redisTemplate.opsForValue().get(key);
        String value = obj != null ? obj.toString():null;
        if (value != null){
            if (value.split(",")[0].equals(identity)) {
                redisTemplate.delete(key);
            }
        }
    }catch (Exception e) {
        log.error("释放锁操作失败,key:" + key + "失败原因:",e);
    }
}

虽然有两种办法来解决“长生锁”的问题,但是还会出现一种极端情况。假如原本3秒就能执行完的定时任务,却用时 1000秒,远远大于锁的过期时间。所以,就可能又会出现某个线程正在持有的锁,被其它线程释放掉,或者由于expire设置的时间到期失去对锁的持有。关于这个问题,一是可以通过对项目中使用到锁的业务场景进行分析评估,来动态调整锁的有效期,从而避免出现这种现象,或者即使出现故障也对业务不产生影响或者能够将影响控制到可承受的范围内;二是可采用守护线程,让获取锁的线程开启一个守护线程,来监控当前任务执行的状态以及是否应该延长锁的有效期。当锁即将到期时但任务还未执行完时,守护线程可以自动为其延长时间。当持有锁的线程执行完任务后,会显示的关掉守护线程。配合方案一和二,应该可以应对项目中大部分场景了。

解决完第一个问题,可以来看下第二个问题了!其实第二个问题的本质就是排他锁,为了实现在某个时长内只允许所有线程获取一次锁同样也离不开“时间”这个变量。我们可以利用在 Redis 中存储一个和每个排他锁相关的键值对来记录时间,并决定是否能够获取锁。具体实现:

/**
  * 拥有排他时长的锁
  * @param key 锁名称
  * @param identity 标识
  * @param expireSec 过期时间
  * @param interval 排它时间
  * @return 是否获取锁
  */
public boolean lock(String key, String identity, int expireSec, int interval) {
    if (lock(key, identity, expireSec)) {
        String preTimeStr = redisTemplate.opsForValue().get(key + "t") != null ? redisTemplate.opsForValue().get(key + "t").toString():null;
        String nowTimeStr = Objects.requireNonNull(redisTemplate.opsForValue().get(key)).toString().split(",")[1];
        if (preTimeStr != null) {
            long preTime = Long.parseLong(preTimeStr);
            long nowTime = Long.parseLong(nowTimeStr);
            // 判断是否符合条件
            if (nowTime - preTime < interval * 1000L) {
                unlock(key, identity);
                return false;
            }
        }
        setString(key + "t", nowTimeStr);
        return true;
    }
    return false;
}

至此,我们把发现的问题都解决完了。剩下的事情就是要梳理梳理代码,把不必要对外暴露的方法的修饰符改为 private ,再把相关方法再封装一下让CURD同学使用起来更方便 。

Redis 分布式锁系列文章中的所有代码都已上传至 GitHub 仓库: springboot-demo

Redis 分布式锁系列文章

【Redis 分布式锁】(1)一把简单的“锁”

【Redis 分布式锁】(2)好用一点的“锁”

【Redis 分布式锁】(3)完善这把“锁”

最新文章

发布者

Avatar photo

常轩

总要做点什么吧!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注