grpool

    经测试,goroutine池对于业务逻辑的执行效率(降低执行时间/CPU使用率)提升不大,甚至没有原生的goroutine执行快速(池化goroutine执行调度并没有底层go调度器高效,因为池化goroutine的执行调度也是基于底层go调度器),但是由于采用了复用的设计,池化后对内存的使用率得到极大的降低。

    概念:

    1. Poolgoroutine池,用于管理若干可复用的goroutine协程资源;
    2. Worker:池对象中参与任务执行的goroutine,一个Worker可以执行若干个Job,直到队列中再无等待的Job
    3. Job:添加到池对象的任务队列中等待执行的任务,是一个func()的方法,一个Job同时只能被一个Worker获取并执行;

    使用方式

    使用场景

    管理大量异步任务的场景、需要异步协程复用的场景、需要降低内存使用率的场景。

    接口文档

    https://godoc.org/github.com/gogf/gf/os/grpool

    1. func Add(f func()) error
    2. func Jobs() int
    3. func Size() int
    4. type Pool
    5. func New(limit ...int) *Pool
    6. func (p *Pool) Add(f func()) error
    7. func (p *Pool) Cap() int
    8. func (p *Pool) Close()
    9. func (p *Pool) IsClosed() bool
    10. func (p *Pool) Jobs() int
    11. func (p *Pool) Size() int

    通过grpool.New方法创建一个goroutine池对象,参数limit为非必需参数,用于限定池中的工作goroutine数量,默认为不限制。需要注意的是,任务可以不停地往池中添加,没有限制,但是工作的goroutine是可以做限制的。我们可以通过Size()方法查询当前的工作goroutine数量,使用Jobs()方法查询当前池中待处理的任务数量。

    同时,为便于使用,grpool包提供了默认的goroutine池,默认的池对象不限制goroutine数量,直接通过grpool.Add即可往默认的池中添加任务,任务参数必须是一个 func()类型的函数/方法。

    1. package main
    2. import (
    3. "time"
    4. "fmt"
    5. "github.com/gogf/gf/os/grpool"
    6. "github.com/gogf/gf/os/gtimer"
    7. )
    8. func job() {
    9. time.Sleep(1*time.Second)
    10. }
    11. func main() {
    12. pool := grpool.New(100)
    13. for i := 0; i < 1000; i++ {
    14. }
    15. fmt.Println("worker:", pool.Size())
    16. fmt.Println(" jobs:", pool.Jobs())
    17. gtimer.SetInterval(time.Second, func() {
    18. fmt.Println("worker:", pool.Size())
    19. fmt.Println(" jobs:", pool.Jobs())
    20. fmt.Println()
    21. })
    22. select {}
    23. }

    这段程序中的任务函数的功能是sleep 1秒钟,这样便能充分展示出goroutine数量限制功能。其中,我们使用了gtime.SetInterval定时器每隔1秒钟打印出当前默认池中的工作goroutine数量以及待处理的任务数量。

    2、我们再来看一个新手经常容易出错的例子

    https://github.com/gogf/gf/blob/master/.example/os/grpool/grpool2.go

    1. package main
    2. import (
    3. "fmt"
    4. "sync"
    5. "github.com/gogf/gf/os/grpool"
    6. )
    7. func main() {
    8. wg := sync.WaitGroup{}
    9. for i := 0; i < 10; i++ {
    10. grpool.Add(func() {
    11. fmt.Println(i)
    12. wg.Done()
    13. })
    14. }
    15. wg.Wait()
    16. }

    我们这段代码的目的是要顺序地打印出0-9,然而运行后却输出:

    为什么呢?这里的执行结果无论是采用go关键字来执行还是grpool来执行都是如此。原因是,对于异步线程/协程来讲,函数进行异步执行注册时,该函数并未真正开始执行(注册时只在goroutine的栈中保存了变量i的内存地址),而一旦开始执行时函数才会去读取变量i的值,而这个时候变量i的值已经自增到了10。 清楚原因之后,改进方案也很简单了,就是在注册异步执行函数的时候,把当时变量i的值也一并传递获取;或者把当前变量i的值赋值给一个不会改变的临时变量,在函数中使用该临时变量而不是直接使用变量i。

    改进后的示例代码如下:

    1)、使用go关键字

    1. package main
    2. import (
    3. "fmt"
    4. "sync"
    5. )
    6. func main() {
    7. wg := sync.WaitGroup{}
    8. for i := 0; i < 10; i++ {
    9. wg.Add(1)
    10. go func(v int){
    11. fmt.Println(v)
    12. wg.Done()
    13. }(i)
    14. }
    15. wg.Wait()
    16. }
    1. 9
    2. 0
    3. 1
    4. 2
    5. 3
    6. 4
    7. 5
    8. 6
    9. 7
    10. 8

    注意,异步执行时并不会保证按照函数注册时的顺序执行,以下同理。

    2)、使用临时变量

    https://github.com/gogf/gf/blob/master/.example/os/grpool/grpool4.go

    1. package main
    2. import (
    3. "fmt"
    4. "sync"
    5. "github.com/gogf/gf/os/grpool"
    6. )
    7. func main() {
    8. wg := sync.WaitGroup{}
    9. for i := 0; i < 10; i++ {
    10. wg.Add(1)
    11. v := i
    12. grpool.Add(func() {
    13. fmt.Println(v)
    14. wg.Done()
    15. })
    16. }
    17. wg.Wait()

    执行后,输出结果为:

    这里可以看到,使用grpool进行任务注册时,只能使用func()类型的参数,因此无法在任务注册时把变量i的值注册进去,因此只能采用临时变量的形式来传递当前变量i的值。

    3、最后我们使用程序测试一下grpool和原生的goroutine之间的性能

    1)、grpool

    1. package main
    2. import (
    3. "fmt"
    4. "sync"
    5. "time"
    6. "github.com/gogf/gf/os/gtime"
    7. "github.com/gogf/gf/os/grpool"
    8. )
    9. func main() {
    10. start := gtime.Millisecond()
    11. wg := sync.WaitGroup{}
    12. for i := 0; i < 10000000; i++ {
    13. wg.Add(1)
    14. grpool.Add(func() {
    15. time.Sleep(time.Millisecond)
    16. wg.Done()
    17. })
    18. }
    19. wg.Wait()
    20. fmt.Println(grpool.Size())
    21. fmt.Println("time spent:", gtime.Millisecond() - start)
    22. }

    2)、goroutine

    1. package main
    2. import (
    3. "fmt"
    4. "sync"
    5. "time"
    6. "github.com/gogf/gf/os/gtime"
    7. )
    8. func main() {
    9. start := gtime.Millisecond()
    10. wg := sync.WaitGroup{}
    11. for i := 0; i < 10000000; i++ {
    12. wg.Add(1)
    13. go func() {
    14. time.Sleep(time.Millisecond)
    15. wg.Done()
    16. }()
    17. }
    18. wg.Wait()
    19. fmt.Println("time spent:", gtime.Millisecond() - start)
    20. }

    3)、运行结果比较

    1. grpool:
    2. goroutine count: 847313
    3. memory spent: ~2.1 G
    4. time spent: 37792 ms
    5. goroutine:
    6. goroutine count: 1000W
    7. time spent: 27085 ms

    可以看到池化过后,执行相同数量的任务,goroutine数量减少很多,相对的内存也降低了一倍以上,CPU时间耗时也勉强可以接受。