节流模式
节流设计模式
节流器接口
/**
* 用于定义不同类型限制方式的结构的接口。
*
*/
public interface Throttler {
void start();
}
节流器实现
/**
* 节流器接口的实现。此类每秒重置一次计数器。
*
*/
public class ThrottleTimerImpl implements Throttler {
private final int throttlePeriod;
private final CallsCount callsCount;
public ThrottleTimerImpl(int throttlePeriod, CallsCount callsCount) {
this.throttlePeriod = throttlePeriod;
this.callsCount = callsCount;
}
/**
* 使用此方法启动计时器。计时器每秒运行一次,并重置计数器。
*/
@Override
public void start() {
new Timer(true).schedule(new TimerTask() {
@Override
public void run() {
callsCount.reset();
}
}, 0, throttlePeriod);
}
}
/**
* 用于跟踪不同租户的计数器的类。
*
*/
@Slf4j
public final class CallsCount {
private final Map<String, AtomicLong> tenantCallsCount = new ConcurrentHashMap<>();
/**
* 将新租户添加到map。
*
* @param tenantName name of the tenant.
*/
public void addTenant(String tenantName) {
tenantCallsCount.putIfAbsent(tenantName, new AtomicLong(0));
}
/**
* 递增指定租户的计数。
*
* @param tenantName name of the tenant.
*/
public void incrementCount(String tenantName) {
tenantCallsCount.get(tenantName).incrementAndGet();
}
/**
* 根据租户名称获取租户计数。
*
* @param tenantName name of the tenant.
* @return the count of the tenant.
*/
public long getCount(String tenantName) {
return tenantCallsCount.get(tenantName).get();
}
/**
* 重置map中所有租户的计数。
*/
public void reset() {
tenantCallsCount.replaceAll((k, v) -> new AtomicLong(0));
LOGGER.info("reset counters");
}
}
资源服务
/**
* Bartender 是一种接受 BarCustomer(租户)和限制的服务
* 资源基于提供给租户的时间。
*/
class Bartender {
private static final Logger LOGGER = LoggerFactory.getLogger(Bartender.class);
private final CallsCount callsCount;
public Bartender(Throttler timer, CallsCount callsCount) {
this.callsCount = callsCount;
timer.start();
}
/**
* 从调酒师那里点一杯饮料。
* @return customer id which is randomly generated
*/
public int orderDrink(BarCustomer barCustomer) {
var tenantName = barCustomer.getName();
var count = callsCount.getCount(tenantName);
if (count >= barCustomer.getAllowedCallsPerSecond()) {
LOGGER.error("I'm sorry {}, you've had enough for today!", tenantName);
return -1;
}
callsCount.incrementCount(tenantName);
LOGGER.debug("Serving beer to {} : [{} consumed] ", barCustomer.getName(), count + 1);
return getRandomCustomerId();
}
private int getRandomCustomerId() {
return ThreadLocalRandom.current().nextInt(1, 10000);
}
}
消费者
/**
* BarCustomer 是一个租户,具有名称和每秒允许的呼叫数。
*/
@Getter
public class BarCustomer {
private final String name;
private final int allowedCallsPerSecond;
/**
* Constructor.
*
* @param name Name of the BarCustomer
* @param allowedCallsPerSecond The number of calls allowed for this particular tenant.
* @throws InvalidParameterException If number of calls is less than 0, throws exception.
*/
public BarCustomer(String name, int allowedCallsPerSecond, CallsCount callsCount) {
if (allowedCallsPerSecond < 0) {
throw new InvalidParameterException("Number of calls less than 0 not allowed");
}
this.name = name;
this.allowedCallsPerSecond = allowedCallsPerSecond;
callsCount.addTenant(name);
}
}
测试使用
/**
* 节流模式是一种限制或限制资源使用的设计模式,甚至是由用户或特定租户提供完整的服务。这可以使系统继续运行
* 并满足服务级别协议,即使需求增加会给资源带来负担。
* <p>
* 在此示例中,有一个 {@link Bartender} 为 {@link BarCustomer} 提供啤酒。这是一个时代
* 基于限制,即每秒只允许一定数量的调用。
* </p>
* ({@link BarCustomer}) is the service tenant class having a name and the number of calls allowed.
* ({@link Bartender}) is the service which is consumed by the tenants and is throttled.
*/
@Slf4j
public class App {
/**
* Application entry point.
*
* @param args main arguments
*/
public static void main(String[] args) {
var callsCount = new CallsCount();
var human = new BarCustomer("young human", 2, callsCount);
var dwarf = new BarCustomer("dwarf soldier", 4, callsCount);
var executorService = Executors.newFixedThreadPool(2);
executorService.execute(() -> makeServiceCalls(human, callsCount));
executorService.execute(() -> makeServiceCalls(dwarf, callsCount));
executorService.shutdown();
try {
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
}
/**
* Make calls to the bartender.
*/
private static void makeServiceCalls(BarCustomer barCustomer, CallsCount callsCount) {
var timer = new ThrottleTimerImpl(1000, callsCount);
var service = new Bartender(timer, callsCount);
// Sleep is introduced to keep the output in check and easy to view and analyze the results.
IntStream.range(0, 50).forEach(i -> {
service.orderDrink(barCustomer);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER.error("Thread interrupted: {}", e.getMessage());
}
});
}
}
License:
CC BY 4.0