优秀的编程知识分享平台

网站首页 > 技术文章 正文

「超级详细」Nacos健康检查源码解析

nanyue 2024-10-21 06:15:05 技术文章 4 ℃

本章内容

客户端

客户端发送心跳请求交互逻辑,如图所示:

客户端发送心跳请求入口

服务启动时向Nacos服务端发送注册请求,如果服务为临时实例,则会向Nacos服务端发送心跳请求。

源码如下:

// com.alibaba.nacos.client.naming.NacosNamingService#registerInstance
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 判断是否为临时实例
    if (instance.isEphemeral()) {
        // 构建心跳信息
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
        // 向Nacos服务端定时(5秒/次)发送心跳请求
        beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
    // 向Nacos服务端注册服务实例
    serverProxy.registerService(groupedServiceName, groupName, instance);
}

BeatInfo

BeatInfo为心跳信息实例类,用于封装服务心跳信息。

主要属性:

public class BeatInfo {
    // 服务实例端口
    private int port;
    // 服务实例IP
    private String ip;
    // 服务实例权重
    private double weight;
    // 服务名
    private String serviceName;
    // 服务集群名
    private String cluster;
    private Map<String, String> metadata;
    private volatile boolean scheduled;
    // 心跳频率
    private volatile long period;
    private volatile boolean stopped;
    // ....
}

BeatReactor

BeatReactor是一个心跳反应器,内部维护了一个ScheduledExecutorService(计划任务线程池),用于将服务实例心跳信息封装成Task(线程),并放入计划任务线程池中,延时指定时间后向Nacos服务端发送心跳请求。

BeatReactor#addBeatInfo方法主要做以下事情:

  • 将心跳信息封装成Task(线程),并放入计划任务线程池中延时指定时间(默认为5秒)后向Nacos服务端发送心跳请求。

源码如下:

// com.alibaba.nacos.client.naming.beat.BeatReactor.addBeatInfo
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
    // key格式:组名@@服务名#服务IP#服务端口,如:DEFAULT_GROUP@@order-server#192.168.31.106#8081
    String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
    BeatInfo existBeat = null;
    //fix #1733
    if ((existBeat = dom2Beat.remove(key)) != null) {
        existBeat.setStopped(true);
    }
    // Map<String, BeatInfo> dom2Beat,key:组名@@服务名#服务IP#服务端口,value:心跳信息
    dom2Beat.put(key, beatInfo);
    // 将心跳信息封装成Task(线程),并放入计划任务线程池中延时指定时间(默认为5秒)后向Nacos服务端发送心跳请求
    executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
    MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

BeatTask

BeatTask实现了Runnable接口,用于封装心跳信息并向Nacos服务端发送心跳请求。

BeatTask#run方法主要做以下事情:

  • 调用心跳检测接口向Nacos服务端发送心跳请求。
  • 根据服务端返回的响应信息判断服务是否已注册,如果服务未注册,则向Nacos服务端发送服务注册请求。
  • 延迟指定时间后继续向Nacos服务端发送心跳请求(即:按指定周期向Naco服务端发送心跳请求)。

源码如下:

// com.alibaba.nacos.client.naming.beat.BeatReactor.BeatTask.run
public void run() {
    if (beatInfo.isStopped()) {
        return;
    }
    // 获取心跳时间间隔
    long nextTime = beatInfo.getPeriod();
    try {
        // 调用心跳检测接口向Nacos服务端发送心跳请求
        JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
        long interval = result.get("clientBeatInterval").asLong();
        boolean lightBeatEnabled = false;
        if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
            lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
        }
        BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
        if (interval > 0) {
            nextTime = interval;
        }
        int code = NamingResponseCode.OK;
        if (result.has(CommonParams.CODE)) {
            code = result.get(CommonParams.CODE).asInt();
        }
        // 根据服务端返回的响应信息判断服务是否已注册
        // 如果服务未注册,则向Nacos服务端发送服务注册请求
        if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
            // 创建服务注册实例并封装服务注册信息
            Instance instance = new Instance();
            instance.setPort(beatInfo.getPort());
            instance.setIp(beatInfo.getIp());
            instance.setWeight(beatInfo.getWeight());
            instance.setMetadata(beatInfo.getMetadata());
            instance.setClusterName(beatInfo.getCluster());
            instance.setServiceName(beatInfo.getServiceName());
            instance.setInstanceId(instance.getInstanceId());
            instance.setEphemeral(true);
            try {
                // 调用服务注册接口向Nacos服务端发送服务注册请求
                serverProxy.registerService(beatInfo.getServiceName(),
                NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
            } catch (Exception ignore) {
            }
        }
    } catch (NacosException ex) {
        // ...
    }
    // 延迟指定时间后继续向Nacos服务端发送心跳请求
    executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
// com.alibaba.nacos.client.naming.net.NamingProxy#sendBeat
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
    
    if (NAMING_LOGGER.isDebugEnabled()) {
        NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
    }
    // 组装心跳请求参数
    Map<String, String> params = new HashMap<String, String>(8);
    Map<String, String> bodyMap = new HashMap<String, String>(2);
    if (!lightBeatEnabled) {
        bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
    }
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
    params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
    params.put("ip", beatInfo.getIp());
    params.put("port", String.valueOf(beatInfo.getPort()));
    /// 调用心跳接口(/nacos/v1/ns/instance/beat)向Nacos服务端发送心跳请求
    String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
    return JacksonUtils.toObj(result);
}

服务端

临时实例

对于临时实例(ephemeral=true),服务端心跳检测主要包含两部分内容:

  • 1)处理客户端心跳请求
  • 2)定时检测服务实例是否已过期。

处理心跳请求

服务端处理心跳请求交互逻辑,如图所示:

InstanceController

Nacos服务端启动类为nacos-console模块中的com.alibaba.nacos.Nacos,在nacos-console模块中引入了nacos-naming模块,该模块中的com.alibaba.nacos.naming.controllers包中提供了服务注册、服务发现、健康检查等核心接口。其中客户端心跳请求的处理方法为InstanceController#beat,该方法主要做以下事情:

  • 解析客户端请求参数,并根据请求参数构建RsInfo对象(客户端心跳信息对象)。
  • 根据namespaceId、serviceName、clusterName、ip、port等信息从注册表中获取服务实例信息,如果服务实例信息不存在,则根据心跳请求参数构建服务实例并向注册表中注册该服务实例。
  • 处理客户端心跳请求。
  • 向客户端返回响应信息。

源码如下:

@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
    // 创建响应对象
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    // 设置客户端心跳间隔时间
    result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
    // 解析客户端请求参数,并根据请求参数构建RsInfo对象
    String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
    RsInfo clientBeat = null;
    if (StringUtils.isNotBlank(beat)) {
        clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
    }
    // 根据请求参数获取集群名
    String clusterName = WebUtils
            .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
    // 获取客户端IP、端口信息
    String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
    int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
    if (clientBeat != null) {
        if (StringUtils.isNotBlank(clientBeat.getCluster())) {
            clusterName = clientBeat.getCluster();
        } else {
            // fix #2533
            clientBeat.setCluster(clusterName);
        }
        ip = clientBeat.getIp();
        port = clientBeat.getPort();
    }
    // 根据请求参数获取命名空间ID
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    // 根据请求参数获取服务名
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
    // 根据namespaceId、serviceName、clusterName、ip、port等信息从注册表中获取服务实例
    Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
    // 如果服务实例不存在,则根据心跳请求参数构建服务实例并向注册表中注册该服务实例
    if (instance == null) {
        if (clientBeat == null) {
            result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
            return result;
        }
        
        Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
                + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
        
        instance = new Instance();
        instance.setPort(clientBeat.getPort());
        instance.setIp(clientBeat.getIp());
        instance.setWeight(clientBeat.getWeight());
        instance.setMetadata(clientBeat.getMetadata());
        instance.setClusterName(clusterName);
        instance.setServiceName(serviceName);
        instance.setInstanceId(instance.getInstanceId());
        instance.setEphemeral(clientBeat.isEphemeral());
        // 向注册表中注册该服务实例
        serviceManager.registerInstance(namespaceId, serviceName, instance);
    }
    // 根据namespaceId,、serviceName获取对应的服务信息
    Service service = serviceManager.getService(namespaceId, serviceName);
    
    if (service == null) {
        throw new NacosException(NacosException.SERVER_ERROR,
                "service not found: " + serviceName + "@" + namespaceId);
    }
    if (clientBeat == null) {
        clientBeat = new RsInfo();
        clientBeat.setIp(ip);
        clientBeat.setPort(port);
        clientBeat.setCluster(clusterName);
    }
    // 处理客户端心跳请求
    service.processClientBeat(clientBeat);
    // 封装响应信息,并向客户端返回响应信息
    result.put(CommonParams.CODE, NamingResponseCode.OK);
    if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
    }
    result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
    // 返回响应结果
    return result;
}

心跳请求处理

心跳请求处理方法为Service#processClientBeat,该方法主要做以下事情:

  • 构建客户端心跳处理器(ClientBeatProcessor)对象封装客户端心跳信息。
  • 将客户端心跳处理器放入线程池中进行心跳处理。

源码如下:

// com.alibaba.nacos.naming.core.Service#processClientBeat
public void processClientBeat(final RsInfo rsInfo) {
    // 构建客户端心跳处理器(ClientBeatProcessor)对象封装客户端心跳信息
    ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
    clientBeatProcessor.setService(this);
    clientBeatProcessor.setRsInfo(rsInfo);
    // 将客户端心跳处理器放入线程池中进行心跳处理
    HealthCheckReactor.scheduleNow(clientBeatProcessor);
}

ClientBeatProcessor(客户端心跳处理器)实现了Runnable接口,其run()方法主要做以下事情:

  • 根据集群名、IP、Port等信息查找客户端对应的服务实例(即:发送心跳请求的服务实例)。
  • 更新服务实例最后一次心跳时间为系统当前时间。

特别说明:处理心跳请求的核心是更新服务实例的最后一次心跳时间(lastBeat),服务端根据最后一次心跳时间判断服务实例是否已过期。

源码如下:

// com.alibaba.nacos.naming.healthcheck.ClientBeatProcessor#run
@Override
public void run() {
    Service service = this.service;
    if (Loggers.EVT_LOG.isDebugEnabled()) {
        Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
    }
    String ip = rsInfo.getIp();
    String clusterName = rsInfo.getCluster();
    int port = rsInfo.getPort();
    // 获取服务集群信息
    Cluster cluster = service.getClusterMap().get(clusterName);
    // 获取服务集群中的所有临时实例
    List<Instance> instances = cluster.allIPs(true);
    // 遍历所有临时实例
    for (Instance instance : instances) {
        // 根据集群名、ip、port等信息查找客户端对应的临时实例(即:发送心跳请求的服务实例)
        if (instance.getIp().equals(ip) && instance.getPort() == port) {
            if (Loggers.EVT_LOG.isDebugEnabled()) {
                Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
            }
            // 更新临时实例最后一次心跳时间为系统当前时间
            instance.setLastBeat(System.currentTimeMillis());
            if (!instance.isMarked() && !instance.isHealthy()) {
                // 设置服务实例的健康状态为true
                instance.setHealthy(true);
                Loggers.EVT_LOG
                        .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
                                cluster.getService().getName(), ip, port, cluster.getName(),
                                UtilsAndCommons.LOCALHOST_SITE);
                // 发布一个服务变更事件
                getPushService().serviceChanged(service);
            }
        }
    }
}

心跳检测

服务端心跳检测交互逻辑,如图所示:

服务端心跳检测入口

服务注册时,如果是第一次注册,会先创建一个Service对象,并根据服务信息构建的心跳检测任务(ClientBeatCheckTask),执行Service#init方法初始化Service。

Service#init()方法主要做以下事情:

  • 开启一个定时任务以5秒/次的频率执行心跳检测任务(ClientBeatCheckTask:创建服务时,根据服务信息构建的心跳检测任务)。
  • 初始化服务集群信息。

源码如下:

public void init() {
    // 开启一个定时任务以5秒/次的频率执行心跳检测任务(创建Service时根据Service信息创建ClientBeatCheckTask对象)
    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    // 初始化服务集群信息
    for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
        entry.getValue().setService(this);
        entry.getValue().init();
    }
}
// com.alibaba.nacos.naming.healthcheck.HealthCheckReactor#scheduleCheck(com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask)
public static void scheduleCheck(ClientBeatCheckTask task) {
    futureMap.computeIfAbsent(task.taskKey(),
            k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}

ClientBeatCheckTask(心跳检测任务)实现了Runnable接口,其run()方法主要做以下事情:

  • 获取服务中的所有临时实例。
  • 判断实例最后一次心跳时间距离当前时间是否超过心跳检测超时时长(默认为15s),超过心跳检测超时时长,则设置实例的健康状态为false,并发布一个服务变更事件和心跳超时事件。
  • 判断实例最后一次心跳时间距离当前时间是否超过删除实例设定的超时时长(默认为30s),超过删除实例设定的超时时长,则将该实例从实例列表中移除。

源码如下:

// com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask#run
@Override
public void run() {
    try {
        // ...
        // 获取服务中的所有临时实例
        List<Instance> instances = service.allIPs(true);
        // first set health status of instances:
        for (Instance instance : instances) {
            // 判断心跳是否超时,默认超时时长为15秒
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                if (!instance.isMarked()) {
                    if (instance.isHealthy()) {
                        // 如果心跳超时,设置实例健康状态为false
                        instance.setHealthy(false);
                        // ...日志信息
                        // 发布一个服务变更事件
                        getPushService().serviceChanged(service);
                        // 发布一个心跳超时事件
                        ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                    }
                }
            }
        }
        if (!getGlobalConfig().isExpireInstance()) {
            return;
        }
        // then remove obsolete instances:
        for (Instance instance : instances) {
            if (instance.isMarked()) {
                continue;
            }
            // 判断心跳间隔(当前时间 - 最后一次心跳时间)是否大于删除实例设定的超时时长(默认30秒)
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                // 超过30s,则删除该服务实例
                // delete instance
                // ...日志信息
                deleteIp(instance);
            }
        } 
    } catch (Exception e) {
        // ...日志信息
    }
}

持久实例

对于持久实例(ephemeral=false),Nacos采用主动检测的方式按指定周期向服务实例发送请求,根据响应信息判断服务实例的健康状态。

服务端主动健康检查交互逻辑,如图所示:

主动健康检查

主动检测入口

服务注册时,如果是第一次注册,会先创建一个Service对象,并执行Service#init方法初始化Service,在init()方法中会调用Cluster#init方法初始化服务集群信息,其中会开启一个延时任务,主动检测集群中持久实例的健康状态。

Cluster#init方法主要做以下事情:

  • 根据集群信息构建健康检查任务(HealthCheckTask)。
  • 开启一个延时任务延时(2000毫秒 + 5000毫秒内的随机数)后执行健康检查任务。

源码如下:

// com.alibaba.nacos.naming.core.Cluster#init
public void init() {
    if (inited) {
        return;
    }
    // 根据集群信息构建健康检查任务(HealthCheckTask)
    checkTask = new HealthCheckTask(this);
    // 开启一个延时任务延时(2000毫秒 + 5000毫秒内的随机数)后执行健康检查任务。
    HealthCheckReactor.scheduleCheck(checkTask);
    inited = true;
}

HealthCheckTask(健康检查任务)实现了Runnable接口,其run()方法主要做以下事情:

  • 基于Distro协议判断当前服务节点是否为服务所对应的责任节点,是则执行健康检查任务(即:向服务发送TCP请求进行健康检查);否则不做处理。
  • 执行完成后,等待指定周期后再次执行健康检查(即:指定周期进行健康检查)。

源码如下:

public HealthCheckTask(Cluster cluster) {
    this.cluster = cluster;
    distroMapper = ApplicationUtils.getBean(DistroMapper.class);
    switchDomain = ApplicationUtils.getBean(SwitchDomain.class);
    healthCheckProcessor = ApplicationUtils.getBean(HealthCheckProcessorDelegate.class);
    initCheckRT();
}


private void initCheckRT() {
    // first check time delay
    //计算主动检测的时间频率,周期为2000毫秒 + 5000毫秒内的随机数
    checkRtNormalized =
            2000 + RandomUtils.nextInt(0, RandomUtils.nextInt(0, switchDomain.getTcpHealthParams().getMax()));
    checkRtBest = Long.MAX_VALUE;
    checkRtWorst = 0L;
}


@Override
public void run() {
    try {
        // 基于Distro协议判断当前服务节点是否为服务所对应的责任节点
        if (distroMapper.responsible(cluster.getService().getName()) && switchDomain
                .isHealthCheckEnabled(cluster.getService().getName())) {
            // 执行健康检查任务(即:向服务发送TCP请求进行健康检查)
            healthCheckProcessor.process(this);
            //...打印日志信息
        }
    } catch (Throwable e) {
        //...打印日志信息
    } finally {
        if (!cancelled) {
            // 执行完成后,等待指定周期后再次执行健康检查(即:指定周期进行健康检查)
            HealthCheckReactor.scheduleCheck(this);
            // ...
        }
    }
}

HealthCheckProcessor是健康检查处理器接口,它有多个实现类,主动检测实现类为TcpSuperSenseProcessor(即:发送TCP请求),TcpSuperSenseProcessor#process方法主要做以下事情:

  • 遍历服务中的所有持久实例,将每个持久实例和健康检查信息封装为Beat,并将其加入到任务队列(阻塞队列)中。

源码如下:

@Override
public void process(HealthCheckTask task) {
    // 获取服务中的所有持久实例
    List<Instance> ips = task.getCluster().allIPs(false);
    // ...
    // 遍历服务下的所有持久实例
    for (Instance ip : ips) {
        // ...
        // 创建Beat对象封装持久实例和对应的健康检查信息
        Beat beat = new Beat(ip, task);
        // 将Beat对象加入到任务队列(阻塞队列)中
        taskQueue.add(beat);
        MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();
    }
}

TcpSuperSenseProcessor实现了Runnable接口,其run()方法主要做以下事情:

  • 循环调用TcpSuperSenseProcessor#processTask方法,不停的从任务队列中取出Beat对象(健康检查任务),并将其封装成任务处理器(TaskProcessor)对象加入到tasks集合中。
  • 在线程池中按批量执行tasks集合中的任务处理器(即:向服务发送请求检测服务健康状态)。

源码如下:

// com.alibaba.nacos.naming.healthcheck.TcpSuperSenseProcessor#run
@Override
public void run() {
    while (true) {
        try {
            // 处理健康检查任务
            processTask();
            // ...
        } catch (Throwable e) {
            SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task", e);
        }
    }
}


private void processTask() throws Exception {
    Collection<Callable<Void>> tasks = new LinkedList<>();
    do {
        // 不断从任务队列中取出Beat对象(健康检查任务)
        Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);
        if (beat == null) {
            return;
        }
        // 将Beat对象封装成任务处理器(TaskProcessor)对象加入到tasks集合中
        tasks.add(new TaskProcessor(beat));
    } while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);
    // 在线程池中按批量执行tasks集合中的任务处理器(即:向服务发送请求检测服务健康状态)
    for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) {
        f.get();
    }
}

TaskProcessor实现了Callable接口,其call()方法主要做以下事情:

  • 通过NIO的方式向客户端发送健康检查请求。

源码如下:

@Override
    public Void call() {
        // 获取健康检查任务已经等待时长
        long waited = System.currentTimeMillis() - beat.getStartTime();
        if (waited > MAX_WAIT_TIME_MILLISECONDS) {
            Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms");
        }
        SocketChannel channel = null;
        try {
            // 获取服务实例
            Instance instance = beat.getIp();
            BeatKey beatKey = keyMap.get(beat.toString());
            if (beatKey != null && beatKey.key.isValid()) {
                if (System.currentTimeMillis() - beatKey.birthTime < TCP_KEEP_ALIVE_MILLIS) {
                    instance.setBeingChecked(false);
                    return null;
                }
                beatKey.key.cancel();
                beatKey.key.channel().close();
            }
            // 通过NIO建立TCP连接
            channel = SocketChannel.open();
            channel.configureBlocking(false);
            // only by setting this can we make the socket close event asynchronous
            channel.socket().setSoLinger(false, -1);
            channel.socket().setReuseAddress(true);
            channel.socket().setKeepAlive(true);
            channel.socket().setTcpNoDelay(true);
            Cluster cluster = beat.getTask().getCluster();
            int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport();
            channel.connect(new InetSocketAddress(instance.getIp(), port));
            // 向Selector中注册连接、读取事件
            SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
            key.attach(beat);
            keyMap.put(beat.toString(), new BeatKey(key));      
            beat.setStartTime(System.currentTimeMillis());
            GlobalExecutor
                    .scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            // ...
        }
        return null;
    }
}

健康检查总结

Nacos健康检测有两种模式:

  • 临时实例:
    • 采用客户端心跳检测模式,心跳周期5秒。
    • 心跳间隔超过15秒则将服务实例标记为不健康。
    • 心跳间隔超过30秒则将服务实例从服务列表删除。
  • 持久实例:
    • 采用服务端主动健康检测方式。
    • 检测周期为2000毫秒 + 5000毫秒内的随机数。
    • 检测异常只会标记为不健康,不会删除服务实例。

【阅读推荐】

更多精彩内容,如:

  • Redis系列
  • 数据结构与算法系列
  • Nacos系列
  • MySQL系列
  • JVM系列
  • Kafka系列

请移步【南秋同学】个人主页进行查阅。内容持续更新中......

【作者简介】

一枚热爱技术和生活的老贝比,专注于Java领域,关注【南秋同学】带你一起学习成长~

最近发表
标签列表