分布式锁【Redission】

发布时间 2023-11-03 14:24:05作者: 木乃伊人

一、简介

        Redission,一个基于Redis实现的分布式工具,为 Redis 官网分布式解决方案。

        Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。企业级开发中使用Redis的最佳范本。

       采用Redis分布式锁,未必能真的加锁成功,我们有个案例就是发优惠券,程序员采用了Redis,但是却是出了事故,分布式锁没有锁住,导致了优惠券多发,损失了数十万RMB,然后那个同事就被GG了,测试被牵连给了通报批评,项目经理被警告。所以需要更加安全的使用Redission。

         官网:Redisson: Redis Java client with features of In-Memory Data Grid

         快速入门:github.com/redisson/re…

         Github的Redission系列:github.com/orgs/rediss…

二、功能分布        

三、Maven配置     

<!--Maven-->
<dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.10.4</version>
</dependency> 

四、基本使用

// 1. Create config object
Config = ...
// 2. Create Redisson instance
RedissonClient redisson = Redisson.create(config);
// 3. Get Redis based object or service you need
RMap<MyKey, MyValue> map = redisson.getMap("myMap");
RLock lock = redisson.getLock("myLock")
lock.lock();
//业务代码
lock.unlock();

五、官方源码API

       RedissionLock类    

        

       RLock红锁类     

        

      Redission采用Lua脚本执行枷锁逻辑

        Redission是通过lua脚本来访问Redis来确保业务逻辑执行的原子性的。

      【lua脚本加锁】

if (redis.call('exists', KEYS[1]) == 0) then 
        redis.call('hset', KEYS[1], ARGV[2], 1);
         redis.call('pexpire', KEYS[1], ARGV[1]); 
         return nil;
         end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
        redis.call('hincrby', KEYS[1], ARGV[2], 1);
        redis.call('pexpire', KEYS[1], ARGV[1]); 
        return nil;
        end;
return redis.call('pttl', KEYS[1]);
  1. KYYS[1]:表示枷锁的key,只需要判断key值是否存在就能知道锁是否被线程持有。
  2. ARGV[1]:表示锁的有效期,默认30s。
  3. ARGV[2]:表示表示加锁的客户端ID。
  4. 首先判断该锁的key值是否存在,如果不存在,那就可以直接加锁。如果已存在,就要判断一下持有锁的线程是不是当前线程。所以用hexist来判断这个hash中是否存在当前线程的ID,如果存在就说持有锁的就是当前线程,则可以再次进入。
  5. 将value值加1并延长锁的有效时间。如果不是当前线程的ID,那么就会返回剩余的生存时间,当前线程就会进入一个循环,不断的去尝试获取锁。

       【lua脚本释放锁】          

if (redis.call('exists', KEYS[1]) == 0) then
       redis.call('publish', KEYS[2], ARGV[1]);
        return 1; 
        end;
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then 
     return nil;
     end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then
     redis.call('pexpire', KEYS[1], ARGV[2]); 
     return 0; 
else redis.call('del', KEYS[1]); 
     redis.call('publish', KEYS[2], ARGV[1]); 
     return 1;
     end;
return nil;
  1. key 不存在,说明锁已释放,直接执行 publish 命令发布释放锁消息并返回 1。
  2. key 存在,但是 field 在 Hash 中不存在,说明自己不是锁持有者,无权释放锁,返回 nil。
  3.  因为锁可重入,所以释放锁时不能把所有已获取的锁全都释放掉,一次只能释放一把锁,因此执行 hincrby 对锁的值减一。
  4.   释放一把锁后,如果还有剩余的锁,则刷新锁的失效时间并返回 0;如果刚才释放的已经是最后一把锁,则执行 del 命令删除锁的 key,并发布锁释放消息,返回 1。

六、分布式锁

        分布式锁是并发业务刚需,Zookeeper有Znode节点,数据库有表级锁和乐观锁/悲观锁。Redis有setNX。

       传统锁的get和del操作非原子性,并发一旦大了,无法保证进程安全。可采用Lua脚本。

       6.1、Lua脚本

                Lua脚本是redis已经内置的一种轻量小巧语言,其执行是通过redis的eval/evalsha命令来运行,把操作封装成一个Lua脚本,如论如何都是一次执行的原子操作。

                Lock.Del.lua如下:               

if redis.call('get', KEYS[1]) == ARGV[1] 
    then 
    -- 执行删除操作
        return redis.call('del', KEYS[1]) 
    else 
    -- 不成功,返回0
        return 0 
end

              delete操作时执行Lua命令

// 解锁脚本
DefaultRedisScript<Object> unlockScript = new DefaultRedisScript();
unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lockDel.lua")));

// 执行lua脚本解锁
redisTemplate.execute(unlockScript, Collections.singletonList(keyName), value);

        6.2、可重入锁

               可重入:同一个线程多次获取同一把锁,不会造成死锁,

               Lua脚本使用可重入锁,需要注意一下方面:

              1、需要存储锁名称lockName、获得该锁的线程id和对应线程的进入次数count。

              2、加锁:

                            每次线程获取锁时,判断是否存在该锁:

                             a、不存在,则设定Hash的key为线程ID,Value初始化为1,设置过期时间,返回获取锁成功true。

                             b、存在,继续判断是否存在当前线程id的hash key。如果存在,线程key的value + 1,重入次数增加1,设置过期时间。如果不存在,返回加锁失败。

              3、解锁:

                            每次线程来解锁时,判断是否存在该锁:

                             a、如存在,检查是否有该锁的id的hash key,有则减1,无这返回解锁失败。

                             b、减1后,判断生育的count是否为0,为0则说明不再需要这把锁,执行del命令删除。

       6.3、计数器的加减

                当同一个线程获取同一把锁,我们需要对对应线程的计数器count做加减。

                判断一个redis key是否存在,可以用exists,而判断一个hash的key是否存在,可以用hexists                

                    

               而redis也有hash自增命令 hincrby

               每次自增1时,hincrby lockname1 threadid 1,自减1时 hincrby lockname1 threadid -1              

                    

       6.4、解锁的判断

                当锁不再被需要了。每次解锁一次,count减1,知道为0,执行删除。

                综合上述的存储结构和判断流程,加锁和解锁的Lua如下:

                加锁lock.lua:          

local key = KEYS[1];
local threadId = ARGV[1];
local releaseTime = ARGV[2];

-- lockname不存在
if(redis.call('exists', key) == 0) then
    redis.call('hset', key, threadId, '1');
    redis.call('expire', key, releaseTime);
    return 1;
end;

-- 当前线程已id存在
if(redis.call('hexists', key, threadId) == 1) then
    redis.call('hincrby', key, threadId, '1');
    redis.call('expire', key, releaseTime);
    return 1;
end;
return 0;

             解锁 unlock.lua:

local key = KEYS[1];
local threadId = ARGV[1];

-- lockname、threadId不存在
if (redis.call('hexists', key, threadId) == 0) then
    return nil;
end;

-- 计数器-1
local count = redis.call('hincrby', key, threadId, -1);

-- 删除lock
if (count == 0) then
    redis.call('del', key);
    return nil;
end;

     代码:

/**
 * @description 原生redis实现分布式锁
 * @date 2021/2/6 10:51 下午
 **/
@Getter
@Setter
public class RedisLock {

    private RedisTemplate redisTemplate;
    private DefaultRedisScript<Long> lockScript;
    private DefaultRedisScript<Object> unlockScript;

    public RedisLock(RedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
        // 加载加锁的脚本
        lockScript = new DefaultRedisScript<>();
        this.lockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lock.lua")));
        this.lockScript.setResultType(Long.class);
        // 加载释放锁的脚本
        unlockScript = new DefaultRedisScript<>();
        this.unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("unlock.lua")));
    }

    /**
     * 获取锁
     */
    public String tryLock(String lockName, long releaseTime) {
        // 存入的线程信息的前缀
        String key = UUID.randomUUID().toString();

        // 执行脚本
        Long result = (Long) redisTemplate.execute(
                lockScript,
                Collections.singletonList(lockName),
                key + Thread.currentThread().getId(),
                releaseTime);

        if (result != null && result.intValue() == 1) {
            return key;
        } else {
            return null;
        }
    }

    /**
     * 解锁
     * @param lockName
     * @param key
     */
    public void unlock(String lockName, String key) {
        redisTemplate.execute(unlockScript,
                Collections.singletonList(lockName),
                key + Thread.currentThread().getId()
                );
    }
}

          至此分布式锁,互斥、可重入、防死锁基本有个了解了。

          当然会有一些问题需要考虑。比如进程A在获取锁时,因为业务操作时间太长,锁到期释放了但是业务还在执行,而此刻进程B又刚好正常获取到锁,两个进程操作就会依旧有共享资源问题。

          且存储该分布式锁的Redis节点宕机后,而且该锁正好处于锁住状态,该锁就会出现死锁状态。这些情况,就要考虑锁续约问题。即可以延长锁的releaseTime,来延迟释放锁直到完成业务。

         况且在性能(锁的最大等待时间)、优雅(无效锁申请)、重试(失败重试机制)等方面还要下功夫,为何不用Redission呢。

七、Redission分布式锁

        7.1、流程简介

  1. A、B线程争抢一把锁,A获取到锁后,B阻塞。
  2. B线程阻塞并非主动CAS,通过Pub/Sub方式订阅该锁的广播。
  3. A操作完成释放锁,B线程收到订阅消息通知。
  4. B被唤醒开始继续抢锁拿到锁。

        7.2、详细的加锁解锁流程总结如下:    

               

     关键字:锁续约、WatchDog、公平锁

      具体可以查询更加详细的资料哈。