优秀的编程知识分享平台

网站首页 > 技术文章 正文

慢谈 Redis 实现分布式锁 以及 Redisson 源码解析

nanyue 2024-09-01 00:06:46 技术文章 6 ℃

# 产生背景

Distributed locks are a very useful primitive in many environments where different processes must operate with shared resources in a mutually exclusive way.

在某些场景中,多个进程必须以互斥的方式独占共享资源,这时用分布式锁是最直接有效的。

随着互联网技术快速发展,数据规模增大,分布式系统越来越普及,一个应用往往会部署在多台机器上(多节点),在有些场景中,为了保证数据不重复,要求在同一时刻,同一任务只在一个节点上运行,即保证某一方法同一时刻只能被一个线程执行。在单机环境中,应用是在同一进程下的,只需要保证单进程多线程环境中的线程安全性,通过 JAVA 提供的 volatile、ReentrantLock、synchronized 以及 concurrent 并发包下一些线程安全的类等就可以做到。而在多机部署环境中,不同机器不同进程,就需要在多进程下保证线程的安全性了。因此,分布式锁应运而生。


实现分布式锁的三种选择

  • 基于数据库实现分布式锁
  • 基于zookeeper实现分布式锁
  • 基于Redis缓存实现分布式锁

以上三种方式都可以实现分布式锁,其中,从健壮性考虑, 用 zookeeper 会比用 Redis 实现更好,但从性能角度考虑,基于 Redis 实现性能会更好,如何选择,还是取决于业务需求。

# 基于 Redis 实现分布式锁的三种方案

  • 用 Redis 实现分布式锁的正确姿势(实现一)
  • 用 Redisson 实现分布式可重入锁(RedissonLock)(实现二)
  • 用 Redisson 实现分布式锁(红锁 RedissonRedLock)(实现三)

本文主要探讨基于 Redis 实现分布式锁的方案,主要分析并对比了以上三种方案,并大致分析了 Redisson 的 RedissonLock 、 RedissonRedLock 源码。

# 分布式锁需满足四个条件

首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

  1. 互斥性。在任意时刻,只有一个客户端能持有锁。
  2. 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  3. 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了,即不能误解锁。
  4. 具有容错性。只要大多数Redis节点正常运行,客户端就能够获取和释放锁。

# 用 Redis 实现分布式锁的正确姿势(实现一)

主要思路

通过 set key value px milliseconds nx 命令实现加锁, 通过Lua脚本实现解锁。核心实现命令如下:

1
//获取锁(unique_value可以是UUID等)
2
SET resource_name unique_value NX PX  30000
3
 
4
//释放锁(lua脚本中,一定要比较value,防止误解锁)
5
if redis.call("get",KEYS[1]) == ARGV[1] then
6
return redis.call("del",KEYS[1])
7
else
8
return 0
9
end

这种实现方式主要有以下几个要点:

  • set 命令要用 set key value px milliseconds nx,替代 setnx + expire 需要分两次执行命令的方式,保证了原子性,
  • value 要具有唯一性,可以使用UUID.randomUUID().toString()方法生成,用来标识这把锁是属于哪个请求加的,在解锁的时候就可以有依据;
  • 释放锁时要验证 value 值,防止误解锁;
  • 通过 Lua 脚本来避免 Check And Set 模型的并发问题,因为在释放锁的时候因为涉及到多个Redis操作 (利用了eval命令执行Lua脚本的原子性);

完整代码实现如下:

01
public class RedisTool {
02
<pre><code>private static final String LOCK_SUCCESS = "OK";
03
private static final String SET_IF_NOT_EXIST = "NX";
04
private static final String SET_WITH_EXPIRE_TIME = "PX";
05
private static final Long RELEASE_SUCCESS = 1L;
06
 
07
/**
08
 * 获取分布式锁(加锁代码)
09
 * @param jedis Redis客户端
10
 * @param lockKey 锁
11
 * @param requestId 请求标识
12
 * @param expireTime 超期时间
13
 * @return 是否获取成功
14
 */
15
public static boolean getDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
16
 
17
    String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
18
 
19
    if (LOCK_SUCCESS.equals(result)) {
20
        return true;
21
    }
22
    return false;
23
}
24
 
25
/**
26
 * 释放分布式锁(解锁代码)
27
 * @param jedis Redis客户端
28
 * @param lockKey 锁
29
 * @param requestId 请求标识
30
 * @return 是否释放成功
31
 */
32
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
33
 
34
    String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else               return 0 end";
35
 
36
    Object result = jedis.eval(script, Collections.singletonList(lockKey), C                                                   ollections.singletonList(requestId));
37
 
38
    if (RELEASE_SUCCESS.equals(result)) {
39
        return true;
40
    }
41
    return false;
42
 
43
}</code></pre>
44
}

加锁代码分析

首先,set()加入了NX参数,可以保证如果已有key存在,则函数不会调用成功,也就是只有一个客户端能持有锁,满足互斥性。其次,由于我们对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即key被删除),不会发生死锁。最后,因为我们将value赋值为requestId,用来标识这把锁是属于哪个请求加的,那么在客户端在解锁的时候就可以进行校验是否是同一个客户端。

解锁代码分析

将Lua代码传到jedis.eval()方法里,并使参数KEYS[1]赋值为lockKey,ARGV[1]赋值为requestId。在执行的时候,首先会获取锁对应的value值,检查是否与requestId相等,如果相等则解锁(删除key)。

这种方式仍存在单点风险

以上实现在 Redis 正常运行情况下是没问题的,但如果存储锁对应key的那个节点挂了的话,就可能存在丢失锁的风险,导致出现多个客户端持有锁的情况,这样就不能实现资源的独享了。

  1. 客户端A从master获取到锁
  2. 在master将锁同步到slave之前,master宕掉了(Redis的主从同步通常是异步的)。
  3. 主从切换,slave节点被晋级为master节点
  4. 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。导致存在同一时刻存不止一个线程获取到锁的情况。

所以在这种实现之下,不论Redis的部署架构是单机模式、主从模式、哨兵模式还是集群模式,都存在这种风险。因为Redis的主从同步是异步的。 运行的是,Redis 之父 antirez 提出了 redlock算法 可以解决这个问题。

# Redisson 实现分布式可重入锁及源码分析 (RedissonLock)(实现二)

什么是 Redisson

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还实现了可重入锁(Reentrant Lock)、公平锁(Fair Lock、联锁(MultiLock)、 红锁(RedLock)、 读写锁(ReadWriteLock)等,还提供了许多分布式服务。Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

Redisson 分布式重入锁用法

Redisson 支持单点模式、主从模式、哨兵模式、集群模式,这里以单点模式为例:

01
// 1.构造redisson实现分布式锁必要的Config
02
Config config = new Config();
03
config.useSingleServer().setAddress("redis://127.0.0.1:5379").setPassword("123456").setDatabase(0);
04
// 2.构造RedissonClient
05
RedissonClient redissonClient = Redisson.create(config);
06
// 3.获取锁对象实例(无法保证是按线程的顺序获取到)
07
RLock rLock = redissonClient.getLock(lockKey);
08
try {
09
/**
10
* 4.尝试获取锁
11
* waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
12
* leaseTime   锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
13
*/
14
boolean res = rLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
15
if (res) {
16
//成功获得锁,在这里处理业务
17
}
18
} catch (Exception e) {
19
throw new RuntimeException("aquire lock fail");
20
}finally{
21
//无论如何, 最后都要解锁
22
rLock.unlock();
23
}

加锁源码分析

1.通过 getLock 方法获取对象

org.redisson.Redisson#getLock()

01
@Override
02
public RLock getLock(String name) {
03
/**
04
*  构造并返回一个 RedissonLock 对象
05
* commandExecutor: 与 Redis 节点通信并发送指令的真正实现。需要说明一下,CommandExecutor 实现是通过 eval 命令来执行 Lua 脚本
06
* name: 锁的全局名称
07
* id: Redisson 客户端唯一标识,实际上就是一个 UUID.randomUUID()
08
*/
09
return new RedissonLock(commandExecutor, name, id);
10
}

2.通过tryLock方法尝试获取锁

tryLock方法里的调用关系大致如下:

org.redisson.RedissonLock#tryLock

01
@Override
02
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
03
//取得最大等待时间
04
long time = unit.toMillis(waitTime);
05
//记录下当前时间
06
long current = System.currentTimeMillis();
07
//取得当前线程id(判断是否可重入锁的关键)
08
long threadId = Thread.currentThread().getId();
09
//1.尝试申请锁,返回还剩余的锁过期时间
10
Long ttl = tryAcquire(leaseTime, unit, threadId);
11
//2.如果为空,表示申请锁成功
12
if (ttl == null) {
13
return true;
14
}
15
//3.申请锁的耗时如果大于等于最大等待时间,则申请锁失败
16
time -= System.currentTimeMillis() - current;
17
if (time Completed ,通知 Future 异步执行已完成
18
*/
19
acquireFailed(threadId);
20
return false;
21
}
22
<pre><code>    current = System.currentTimeMillis();
23
 
24
    /**
25
     * 4.订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
26
     * 基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争
27
     * 当 this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败
28
     * 当 this.await返回true,进入循环尝试获取锁
29
     */
30
    RFuture&lt;RedissonLockEntry&gt; subscribeFuture = subscribe(threadId);
31
    //await 方法内部是用CountDownLatch来实现阻塞,获取subscribe异步执行的结果(应用了Netty 的 Future)
32
    if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
33
        if (!subscribeFuture.cancel(false)) {
34
            subscribeFuture.onComplete((res, e) -&gt; {
35
                if (e == null) {
36
                    unsubscribe(subscribeFuture, threadId);
37
                }
38
            });
39
        }
40
        acquireFailed(threadId);
41
        return false;
42
    }
43
 
44
    try {
45
        //计算获取锁的总耗时,如果大于等于最大等待时间,则获取锁失败
46
        time -= System.currentTimeMillis() - current;
47
        if (time &lt;= 0) {
48
            acquireFailed(threadId);
49
            return false;
50
        }
51
 
52
        /**
53
         * 5.收到锁释放的信号后,在最大等待时间之内,循环一次接着一次的尝试获取锁
54
         * 获取锁成功,则立马返回true,
55
         * 若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回false结束循环
56
         */
57
        while (true) {
58
            long currentTime = System.currentTimeMillis();
59
            // 再次尝试申请锁
60
            ttl = tryAcquire(leaseTime, unit, threadId);
61
            // 成功获取锁则直接返回true结束循环
62
            if (ttl == null) {
63
                return true;
64
            }
65
 
66
            //超过最大等待时间则返回false结束循环,获取锁失败
67
            time -= System.currentTimeMillis() - currentTime;
68
            if (time &lt;= 0) {
69
                acquireFailed(threadId);
70
                return false;
71
            }
72
 
73
            /**
74
             * 6.阻塞等待锁(通过信号量(共享锁)阻塞,等待解锁消息):
75
             */
76
            currentTime = System.currentTimeMillis();
77
            if (ttl &gt;= 0 &amp;&amp; ttl &lt; time) {
78
                //如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。
79
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
80
            } else {
81
                //则就在wait time 时间范围内等待可以通过信号量
82
                getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
83
            }
84
 
85
            //7.更新剩余的等待时间(最大等待时间-已经消耗的阻塞时间)
86
            time -= System.currentTimeMillis() - currentTime;
87
            if (time &lt;= 0) {
88
                acquireFailed(threadId);
89
                return false;
90
            }
91
        }
92
    } finally {
93
        //7.无论是否获得锁,都要取消订阅解锁消息
94
        unsubscribe(subscribeFuture, threadId);
95
    }
96
}</code></pre>

其中 tryAcquire 内部通过调用 tryLockInnerAsync 实现申请锁的逻辑。申请锁并返回锁有效期还剩余的时间,如果为空说明锁未被其它线程申请则直接获取并返回,如果获取到时间,则进入等待竞争逻辑。

org.redisson.RedissonLock#tryLockInnerAsync

加锁流程图:

实现源码:

01
RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
02
internalLockLeaseTime = unit.toMillis(leaseTime);
03
<pre><code>    /**
04
     * 通过 EVAL 命令执行 Lua 脚本获取锁,保证了原子性
05
     */
06
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
07
              // 1.如果缓存中的key不存在,则执行 hset 命令(hset key UUID+threadId 1),然后通过 pexpire 命令设置锁的过期时间(即锁的租约时间)
08
              // 返回空值 nil ,表示获取锁成功
09
              "if (redis.call('exists', KEYS[1]) == 0) then " +
10
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
11
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
12
                  "return nil; " +
13
              "end; " +
14
               // 如果key已经存在,并且value也匹配,表示是当前线程持有的锁,则执行 hincrby 命令,重入次数加1,并且设置失效时间
15
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
16
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
17
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
18
                  "return nil; " +
19
              "end; " +
20
               //如果key已经存在,但是value不匹配,说明锁已经被其他线程持有,通过 pttl 命令获取锁的剩余存活时间并返回,至此获取锁失败
21
              "return redis.call('pttl', KEYS[1]);",
22
               //这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2]
23
               Collections.&lt;Object&gt;singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
24
}</code></pre>

参数说明:

  • KEYS[1]就是Collections.singletonList(getName()),表示分布式锁的key;
  • ARGV[1]就是internalLockLeaseTime,即锁的租约时间(持有锁的有效时间),默认30s;
  • ARGV[2]就是getLockName(threadId),是获取锁时set的唯一值 value,即UUID+threadId。

解锁源码分析

unlock 内部通过 get(unlockAsync(Thread.currentThread().getId())) 调用 unlockInnerAsync 解锁。

org.redisson.RedissonLock#unlock

01
@Override
02
public void unlock() {
03
try {
04
get(unlockAsync(Thread.currentThread().getId()));
05
} catch (RedisException e) {
06
if (e.getCause() instanceof IllegalMonitorStateException) {
07
throw (IllegalMonitorStateException) e.getCause();
08
} else {
09
throw e;
10
}
11
}
12
}

get方法利用是 CountDownLatch 在异步调用结果返回前将当前线程阻塞,然后通过 Netty 的 FutureListener 在异步调用完成后解除阻塞,并返回调用结果。

org.redisson.command.CommandAsyncService#get

01
@Override
02
public  V get(RFuture future) {
03
if (!future.isDone()) {   //任务还没完成
04
// 设置一个单线程的同步控制器
05
CountDownLatch l = new CountDownLatch(1);
06
future.onComplete((res, e) -&gt; {
07
//操作完成时,唤醒在await()方法中等待的线程
08
l.countDown();
09
});
10
<pre><code>        boolean interrupted = false;
11
        while (!future.isDone()) {
12
            try {
13
                //阻塞等待
14
                l.await();
15
            } catch (InterruptedException e) {
16
                interrupted = true;
17
                break;
18
            }
19
        }
20
 
21
        if (interrupted) {
22
            Thread.currentThread().interrupt();
23
        }
24
    }
25
 
26
    if (future.isSuccess()) {
27
        return future.getNow();
28
    }
29
 
30
    throw convertException(future);
31
}</code></pre>

org.redisson.RedissonLock#unlockInnerAsync

解锁流程图:

实现源码:

01
protected RFuture unlockInnerAsync(long threadId) {
02
/**
03
* 通过 EVAL 命令执行 Lua 脚本获取锁,保证了原子性
04
*/
05
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
06
//如果分布式锁存在,但是value不匹配,表示锁已经被其他线程占用,无权释放锁,那么直接返回空值(解铃还须系铃人)
07
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
08
"return nil;" +
09
"end; " +
10
//如果value匹配,则就是当前线程占有分布式锁,那么将重入次数减1
11
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
12
//重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只能更新失效时间,还不能删除
13
"if (counter &gt; 0) then " +
14
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
15
"return 0; " +
16
"else " +
17
//重入次数减1后的值如果为0,这时就可以删除这个KEY,并发布解锁消息,返回1
18
"redis.call('del', KEYS[1]); " +
19
"redis.call('publish', KEYS[2], ARGV[1]); " +
20
"return 1; "+
21
"end; " +
22
"return nil;",
23
//这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
24
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
25
<pre><code>}</code></pre>

解锁消息处理

org.redisson.pubsub#onMessage

01
public class LockPubSub extends PublishSubscribe {
02
<pre><code>public static final Long UNLOCK_MESSAGE = 0L;
03
public static final Long READ_UNLOCK_MESSAGE = 1L;
04
 
05
public LockPubSub(PublishSubscribeService service) {
06
    super(service);
07
}
08
 
09
@Override
10
protected RedissonLockEntry createEntry(RPromise&lt;RedissonLockEntry&gt; newPromise) {
11
    return new RedissonLockEntry(newPromise);
12
}
13
 
14
@Override
15
protected void onMessage(RedissonLockEntry value, Long message) {
16
 
17
    /**
18
     * 判断是否是解锁消息
19
     */
20
    if (message.equals(UNLOCK_MESSAGE)) {
21
        Runnable runnableToExecute = value.getListeners().poll();
22
        if (runnableToExecute != null) {
23
            runnableToExecute.run();
24
        }
25
 
26
        /**
27
         * 释放一个信号量,唤醒等待的entry.getLatch().tryAcquire去再次尝试申请锁
28
         */
29
        value.getLatch().release();
30
    } else if (message.equals(READ_UNLOCK_MESSAGE)) {
31
        while (true) {
32
            /**
33
             * 如果还有其他Listeners回调,则也唤醒执行
34
             */
35
            Runnable runnableToExecute = value.getListeners().poll();
36
            if (runnableToExecute == null) {
37
                break;
38
            }
39
            runnableToExecute.run();
40
        }
41
 
42
        value.getLatch().release(value.getLatch().getQueueLength());
43
    }
44
}</code></pre>
45
}

总结对比

通过 Redisson 实现分布式可重入锁(实现二),比纯自己通过set key value px milliseconds nx +lua 实现(实现一)的效果更好些,虽然基本原理都一样,因为通过分析源码可知,RedissonLock
是可重入的,并且考虑了失败重试,可以设置锁的最大等待时间, 在实现上也做了一些优化,减少了无效的锁申请,提升了资源的利用率。

需要特别注意的是,RedissonLock 同样没有解决 节点挂掉的时候,存在丢失锁的风险的问题。而现实情况是有一些场景无法容忍的,所以 Redisson 提供了实现了redlock算法的 RedissonRedLock,RedissonRedLock 真正解决了单点失败的问题,代价是需要额外的为 RedissonRedLock 搭建Redis环境。

所以,如果业务场景可以容忍这种小概率的错误,则推荐使用 RedissonLock, 如果无法容忍,则推荐使用 RedissonRedLock。

# redlock算法

Redis 官网对 redLock 算法的介绍大致如下:

The Redlock algorithm

在分布式版本的算法里我们假设我们有N个Redis master节点,这些节点都是完全独立的,我们不用任何复制或者其他隐含的分布式协调机制。之前我们已经描述了在Redis单实例下怎么安全地获取和释放锁。我们确保将在每(N)个实例上使用此方法获取和释放锁。在我们的例子里面我们把N设成5,这是一个比较合理的设置,所以我们需要在5台机器上面或者5台虚拟机上面运行这些实例,这样保证他们不会同时都宕掉。为了取到锁,客户端应该执行以下操作:

  1. 获取当前Unix时间,以毫秒为单位。
  2. 依次尝试从5个实例,使用相同的key和具有唯一性的value(例如UUID)获取锁。当向Redis请求获取锁时,客户端应该设置一个尝试从某个Reids实例获取锁的最大等待时间(超过这个时间,则立马询问下一个实例),这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁。
  3. 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁消耗的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的总耗时小于锁失效时间时,锁才算获取成功。
  4. 如果取到了锁,key的真正有效时间 = 有效时间(获取锁时设置的key的自动超时时间) – 获取锁的总耗时(询问各个Redis实例的总耗时之和)(步骤3计算的结果)。
  5. 如果因为某些原因,最终获取锁失败(即没有在至少 “N/2+1 ”个Redis实例取到锁或者“获取锁的总耗时”超过了“有效时间”),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,这样可以防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。

# 用 Redisson 实现分布式锁(红锁 RedissonRedLock)及源码分析(实现三)

这里以三个单机模式为例,需要特别注意的是他们完全互相独立,不存在主从复制或者其他集群协调机制。

01
Config config1 = new Config();
02
config1.useSingleServer().setAddress("redis://172.0.0.1:5378").setPassword("a123456").setDatabase(0);
03
RedissonClient redissonClient1 = Redisson.create(config1);
04
<pre><code>    Config config2 = new Config();
05
    config2.useSingleServer().setAddress("redis://172.0.0.1:5379").setPassword("a123456").setDatabase(0);
06
    RedissonClient redissonClient2 = Redisson.create(config2);
07
 
08
    Config config3 = new Config();
09
    config3.useSingleServer().setAddress("redis://172.0.0.1:5380").setPassword("a123456").setDatabase(0);
10
    RedissonClient redissonClient3 = Redisson.create(config3);
11
 
12
    /**
13
     * 获取多个 RLock 对象
14
     */
15
    RLock lock1 = redissonClient1.getLock(lockKey);
16
    RLock lock2 = redissonClient2.getLock(lockKey);
17
    RLock lock3 = redissonClient3.getLock(lockKey);
18
 
19
    /**
20
     * 根据多个 RLock 对象构建 RedissonRedLock (最核心的差别就在这里)
21
     */
22
    RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
23
 
24
    try {
25
        /**
26
         * 4.尝试获取锁
27
         * waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
28
         * leaseTime   锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
29
         */
30
        boolean res = redLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
31
        if (res) {
32
            //成功获得锁,在这里处理业务
33
        }
34
    } catch (Exception e) {
35
        throw new RuntimeException("aquire lock fail");
36
    }finally{
37
        //无论如何, 最后都要解锁
38
        redLock.unlock();
39
    }</code></pre>

最核心的变化就是需要构建多个 RLock ,然后根据多个 RLock 构建成一个 RedissonRedLock,因为 redLock 算法是建立在多个互相独立的 Redis 环境之上的(为了区分可以叫为 Redission node),Redission node 节点既可以是单机模式(single),也可以是主从模式(master/salve),哨兵模式(sentinal),或者集群模式(cluster)。这就意味着,不能跟以往这样只搭建 1个 cluster、或 1个 sentinel 集群,或是1套主从架构就了事了,需要为 RedissonRedLock 额外搭建多几套独立的 Redission 节点。 比如可以搭建3个 或者5个 Redission节点,具体可看视资源及业务情况而定。

下图是一个利用多个 Redission node 最终 组成 RedLock分布式锁的例子,需要特别注意的是每个 Redission node 是互相独立的,不存在任何复制或者其他隐含的分布式协调机制。

# Redisson 实现redlock算法源码分析(RedLock)

加锁核心代码

org.redisson.RedissonMultiLock#tryLock

001
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
002
long newLeaseTime = -1;
003
if (leaseTime != -1) {
004
newLeaseTime = unit.toMillis(waitTime)*2;
005
}
006
<pre><code>    long time = System.currentTimeMillis();
007
    long remainTime = -1;
008
    if (waitTime != -1) {
009
        remainTime = unit.toMillis(waitTime);
010
    }
011
    long lockWaitTime = calcLockWaitTime(remainTime);
012
    /**
013
     * 1. 允许加锁失败节点个数限制(N-(N/2+1))
014
     */
015
    int failedLocksLimit = failedLocksLimit();
016
    /**
017
     * 2. 遍历所有节点通过EVAL命令执行lua加锁
018
     */
019
    List&lt;RLock&gt; acquiredLocks = new ArrayList&lt;&gt;(locks.size());
020
    for (ListIterator&lt;RLock&gt; iterator = locks.listIterator(); iterator.hasNext();) {
021
        RLock lock = iterator.next();
022
        boolean lockAcquired;
023
        /**
024
         *  3.对节点尝试加锁
025
         */
026
        try {
027
            if (waitTime == -1 &amp;&amp; leaseTime == -1) {
028
                lockAcquired = lock.tryLock();
029
            } else {
030
                long awaitTime = Math.min(lockWaitTime, remainTime);
031
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
032
            }
033
        } catch (RedisResponseTimeoutException e) {
034
            // 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁所有节点
035
            unlockInner(Arrays.asList(lock));
036
            lockAcquired = false;
037
        } catch (Exception e) {
038
            // 抛出异常表示获取锁失败
039
            lockAcquired = false;
040
        }
041
 
042
        if (lockAcquired) {
043
            /**
044
             *4. 如果获取到锁则添加到已获取锁集合中
045
             */
046
            acquiredLocks.add(lock);
047
        } else {
048
            /**
049
             * 5. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N-(N/2+1))
050
             * 如果已经到达, 就认定最终申请锁失败,则没有必要继续从后面的节点申请了
051
             * 因为 Redlock 算法要求至少N/2+1 个节点都加锁成功,才算最终的锁申请成功
052
             */
053
            if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
054
                break;
055
            }
056
 
057
            if (failedLocksLimit == 0) {
058
                unlockInner(acquiredLocks);
059
                if (waitTime == -1 &amp;&amp; leaseTime == -1) {
060
                    return false;
061
                }
062
                failedLocksLimit = failedLocksLimit();
063
                acquiredLocks.clear();
064
                // reset iterator
065
                while (iterator.hasPrevious()) {
066
                    iterator.previous();
067
                }
068
            } else {
069
                failedLocksLimit--;
070
            }
071
        }
072
 
073
        /**
074
         * 6.计算 目前从各个节点获取锁已经消耗的总时间,如果已经等于最大等待时间,则认定最终申请锁失败,返回false
075
         */
076
        if (remainTime != -1) {
077
            remainTime -= System.currentTimeMillis() - time;
078
            time = System.currentTimeMillis();
079
            if (remainTime &lt;= 0) {
080
                unlockInner(acquiredLocks);
081
                return false;
082
            }
083
        }
084
    }
085
 
086
    if (leaseTime != -1) {
087
        List&lt;RFuture&lt;Boolean&gt;&gt; futures = new ArrayList&lt;&gt;(acquiredLocks.size());
088
        for (RLock rLock : acquiredLocks) {
089
            RFuture&lt;Boolean&gt; future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
090
            futures.add(future);
091
        }
092
 
093
        for (RFuture&lt;Boolean&gt; rFuture : futures) {
094
            rFuture.syncUninterruptibly();
095
        }
096
    }
097
 
098
    /**
099
     * 7.如果逻辑正常执行完则认为最终申请锁成功,返回true
100
     */
101
    return true;
102
}</code></pre>

Tags:

最近发表
标签列表