敞开生长之旅!这是我参与「日新计划 12 月更文应战」的第10天,点击查看活动概况

常见的限流算法分析

限流在咱们日常日子中经常见到,如火车站门口的栏杆一些景点的门票只出售必定的数量 等等。在咱们的开发中也用到了这种思想。

为什么要限流

在确保可用的状况下尽或许多添加进入的人数,其他的人在排队等待,或者返回友好提示,确保里边的进行体系的用户能够正常运用, 避免体系雪崩

限流算法

限流算法很多,常见的有三类,分别是 计数器算法漏桶算法令牌桶算法

(1)计数器:

在一段时刻距离内,处理恳求的最大数量固定,超越部分不做处理。

(2)漏桶:

漏桶巨细固定,处理速度固定,但恳求进入速度不固定(在突发状况恳求过多时,会丢掉过多的恳求)。

(3)令牌桶:

令牌桶的巨细固定,令牌的产生速度固定,可是消耗令牌(即恳求)速度不固定(能够应对一些某些时刻恳求过多的状况);每个恳求都会从令牌桶中取出令牌,假如没有令牌则丢掉该次恳求。

计数器限流

在一段时刻距离内,处理恳求的最大数量固定,超越部分不做处理。

举个,比方咱们规矩对于A接口,咱们1分钟的拜访次数不能超越100次。

那么咱们能够这么做:

在一开 始的时候,咱们能够设置一个计数器counter,每当一个恳求过来的时候,counter就加1,假如counter的值大于100并且该恳求与第一个恳求的距离时刻还在1分钟之内,那么阐明恳求数过多,回绝拜访;

假如该恳求与第一个恳求的距离时刻大于1分钟,且counter的值还在限流范围内,那么就重置 counter,便是这么简单粗暴。

常见的限流算法分析以及手写实现(计数器、漏斗、令牌桶)

代码完成:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
​
//计数器 限流
public class CounterLimiter {
​
  //开始时刻
  private static long startTime = System.currentTimeMillis();
​
  //时刻距离1000ms
  private static long interval = 1000;
​
  //每个时刻距离内,约束数量
  private static long limit = 3;
​
  //累加器
  private static AtomicLong accumulator = new AtomicLong();
​
  /**
   * true 代表放行,恳求可已经过
   * false 代表约束,不让恳求经过
   */
  public static boolean tryAcquire() {
    long nowTime = System.currentTimeMillis();
    //判断是否在上一个时刻距离内
    if (nowTime < startTime + interval) {
      //假如还在上个时刻距离内
      long count = accumulator.incrementAndGet();
      if (count <= limit) {
        return true;
       } else {
        return false;
       }
     } else {
      //假如不在上一个时刻距离内
      synchronized (CounterLimiter.class) {
        //避免重复初始化
        if (nowTime > startTime + interval) {
          startTime = nowTime;
          accumulator.set(0);
         }
       }
      //再次进行判断
      long count = accumulator.incrementAndGet();
      if (count <= limit) {
        return true;
       } else {
        return false;
       }
     }
   }
​
​
  // 测验
  public static void main(String[] args) {
​
    //线程池,用于多线程模仿测验
    ExecutorService pool = Executors.newFixedThreadPool(10);
    // 被约束的次数
    AtomicInteger limited = new AtomicInteger(0);
    // 线程数
    final int threads = 2;
    // 每条线程的履行轮数
    final int turns = 20;
    // 同步器
    CountDownLatch countDownLatch = new CountDownLatch(threads);
    long start = System.currentTimeMillis();
    for (int i = 0; i < threads; i++) {
      pool.submit(() ->
       {
        try {
​
          for (int j = 0; j < turns; j++) {
​
            boolean flag = tryAcquire();
            if (!flag) {
              // 被约束的次数累积
              limited.getAndIncrement();
             }
            Thread.sleep(200);
           }
​
         } catch (Exception e) {
          e.printStackTrace();
         }
        //等待所有线程完毕
        countDownLatch.countDown();
       });
     }
    try {
      countDownLatch.await();
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    float time = (System.currentTimeMillis() - start) / 1000F;
    //输出核算成果
    System.out.println("约束的次数为:" + limited.get() +
        ",经过的次数为:" + (threads * turns - limited.get()));
    System.out.println("约束的份额为:" + (float) limited.get() / (float) (threads * turns));
    System.out.println("运转的时长为:" + time + "s");
   }
​
}
​

计数器限流的缺乏:

这个算法虽然简单,可是存在临界问题,咱们看下图:

常见的限流算法分析以及手写实现(计数器、漏斗、令牌桶)

从上图中咱们能够看到,假设有一个歹意用户,他在0:59时,瞬间发送了100个恳求,并且1:00又瞬间发送了100个恳求,那么其实这个用户在 1秒里边,瞬间发送了200个恳求。

咱们方才规矩的是1分钟最多100个恳求(规划的吞吐量),也便是每秒钟最多1.7个恳求,用户经过在时刻窗口的重置节点处突发恳求, 能够瞬间超越咱们的速率约束。

用户有或许经过算法的这个漏洞,瞬间压垮咱们的应用。‍♀️

漏桶限流

✨漏桶算法限流的基本原理为:水(对应恳求)从进水口进入到漏桶里,漏桶以必定的速度出水(恳求放行),当水流入速度过大,桶内的总水量大于桶容量会直接溢出,恳求被回绝。 大致的漏桶限流规矩如下:

(1)进水口(对应客户端恳求)以任意速率流入进入漏桶。

(2)漏桶的容量是固定的,出水(放行)速率也是固定的。

(3)漏桶容量是不变的,假如处理速度太慢,桶内水量会超出了桶的容量,则后面流入的水滴会溢出,表示恳求回绝。

常见的限流算法分析以及手写实现(计数器、漏斗、令牌桶)

⭐漏桶算法其实很简单,能够粗略的认为便是注水漏水进程,往桶中以任意速率流入水,以必定速率流出水,当水超越桶容量(capacity)则丢掉,由于桶容量是不变的,确保了整体的速率。 以必定速率流出水,

常见的限流算法分析以及手写实现(计数器、漏斗、令牌桶)

削峰: 有大量流量进入时,会产生溢出,然后限流保护服务可用

缓冲: 不至于直接恳求到服务器, 缓冲压力

代码完成:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
​
//漏斗限流
public class LeakBucketLimiter {
​
  //桶的巨细
  private static long capacity = 10;
  //流出速率,每秒两个
  private static long rate = 2;
  //开始时刻
  private static long startTime = System.currentTimeMillis();
  //桶中剩下的水
  private static AtomicLong water = new AtomicLong();
​
  /**
   * true 代表放行,恳求可已经过
   * false 代表约束,不让恳求经过
   */
  public synchronized static boolean tryAcquire() {
    //假如桶的余量问0,直接放行
    if (water.get() == 0) {
      startTime = System.currentTimeMillis();
      water.set(1);
      return true;
     }
    //核算从当时时刻到开始时刻流出的水,和现在桶中剩下的水
    //桶中剩下的水
    water.set(water.get() - (System.currentTimeMillis() - startTime) / 1000 * rate);
    //避免出现<0的状况
    water.set(Math.max(0, water.get()));
    //设置新的开始时刻
    startTime += (System.currentTimeMillis() - startTime) / 1000 * 1000;
    //假如当时水小于容量,表示能够放行
    if (water.get() < capacity) {
      water.incrementAndGet();
      return true;
     } else {
      return false;
     }
   }
​
​
  // 测验
  public static void main(String[] args) {
​
    //线程池,用于多线程模仿测验
    ExecutorService pool = Executors.newFixedThreadPool(10);
    // 被约束的次数
    AtomicInteger limited = new AtomicInteger(0);
    // 线程数
    final int threads = 2;
    // 每条线程的履行轮数
    final int turns = 20;
    // 同步器
    CountDownLatch countDownLatch = new CountDownLatch(threads);
    long start = System.currentTimeMillis();
    for (int i = 0; i < threads; i++) {
      pool.submit(() ->
       {
        try {
​
          for (int j = 0; j < turns; j++) {
​
            boolean flag = tryAcquire();
            if (!flag) {
              // 被约束的次数累积
              limited.getAndIncrement();
             }
            Thread.sleep(200);
           }
​
         } catch (Exception e) {
          e.printStackTrace();
         }
        //等待所有线程完毕
        countDownLatch.countDown();
       });
     }
    try {
      countDownLatch.await();
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    float time = (System.currentTimeMillis() - start) / 1000F;
    //输出核算成果
    System.out.println("约束的次数为:" + limited.get() +
        ",经过的次数为:" + (threads * turns - limited.get()));
    System.out.println("约束的份额为:" + (float) limited.get() / (float) (threads * turns));
    System.out.println("运转的时长为:" + time + "s");
   }
​
}

漏桶的缺乏:

漏桶的出水速度固定,也便是恳求放行速度是固定的。 漏桶出口的速度固定,不能灵活的应对后端才能提升。比方,经过动态扩容,后端流量从1000QPS提升到1WQPS,漏桶没有办法。

令牌桶限流

令牌桶算法中新恳求到来时会从桶里拿走一个令牌,假如桶内没有令牌可拿,就回绝服务。 当然,令牌的数量也是有上限的。令牌的数量与时刻和发放速率强相关,时刻流逝的时刻越长,会不断往桶里加入越多的令牌,假如令牌发放的速度比申请速度快,令牌桶会放满令牌,直到令牌占满整个令牌桶。

令牌桶限流大致的规矩如下:‍

(1)进水口依照某个速度,向桶中放入令牌。

(2)令牌的容量是固定的,可是放行的速度不是固定的,只需桶中还有剩下令牌,一旦恳求过来就能申请成功,然后放行。

(3)假如令牌的发放速度,慢于恳求到来速度,桶内就无牌可领,恳求就会被回绝。

总之,令牌的发送速率能够设置,然后能够对突发的出口流量进行有效的应对。

常见的限流算法分析以及手写实现(计数器、漏斗、令牌桶)

代码完成:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
​
//令牌桶
public class TokenBucketLimiter {
  //桶的容量
  private static long capacity = 10;
  //放入令牌的速率,每秒2个
  private static long rate = 2;
  //上次放置令牌的时刻
  private static long lastTime = System.currentTimeMillis();
  //桶中令牌的余量
  private static AtomicLong tokenNum = new AtomicLong();
​
  /**
   * true 代表放行,恳求可已经过
   * false 代表约束,不让恳求经过
   */
  public synchronized static boolean tryAcquire() {
    //更新桶中剩下令牌的数量
    long now = System.currentTimeMillis();
    tokenNum.addAndGet((now - lastTime) / 1000 * rate);
    tokenNum.set(Math.min(capacity, tokenNum.get()));
    //更新时刻
    lastTime += (now - lastTime) / 1000 * 1000;
    //桶中还有令牌就放行
    if (tokenNum.get() > 0) {
      tokenNum.decrementAndGet();
      return true;
     } else {
      return false;
     }
   }
​
​
  //测验
  public static void main(String[] args) {
​
    //线程池,用于多线程模仿测验
    ExecutorService pool = Executors.newFixedThreadPool(10);
    // 被约束的次数
    AtomicInteger limited = new AtomicInteger(0);
    // 线程数
    final int threads = 2;
    // 每条线程的履行轮数
    final int turns = 20;
    // 同步器
    CountDownLatch countDownLatch = new CountDownLatch(threads);
    long start = System.currentTimeMillis();
    for (int i = 0; i < threads; i++) {
      pool.submit(() ->
       {
        try {
​
          for (int j = 0; j < turns; j++) {
​
            boolean flag = tryAcquire();
            if (!flag) {
              // 被约束的次数累积
              limited.getAndIncrement();
             }
            Thread.sleep(200);
           }
​
         } catch (Exception e) {
          e.printStackTrace();
         }
        //等待所有线程完毕
        countDownLatch.countDown();
       });
     }
    try {
      countDownLatch.await();
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    float time = (System.currentTimeMillis() - start) / 1000F;
    //输出核算成果
    System.out.println("约束的次数为:" + limited.get() +
        ",经过的次数为:" + (threads * turns - limited.get()));
    System.out.println("约束的份额为:" + (float) limited.get() / (float) (threads * turns));
    System.out.println("运转的时长为:" + time + "s");
   }
​
}

令牌桶的优点:

令牌桶的优点之一便是能够方便地应对 突发出口流量(后端才能的提升)。

比方,能够改动令牌的发放速度,算法能依照新的发送速率调大令牌的发放数量,使得出口突发流量能被处理。