优秀的编程知识分享平台

网站首页 > 技术文章 正文

Redisson分布式锁(二):源码分析(redisson分布式锁秒杀)

nanyue 2024-09-01 00:06:21 技术文章 6 ℃

一、实现分布式锁的要求

  • 确保互斥:在同一时刻,必须保证锁至多只能被一个客户端持有。
  • 不能死锁:在一个客户端在持有锁的期间崩溃而没有主动解锁情况下,也能保证后续其他客户端能加锁。
  • 避免活锁:在获取锁失败的情况下,反复进行重试操作,占用Cpu资源,影响性能。
  • 实现更多锁特性:锁中断、锁重入、锁超时等。
  • 确保客户端只能解锁自己持有的锁。

二、redission加锁、锁等待、解锁

1、加锁

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
        "if (redis.call('exists', KEYS[1]) == 0) then " +
            "redis.call('hset', KEYS[1], ARGV[2], 1); " +
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return nil; " +
        "end; " +
        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return nil; " +
        "end; " +
        "return redis.call('pttl', KEYS[1]);",
        Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

KEYS[1]为getName();
ARGV[1]为internalLockLeaseTime;
ARGV[2]为getLockName(threadId)。

org.redisson.RedissonLock类tryLockInnerAsync通过eval命令执行Lua代码完成加锁操作。KEYS[1]为锁在redis中的key,key对应value为map结构,ARGV[1]为锁超时时间,ARGV[2]为锁value中的key。ARGV[2]由UUID+threadId组成,用来标记锁被谁持有。

(1) 第一个If判断key是否存在,不存在完成加锁操作

  • redis.call(‘hset’, KEYS[1], ARGV[2], 1);创建key[1] map中添加key:ARGV[2] ,value:1
  • redis.call(‘pexpire’, KEYS[1], ARGV[1]);设置key[1]过期时间,避免发生死锁。

eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令。可避免第一条命令执行成功第二条命令执行失败导致死锁。

(2)第二个if判断key存在且当前线程已经持有锁, 重入:

  • redis.call(‘hexists’, KEYS[1],ARGV[2]);判断redis中锁的标记值是否与当前请求的标记值相同,相同代表该线程已经获取锁。
  • redis.call(‘hincrby’, KEYS[1], ARGV[2],1);记录同一线程持有锁之后累计加锁次数实现锁重入。

(3)最后的return表示key存在被其他线程获取的锁, 等待:

  • redis.call(‘pexpire’, KEYS[1], ARGV[1]); 重制锁超时时间。

2、锁等待

@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    //步骤1、这里调用加锁操作,若没有返回null表名有其它线程占用锁,获得锁超时时间,执行锁等待逻辑
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }
    //步骤2、步骤一中加锁操作失败,订阅消息,利用redis的pubsub提供一个通知机制来减少不断的重试,避免发生活锁。
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    commandExecutor.syncSubscription(future);

    try {
        while (true) {
            ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }
            //步骤3、getLath()获取RedissionLockEntry实例latch变量,由于permits为0,所以调用acquire()方法后线程阻塞。
            // waiting for message
            if (ttl >= 0) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().acquire();
            }

        }
    } finally {
        unsubscribe(future, threadId);
    }
//  get(lockAsync(leaseTime, unit));
}

其实很多这种循环阻塞的逻辑都不会使用空轮询,如果用空轮询的话太耗费CPU的性能,比如我们有定时去查询数据表中是否有数据,会使用while无线循环,但是配合Thread.sleep来防止空轮询,还有的会借助java的阻塞队列来防止空轮询,这里猜测阻塞队列是借助操作系统内核的中断来唤醒的,这个具体得去研究系统底层实现,这里就不多说,而redisssion是通过消息订阅来阻塞的,也防止了空轮询。总之,尽量避免使用空轮询。

3、解锁

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
        "if (redis.call('exists', KEYS[1]) == 0) then " +
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; " +
        "end;" +
        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
            "return nil;" +
        "end; " +
        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
        "if (counter > 0) then " +
            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
            "return 0; " +
        "else " +
            "redis.call('del', KEYS[1]); " +
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; "+
        "end; " +
        "return nil;",
        Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

KEYS[1]为getName();
KEYS[2]为getChannelName();
ARGV[1]为LockPubSub.unlockMessage;
ARGV[2]为internalLockLeaseTime;
ARGV[3]为getLockName(threadId);

  • (1)第一个if判断锁对应key是否存在,不存在则publish消息,将获取锁被阻塞的线程恢复重新获取锁;
  • (2)第二个if判断锁对应key存在,value中是否存在当前要释放锁的标示,不存在返回nil,确保锁只能被持有的线程释放。
  • (3)对应key存在,value中存在当前要释放锁的标示,将锁标示对应值-1,第三个if判断锁标示对应的值是否大于0,大于0,表示有锁重入情况发生,重新设置锁过期时间。
  • (4)对应key存在,value中存在当前要释放锁的标示,将锁标示对应值-1后等于0,调用del操作释放锁,并publish消息,将获取锁被阻塞的线程恢复重新获取锁;

订阅者将会收到如下消息。

public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
    ...
    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
        if (message.equals(unlockMessage)) {
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute != null) {
                runnableToExecute.run();
            }

            value.getLatch().release();
        } else if (message.equals(readUnlockMessage)) {
            while (true) {
                Runnable runnableToExecute = value.getListeners().poll();
                if (runnableToExecute == null) {
                    break;
                }
                runnableToExecute.run();
            }
            //订阅者接收到publish消息后,执行release操作,调用acquire被阻塞的线程将继续执行获取锁操作。
            value.getLatch().release(value.getLatch().getQueueLength());
        }
    }
    ...
}

这里就跟步骤2的所等待逻辑互相呼应了

...
RFuture<RedissonLockEntry> future = subscribe(threadId);
...
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
...

4、watch dog自动延期机制

redisson默认超时时间是30s,假设我们的业务处理逻辑超过了30秒怎么办?redisson是这样实现的,当客户端
一获得锁,则会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。

源码如下:



看门狗因为是新启动一个后台线程在运行,会对性能有一定的影响。

三、redisson实现分布式锁是否满足要求

1、确保互斥

要求:在同一时刻,必须保证锁至多只能被一个客户端持有。

redisson实现:redisson用lua脚本来保证原子操作,判断key是否存在,存在者锁等待,所以是满足于该要求的。

2、不能死锁

要求:在一个客户端在持有锁的期间崩溃而没有主动解锁情况下,也能保证后续其他客户端能加锁。

redisson实现:redisson对锁设置了超时时间,再客户端奔溃的情况下,超时后会自动被清除,所以不会死锁,满足要求。

3、避免活锁

要求:在获取锁失败的情况下,反复进行重试操作,占用Cpu资源,影响性能。
redisson实现:redisson通过消息订阅机制来防止空轮询,满足要求。

4、实现更多锁特性:锁中断、锁重入、锁超时等。

要求:确保客户端只能解锁自己持有的锁。
redisson实现:redisson通过map的结构来实现锁的重入,同一先线程进入只不过value+1,满足要求并且还有看门狗防止客户端还存在的情况下锁超时。

Tags:

最近发表
标签列表