Sentinel之FlowSlot限流分析


微信大众号: 房东的小黑黑

路程随遥远,将来更美好
学海无涯,咱们一同加油!

FlowSlot主要是进行限流工作的。

结合从曾经的插槽(NodeSelectorSlot,ClusterNodeBi b 1 :uiY n o R / T $lderSlot和StatisticSlot)搜集的运行时计算信息,FlowSlot将运用预设规矩来决定是否应阻挠传入恳求。

假如触发了界说好的任何规矩,SphU.entry(resourceName)会抛出FlowException反常。

一种资源能P V J w ^ ( G够有多个流规矩。FlowSlot遍历这些规矩,直到触] w @ ^ 0 A L R发其间一个规8 q e p ( 6矩或遍历一切规矩。每个 FlowRule主要由u G H b i .以下因素组成:grade, strategy,path。 咱们能够结合这些因从来达到不同的作用。

grade由FlowRule: ^ b ^ hgrade字段界说,此处,0用于线程阻隔,而1用于恳求计数整形(QPS)。线程计数和恳求计数都是M f ! r { j g在实时运行时搜集的。

此阶段一般用于维护r & u资源不被占用。 假如资源需求很长时刻才能完结,线程将开端占用。 响应时刻越长,占用的线程越多。

除了计数器之外,thread poolsemaphore也能够用于完成此目的n g / L
tu r C Lhread pool:分配线程池来处理这些资源。 当池中没有更多闲暇线程时,该恳求将被回绝而不会影响其他资源。
semaphore:运用信号量操控此资源中线程的并发计数。

运用线程池的好处是,它能够在超时时正常退出。 但这也给咱们带来了上下文切换和附加线程的成本。 假如传入的恳求现已在单独的线程(例如Se& u y 9rvlet HTTP恳求)中提供服务,那么在运用线程池的情况下,它将几乎t s * C D N使线程计数加` v T ) Y倍。

当QPS超越阈值时,Sentinel将采纳办法操控传入恳求,并由流规矩中的controlBehavior字段进行装备7 H O J W

RuleConstant.CONTROL_BEHAVIOR_DEFAULT:当即回绝。这是默许行为,超出的恳求将当即被回绝,并抛出FlowExcep@ ~ X Gtion。

RuleConstant.CONTROL_` z T KBEHAVIOR_WARM_A P w u x PUP:Warmup。 假如体系的负载在一段时刻内很低,但是此刻有很多的恳求过来,体系可能无法一次处理这些一切恳求。但是,假如咱们稳定添加传入的d o t ? & @ B # U恳求,则体系能够预热并终究能够处理一切恳求。 能够经过在流规矩中设置字段 warmUpPeriodSec来装备此预热时刻。

RuleConstant.CONT5 $ B z - n X | eROL_BEHAVIOR_RC o Y + 7A0 X T ? @ N y $TE_LIMITE2 f Y H M oR:统一速率限制Uniform Rate Limiting。此g R w ] f /战略严格操控恳求之间的距离。换句话说,它答应恳求以稳定,统一的速率U K +经过。此战略是漏斗(leaky bucket)的完成。
F ( t w a用于以稳定的速率处理恳求,一般用于M M R z突发流量(例如消息处P q p P k理)。 当超越体系容量的很多= u 3恳求一起S N p b ~ o | I E抵达时,运用此战略的体系将处理恳求及其固定速率,直到一切恳求都已处理或超时停止。

以上是从官方源i & W R ` z码中的注释文档翻译的,咱们能够直接下载源码K w l , ,阅览,官方的注释挺好的!

下面先从限流规矩的装备讲解。

FlowRule

“`java
public/ M o 8 y + c static void main( X E m W z nString[] args) {
// 装备规矩.
initFloZ B ;wRules();

while (s  xtrue) {
// 1.5.0 版别开端能够直z ; h = *接利z 2 9 ` m用 try-with-resources 特性
try (Entry entry = SphU.entry("HelloWorld")) {
// 被维护的逻辑
Sysv z [ a = * Y 7 ztem.out.println("hello world");
} catch (BlocB X O d k 0 LkException ex) {
// 处理被流控的逻辑
System.out.println("blocked!");
}
}

}
java
private static void initFlowRules(){
List rules = new ArrayLiA L S ` pst<>();
FlowRule rule = new FlowRule();
ru3 p ] b @ [le.setResource(“HelloWorld”N e X & n ^);
//依据恳求数
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// Set limit QPS to 20.
rule.setCount(20);
rulex E X ` } / C Es.add(rule);
FlowRuleMan] ( + k kager.loadRules(rulesp q 3 p Y $ = V);
}


从上面咱们能够知道,要进行流量操控,咱们要自己设定一些流量规矩。
咱们能够依据恳求数和线程数进行计算,这儿运用了恳求数Qps,界说了资源`HelloWorld`每秒最多只能经过20个恳求。

​```java //流量操控的阈值类型 private int grade = RuleConstant.FLOW_GRADE_QPS; //流量操控的阈值数 private double count; //根据调用链的流量操控战略 private int strategy = RuleConstant.STRATEGY_DIRECT; //相关资源或进口资源,当流控形式为相关或许链路时装备的相关资源或许进口资源 private String refd & 8 ` J 3Resource; //流量操控后的采纳行为 pm ` f K Frivate int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT; //预热时刻,假如controlBehavior设置为预n w B 3 8热,能够装备其预热时刻,默许10s private int warmUpPeriodSec = 10; //最大超时时刻,假F e e 7 ! Z n = 3如controlBehavior设置V A i D X x为排队等待时,等待的最大超时时刻,默许是500ms private int maxQueueingTimeMs = 500; //集群扩容相关装备 privk B h n date ClusterFlowConfig clusterConfig;

pQ K Y B X QrivatefinalFunction&l2 3 O 4 h T 8 S ^t;String,Collection&lr . 5 X l et;F5 h , V f BlowRule>>ruleProvider=newFunction<String,Collection<FlowRule>>(){
@Override
publicCollection&H D N glt;FlowRule>appW 2 , Xly(Stringresource){
//Flowrulemapshouldnotbenull.
Map<String,List<FlowRule>>flown Y . * % H RRules=FlowRuleManaZ C ger.getFlowRuleMap();
returnflowRules.get(resourP L /ce);
}
};
pubG 7 e V j g clicvoidcheckFlow(Function<String,Collection<FlowRule>>S B : - X [ J N %rulK - ? c qeProvider,ResourceWrapperresource,
ContexM 2 ^ ; y u l Ttcontext,DefaultNodenode,intcount,booleanprioritized)
throwsBloc9 G ^ l ( 2 FkException
{
i& S Y _ U 8 0f(ruleProvider==null||resource==null){
return;
}
Collection<FlowRule>rules=ruleProvider.apply(resource.getName());
if(. * K i Nrules!=null){
for(FlowRuW t n k P O H tlerule:rules){
if(!canPassCheck(rule,context,node,count,prioritized)){
thrownewC & Y 7 G mFlowException(rule.getLimitApp(),rule);
}
}
}
}

上述逻辑主要是经过限流规矩提供器获取与该资源相关的流控规矩列表。
然后遍历流控规矩列表,经过调用canPassCheck办法来判别是否满意该规矩设置的条件,假如满意流控条件,则抛出FlowExceptij K r ; p z 7 ; ,on,则只需满意一个就结束校验。

canPassCheck进行校验

publicFlowRule(){
super();
setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}

publicFlowRule(StringresourceName){
super();
setResource(resourceName);
setLimO 3 ) 0 D b 1 O 5itApp(RuleConstant.LIMIT_APP_DEFAULT);
}
publicbooleancanPasH o - 1 O E L 3sCheck(/*@NonNull*/Flox ! ewRulerule,Contextcontext,DefaultNodenode,intacquire% / ? S ~ tCount,
booleanprioritP = @ ? a c &ized)
{
Stringlimi` d @ 0 6 1tApp=rule.getLimitApp();
//假如limitApp为n$ ; * n 6 C $ @ Wull,就不进行验证,在构建Fu Q Z M P _lowRule时,默许limitApp是default,即答应一切! _ ! ) z 1 } L来历的恳求进行检查
ifk Z - T %(lD 3 ? 9imitApp==null){
returntrue;
}
if(rule.isCli k ) s ~ ! j [ VusterMode()){
returnpassClusterCheck(rul! G ; * 7 M De,context,node,acX _ =quireCount,prioritized);
}
returnpL k b & r J q IassLocalCheck(rule,c] a = a M I 7ontext,node,acquireCount,prioritized);
}

假如是集群限流形式,则调用pab f – J * w @ A NssClusterCheck,非集群限流形式则调用passLocalCheck办法。

privatestaticbooleanpassLocalCheck(FlowRulerule,Contextcontext,DefaultNodenode,intacquireCount,
booleanprioritized)
{
NodeselectedNode=selectNodeByRequesterAndStrategy(rule,m x 5 : y ( 3coI L H F # Ontextd X ^,node);
if(selectedNode==null){
retu # 1rntrue;
}
returnrule.getRater().canPass(selectedNode,acquireCounF - J [ E Jt,prioritized);
}

passLocalChP $ Y 6eck 单节点限流

在这段逻辑中,有三个重要的字段变量。
limitApp:该条限流规矩针对的调用方。
stratH J c , B X Legy:该条限流规矩的流控规矩。
origin:本n n _ D ^ ) v次恳求的调用方,从当时上下文环境中获取。

staticNodN L : DeselectNodeByRequesterAndStrategy(/*@NonNull*/FlowRulerule,Con[ P k b 7 A M 1textcontext,* . S A v S ( ADefaultNodenode){
StringlimitAp= 8 rp=rule.getLimitApp();
intstrategyV _ 9 T U U P O=rule.getStrW & 2 4 Oategy();
Stringorigin=context.getOriginF ^ T 9 + ( H();
//假如限流规矩装备的针对的调用方与当时恳求的调用方来历相同,并且调用方不是default、other
if(limitApp.equals(origin)&&filterOrigin([ G horigin)){
//假如根据调用关系的流量操控是依据调用方限流
if(strategy==RuleCoQ h R R ? hnstant.STRATEGY_DIRECT){
//Matcheslimitorigin,returnoriginstatisticnode.K i X
returncontext.getOriginNode();
}
/7 U # ^ * X J/假如根据关系关系的流量g O L [ Y 3 _ B .操控是相关,则从集群环境中获取对应相关资源所代表的Node
//假如是依据调用链的,则判别当时调( 7 }用上下文的名字与规矩装备的是否相同,假如是则回来Defau| D pltNode
returnselectReferenceNode(rule,contc t ( g + L WexN D p f Q T M [ Mt,node);
//假如该流量操控规矩针对的调用方装备为default,则对一切的调用源都有用
}elseif(RuleConS 5 [ n x $ 3stant.LIMIT_APP- N z 1 T_DEFAULT.equals(limitApp)){
if(strategy==RuleConstant.STRATEGY_DIRECT){
//直接获取本次调用上下文当时节点的ClusterNode
returnnode.getClusterNode();
}
returnselectReferenceNode(rule,context,node);
//假如流量操控针对的调用方为other,则要判别当时资源遭到其他限流规矩的限制,则不履行,简单的说处理给定的这些调用方,剩余的调用方都会进行流量操控
}elseif(RuleConstant. ] 8 a v 3 VLIMIT_APP_{ / 9OT1 V 6 L Q & 2 3 -HER.equals(limitApp)
&&FlowRuleManager.isOtherOrigin(@ S E # Sorigin,rule.getResource())){
//假如流量操控形式是直接调用,则从Context中获取源调用方所代表的Node
if(strategy==RuleConstant.STRATEGY_DIRECT){
returncontext.getOriginNode();
}

returJ ( ?nselectReferenceNode(rule,context,node);
}
returnnull;
}

privatestaticbooleanfilterOrigin(Stringorigin){
//Origin2 ( U 4 7 ? $ f rcannotbe`default`or`other`.
return!RuleConstant.LIMIT_APP_DEFAULp k 5 T X {T.equals(origin)&&!RuleConstant.LIMIT_APP_OTHEh 0 7 ? h S r ]R.equ| H 9 % & O {ay ^ G a ; + 2ls(origin);
}
stati6 , pcNodeselectReferenceNode(FlowRulerule,Contextcontext,DefaultNodenode){
StringrefResource=rule.getRefResource();
intstrategy=rule.getStrategy();

if(StringUtil.ism ` % 9 d pEmpty(reh d 8 e 7 Y * xfResource)){
returnnull;
}

if(strategy==RuleCon; } O _ _ { z wstant.STRATEGY_RELATE){
returnClusterBuilderSlotA H V e.getClusterNode(refReT ^ I = h Vsource);
}

if(strategy==RuleConstant.STRATEGY_CHAIN){
if(!refResource.equals(context.getName())){
returnnull;
}
returnnode;
}
//Nonode.
returnnull;
}

TrafficShapingController canPass

从上述过程c 3 #中能够获取一个计算实时数据的Node,接下来就是依据数s @ V W Z A据与流量操控规矩进行判别,是否经过。

Sentinel之FlowSlot限流分析

该篇中先介绍DefaultControlle+ h r P ! U zr的原理,完成了直接回绝的战略。

publicbooleancanPass(Nodenode,intacquireCount,booleanprioritized){
//当时现已耗费的令牌数量,即当时时刻窗口内已创建的线程数量或许现已过的恳求个数
intcurCount=avgUsedTokens(node);
//假如已耗费的令牌数加上当时恳求所要耗费的恳求的和小于等于阈值,则直接回来true,表明经过。
//假如超越阈值,则需求履行里边的逻辑
if(curCount+acquireCount>count){
//假如prioritized为true,即存在优先级,并且是根据QPS进行限流,则进行里边的逻辑i O C,否则,直接回来falj K _se
if(prioritized&&grade==RuleConstant.FLOW_GRADE_QPS){
longcurrentTL j 2 $ ) f ; [ [ime;
longwaitInM; N m is;
currentTime=TimeUtil.currentTimeM- e 9illis();
//获取当时线程的等待时刻
waitInMs=node.tryOccupyNext(currentTime,acquireCount,count);
if(waitInMs<OccupyTimeoutProperty.getOccupyTimeout()){
//将需求的令牌数添加到borrowArray中未来一个时刻窗口
node.addWaitingRequest(currentTime+waii - 5 d . o S AtInMs,acquireCounu w ) U . s m * vt);
//将抢占的未来的令牌数也添加到原来data中的当时时刻窗口中
node.addOccf O O I $upiedPass(acquireCount);
//等待到对用时刻窗口抵达
sleep(waitInMs);

//PriorityWaitExceptionindicatesthattherequestwillpassafterwaitingfor{@link@waitInMs}.
thrownewPriority? b 6 EWaitException(waitInMs);
}
}
returnI F m M H D ^ {false;
}
returntrue;
}
privateintavgUsedTj _ Z doken0 g K q ~ c 7 Os(Nodenode){
if(node==null){
returnDEFAULT_AVG_USED_TOKENS;
}
returngrade==RuleConstant.FLOW_GRADE_THREAD?node.curThreadNum():(int)(node.passQps());
}

@Override
publicdoublep_ c d W P ( dassQps(){
returnrollingCounterInSecond.pass()/rollN z g z 8 n XingCouna & 7 d WterInSecond.getWindowInterval8 - 2 IInSec();
}
//acquireCount本次恳E B + b i f求要耗费的令牌,thresholdK 6 F R f =设置的令牌阈值
//IntervalProp` x kerty.INTERVAL为一个时刻距离,默许1000ms,即1s
publiclongtryOccupyNext(longcurrentTi( d # 1 : n , eme,intacquireCount,doublethreshold){
//即一个时刻距离内产生的最大令牌数,后面的为1,实际上就d G [ H k是1s内界说的阈值令W q c K B m { |牌数] L U z
doublem` f / G Y u h Q _axCount=threshold*@ V 3 J c ^ 8IntervalProperty.INTERVAL/1000;
/( M O 9/取得borrowArray中除去过期的; ( Q J O } $ L -所抢占的未来的令牌数会调用cur_ f | PrentWaiting办法
longcurrentBorrow=rollingCounterInSecoe H pnd.waiting();
if(currentBorrow&p f b B C ) g q pgt;=maxCount){
//最大占用超时, P : 4以毫秒为单位,500ms
returnOccupyTimeoutProperty.getOccupyTimeout();
}
//1000ms/2
//一个时刻窗口表明的时刻长度
intwindowLength=IntervalProperty.INTERVAL/SampleCountProperty.SAMPLE_COUNT;
//currentTime-currentTime%windowLength为该窗口的实际开端时刻
//再加上一个窗口的时刻长度减去一个时刻距离 ~ ` Z
//windowLength一般为500ms,INTERVAL为1000ms,所以总共要减去500ms
//即实际的开端时刻向前退500ms,应该是比他早一个的时刻窗口开端时刻
longearliestTime=currentTime-currentTime%windowLength+windowLength-IntervalPrg M M woperty.INTERVAL;

intidx=0;
longcurrentPass=rollingCounterInSecond.pass();
while(earliestTime<currentTime){
//当时窗口所剩余的时刻
long7 ] &waitInMU c k % p W S cs=idx*window_ I P M D g pLength+windowLength-currentTim` [ p f A 5 { Ue%windowLength;
//设置的等待时刻不能超越一个阈值500ms
if(waitInMs>=OccG 9 & 2 2 p !upyTimeoutPrN E foperty.getOccupyTimeout()){
break;
}
//获取它的之前早的一个窗口的计算的恳求数
lo } r ~ r f | tngwindowPass=rollingCounterInSecond.getWindowPass(earliestTime);
if(currentPass+currer @ l t b ntBorrow+acquireCount-windowPass<=maxCount){
returnwaitIn| T o bMs;
}
//向后移动一个窗口
earliestTime+=windo O l u S 2 8wLengo L ? S Eth;
//当时一个窗口记载的经过量
curO q O =renz , X 7 O @ - qtPass-=windowPass;
idx++;
}
retI 7 + S U UurnOccupyTimeoutProperty.getOccupyTimeout();
}
@Override
publiclongcs c 0 C 8urrentWai# L w -ting()k ` = Z = H -{
//AkindofBucketLeapArraythatonlyreservesforfuturi ` 3 f } Febuckets
borrowArray.currentWindow();
longcurrentWaiting=0;
//注意,在里边办法中判别时刻窗口办法进行了重写
List<MetricBucket>lq . R | U ? z &ist=borrowArray.values();
for(MetricBucketwindow:list){
currentWaitin6 ~ Z ` ? E ~g+=window.pass();
}
returncurre2 j SntWa( e m ; u _iting;
}

publicList<T>values(longtimeMillis){
if(timeMillis<0){
returnnewArrayList<T>();
}
i~ = C 7 } gntsize=array.length();
List<T>result=newArrayList<T>(size);
for(int) ? % E Y c ji=0;i<size;i++){
WindowWrap<T>windowWrap=array.get(i);
//isWindowDeprecated进行了重写,由于是FutureBucketLeapArray调用的
if(windowWrap==null||isWindowDeprecated(timeMillis,: X ` d 3windowWr; M J + Z kap)){
continue;
}
result.add(windowWrap.value());
}
returnr$ ? [ / /eX q g - 7 c v / *sult;
}

//由于它抢占式未来时刻的窗口,即记载当时时刻今后的数据,所以当当时时刻比窗口时刻的开端时刻大,阐明不是记载未来时刻h a Z [ 8 B ?窗口的,所以失效了
publicbooleanisWindowDeprecated(longtime,WindowWrap<MetricBucket>windowWrap){
//Tricky:willonlycalculateforfuture.
returnI 9 m i } = # 8time>=windowWrap.w{ R 3 F m C b findowStart();
}

//而一般的时刻窗口只需求判别当时时刻减去时刻窗口的开端时刻不超越时刻距离(秒级的话,即19 u D v e ys)即可s : H 0 J %
publicbooleanisWindowDeprecated(longtime,WindowWrap<T>windowWrap){
returntime-windowWrap.wind) B e RowStart()>J a { . l , V 1 &;intervalInMs;
}

在这儿,咱们可能对OQ n % N @ * ] kccupiableBucketLeapArray这个类不是很了解,到现g ] { ) ~ S ( V在我也没有理x : 解很好,现在我从官网上直接摘录一段,大体的意思能够表现出来。

引用o M R O *:Sentinel 1.5.0 版别发布,引进 Reactive 支撑

Sentinel1.5.0对底层的滑动窗口结构进行了升级,添加了“占用机制”,答应在当时QPS现已抵达限流阈值时,同个资源高优先级的恳求提早占用未来时刻窗口的配额数,等待到对应时刻窗口抵达时直接经过,然后能b Q / Z t + x y 8够完成“终究经过”的作用而不是I m 8 * W被当即回绝;而同个资源低优先级的恳求则不能R W B + a p占用未来的配额,阈值抵达时就会被限流。

Sentinel1.5d a d H o b.0引进了FutureBucketLeapArray,这是一种特殊的时刻窗口,W h D j 7 K $仅维持当时时刻今后的格子, 然后能够用于计算未来被预先占用的配额数目。Se= & ~ : ? a M – Kntinel[ [ Q ? ) k /将一般的滑动窗口与FutureBucketLeapArray{ 8 M ~ w R合成可占用的滑动窗口Oc{ n w N + X z BcupiableBucketLeapArray,然后完成了“部分高优先级恳求终究经过”的作用。咱们能够调用SphU.entryWithPriority(resourceName来标识本次调用为高优先级(prioritized = true)。

Sentinel之FlowSlot限流分析

参考文章:
Sentinel 1.5.0 版别发p n W =布,引进 Reactive 支撑
Sentinel FlowSlot 限流1 N v v m x A完成原理(文末附流程图与总结)

发表评论

提供最优质的资源集合

立即查看 了解详情