分布式锁的场景
1、互斥
在并发情况下,同一个时刻锁只能被一个线程占有;
2、防止死锁
当出现服务宕机,无法释放持有锁时,需要有超时策略来保证锁自动释放;
3、性能
只需要锁定核心的共享资源,防止锁的范围太大,业务处理的时间过长,导致并发情况下大量的线程阻塞;
4、可重入
可重入指的是线程在获得锁之后,再次获取该锁不需要阻塞;
Redission锁实现原理
为什么要使用Lua脚本?
因为整个加锁与解锁的过程操作的命令过多,redis又是单线程执行执行命令的, 通过Lua脚本是为了保证我们的操作的原子性。
Redis默认的存储结构是怎么样的?
1.采用Hash的存储结构,hash存储的方式为<key,<key1,value>>,为了程序的可读性;
2.key为 commandExecutor.getConnectionManager().getId()+线程ID,记录线程的ID为了实现可重入
3.value为2, 记录当前线程获取锁的次数,目前当前是获取了2次锁;
Redis分布式锁的缺点
在哨兵模式或者主从模式下,如果 master实例宕机的时候,此时 master的锁没有同步到slave,可能导致多个客户端同时完成加锁。
Redisson的源码
/**
* lock源码:
**/
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
// 通过Lua脚本
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
//定时任务给锁续期
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
// 通过lua脚本访问Redis,保证操作的原子性
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
/**
* unlock源码:
**/
//通过lua脚本访问Redis,保证操作的原子性
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
lock与unlock都是使用Lua脚本去操作的。
<!-- maven版本号 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.2</version>
</dependency>
整个加锁与解锁Demo
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
/**
* RedissonLock 使用
*/
public class RedissonDemo {
public static void main(String[] args) {
//配置信息
Config config = new Config();
String url = "redis://xxxxxx:6379";
config.useSingleServer().setAddress(url)
.setPassword("xxxxxx")
.setDatabase(0);
//通过配置获取连接对象
RedissonClient redissonClient = Redisson.create( config );
//获RLock锁对象
RLock rLock = redissonClient.getLock("Alock");
//加锁
rLock.lock();
System.out.println( "threadId=" + Thread.currentThread().getName() );
try {
System.out.println( "获取锁成功" );
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放锁
rLock.unlock();
}
System.out.println( "释放锁成功" );
}
}
觉得可以的话可以给个点赞+关注,编程的路上一起前行[呲牙],谢谢!