优秀的编程知识分享平台

网站首页 > 技术文章 正文

实现ZK分布式锁,你注意到这个坑了吗?

nanyue 2024-09-01 20:39:15 技术文章 9 ℃


技术要点

  • 临时顺序节点
  • 监听事件
  • 线程间通信

回顾ZK知识点

节点类型

持久节点:节点被创建后,一直存在,不会随着会话(session)失效而消失。 临时节点:临时节点的生命周期和会话(session)绑定,一旦会话失效,临时节点会被自动清除掉。注意:会话失效不等于连接断开,通过下面两个异常来理解下这句话。我们经常遇见这两类异常CONNECTIONLOSS(连接断开)和SESSIONEXPIRED(Session过期),连接断开(CONNECTIONLOSS):一般发生在网络闪断或所连接zk服务器挂掉;Session过期(SESSIONEXPIRED):一般发生在连接断开后,超过SESSION_TIMEOUT后还没有成功连接上zk服务器。 顺序节点:创建节点时,自动给节点名加上一个数字编号后缀。

监听器(watcher)

监听节点/数据的变化,类似于发布订阅功能。

Watcher事件类型

事件

触发时机

NodeCreated

节点被创建时

NodeDeleted

节点被删除时

NodeDataChanged

节点内容发生修改时

NodeChildrenChanged

子节点列表发生变更时

注册方法和事件对应关系

注册方法

Created

ChildrenChanged

DataChanged

Deleted

zk.exists("/node",watcher)


zk.getData("/node",watcher)



zk.getChildren("/node",watcher)



实现原理

zk分布式锁实现原理是利用临时顺序节点特性和watch机制。为什么要用临时顺序节点?临时的作用类似于设置超时时间,防止宕机出现死锁。顺序:我们知道分布式锁的特点是 “互斥“,redis分布式锁的互斥通过set NX PX命令,DB分布式锁的互斥是通过for update和版本号,而zk有自己顺序节点,可以让抢锁者按照一定顺序来获取锁,也是一种互斥。让我们来看看具体实现原理:

  • 创建锁根目录(/zk_lock)
  • 在根目录(/zk_lock)下创建临时顺序节点。
  • 获取根目录(/zk_lock)下的所有子节点并排序,如果自己排在第一位,则抢锁成功,反之,没有抢到锁,则监听前一个节点是否释放锁。

代码实现

@Slf4j
public class ZkLock {

    private final String LOCK_ROOT = "/zk_lock";
    private final ConcurrentMap<Thread, String> threadData = Maps.newConcurrentMap();

    private final Watcher watcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            notifyFromWatcher();
        }
    };

    /**
     * 加锁
     * @param key
     * @param timeOutMs 超时时间,单位:毫秒
     * @return
     */
    public boolean tryLock(String key, Long timeOutMs) {

        Long startTime = System.currentTimeMillis();

        String lockPath = String.format("%s/%s",LOCK_ROOT, key);
        String ourPath = ZkManager.createEphemeralSeqNode(lockPath);
        boolean isSuccess = innerLock(startTime, timeOutMs, ourPath);
        if (isSuccess) {
            threadData.put(Thread.currentThread(), ourPath);
        }

        return isSuccess;
    }

    /**
     * 解锁
     */
    public void unLock(){
        String lockPath = threadData.get(Thread.currentThread());
        if (!StringUtils.isEmpty(lockPath)){
            ZkManager.delNode(lockPath);
        }
    }

    private boolean innerLock(Long startTime, Long timeOutMs, String ourPath){

        log.error("innerLock ourPath:" + ourPath);

        // 是否获取的锁
        boolean haveTheLock = false;
        // 是否删除超时节点
        boolean isDeleteTimeOutNode = false;

        try {
            while (!haveTheLock){

                String sequenceNodeName = ourPath.substring(LOCK_ROOT.length() + 1);
                List<String> childNodes = ZkManager.getNodeChild(LOCK_ROOT);
                Collections.sort(childNodes);
                int index = childNodes.indexOf(sequenceNodeName);
                if (index == 0) {
                    haveTheLock = true;
                    break;
                }

                synchronized (this){

                    // watch前一个节点
                    String previousNodePath = String.format("%s/%s",LOCK_ROOT, childNodes.get(index-1));
                    log.error("innerLock previousNodePath:" + previousNodePath);
                    try {
                        ZkManager.getClient().getData().usingWatcher(watcher).forPath(previousNodePath);

                        if (timeOutMs == null) {
                            wait();
                        }

                        timeOutMs -= (System.currentTimeMillis() - startTime);
                        log.warn("innerLock timeOutMs:" + timeOutMs);

                        startTime = System.currentTimeMillis();
                        if (timeOutMs <= 0) {
                            isDeleteTimeOutNode = true;
                            break;
                        }
                        wait(timeOutMs);

                    } catch (KeeperException.NoNodeException e) {
//                        e.printStackTrace();
                    }
                }
            }
        } catch (Exception e) {
            isDeleteTimeOutNode = true;
            e.printStackTrace();
        } finally {
            if (isDeleteTimeOutNode) {
                ZkManager.delNode(ourPath);
            }
        }

        return haveTheLock;
    }

    /**
     * 通知waiter
     */
    private synchronized void notifyFromWatcher(){
        notifyAll();
    }
}

总结:

  • 实现了互斥、锁超时自动失效、阻塞等待、公平性。
  • 不足之处:未实现可重入(只需本地变量控制下就可以了)

锁超时并发问题

产生的原因是:GC停顿导致临时节点释放,如下图所示:

客户端1发生GC停顿的时候,Zookeeper检测不到心跳,也是有可能出现多个客户端同时操作共享资源的情形。当然,你可以说,我们可以通过JVM调优,避免GC停顿出现。但是注意了,我们所做的一切,只能尽可能避免多个客户端操作共享资源,无法完全消除

不存在集群宕机并发执行问题

  • 所有写操作,都是由leader来完成的,可以认为全局串行写操作。
  • leader宕机,会选出新的leader,继续上面写操作。

实战演练

100个线程,使用zk分布式锁对count进行加1操作,验证结果是否是100?

@Slf4j
public class ZkLockTest {

    private static int count = 0;

    public static void main(String[] args) throws InterruptedException {

        int threadCount = 100;
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {

            Thread thread = new Thread(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    ZkLock zkLock = new ZkLock();
                    try {
                        boolean isSuccess = zkLock.tryLock("testLock",60000L);
                        if (!isSuccess) {
                            log.error("innerLock 加锁失败!");
                            return;
                        }
                        log.error("innerLock 加锁成功!" + Thread.currentThread().getName());
                        count++;
                        log.error("thread:"+ Thread.currentThread().getName() +" count:" + count);

                    } catch (Exception ex) {
                        ex.printStackTrace();
                    } finally {
                        zkLock.unLock();
                        log.error("innerLock 解锁成功!");
                        countDownLatch.countDown();
                    }
                }
            });
            thread.start();
        }
        countDownLatch.await();
    }
}

结果正确:

[11/08/21 03:15:26:026 CST] Thread-83 ERROR zk.ZkLockTest: thread:Thread-83 count:100

遇到的坑

  • 使用NodeCache监听时,锁已经释放了,但是被通知的节点还是进入了等待,而没有立即获取到锁,如图所示:

问题就出在NodeCache竟然可以监听不存在的节点的,所以才会进入等待。

Tags:

最近发表
标签列表