分布式限流器也是项目开发中常用的,基于redis incr指令可以比较方便快捷的实现一个分布式限流器,因为这个指令自身的原子性,所以不用开发者考虑计数过程的并发问题。下面是go代码实现。
限流器相关属性
counter计数器,这里把计数器单独定义为了一个接口,方便使用不同的方式实现,比如下文中将要介绍基于redis实现。
window限流窗口,指计数时间周期,比如初始化为1000就是1秒内请求不得超过指定阈值limit。
retryTimeout重试超时时间,用来控制达到限流阈值后,继续重试的超时时间。
// Limiter 限流器
type Limiter struct {
name string // 限流器名称
window int64 // 限流窗口 ms
limit int64 // 限流阈值
retryTimeout int64 // 超时时间
counter Count // 计数器
}
// Count 计数器
type Count interface {
IncrBy(ctx context.Context, key string, num int64) (int64, error)
}
// NewLimiter 初始化
func NewLimiter(counter Count, name string, window int64, limit int64,
retryTimeout int64) *Limiter {
return &Limiter{
counter: counter,
name: name,
window: window,
limit: limit,
retryTimeout: retryTimeout,
}
}
具体方法
Allow 到达阈值后,不再重试,直接返回false,丢弃掉超过阈值的请求。
AllowRetry 到达阈值后,在超时时间范围内进行重试。
wait 每次重试等待时间,保证重试是在下一个窗口,避免不必要的重试。使用随机数是为了尽量打散请求。
getKey 计算当前窗口,根据当前窗口拼接计数key。
// AllowRetry 在超时时间内重试
func (l *Limiter) AllowRetry(ctx context.Context, key string,
num int64) (bool, error) {
ctxWithTimeout, _ := context.WithTimeout(ctx, time.Duration(l.retryTimeout)*time.Millisecond)
var allow bool
var err error
for {
allow, err = l.Allow(ctx, key, num)
if err == nil {
return allow, nil
}
if ctxWithTimeout.Err() != nil {
return false, ctxWithTimeout.Err()
}
l.wait()
}
}
// Allow 不重试
func (l *Limiter) Allow(ctx context.Context, key string,
num int64) (bool, error) {
return l.getToken(ctx, key, num)
}
// getToken 进行计数
func (l *Limiter) getToken(ctx context.Context, key string,
num int64) (bool, error) {
unionKey := l.getKey(key)
num, err := l.counter.IncrBy(ctx, unionKey, num)
if err != nil {
return false, err
}
return num < l.limit, nil
}
// wait 每次重试等待时间
func (l *Limiter) wait() {
randWait := l.window - rand.Int63n(l.window/2)
time.Sleep(time.Duration(randWait) * time.Millisecond)
}
// getKey 获取计数key
func (l *Limiter) getKey(key string) string {
// 计算当前窗口
nowWindow := time.Now().UnixMilli() / l.window
return fmt.Sprintf("%s:%s:%d", l.name, key, nowWindow)
}
redis 计数器
redis计数器实现就比较简单了,直接调用redis的incr方法。
// RedisCounter 使用redis分布式限流
type RedisCounter struct {
redisCli *redis.Client
expire int64
}
// IncrBy ...计数
func (r *RedisCounter) IncrBy(ctx context.Context, key string,
num int64) (int64, error) {
defer func() {
_ = r.redisCli.Expire(ctx, key, time.Duration(r.expire)*time.Millisecond)
}()
return r.redisCli.IncrBy(ctx, key, num).Result()
}
以上就是基于redis实现的分布式限流器,把计数器以接口形式组装进去,可以使代码更加通用,直接基于内存实现就是单机限流。不想基于redis实现,也可以基于其他方式实现。