Ribbon
Ribbon是Netflix发布的云中间层服务开源项目,其主要功能是提供客户端实现负载均衡算法。Ribbon客户端组件提供一系列完善的配置项如连接超时,重试等。简单的说,Ribbon是一个客户端负载均衡器,我们可以在配置文件中Load Balancer后面的所有机器,Ribbon会自动的帮助你基于某种规则(如简单轮询,随机连接等)去连接这些机器,我们也很容易使用Ribbon实现自定义的负载均衡算法。
负载均衡策略
IRule:所有负载均衡策略的父接口,里边的核心方法就是choose方法,用来选择一个服务实例。
AbstractLoadBalancerRule:是一个抽象类,里边主要定义了一个ILoadBalancer负载均衡器,负载均衡器辅助负责均衡策略选取合适的服务端实例。
spring cloud ribbon大概提供了以下几种负载均衡策略。
1.RoundRobinRule: 轮询选择server
2.RandomRule:随机选择一个server
3.RetryRule:先按照指定的策略获取服务,如果获取服务失败则在指定时间内进行重试,获取可用的服务
4.WeightedResponseTimeRule:对RoundRobinRule的扩展,响应速度越快的实例选择权重越多大,越容易被选择
5.BestAvailableRule:会先过滤掉由于多次访问故障而处于断路器跳闸状态的服务,然后选择一个并发量最小的服务
6.AvailabilityFilteringRule:过滤掉那些因为一直连接失败的被标记为circuit tripped的后端server,并过滤掉那些高并发的的后端server(active connections 超过配置的阈值)
7.ZoneAvoidanceRule:复合判断server所在区域的性能和server的可用性,选择服务器
源码分析
RoundRobinRule:
public class RoundRobinRule extends AbstractLoadBalancerRule {
private AtomicInteger nextServerCyclicCounter;
private static final boolean AVAILABLE_ONLY_SERVERS = true;
private static final boolean ALL_SERVERS = false;
private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);
public RoundRobinRule() {
this.nextServerCyclicCounter = new AtomicInteger(0);
}
public RoundRobinRule(ILoadBalancer lb) {
this();
this.setLoadBalancer(lb);
}
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
} else {
Server server = null;
int count = 0;
while(true) {
if (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if (upCount != 0 && serverCount != 0) {
int nextServerIndex = this.incrementAndGetModulo(serverCount);
server = (Server)allServers.get(nextServerIndex);
if (server == null) {
Thread.yield();
} else {
if (server.isAlive() && server.isReadyToServe()) {
return server;
}
server = null;
}
continue;
}
log.warn("No up servers available from load balancer: " + lb);
return null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: " + lb);
}
return server;
}
}
}
private int incrementAndGetModulo(int modulo) {
int current;
int next;
do {
current = this.nextServerCyclicCounter.get();
next = (current + 1) % modulo;
} while(!this.nextServerCyclicCounter.compareAndSet(current, next));
return next;
}
public Server choose(Object key) {
return this.choose(this.getLoadBalancer(), key);
}
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
这个类的choose(ILoadBalancer lb, Object key)函数整体逻辑是这样的:
开启一个计数器count,在while循环中遍历服务清单
while(true) {
if (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
........
获取清单之前先通过incrementAndGetModulo方法获取一个下标,
private int incrementAndGetModulo(int modulo) {
int current;
int next;
do {
current = this.nextServerCyclicCounter.get();
next = (current + 1) % modulo;
} while(!this.nextServerCyclicCounter.compareAndSet(current, next));
return next;
}
这个下标是一个不断自增长的数先加1然后和服务清单总数取模之后获取到的(所以这个下标从来不会越界),
拿着下标再去服务清单列表中取服务,每次循环计数器都会加1,如果连续10次都没有取到服务,则会报一个警告No available alive servers after 10 tries from load balancer: XXXX。
RandomRule:
public class RandomRule extends AbstractLoadBalancerRule {
public RandomRule() {
}
@SuppressWarnings({"RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE"})
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
} else {
Server server = null;
while(server == null) {
if (Thread.interrupted()) {
return null;
}
List<Server> upList = lb.getReachableServers();
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
return null;
}
int index = this.chooseRandomInt(serverCount);
server = (Server)upList.get(index);
if (server == null) {
Thread.yield();
} else {
if (server.isAlive()) {
return server;
}
server = null;
Thread.yield();
}
}
return server;
}
}
protected int chooseRandomInt(int serverCount) {
return ThreadLocalRandom.current().nextInt(serverCount);
}
public Server choose(Object key) {
return this.choose(this.getLoadBalancer(), key);
}
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
在RoundRobinRule策略前面的方法一样,通过ILoadBalancer获取所有可用的服务清单,所有注册的服务清单。
List<Server> upList = lb.getReachableServers();
List<Server> allList = lb.getAllServers();
RandomRule是根据ThreadLocalRandom获取一个随机的服务。
生成随机数是很常见的需求。 JAVA 提供 Random生成随机数。但是它在多线程环境中性能并不高。
简单来说,Random 之所以在多线程环境中性能不高的原因是多个线程共享同一个 Random 实例并进行争夺。
为了解决这个限制,JAVA 在 JDK 7 中引入了 ThreadLocalRandom 类,用于在多线程环境下生产随机数。
RetryRule:
public class RetryRule extends AbstractLoadBalancerRule {
IRule subRule = new RoundRobinRule();
long maxRetryMillis = 500L;
public RetryRule() {
}
public RetryRule(IRule subRule) {
this.subRule = (IRule)(subRule != null ? subRule : new RoundRobinRule());
}
public RetryRule(IRule subRule, long maxRetryMillis) {
this.subRule = (IRule)(subRule != null ? subRule : new RoundRobinRule());
this.maxRetryMillis = maxRetryMillis > 0L ? maxRetryMillis : 500L;
}
public void setRule(IRule subRule) {
this.subRule = (IRule)(subRule != null ? subRule : new RoundRobinRule());
}
public IRule getRule() {
return this.subRule;
}
public void setMaxRetryMillis(long maxRetryMillis) {
if (maxRetryMillis > 0L) {
this.maxRetryMillis = maxRetryMillis;
} else {
this.maxRetryMillis = 500L;
}
}
public long getMaxRetryMillis() {
return this.maxRetryMillis;
}
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
this.subRule.setLoadBalancer(lb);
}
public Server choose(ILoadBalancer lb, Object key) {
long requestTime = System.currentTimeMillis();
long deadline = requestTime + this.maxRetryMillis;
Server answer = null;
answer = this.subRule.choose(key);
if ((answer == null || !answer.isAlive()) && System.currentTimeMillis() < deadline) {
InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis());
while(!Thread.interrupted()) {
answer = this.subRule.choose(key);
if (answer != null && answer.isAlive() || System.currentTimeMillis() >= deadline) {
break;
}
Thread.yield();
}
task.cancel();
}
return answer != null && answer.isAlive() ? answer : null;
}
public Server choose(Object key) {
return this.choose(this.getLoadBalancer(), key);
}
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}
RetryRule中又定义了一个subRule,它的实现类是RoundRobinRule。
IRule subRule = new RoundRobinRule();
也可以传入其他的负载均衡策略,如果不传入默认是RoundRobinRule。
然后在RetryRule的choose(ILoadBalancer lb, Object key)方法中,每次还是采用传入的策略中的choose规则来选择一个服务实例,如果选到的实例正常就返回,如果选择的服务实例为null或者已经失效,则在失效时间deadline之前不断的进行重试,如果超过了deadline还是没取到则会返回一个null。
WeightedResponseTimeRule:
public class WeightedResponseTimeRule extends RoundRobinRule {
public static final IClientConfigKey<Integer> WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY = new IClientConfigKey<Integer>() {
public String key() {
return "ServerWeightTaskTimerInterval";
}
public String toString() {
return this.key();
}
public Class<Integer> type() {
return Integer.class;
}
};
public static final int DEFAULT_TIMER_INTERVAL = 30000;
private int serverWeightTaskTimerInterval = 30000;
private static final Logger logger = LoggerFactory.getLogger(WeightedResponseTimeRule.class);
private volatile List<Double> accumulatedWeights = new ArrayList();
private final Random random = new Random();
protected Timer serverWeightTimer = null;
protected AtomicBoolean serverWeightAssignmentInProgress = new AtomicBoolean(false);
String name = "unknown";
public WeightedResponseTimeRule() {
}
public WeightedResponseTimeRule(ILoadBalancer lb) {
super(lb);
}
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
if (lb instanceof BaseLoadBalancer) {
this.name = ((BaseLoadBalancer)lb).getName();
}
this.initialize(lb);
}
void initialize(ILoadBalancer lb) {
if (this.serverWeightTimer != null) {
this.serverWeightTimer.cancel();
}
this.serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + this.name, true);
this.serverWeightTimer.schedule(new WeightedResponseTimeRule.DynamicServerWeightTask(), 0L, (long)this.serverWeightTaskTimerInterval);
WeightedResponseTimeRule.ServerWeight sw = new WeightedResponseTimeRule.ServerWeight();
sw.maintainWeights();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
WeightedResponseTimeRule.logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + WeightedResponseTimeRule.this.name);
WeightedResponseTimeRule.this.serverWeightTimer.cancel();
}
}));
}
public void shutdown() {
if (this.serverWeightTimer != null) {
logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + this.name);
this.serverWeightTimer.cancel();
}
}
List<Double> getAccumulatedWeights() {
return Collections.unmodifiableList(this.accumulatedWeights);
}
@SuppressWarnings({"RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE"})
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
} else {
Server server = null;
while(server == null) {
List<Double> currentWeights = this.accumulatedWeights;
if (Thread.interrupted()) {
return null;
}
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
return null;
}
int serverIndex = 0;
double maxTotalWeight = currentWeights.size() == 0 ? 0.0D : (Double)currentWeights.get(currentWeights.size() - 1);
if (maxTotalWeight >= 0.001D && serverCount == currentWeights.size()) {
double randomWeight = this.random.nextDouble() * maxTotalWeight;
int n = 0;
for(Iterator var13 = currentWeights.iterator(); var13.hasNext(); ++n) {
Double d = (Double)var13.next();
if (d >= randomWeight) {
serverIndex = n;
break;
}
}
server = (Server)allList.get(serverIndex);
} else {
server = super.choose(this.getLoadBalancer(), key);
if (server == null) {
return server;
}
}
if (server == null) {
Thread.yield();
} else {
if (server.isAlive()) {
return server;
}
server = null;
}
}
return server;
}
}
void setWeights(List<Double> weights) {
this.accumulatedWeights = weights;
}
public void initWithNiwsConfig(IClientConfig clientConfig) {
super.initWithNiwsConfig(clientConfig);
this.serverWeightTaskTimerInterval = (Integer)clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, 30000);
}
class ServerWeight {
ServerWeight() {
}
public void maintainWeights() {
ILoadBalancer lb = WeightedResponseTimeRule.this.getLoadBalancer();
if (lb != null) {
if (WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.compareAndSet(false, true)) {
try {
WeightedResponseTimeRule.logger.info("Weight adjusting job started");
AbstractLoadBalancer nlb = (AbstractLoadBalancer)lb;
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats != null) {
double totalResponseTime = 0.0D;
ServerStats ss;
for(Iterator var6 = nlb.getAllServers().iterator(); var6.hasNext(); totalResponseTime += ss.getResponseTimeAvg()) {
Server server = (Server)var6.next();
ss = stats.getSingleServerStat(server);
}
Double weightSoFar = 0.0D;
List<Double> finalWeights = new ArrayList();
Iterator var20 = nlb.getAllServers().iterator();
while(var20.hasNext()) {
Server serverx = (Server)var20.next();
ServerStats ssx = stats.getSingleServerStat(serverx);
double weight = totalResponseTime - ssx.getResponseTimeAvg();
weightSoFar = weightSoFar + weight;
finalWeights.add(weightSoFar);
}
WeightedResponseTimeRule.this.setWeights(finalWeights);
return;
}
} catch (Exception var16) {
WeightedResponseTimeRule.logger.error("Error calculating server weights", var16);
return;
} finally {
WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.set(false);
}
}
}
}
}
class DynamicServerWeightTask extends TimerTask {
DynamicServerWeightTask() {
}
public void run() {
WeightedResponseTimeRule.ServerWeight serverWeight = WeightedResponseTimeRule.this.new ServerWeight();
try {
serverWeight.maintainWeights();
} catch (Exception var3) {
WeightedResponseTimeRule.logger.error("Error running DynamicServerWeightTask for {}", WeightedResponseTimeRule.this.name, var3);
}
}
}
}
weightedResponseTimeRuleRoundRobinRule的一个子类,在WeightedResponseTimeRule中对RoundRobinRule的功能进行了扩展,WeightedResponseTimeRule中会根据每一个实例的运行情况来给计算出该实例的一个权重,然后在挑选实例的时候则根据权重进行挑选,这样能够实现更优的实例调用。WeightedResponseTimeRule中有一个名叫DynamicServerWeightTask的定时任务,默认情况下每隔30秒会计算一次各个服务实例的权重,权重的计算规则也很简单,如果一个服务的平均响应时间越短则权重越大,那么该服务实例被选中执行任务的概率也就越大。
BestAvailableRule:
public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {
private LoadBalancerStats loadBalancerStats;
public BestAvailableRule() {
}
public Server choose(Object key) {
if (this.loadBalancerStats == null) {
return super.choose(key);
} else {
List<Server> serverList = this.getLoadBalancer().getAllServers();
int minimalConcurrentConnections = 2147483647;
long currentTime = System.currentTimeMillis();
Server chosen = null;
Iterator var7 = serverList.iterator();
while(var7.hasNext()) {
Server server = (Server)var7.next();
ServerStats serverStats = this.loadBalancerStats.getSingleServerStat(server);
if (!serverStats.isCircuitBreakerTripped(currentTime)) {
int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
if (concurrentConnections < minimalConcurrentConnections) {
minimalConcurrentConnections = concurrentConnections;
chosen = server;
}
}
}
if (chosen == null) {
return super.choose(key);
} else {
return chosen;
}
}
}
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
if (lb instanceof AbstractLoadBalancer) {
this.loadBalancerStats = ((AbstractLoadBalancer)lb).getLoadBalancerStats();
}
}
}
BestAvailableRule继承自ClientConfigEnabledRoundRobinRule,根据loadBalancerStats中保存的服务实例的状态信息来过滤掉失效的服务实例的功能,然后顺便找出并发请求最小的服务实例来使用。然而loadBalancerStats有可能为null,如果loadBalancerStats为null,则BestAvailableRule将采用它的父类即ClientConfigEnabledRoundRobinRule的服务选取策略(线性轮询)。
AvailabilityFilteringRule:
public Server choose(Object key) {
int count = 0;
for(Server server = this.roundRobinRule.choose(key); count++ <= 10; server = this.roundRobinRule.choose(key)) {
if (this.predicate.apply(new PredicateKey(server))) {
return server;
}
}
return super.choose(key);
}
轮循选一个,判读是否满足条件,如果满足则返回,超过10次,则调用父类的choose方法选择。
apply规则:
需要满足俩个条件,断路器闭合,调用服务的并发请求数小于限制。
10次之后还不满足,则调用父类的choose方法,来看下PredicateBasedRule的choose实现
public Server choose(Object key) {
ILoadBalancer lb = this.getLoadBalancer();
Optional<Server> server = this.getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
return server.isPresent() ? (Server)server.get() : null;
}
chooseRoundRobinAfterFiltering中是如何实现的呢
- 获取所有的服务实例
- 遍历服务列表,过滤掉不满足条件的
- 在满足条件的服务列表中,再进行RoundRibbon算法,选出服务
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
List<Server> eligible = this.getEligibleServers(servers, loadBalancerKey);
return eligible.size() == 0 ? Optional.absent() : Optional.of(eligible.get(this.incrementAndGetModulo(eligible.size())));
}
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
if (loadBalancerKey == null) {
return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
} else {
List<Server> results = Lists.newArrayList();
Iterator var4 = servers.iterator();
while(var4.hasNext()) {
Server server = (Server)var4.next();
if (this.apply(new PredicateKey(loadBalancerKey, server))) {
results.add(server);
}
}
return results;
}
}
AvailabilityFilteringRule在RoundRibbon的基础上,选择满足条件的服务,如果10次了还没得到,则在满足条件的服务列表中,再用RoundRibbon算法选择。
ZoneAvoidanceRule:
public class ZoneAvoidanceRule extends PredicateBasedRule {
private static final Random random = new Random();
//使用CompositePredicate来进行服务实例清单过滤。
//组合过滤条件
private CompositePredicate compositePredicate;
public ZoneAvoidanceRule() {
super();
//主过滤条件
ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
//次过滤条件
AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}
......
}
public class CompositePredicate extends AbstractServerPredicate {
//主过滤条件
private AbstractServerPredicate delegate;
//次过滤条件列表
private List<AbstractServerPredicate> fallbacks = Lists.newArrayList();
private int minimalFilteredServers = 1;
private float minimalFilteredPercentage = 0;
@Override
public boolean apply(@Nullable PredicateKey input) {
return delegate.apply(input);
}
......
@Override
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
//使用主过滤条件对所有实例过滤并返回过滤后的清单
List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
Iterator<AbstractServerPredicate> i = fallbacks.iterator();
//依次使用次过滤条件对主过滤条件的结果进行过滤
//不论是主过滤条件还是次过滤条件,都需要判断下面两个条件
//只要有一个条件符合,就不再过滤,将当前结果返回供线性轮询
//算法选择
//第1个条件:过滤后的实例总数>=最小过滤实例数(默认为1)
//第2个条件:过滤互的实例比例>最小过滤百分比(默认为0)
while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
&& i.hasNext()) {
AbstractServerPredicate predicate = i.next();
result = predicate.getEligibleServers(servers, loadBalancerKey);
}
return result;
}
}
/服务实例所在的Zone必须可用
public class ZoneAvoidancePredicate extends AbstractServerPredicate {
......
@Override
public boolean apply(@Nullable PredicateKey input) {
if (!ENABLED.get()) {
return true;
}
String serverZone = input.getServer().getZone();
if (serverZone == null) {
// there is no zone information from the server, we do not want to filter
// out this server
return true;
}
LoadBalancerStats lbStats = getLBStats();
if (lbStats == null) {
// no stats available, do not filter
return true;
}
if (lbStats.getAvailableZones().size() <= 1) {
// only one zone is available, do not filter
return true;
}
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
if (!zoneSnapshot.keySet().contains(serverZone)) {
// The server zone is unknown to the load balancer, do not filter it out
return true;
}
logger.debug("Zone snapshots: {}", zoneSnapshot);
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
//服务实例所在的Zone必须可用
if (availableZones != null) {
return availableZones.contains(input.getServer().getZone());
} else {
return false;
}
}
}
ZoneAvoidanceRule是PredicateBasedRule的一个实现类,只不过这里多一个过滤条件,ZoneAvoidanceRule中的过滤条件是以ZoneAvoidancePredicate为主过滤条件和以AvailabilityPredicate为次过滤条件组成的一个叫做CompositePredicate的组合过滤条件,过滤成功之后,继续采用线性轮询的方式从过滤结果中选择一个出来。