文章

节流模式

节流设计模式

节流器接口

/**
 * 用于定义不同类型限制方式的结构的接口。
 *
 */
public interface Throttler {

  void start();
}

节流器实现

/**
 * 节流器接口的实现。此类每秒重置一次计数器。
 *
 */
public class ThrottleTimerImpl implements Throttler {

  private final int throttlePeriod;
  private final CallsCount callsCount;

  public ThrottleTimerImpl(int throttlePeriod, CallsCount callsCount) {
    this.throttlePeriod = throttlePeriod;
    this.callsCount = callsCount;
  }

  /**
   * 使用此方法启动计时器。计时器每秒运行一次,并重置计数器。
   */
  @Override
  public void start() {
    new Timer(true).schedule(new TimerTask() {
      @Override
      public void run() {
        callsCount.reset();
      }
    }, 0, throttlePeriod);
  }
}
/**
 * 用于跟踪不同租户的计数器的类。
 *
 */
@Slf4j
public final class CallsCount {
  private final Map<String, AtomicLong> tenantCallsCount = new ConcurrentHashMap<>();

  /**
   * 将新租户添加到map。
   *
   * @param tenantName name of the tenant.
   */
  public void addTenant(String tenantName) {
    tenantCallsCount.putIfAbsent(tenantName, new AtomicLong(0));
  }

  /**
   * 递增指定租户的计数。
   *
   * @param tenantName name of the tenant.
   */
  public void incrementCount(String tenantName) {
    tenantCallsCount.get(tenantName).incrementAndGet();
  }

  /**
   * 根据租户名称获取租户计数。
   *
   * @param tenantName name of the tenant.
   * @return the count of the tenant.
   */
  public long getCount(String tenantName) {
    return tenantCallsCount.get(tenantName).get();
  }

  /**
   * 重置map中所有租户的计数。
   */
  public void reset() {
    tenantCallsCount.replaceAll((k, v) -> new AtomicLong(0));
    LOGGER.info("reset counters");
  }
}

资源服务

/**
 * Bartender 是一种接受 BarCustomer(租户)和限制的服务
 * 资源基于提供给租户的时间。
 */
class Bartender {

  private static final Logger LOGGER = LoggerFactory.getLogger(Bartender.class);
  private final CallsCount callsCount;

  public Bartender(Throttler timer, CallsCount callsCount) {
    this.callsCount = callsCount;
    timer.start();
  }

  /**
   * 从调酒师那里点一杯饮料。
   * @return customer id which is randomly generated
   */
  public int orderDrink(BarCustomer barCustomer) {
    var tenantName = barCustomer.getName();
    var count = callsCount.getCount(tenantName);
    if (count >= barCustomer.getAllowedCallsPerSecond()) {
      LOGGER.error("I'm sorry {}, you've had enough for today!", tenantName);
      return -1;
    }
    callsCount.incrementCount(tenantName);
    LOGGER.debug("Serving beer to {} : [{} consumed] ", barCustomer.getName(), count + 1);
    return getRandomCustomerId();
  }

  private int getRandomCustomerId() {
    return ThreadLocalRandom.current().nextInt(1, 10000);
  }
}

消费者

/**
 * BarCustomer 是一个租户,具有名称和每秒允许的呼叫数。
 */
@Getter
public class BarCustomer {

  private final String name;
  private final int allowedCallsPerSecond;

  /**
   * Constructor.
   *
   * @param name Name of the BarCustomer
   * @param allowedCallsPerSecond The number of calls allowed for this particular tenant.
   * @throws InvalidParameterException If number of calls is less than 0, throws exception.
   */
  public BarCustomer(String name, int allowedCallsPerSecond, CallsCount callsCount) {
    if (allowedCallsPerSecond < 0) {
      throw new InvalidParameterException("Number of calls less than 0 not allowed");
    }
    this.name = name;
    this.allowedCallsPerSecond = allowedCallsPerSecond;
    callsCount.addTenant(name);
  }
}

测试使用

/**
* 节流模式是一种限制或限制资源使用的设计模式,甚至是由用户或特定租户提供完整的服务。这可以使系统继续运行
 * 并满足服务级别协议,即使需求增加会给资源带来负担。
 * <p>
 * 在此示例中,有一个 {@link Bartender} 为 {@link BarCustomer} 提供啤酒。这是一个时代
 * 基于限制,即每秒只允许一定数量的调用。
 * </p>
 * ({@link BarCustomer}) is the service tenant class having a name and the number of calls allowed.
 * ({@link Bartender}) is the service which is consumed by the tenants and is throttled.
 */
@Slf4j
public class App {

  /**
   * Application entry point.
   *
   * @param args main arguments
   */
  public static void main(String[] args) {
    var callsCount = new CallsCount();
    var human = new BarCustomer("young human", 2, callsCount);
    var dwarf = new BarCustomer("dwarf soldier", 4, callsCount);

    var executorService = Executors.newFixedThreadPool(2);

    executorService.execute(() -> makeServiceCalls(human, callsCount));
    executorService.execute(() -> makeServiceCalls(dwarf, callsCount));

    executorService.shutdown();
    try {
      if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
        executorService.shutdownNow();
      }
    } catch (InterruptedException e) {
      executorService.shutdownNow();
    }
  }

  /**
   * Make calls to the bartender.
   */
  private static void makeServiceCalls(BarCustomer barCustomer, CallsCount callsCount) {
    var timer = new ThrottleTimerImpl(1000, callsCount);
    var service = new Bartender(timer, callsCount);
    // Sleep is introduced to keep the output in check and easy to view and analyze the results.
    IntStream.range(0, 50).forEach(i -> {
      service.orderDrink(barCustomer);
      try {
        Thread.sleep(100);
      } catch (InterruptedException e) {
        LOGGER.error("Thread interrupted: {}", e.getMessage());
      }
    });
  }
}

License:  CC BY 4.0