最近经常用go写一些脚本,为了执行效率,一般会开启多个goroutine并发执行,脚本中通常会调用一些线上服务或者数据库读写数据,为了不影响线上服务运行,为了不影响线上服务运行就要对执行的goroutine数量进行限制,避免并发太大打垮线上服务。下面就简单写一个goroutine并发控制器。

控制器实现

limiter属性以及初始化部分如下,

// ConcurrentLimiter 并发控制
type ConcurrentLimiter struct {
	max   int64 // 最大并发数
	currNum atomic.Int64 // 当前已启动并发数
	wg      sync.WaitGroup // 保证全部goroutine执行完后退出
}

func NewConcurrentLimiter(max int64) *ConcurrentLimiter {
	return &ConcurrentLimiter{
		max:     max,
		currNum: atomic.Int64{},
		wg:      sync.WaitGroup{},
	}
}

具体执行方法, 传入goroutine要执行的方法,以及所需参数。

func (c *ConcurrentLimiter) Do(params interface{}, f func(params interface{})) {
	c.waitLimit()
	c.wg.Add(1)
	// 当前并发数量
	c.currNum.Add(1)
	go func(params interface{}) {
		defer func() {
			c.wg.Done()
			c.currNum.Add(-1)
		}()
		f(params)
	}(params)
}

// 当前并发达到最大时,就无限等待
func (c *ConcurrentLimiter) waitLimit() {
	if c.max == 0 {
		return
	}
	for {
		if c.currNum.Load() < c.max {
			return
		}
		time.Sleep(time.Microsecond * 10)
	}
}

因为都是直接启动goroutine异步执行,为了脚本能够优雅退出,我们需要在程序最后用WaitGroup控制一下miangoroutine 退出。

func (c *ConcurrentLimiter) Wait() {
	c.wg.Wait()
}

最后再加一个获取当前正在执行的goroutine数量的方法,方便统计

func (c *ConcurrentLimiter) CurrNum() int64 {
	return c.currNum.Load()
}

使用例子

简单写个test

func TestConcurrentLimiter_Do(t *testing.T) {
	cl := NewConcurrentLimiter(2)
	f := func(params interface{}) {
		fmt.Println("val: ", params)
	}
	go func() {
		for {
			fmt.Println("curr num: ", cl.CurrNum())
			time.Sleep(time.Microsecond * 100)
		}
	}()
	i := 0
	for {
		cl.Do(i, f)
		i++
	}
}

通过输出,可以看到goroutine数量被控制在设置最大值以内

=== RUN   TestConcurrentLimiter_Do
curr num:  2
curr num:  2
curr num:  1
curr num:  2