Sentinel之实时数据获取
微信公众号: 房东的小黑黑
路程随悠远,将来更美好
学海无涯,大家一起加油!
Sentinel有一个重要的功能,即实时数据核算剖析,咱们能够获得在每1秒或许每1分钟下的每个上下文调用链路中的某一资源的请求数、阻塞数R l N Z r o或呼应时刻;也能够获得某一资源R e = V 8 & N J e全局的请求数、阻塞数或许呼应时刻。 主要完成逻辑是在StatisticSlot
中6 Z o c V F。
Statisticslot
处于调用链slotchain中的第三个,担任核算资源的实时状况,调用到sloc F q wtchain中的恣意一个q B J T F H 4 o aslot时,都会触发该slot的entry办法。
@OverrideN 3 6 y
publicvoidentry(Contextcontext,ResourceWrapperres( C B Y f S G l `ourceWrapper,DefaultNodenode,intcount,Object...args)K B EthrowsThrowable{
try{
//触发下一个Slot的enV J 8 dtry办法
fireEntry(context,resourceWrapper,node,count,args);
//假如能经过SlotChain中后边的Slot的entry办法,阐明没有被限流或降级
//核算信息
node.increaseThreadNum();
node.addPaB - v N ZssRC f x mequest();
//省掉部分代码
}catch(BlockExceptione){
context.getCurEntry().setError(M R 7e);
//Addblockcount.
node.increaseBlockt . S GedQps();0 C - ; . W = P #
//省掉部分代码
throwe;
}catA ? X & + o H Rch(Throwa6 j A k 0blee){
context.getCurEntry().setError(e);} j &
//Shouldnothappen
node.increaseExceptionQps();
//省掉部分代码
throwe;
}
}
entry()
主要s i % z 2 [ j有三个部分:
1) 首先会触发后续slot的entry| i 1 p O办法,如SystemSlot、FlowSlot、DegradeSlot等的规矩。
2)当后续的slot经过,没有抛出BlockException反常,阐明该资源被成功调用,则增加履行线程数和经过的请求数。
3)当后续的slot中某一没有经过,则会抛出BlockException等反常,假如捕获的是BlockException反常,则主要是增加阻塞的数量;假如是体系反= S w J f M &常,则增加反常数量。
当退出的时分会履行exit()
办法:
publicvoidexitb + x K : b :(Contextcontext,ResourceWrapperresourceWrapper,intcount,Object...args){
DefaultNodenode=(DefaultNode)context.getCurNode();
if(context.getCurEntry().getE` ^ | L 6 * ^rror()==null){
//核算呼应时刻,经过当时时刻-CE x 4urEntry的创立时刻取毫秒值
longrt=TimeUtil.currentTimeMillis()-context.g) p G 0 }etCurEntry().getCreateTime();
if(rt>Constants.TIME_DROP_VALVE){
rt=) 8 P } (Constants.TIME_DROP_Vz ! x ` XALVE;
}
//新增呼应时刻和成功数
node.addRtAndSuccess(rt,couj p s N - g U }nt);
if(context.getCurEntry().getOriginNode()!=n} 7 } J o 2 + }ull){
context.getCurEntry().getOriginNode().ad] p ^ R / JdRtAndSuccess(rt,count);
}
//线程数减1
node.decreaseThreadNum()? r z 3;
if(context.getCur% lEntry().getOriginNode()!=null){
context.getCurEntry().getOriginNode().decreaseThre, [ ; ~ 9 0 K oadNum();
}
//全局线程数-1
if(resourceWrapper.getType()==EntryType.IN){
Constants.ENTRY_NODE.addRtAndSuccess(rt,countW t 2 # E);
Constants.ENTRY_NODE.decreaseThreadNum();
}
}else{
//Errormayhappen.
}
***其他逻辑***
fireExit(context,resourceWrapper,count);
}
当退出1 L Y时,重点重视呼应时刻,将本次呼应时刻收集到Node中,并将当时活泼线程数减1。
整体流程如上所述,可是具体的操作咱们还不清楚,接下来我将剖析其间的Qps数是怎样核算# . O ] u的。
在上? 3 # u r述4 W [ r – h B的entry()
办法中在k M 2 P o 1 i @ ~核算Qps数量时会调用node.addPassRequest();
办法。
@Override
publicvoidaddPassRequ_ # U 6 k j *est(intcount){
#DefaultNode类6 Z 1 ! 2 ~型
#核算某个resource在某个context中的实时目标
super.addPassRequest(count);
#Clustem O k e OrNode类型
#核算某个resource在所有的context中实时目标总和
this.clusterNode.addPassRequest(count);
}
这两个Node都是StatisticNode; $ P ^ & : n
的子类,终究会调用StaO . ~ qtistiS P p ~cNode
中的办法。
@Override
pubq Z y , : K , l *licvoidaddPassRequest(intcount){
#秒级核算
rollink J A , N 0 d ~gCounterInSecond.addPass(count);
#分P h C V } ;钟核算
rollingCounterInMinute.addPass(count);
}
秒V g Z ( N t o级核算和分钟核算8 I 7 . r }的底层原理都是一样的,下面将对秒级核算进行剖析。
public k 5c7 K ] n X R X ClassArrayMetricimplementsMetric{
pU e ( p E ]rivatefinalLeapArray<MetricBucket>data;
publ} H : $ kicArr} * 4 T oayMetric(intsampleCount,intintervalInMs,booleanenableOccupy){
if(enaD 5 MbleOccupy){
thO y y Y }is.data=newOccupia1 P cbleBucket- = { $LeapArray(sampleCount,intervalI, S E M : ? =nMs);
}else{
this.data=newBuck1 p E FetLeapArray(sampleCou4 $ l V Lnt,intervalInMs);
}
}
@Over[ T 3 A & H | ;ride
publ4 Z y r { w O = UicvoidaddPass(intcount){
WindowWrap<MetricBucket>wrap=data.currentWindow();
wra, 3 ; Rp.value().addPass(Y d n + ecount);
}
在上面代码中,有几个重要的类。ArrayMetric
、BucketLeapArray
、MetricBucket
、Windb U 8owWrap
。
Window0 ] n j C )Wrap
每一个滑动窗/ o Q K : C : C口的包装类,其内部的数据结构T是用Metr O g Z S , 8icBucket表明的。Z $ I E f b
publiccO 2 k _lassWindowWrap<T>{
//一个窗口时段的时刻长度(以毫秒为单位)
privatefinallongwi9 * } % % nndowLengt; ] i W y T Q B )hInMs;
//窗口的开端时刻戳(以毫秒为单位)
privatelongwindowStart6 : @ y ; C _ 9 8;
//核算数据,MetricBucket
privateTvalue;
MetricBucket
表明一段时刻内的目标数据,存放在LongAdder
类型的数组里。有经过数量、阻? V Y B X ?塞数量、反常数量、成功数量、呼应! B ( r x % 6时刻、现已过未来配额。相对于AtomicLong
,LongAddr
在高并发下有更好的吞吐量,a M b ; $代价是花费了更多的空间。i Y r : D / p {
publicclassMetricBucket{
privateJ b ! c . J 3 1 afin3 q * , 8alLongAdder[]counters;
privatevolatilelongminRt;
publiclongget(MetricEventevent){
returncounters[event.ordinal()].sum();
}
}
publicenumMetricEvent{
PASS,
BLOCK,
EXCEPTION,
SUCg , KCESS,
RT,
OCCU3 j 8 _ d ?PIED_PASS
}E E v s
LeapArray
Sentinel中核算目标的基本数据结构。
publicLeapArray(intsampleCount,intintervalInMs){
#时刻窗口的长度
this.windowLengthInMs=intervalInMs/sampleCount;
#以毫秒为单位的时刻距离,
thisy ? J k 5 Z m ! c.intervalInMs=intervalInMs;
#采样窗口的个数,即数组长度
this.sampleCount=sampleCoO C g & ] +unt;
thS k m N L X K xis.ar6 n ! % - , 1ray=newAtomicReferenceAp J E ) urray<>(sampleC M c kCount);
}
在按秒核算时,默许的V . i l R F g ( –时刻窗口数组长度为2,每个时刻窗口的长度为500ms。
在核算QPS时,第一步是调用data.currentWindow()
,获取当时时刻窗口。
publicWindowWrap<T>currentWindow(){
returncurrentWindow(TimeUtil.currentTimeMillis());
}
Qps增加第一大步
下面对currentTimeM0 S Y 3 fills()
办法进行拆开剖析。
pu% : . @ ~ g abld r ~icWindoo 3 x I 9 n xwWrap<T&gl N R h A E lt;currentWindow(longtimeMillis){
if(timeMillis<0){
returnnull;
}
#核算给定的时刻映射在数组中的下标(默许数组长度为2)
#则idx能够是0或许1
intidx=calculateTimeI` J Adx(timeMillis);
#根据当时时刻核算出所在窗口应该对用的开端时刻
longwindowStart=ca? Z P /lculateWindowStart(timeMillis);
privateintcalculateTimeIdx(longtimeMillis){
longt a ,timeId=timeMillis/windowLengthInMs;
return(int)(timeId%array.length());
}
protectedlongcalculateWindowStart(/*@ValidG L M | A*/longtim; D u M K p 7 % *e l C { UMillis){
returntimeMillis-timeMillis%windoc 0 EwLengthInMs;
}
为什么默许要用两个采样窗口,由于sentinel设定的是比较轻量的框架。时刻 G P k Q ` c窗口保存着很多核算数据,d ) p ! a !假如时刻窗口过多的话,一方面会占用过多的内存,另一方面时刻窗口过4 { y多意味着时刻窗口的长度会变小,假如时刻窗口长度变小,就会导致时刻窗口过于频频的滑动。
while(trJ G ( E V Jue){
#获取存储的该索引位置下的旧的时刻窗口
WindowWrap<T>0 : K 4old=array.get(idx);
if(old==null){
#没有则创立一个
WindowWrap<T>window=B d : c ; ) G l OnewW . ) K CWindowWrap<T>(windowL+ e : / P uengthInMs,windowStart,newEmptyBucket(timeMillis));
#经过CAS进行设置
if(array.comB R : r k T s VpareAndSew f ~ u B s @ 3 {t(idx,null,window)){
returnwindow;
}else{
//否则当时线程让出时] + X 6 Y E Z p B刻片,再进行线程竞争
Thread.yielo D v G W $d();
}
## J ? x - !假如实践应当的开端时刻和本来的窗口的开端时刻持平,则阐明没有失效,直接回来
}elseif(windowStart==old.windowStart())j x c z ) P{
returnold;
#让应当的开端时刻大于本来old窗口的y P @ O O ,开端时刻,则_ l v I g x q阐明该窗口失效
}elseif(windowStartr n z 3 , S>old.windowStart()){
if(updateLock.tryL} Q 9 S Oock()){
try{
#将旧的时刻窗口的开端时刻设置为实践应该的开端时刻,
# ! / 1 ] z ( [ o并1 # u r I Z u . 0重置该窗口的核算数据为0
rd F v Oetus 9 J O 9rnresetWindowTo(old,windowStart);
}finally{
updateLock.unlock();
}
}else{
Thread.yield();
}
#这种情况不) e 2 }可能存在,会抛出反常
}elseif(windowStart<old.windowStart()){
returnnewWindowWrap<T>(windowLengthInMs,windowStart,newEmptyBucket(timeMillis));
}
}
@Override
protectedWindowWrap<MetricBucket>resez ` jtWindowTo(WindowWrap<MetricBucke# u . x } bt>w,longstartTime){
//Updatethestarttimeandresetvalue.
w.resetTo(startTime);
#w.value()即MetricBucw N {ket
w.value().reset();
returnw;
}
#重新设置它的开端时刻
publicWindowWrap<T>resetTo(longstartTime){
this.windowStart=startTime;
returnthis;
}
#将MetricBucket的核算数据都重置为0
publicvoidreset(){
int$ * . - 2 P -ernalReset(0L);
}
Qps增加第二大步
至此,第一大步现已介绍完了,下面是第二大步wrap.val= K t 8 hue().addPass(count)
。
这一步很简单,便是在第一步后会获得所处的时刻窗口Wind# 0 0owWrap
,然后得到该类里面的MetricBucket
,它核算了该事件0 c 5 } s窗口下的数据核算,最后进行原子增加操作。
privateTvalue;
publicWindowWrap(longT 9 c ] @ W q X $windowLengthInMs,longwindowStart,Tvalue){
this.windowL( W @ F E * H #engthInMs=windowLengthInMs;
this.windowStart=windowStart;
this.value=value;
}
publ1 T m T G ,icTvalue(){
returs ` a mnvalue;
}
publicvoidaddPass(ing L L $ t Y [ | ptn){
add(MetricEvent.PASS,n);
}
publicMetricBucketadd(MetricEventevent,longn){N - n P
counF o v / |ters[event.ordinal()].add(n);
returnthis;
}
以上便是增加Qps的整体流程。
Qps数据获取
那咱们将数据增加上了,那怎样查询获得呢?

经过学习了解后,咱们O ` 9 w :能够知道资源的数据核算存放在DefaultNode
和ClsterNode
中,它们都是StatisticNode
的子类,St & i Y { t 1atisticNode
完成了NOde
接口的很多关于核算数据的办法,其间有核算Qps的办法。
@OP - W g 5 b c &verride
publicdoublepassQps(){
#先获取现在的时刻窗口数组的Qps总量@(1)
#然后获取时刻@(2)
return3 e N 9 } p u vrollingCounterInSecond.pass()/rollingCounterInSecond.getWindowIntervalInSec();
}
代码@(1)解析
@Override
publiclongpass(){
#与前! c Z Y f面办法共同,过滤掉过期窗口
data.current% 5 Z { ] GWinE t W } Kdow();
longpY s ` 8 Q Pass=0;
List<G g c C U $ g DMetricBucket>list=data.values();
for(MetricBucketwindow:list){
pass+=window.pass();
}
returnpassO U a :;
}
pu) Y k ablicList<T>values(){
returnvalues(TimeUtil.currentTimeMillis());
}
publicList<T>values(longtimeMillis){
if(timeMillis<0){
return1 L ~ ~newArrayList<T>();
}
intsize=array.length();
List<T>result=newArrayList<T>(size);
for(inti=0;i<size;i++){
WindowWrap<T>windowWrap=ar. r Cray.get(i);& Y L $ 9 W & h
ifu U g | g = ~ a I(windowWrap==null||isWj ] 3 2 C F g b vindowDeprecated(timeMillis,windowWrap)){
continue;
}
#即MetricBucket
result.add(windowWrap.value());
}
returnresult;
}
当时时刻减去某一窗口. [ / S 6 r e j d的开端时刻,0 h 8 q ?超越了事件距离(按秒核算的话,便是1s)a & 9 c U,就阐明该窗口过期,不增加。
publicT . H S ibooleanisWindowDeprecated(longtime,WindowWrap<T>windowWrap){
returntime-windowWrap.windowStart()>intervalInMs;
}
代码@(2)解析
由于之前的时刻单位是毫秒,现在核算的是每秒,, I t ? A所以转化为秒。
@Ove: ? B H R jrride
publicdoublU + ^ ZegetWindowIntervalInSec(){
returndata.~ ~ 1 d ! ~ B 8 EgetIntervalInSecond();
}
publicdoK } ~ g Tubleg 9 RgetIntp 4 #erva& | W ( H Z |lInSecond(){
re1 R _turni* 0 E ntervalInMs/1000.0;
}
至此,关于实时核算的模块就讲完了,大部分是参考几个大神的文章,图文并茂,很好了解,大家能够阅A 5 H ) {读如下:
Sentinel 原理-滑动窗口? ] T c (
Alibp & _ ?aba Seninel 滑动窗口完成原理(文末附原理图)
源码剖析 Sentinel 实时数据收集完成原理
本文运用 mdnice 排版