分布式限流器也是项目开发中常用的,基于redis incr指令可以比较方便快捷的实现一个分布式限流器,因为这个指令自身的原子性,所以不用开发者考虑计数过程的并发问题。下面是go代码实现。

限流器相关属性

counter计数器,这里把计数器单独定义为了一个接口,方便使用不同的方式实现,比如下文中将要介绍基于redis实现。 window限流窗口,指计数时间周期,比如初始化为1000就是1秒内请求不得超过指定阈值limit。
retryTimeout重试超时时间,用来控制达到限流阈值后,继续重试的超时时间。

// Limiter 限流器
type Limiter struct {
	name         string // 限流器名称
	window       int64 // 限流窗口 ms
	limit        int64 // 限流阈值
	retryTimeout int64 // 超时时间
	counter      Count // 计数器
}

// Count 计数器
type Count interface {
	IncrBy(ctx context.Context, key string, num int64) (int64, error)
}

// NewLimiter 初始化
func NewLimiter(counter Count, name string, window int64, limit int64,
 retryTimeout int64) *Limiter {
	return &Limiter{
		counter:      counter,
		name:         name,
		window:       window,
		limit:        limit,
		retryTimeout: retryTimeout,
	}
}

具体方法

Allow 到达阈值后,不再重试,直接返回false,丢弃掉超过阈值的请求。
AllowRetry 到达阈值后,在超时时间范围内进行重试。 wait 每次重试等待时间,保证重试是在下一个窗口,避免不必要的重试。使用随机数是为了尽量打散请求。
getKey 计算当前窗口,根据当前窗口拼接计数key。

// AllowRetry 在超时时间内重试
func (l *Limiter) AllowRetry(ctx context.Context, key string,
 num int64) (bool, error) {
	ctxWithTimeout, _ := context.WithTimeout(ctx, time.Duration(l.retryTimeout)*time.Millisecond)
	var allow bool
	var err error
	for {
		allow, err = l.Allow(ctx, key, num)
		if err == nil {
			return allow, nil
		}
		if ctxWithTimeout.Err() != nil {
			return false, ctxWithTimeout.Err()
		}
		l.wait()
	}
}

// Allow 不重试
func (l *Limiter) Allow(ctx context.Context, key string,
 num int64) (bool, error) {
	return l.getToken(ctx, key, num)
}

// getToken 进行计数
func (l *Limiter) getToken(ctx context.Context, key string,
 num int64) (bool, error) {
	unionKey := l.getKey(key)
	num, err := l.counter.IncrBy(ctx, unionKey, num)
	if err != nil {
		return false, err
	}
	return num < l.limit, nil
}

// wait 每次重试等待时间
func (l *Limiter) wait() {
	randWait := l.window - rand.Int63n(l.window/2)
	time.Sleep(time.Duration(randWait) * time.Millisecond)
}

// getKey 获取计数key
func (l *Limiter) getKey(key string) string {
    // 计算当前窗口
	nowWindow := time.Now().UnixMilli() / l.window
	return fmt.Sprintf("%s:%s:%d", l.name, key, nowWindow)
}

redis 计数器

redis计数器实现就比较简单了,直接调用redis的incr方法。

// RedisCounter 使用redis分布式限流
type RedisCounter struct {
	redisCli *redis.Client
	expire   int64
}

// IncrBy ...计数
func (r *RedisCounter) IncrBy(ctx context.Context, key string,
 num int64) (int64, error) {
	defer func() {
		_ = r.redisCli.Expire(ctx, key, time.Duration(r.expire)*time.Millisecond)
	}()
	return r.redisCli.IncrBy(ctx, key, num).Result()
}

以上就是基于redis实现的分布式限流器,把计数器以接口形式组装进去,可以使代码更加通用,直接基于内存实现就是单机限流。不想基于redis实现,也可以基于其他方式实现。