令牌桶限流

#前言

在最近工作中承接了部分技改项需求,需要在流量高峰期对非紧密关联业务的缓存写操作进行速率限制,重保缓存读操作,保证业务稳定。 考虑到该部分需要限流业务的部署情况,一开始就敲定了单机限流的前提。 同时,在做了进一步调研之后,决定采取 Google Guava 包中成熟的令牌桶限流

#令牌桶算法

令牌桶的限流原理可分为以下几步:

  • 令牌桶以设定速率产生令牌并放入桶中,当令牌数达到桶的令牌上限时不再继续生成;

  • 每当有新请求到来时都会先去令牌桶获取令牌,只有拿到令牌的请求才会放行;

  • 根据获取令牌的方式,令牌桶对请求的处理方式也不同,令牌的获取分为阻塞式获取和非阻塞式获取:

    • 阻塞式:若当前请求获取到令牌则放行,否则请求所在线程阻塞直至获取到令牌;
    • 非阻塞式:若当前请求获取到令牌则放行,否则立即返回失败。

#核心API

#令牌桶创建
    // 创建平滑突发限流实例
    public static RateLimiter create(double permitsPerSecond) {
        return create(RateLimiter.SleepingStopwatch.createFromSystemTimer(), permitsPerSecond);
    }

    static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {
        RateLimiter rateLimiter = new SmoothRateLimiter.SmoothBursty(stopwatch, 1.0);
        rateLimiter.setRate(permitsPerSecond);
        return rateLimiter;
    }

    SmoothBursty(RateLimiter.SleepingStopwatch stopwatch, double maxBurstSeconds) {
        super(stopwatch, null);
        this.maxBurstSeconds = maxBurstSeconds;
    }
    
    // 创建平滑预热限流实例
    static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond, long warmupPeriod, TimeUnit unit, double coldFactor) {
        RateLimiter rateLimiter = new SmoothRateLimiter.SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
        rateLimiter.setRate(permitsPerSecond);
        return rateLimiter;
    }

    SmoothWarmingUp(RateLimiter.SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) {
        super(stopwatch, null);
        this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
        this.coldFactor = coldFactor;
    }

RateLimiterSmoothBurstySmoothWarmingUp 两个实现类,分别表示平滑突发限流和平滑预热限流,前者生成令牌的速率是恒定的,后置存在一个预热期,在预热期内令牌的速度是慢慢增加直至到达固定速度为止。

#设置限流器的速率
    // 对外提供的方法
    public final void setRate(double permitsPerSecond) {
        Preconditions.checkArgument(permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
        synchronized(this.mutex()) {
            this.doSetRate(permitsPerSecond, this.stopwatch.readMicros());
        }
    }

    // 实际调用方法
    final void doSetRate(double permitsPerSecond, long nowMicros) {
        this.resync(nowMicros);
        double stableIntervalMicros = (double)TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
        this.stableIntervalMicros = stableIntervalMicros;
        this.doSetRate(permitsPerSecond, stableIntervalMicros);
    }

    // resync方法
    private void resync(long nowMicros) {
        if (nowMicros > this.nextFreeTicketMicros) {
            this.storedPermits = Math.min(this.maxPermits, this.storedPermits + (double)(nowMicros - this.nextFreeTicketMicros) / this.stableIntervalMicros);
            this.nextFreeTicketMicros = nowMicros;
        }
    }
    
    // SmoothBursty平滑突发限流实现
    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
        double oldMaxPermits = this.maxPermits;
        this.maxPermits = this.maxBurstSeconds * permitsPerSecond;
        if (oldMaxPermits == Double.POSITIVE_INFINITY) {
            this.storedPermits = this.maxPermits;
        } else {
            this.storedPermits = oldMaxPermits == 0.0 ? 0.0 : this.storedPermits * this.maxPermits / oldMaxPermits;
        }
    }
    
    // SmoothWarmingUp平滑预热限流实现
    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
        double oldMaxPermits = this.maxPermits;
        this.maxPermits = (double)this.warmupPeriodMicros / stableIntervalMicros;
        this.halfPermits = this.maxPermits / 2.0;
        double coldIntervalMicros = stableIntervalMicros * 3.0;
        this.slope = (coldIntervalMicros - stableIntervalMicros) / this.halfPermits;
        if (oldMaxPermits == Double.POSITIVE_INFINITY) {
            this.storedPermits = 0.0;
        } else {
            this.storedPermits = oldMaxPermits == 0.0 ? this.maxPermits : this.storedPermits * this.maxPermits / oldMaxPermits;
        }
    }

设置限流器速率时,根据传入的每秒产生的令牌数 permitsPerSecond 计算出并产生一个令牌所需的微秒数 stableIntervalMicros 。 然后在 resync 方法中,对令牌桶当前已存储的令牌数storedPermits 与下一个可以分配令牌的时间点 nextFreeTicketMicros 进行了调整,即如果当前时间晚于 nextFreeTicketMicros,则计算这段时间内产生的令牌数并累加到 令牌桶当前已存储的令牌数storedPermits 上,并更新下次可获取令牌时间 nextFreeTicketMicros 为当前时间。

  • SmoothBursty 平滑突发限流实现中,先根据桶中最多可保存的多少秒存入的令牌数 maxBurstSeconds, 创建实例时传入的默认值为 1.0,然后再乘以创建实例时传入的每秒产生的令牌数 permitsPerSecond 得到新的最大令牌数 maxPermits 并更新实例的相应属性。 同时,计算得到令牌桶当前已存储的令牌数storedPermits 并更新相应实例属性, 计算公式为 storedPermits = oldStoredPermits * maxPermits / oldMaxPermits
  • SmoothWarmingUp 平滑预热限流实现中,计算并更新 maxPermitsstoredPermits 的逻辑有些区别,会根据预热期warmupPeriodMicros进行计算。
#获取限流器的速率
    // 对外提供的方法
    public final double getRate() {
        synchronized(this.mutex()) {
            return this.doGetRate();
        }
    }
    
    // 实际调用方法
    final double doGetRate() {
        return (double)TimeUnit.SECONDS.toMicros(1L) / this.stableIntervalMicros;
    }
    
    // mutex锁
    private Object mutex() {
        Object mutex = this.mutexDoNotUseDirectly;
        if (mutex == null) {
            synchronized(this) {
                mutex = this.mutexDoNotUseDirectly;
                if (mutex == null) {
                    this.mutexDoNotUseDirectly = mutex = new Object();
                }
            }
        }
        return mutex;
    }

获取限流器速率的计算方式是使用 1 除以令牌的生成速率 stableIntervalMicros 得出,即 rate = 1 / stableIntervalMicros 。这里为了保证线程安全,实现了一个 mutex 锁。

#阻塞式获取令牌
    // 获取一个令牌
    public double acquire() {
        return acquire(1);
    }
    
    // 获取指定数目的令牌
    public double acquire(int permits) {
        long microsToWait = this.reserve(permits);
        this.stopwatch.sleepMicrosUninterruptibly(microsToWait);
        return 1.0 * (double)microsToWait / (double)TimeUnit.SECONDS.toMicros(1L);
    }

RateLimiter 获取许可,该方法会被阻塞直到获取到请求, 返回值为阻塞时间,单位为秒。

#非阻塞式获取令牌
    // 获取令牌带超时时间
    public boolean tryAcquire(long timeout, TimeUnit unit) {
        return this.tryAcquire(1, timeout, unit);
    }

    // 获取指定数目令牌不带超时时间
    public boolean tryAcquire(int permits) {
        return this.tryAcquire(permits, 0L, TimeUnit.MICROSECONDS);
    }

    // 获取令牌不带超时时间
    public boolean tryAcquire() {
        return this.tryAcquire(1, 0L, TimeUnit.MICROSECONDS);
    }

    // 获取指定数目令牌带超时时间
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
        long timeoutMicros = Math.max(unit.toMicros(timeout), 0L);
        checkPermits(permits);
        long microsToWait;
        synchronized(this.mutex()) {
            long nowMicros = this.stopwatch.readMicros();
            if (!this.canAcquire(nowMicros, timeoutMicros)) {
                return false;
            }

            microsToWait = this.reserveAndGetWaitLength(permits, nowMicros);
        }

        this.stopwatch.sleepMicrosUninterruptibly(microsToWait);
        return true;
    }

阻塞式获取令牌,可以指定获取令牌的数目和超时时间,若在超时时间内获取到了令牌,则不会阻塞,否则立即返回 false

#实践

#参数配置化

在业务实践中对令牌桶的参数做了动态配置,主要参数包括是否开启限流以及限流的 QPS 。同时为了实现一个通用限流的效果,以键值对的 json 形式存放相应配置, 键表示业务类型,值对象中配置限流器的相关参数如下所示:

{
  "business_a": {
    "switchOn": false,
    "qps": 0.5
  },
  "business_b": {
    "switchOn": true,
    "qps": 10
  }
}

同时实现一个读取配置的方法,如下:

@Component("rateLimiterConfig")
public class RateLimiterConfig {

    /**
     * 动态配置的令牌桶限流参数
     */
    private String rateLimiterConfig;

    /**
     * 默认限制QPS
     */
    public static final double DEFAULT_LIMITED_QPS = 1.0d;

    /**
     * 获取令牌桶限流配置
     *
     * @param rateLimiterType 限流器类型
     * @return 结果
     */
    public LimiterConfigInfo getRateLimiterConfig(GuavaRateLimiter.RateLimiterType rateLimiterType) {
        LimiterConfigInfo configInfo = new LimiterConfigInfo().setSwitchOn(false).setQps(DEFAULT_LIMITED_QPS);
        if (StringUtils.isNotBlank(rateLimiterConfig)) {
            TypeReference<HashMap<String, LimiterConfigInfo>> typeRef = new TypeReference<HashMap<String, RateLimiterConfig.LimiterConfigInfo>>() {
            };
            Optional.ofNullable(
                            Optional.ofNullable(JSON.parseObject(rateLimiterConfig, typeRef))
                                    .orElse(Maps.newHashMap())
                                    .get(String.valueOf(rateLimiterType.getName()))
                    )
                    .ifPresent(
                            config -> {
                                Optional.ofNullable(config.getSwitchOn()).ifPresent(configInfo::setSwitchOn);
                                Optional.ofNullable(config.getQps()).ifPresent(configInfo::setQps);
                            }
                    );
        }
        return configInfo;
    }

    /**
     * 限流配置项
     */
    @Data
    @Accessors(chain = true)
    public static class LimiterConfigInfo {

        /**
         * 限流开关
         */
        private Boolean switchOn;

        /**
         * 限流qps
         */
        private Double qps;
    }
}
#动态监听配置变化及方法封装

配合限流器配置的动态更新,需要动态对限流器速率进行动态设置。同时,对 RateLimiter 做一层业务封装。

@Slf4j
@SuppressWarnings("all")
@Service("guavaRateLimiter")
public class GuavaRateLimiter {

  /**
   * 限流器集合
   */
  @Getter
  private ConcurrentHashMap<RateLimiterType, RateLimiter> rateLimiterMap = new ConcurrentHashMap<>();

  /**
   * 限流器配置
   */
  @Resource(name = "rateLimiterConfig")
  private RateLimiterConfig rateLimiterConfig;

  /**
   * 限流器配置变更监听器
   * 
   * @param rateLimiterConfigProperty 限流器配置属性
   */
  private void onRateLimiterConfigPropertyChange(Property rateLimiterConfigProperty) {
    if (Objects.isNull(rateLimiterConfigProperty)) {
      return;
    }
    String configKey = rateLimiterConfigProperty.getKey();
    String configValue = Optional.ofNullable(rateLimiterConfigProperty.getValue()).map(o -> (String) o).orElse(null);
    log.info("监听到限流配置发生变化, 键: {}, 值: {}", configKey, configValue);
    TypeReference<HashMap<String, RateLimiterConfig.LimiterConfigInfo>> typeRef = new TypeReference<HashMap<String, RateLimiterConfig.LimiterConfigInfo>>() {
    };
    Map<String, RateLimiterConfig.LimiterConfigInfo> rateLimiterConfigMap = JSON.parseObject(configValue, typeRef);
    if (MapUtils.isEmpty(rateLimiterConfigMap)) {
      rateLimiterMap.clear();
      log.info("限流配置为空, 清理本地限流器完成");
    }
    for (Map.Entry<String, RateLimiterConfig.LimiterConfigInfo> entry : rateLimiterConfigMap.entrySet()) {
      RateLimiterType rateLimiterType = RateLimiterType.ofName(entry.getKey());
      if (Objects.isNull(rateLimiterType)) {
        continue;
      }
      RateLimiterConfig.LimiterConfigInfo value = entry.getValue();
      if (Optional.ofNullable(value.getSwitchOn()).orElse(false)) {
        double newQps = Optional.ofNullable(value.getQps()).orElse(RateLimiterConfig.DEFAULT_LIMITED_QPS);
        if (rateLimiterMap.containsKey(rateLimiterType)) {
          RateLimiter rateLimiter = rateLimiterMap.get(rateLimiterType);
          double oldQps = rateLimiter.getRate();
          rateLimiter.setRate(newQps);
          log.info("【{}限流器】限流开关已开启, 监听到限流速率发生变化, 重新设置限流速率完成, {} -> {}", rateLimiterType.getDesc(), oldQps, newQps);
        } else {
          rateLimiterMap.put(rateLimiterType, RateLimiter.create(newQps));
          log.info("【{}限流器】初始化完成", rateLimiterType.getDesc());
        }
      } else {
        rateLimiterMap.remove(rateLimiterType);
        log.info("【{}限流器】限流开关已关闭, 清理本地该限流器完成", rateLimiterType.getDesc());
      }
    }
  }

  /**
   * 创建限流器
   *
   * @param rateLimiterType 限流器类型
   */
  private void putRateLimiter(RateLimiterType rateLimiterType) {
    if (!rateLimiterMap.containsKey(rateLimiterType)) {
      rateLimiterMap.put(rateLimiterType, RateLimiter.create(rateLimiterConfig.getRateLimiterConfig(rateLimiterType).getQps()));
      log.info("【{}限流器】初始化完成", rateLimiterType.getDesc());
    }
  }

  /**
   * 获取限流器
   *
   * @param rateLimiterType 限流器类型
   * @return 结果
   */
  private RateLimiter getRateLimiter(RateLimiterType rateLimiterType) {
    // 获取时若为null则创建创建并放置
    this.putRateLimiter(rateLimiterType);
    return rateLimiterMap.get(rateLimiterType);
  }

  /**
   * 阻塞式获取令牌
   *
   * @param rateLimiterType 限流器类型
   */
  public void acquire(RateLimiterType rateLimiterType) {
    if (rateLimiterConfig.getRateLimiterConfig(rateLimiterType).getSwitchOn()) {
      // 如果开启限流则开始获取令牌
      double waitingSeconds = this.getRateLimiter(rateLimiterType).acquire();
      log.info("【{}限流器】限流开关已开启, 本次获取令牌成功, 获取令牌等待时间: {}s", rateLimiterType.getDesc(), waitingSeconds);
    }
  }

  /**
   * 限流器类型
   */
  @Getter
  public enum RateLimiterType {

    BUSINESS_A(1, "business_a", "业务a"),
    BUSINESS_B(1, "business_b", "业务b");

    private final int code;
    private final String name;
    private final String desc;

    RateLimiterType(int code, String name, String desc) {
      this.code = code;
      this.name = name;
      this.desc = desc;
    }

    /**
     * 根据code字符串获取枚举项
     *
     * @param codeStr code字符串
     * @return 结果
     */
    public static RateLimiterType ofCodeStr(String codeStr) {
      for (RateLimiterType value : values()) {
        if (String.valueOf(value.getCode()).equals(codeStr)) {
          return value;
        }
      }
      return null;
    }

    /**
     * 根据name获取枚举项
     *
     * @param name 名称
     * @return 结果
     */
    public static RateLimiterType ofName(String name) {
      for (RateLimiterType value : values()) {
        if (String.valueOf(value.getName()).equals(name)) {
          return value;
        }
      }
      return null;
    }
  }
}

#后记

本文针对 guava 包的令牌桶限流做了简单介绍,并在业务实践中进行了单机限流。对于分布式限流的原理和设计,后续有时间再做进一步的介绍和实践。