技术要点
- 临时顺序节点
- 监听事件
- 线程间通信
回顾ZK知识点
节点类型
持久节点:节点被创建后,一直存在,不会随着会话(session)失效而消失。 临时节点:临时节点的生命周期和会话(session)绑定,一旦会话失效,临时节点会被自动清除掉。注意:会话失效不等于连接断开,通过下面两个异常来理解下这句话。我们经常遇见这两类异常CONNECTIONLOSS(连接断开)和SESSIONEXPIRED(Session过期),连接断开(CONNECTIONLOSS):一般发生在网络闪断或所连接zk服务器挂掉;Session过期(SESSIONEXPIRED):一般发生在连接断开后,超过SESSION_TIMEOUT后还没有成功连接上zk服务器。 顺序节点:创建节点时,自动给节点名加上一个数字编号后缀。
监听器(watcher)
监听节点/数据的变化,类似于发布订阅功能。
Watcher事件类型:
事件 | 触发时机 |
NodeCreated | 节点被创建时 |
NodeDeleted | 节点被删除时 |
NodeDataChanged | 节点内容发生修改时 |
NodeChildrenChanged | 子节点列表发生变更时 |
注册方法和事件对应关系:
注册方法 | Created | ChildrenChanged | DataChanged | Deleted |
zk.exists("/node",watcher) | √ | √ | √ | |
zk.getData("/node",watcher) | √ | √ | ||
zk.getChildren("/node",watcher) | √ | √ |
实现原理
zk分布式锁实现原理是利用临时顺序节点特性和watch机制。为什么要用临时顺序节点?临时的作用类似于设置超时时间,防止宕机出现死锁。顺序:我们知道分布式锁的特点是 “互斥“,redis分布式锁的互斥通过set NX PX命令,DB分布式锁的互斥是通过for update和版本号,而zk有自己顺序节点,可以让抢锁者按照一定顺序来获取锁,也是一种互斥。让我们来看看具体实现原理:
- 创建锁根目录(/zk_lock)
- 在根目录(/zk_lock)下创建临时顺序节点。
- 获取根目录(/zk_lock)下的所有子节点并排序,如果自己排在第一位,则抢锁成功,反之,没有抢到锁,则监听前一个节点是否释放锁。
代码实现
@Slf4j
public class ZkLock {
private final String LOCK_ROOT = "/zk_lock";
private final ConcurrentMap<Thread, String> threadData = Maps.newConcurrentMap();
private final Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
notifyFromWatcher();
}
};
/**
* 加锁
* @param key
* @param timeOutMs 超时时间,单位:毫秒
* @return
*/
public boolean tryLock(String key, Long timeOutMs) {
Long startTime = System.currentTimeMillis();
String lockPath = String.format("%s/%s",LOCK_ROOT, key);
String ourPath = ZkManager.createEphemeralSeqNode(lockPath);
boolean isSuccess = innerLock(startTime, timeOutMs, ourPath);
if (isSuccess) {
threadData.put(Thread.currentThread(), ourPath);
}
return isSuccess;
}
/**
* 解锁
*/
public void unLock(){
String lockPath = threadData.get(Thread.currentThread());
if (!StringUtils.isEmpty(lockPath)){
ZkManager.delNode(lockPath);
}
}
private boolean innerLock(Long startTime, Long timeOutMs, String ourPath){
log.error("innerLock ourPath:" + ourPath);
// 是否获取的锁
boolean haveTheLock = false;
// 是否删除超时节点
boolean isDeleteTimeOutNode = false;
try {
while (!haveTheLock){
String sequenceNodeName = ourPath.substring(LOCK_ROOT.length() + 1);
List<String> childNodes = ZkManager.getNodeChild(LOCK_ROOT);
Collections.sort(childNodes);
int index = childNodes.indexOf(sequenceNodeName);
if (index == 0) {
haveTheLock = true;
break;
}
synchronized (this){
// watch前一个节点
String previousNodePath = String.format("%s/%s",LOCK_ROOT, childNodes.get(index-1));
log.error("innerLock previousNodePath:" + previousNodePath);
try {
ZkManager.getClient().getData().usingWatcher(watcher).forPath(previousNodePath);
if (timeOutMs == null) {
wait();
}
timeOutMs -= (System.currentTimeMillis() - startTime);
log.warn("innerLock timeOutMs:" + timeOutMs);
startTime = System.currentTimeMillis();
if (timeOutMs <= 0) {
isDeleteTimeOutNode = true;
break;
}
wait(timeOutMs);
} catch (KeeperException.NoNodeException e) {
// e.printStackTrace();
}
}
}
} catch (Exception e) {
isDeleteTimeOutNode = true;
e.printStackTrace();
} finally {
if (isDeleteTimeOutNode) {
ZkManager.delNode(ourPath);
}
}
return haveTheLock;
}
/**
* 通知waiter
*/
private synchronized void notifyFromWatcher(){
notifyAll();
}
}
总结:
- 实现了互斥、锁超时自动失效、阻塞等待、公平性。
- 不足之处:未实现可重入(只需本地变量控制下就可以了)
锁超时并发问题
产生的原因是:GC停顿导致临时节点释放,如下图所示:
客户端1发生GC停顿的时候,Zookeeper检测不到心跳,也是有可能出现多个客户端同时操作共享资源的情形。当然,你可以说,我们可以通过JVM调优,避免GC停顿出现。但是注意了,我们所做的一切,只能尽可能避免多个客户端操作共享资源,无法完全消除。
不存在集群宕机并发执行问题
- 所有写操作,都是由leader来完成的,可以认为全局串行写操作。
- leader宕机,会选出新的leader,继续上面写操作。
实战演练
100个线程,使用zk分布式锁对count进行加1操作,验证结果是否是100?
@Slf4j
public class ZkLockTest {
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
int threadCount = 100;
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
Thread thread = new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
ZkLock zkLock = new ZkLock();
try {
boolean isSuccess = zkLock.tryLock("testLock",60000L);
if (!isSuccess) {
log.error("innerLock 加锁失败!");
return;
}
log.error("innerLock 加锁成功!" + Thread.currentThread().getName());
count++;
log.error("thread:"+ Thread.currentThread().getName() +" count:" + count);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
zkLock.unLock();
log.error("innerLock 解锁成功!");
countDownLatch.countDown();
}
}
});
thread.start();
}
countDownLatch.await();
}
}
结果正确:
[11/08/21 03:15:26:026 CST] Thread-83 ERROR zk.ZkLockTest: thread:Thread-83 count:100
遇到的坑
- 使用NodeCache监听时,锁已经释放了,但是被通知的节点还是进入了等待,而没有立即获取到锁,如图所示:
问题就出在NodeCache竟然可以监听不存在的节点的,所以才会进入等待。