最近经常用go写一些脚本,为了执行效率,一般会开启多个goroutine并发执行,脚本中通常会调用一些线上服务或者数据库读写数据,为了不影响线上服务运行,为了不影响线上服务运行就要对执行的goroutine数量进行限制,避免并发太大打垮线上服务。下面就简单写一个goroutine并发控制器。
控制器实现
limiter属性以及初始化部分如下,
// ConcurrentLimiter 并发控制
type ConcurrentLimiter struct {
max int64 // 最大并发数
currNum atomic.Int64 // 当前已启动并发数
wg sync.WaitGroup // 保证全部goroutine执行完后退出
}
func NewConcurrentLimiter(max int64) *ConcurrentLimiter {
return &ConcurrentLimiter{
max: max,
currNum: atomic.Int64{},
wg: sync.WaitGroup{},
}
}
具体执行方法, 传入goroutine要执行的方法,以及所需参数。
func (c *ConcurrentLimiter) Do(params interface{}, f func(params interface{})) {
c.waitLimit()
c.wg.Add(1)
// 当前并发数量
c.currNum.Add(1)
go func(params interface{}) {
defer func() {
c.wg.Done()
c.currNum.Add(-1)
}()
f(params)
}(params)
}
// 当前并发达到最大时,就无限等待
func (c *ConcurrentLimiter) waitLimit() {
if c.max == 0 {
return
}
for {
if c.currNum.Load() < c.max {
return
}
time.Sleep(time.Microsecond * 10)
}
}
因为都是直接启动goroutine异步执行,为了脚本能够优雅退出,我们需要在程序最后用WaitGroup控制一下miangoroutine 退出。
func (c *ConcurrentLimiter) Wait() {
c.wg.Wait()
}
最后再加一个获取当前正在执行的goroutine数量的方法,方便统计
func (c *ConcurrentLimiter) CurrNum() int64 {
return c.currNum.Load()
}
使用例子
简单写个test
func TestConcurrentLimiter_Do(t *testing.T) {
cl := NewConcurrentLimiter(2)
f := func(params interface{}) {
fmt.Println("val: ", params)
}
go func() {
for {
fmt.Println("curr num: ", cl.CurrNum())
time.Sleep(time.Microsecond * 100)
}
}()
i := 0
for {
cl.Do(i, f)
i++
}
}
通过输出,可以看到goroutine数量被控制在设置最大值以内
=== RUN TestConcurrentLimiter_Do
curr num: 2
curr num: 2
curr num: 1
curr num: 2