背景
将入侵4.0 java代码迁移到入侵5.0 golang项目时,有些并行处理的功能用到了java的CompletetableFuture框架,但是golang中好像没有类似的框架(可能我没有找到),所以打算手动写一个简易的golang版本的CompletetableFuture。
在入侵java代码中用到CompletetableFuture的地方:
1、多个任务同时执行,设置了超时时间,最后获取每个任务的执行结果

2、多个任务同时执行,设置了超时时间,不需要获取每个任务的执行结果

当然CompletetableFuture还有很多其他特性,但是java版本目前主要是用到了这两个,或者是基于这两个的一些变种。
实现思路
CompletetableFuture特性
在实现一个golang版本的CompletetableFuture框架前,先了解下对于以上2种场景,CompletetableFuture的特性:
- 任务执行超时后,会抛出TimeoutException,如果catch住异常,调用方还是可以取到每个任务的执行结果
- 某一个任务报错,会抛出第一个异常,如果catch住异常,还是能取到其他正常任务的执行结果
场景分析
- 对于上面的第1种场景,需要封装。
- 对于上面的第2种场景,因为不需要返回值,其实直接写几个goroutine就行了,不需要封装。
实现思路
golang中的并发框架默认的有goroutine、sync.WaitGroup,以及第三方库golang.org/x/sync/errgroup,可以考虑封装这些并发框架
实现目标
- 整个任务结束后,可以获取每个任务的返回值
- 可以设置整个任务的超时时间,任务超时后,会抛出一个timeout的error,调用方还是可以获取超时前正常执行任务的执行结果
- 当有任务出现error时,返回出现的第一个error,调用方还是可以获取每个任务的执行结果
实现代码
CompletableFuture:任务及执行结果
package async type CompletableFuture[T any] struct { Supplier Supplier[T] Result chan T } // Get 获取任务执行结果 func (c *CompletableFuture[T]) Get() T { var defaultVal T if len(c.Result) == 0 { return defaultVal } val, ok := <-c.Result if !ok { return defaultVal } return val } func SupplyAsync[T any](supplier Supplier[T]) *CompletableFuture[T] { return &CompletableFuture[T]{ Supplier: supplier, Result: make(chan T, 1), } } func All[T any](futures ...*CompletableFuture[T]) []*CompletableFuture[T] { if len(futures) == 0 { return nil } futureList := make([]*CompletableFuture[T], len(futures)) for index, future := range futures { futureList[index] = future } return futureList } // Close 关闭每个任务中的通道 func Close[T any](futures []*CompletableFuture[T]) { if len(futures) == 0 { return } for _, future := range futures { close(future.Result) } }
Supplier:任务
package async type Supplier[T any] struct { Apply func(param any) (T, error) } func NewSupplier[T any](f func(any) T) Supplier[T] { supplier := Supplier[T]{ Apply: func(p any) (T, error) { return f(p), nil }, } return supplier }
实现多任务并行执行,有三种方式:
- 使用 atomic + goroutine + select + channel 实现多个任务并行执行
- 使用 WaitGroup + goroutine + select + channel 实现多个任务并行执行(推荐)
- 使用 errgroup + goroutine + select + channel 实现多个任务并行执行
第1种方式:
package async import ( "context" "errors" "sync/atomic" "time" ) // Execute 使用 goroutine + select + channel 实现多个任务并行执行 func Execute[T any](futures []*CompletableFuture[T], timeout time.Duration) error { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() doneChan := make(chan struct{}) errChan := make(chan error) var counter int32 for _, future := range futures { supplier := future.Supplier result := future.Result go func() { defer func() { if atomic.AddInt32(&counter, 1) == int32(len(futures)) { closeChan(doneChan, errChan) } }() res, err := supplier.Apply(nil) if err != nil { errChan <- err return } result <- res }() } select { case err := <-errChan: // 返回遇到的第一个error return err case <-ctx.Done(): // ctx超时后,通道没有关闭,所以这里需要关闭通道 closeChan(doneChan, errChan) // 返回ctx timeout return errors.New("ctx timeout") case <-doneChan: return nil } } func closeChan(doneChan chan struct{}, errChan chan error) { close(doneChan) close(errChan) }
第2种方式:
package async import ( "context" "errors" "log" "sync" "time" ) // ExecuteWithWaitGroup 使用 WaitGroup + goroutine + select + channel 实现多个任务并行执行 func ExecuteWithWaitGroup[T any](futures []*CompletableFuture[T], timeout time.Duration) error { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() wg := &sync.WaitGroup{} doneChan := make(chan struct{}) errChan := make(chan error) for _, future := range futures { wg.Add(1) supplier := future.Supplier result := future.Result go func() { defer wg.Done() res, err := supplier.Apply(nil) if err != nil { errChan <- err return } result <- res }() } go func() { wg.Wait() closeChan(doneChan, errChan) }() select { case err := <-errChan: // 返回遇到的第一个error return err case <-ctx.Done(): // ctx超时后,不会执行wg.Wait()后的代码,所以这里需要关闭通道 closeChan(doneChan, errChan) // 返回ctx timeout return errors.New("ctx timeout") case <-doneChan: return nil } } func closeChan(doneChan chan struct{}, errChan chan error) { close(doneChan) close(errChan) }
第3种方式:
package async import ( "context" "errors" "golang.org/x/sync/errgroup" "time" ) // ExecuteWithErrGroup 使用 errgroup + goroutine + select + channel 实现多个任务并行执行 func ExecuteWithErrGroup[T any](futures []*CompletableFuture[T], timeout time.Duration) error { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() doneChan := make(chan struct{}) errChan := make(chan error) eg, ctx := errgroup.WithContext(ctx) for _, future := range futures { supplier := future.Supplier result := future.Result eg.Go(func() error { res, err := supplier.Apply(nil) if err != nil { errChan <- err return err } result <- res return nil }) } go func() { if err := eg.Wait(); err != nil { // do nothing } closeChan(doneChan, errChan) }() select { case err := <-errChan: // 返回遇到的第一个error return err case <-ctx.Done(): // ctx超时后,不会执行wg.Wait()后的代码,所以这里需要关闭通道 closeChan(doneChan, errChan) // 返回ctx timeout return errors.New("ctx timeout") case <-doneChan: return nil } } func closeChan(doneChan chan struct{}, errChan chan error) { close(doneChan) close(errChan) }
测试
测试场景
测试场景分为以下几种:
- 多个任务,都在超时之前正常执行完成
- 多个任务,有一个任务在超时前未完成
- 多个任务,在超时前,有一个任务返回error
- 多个任务,在超时后,有一个任务返回error
场景一
测试代码:
package test import ( "git.qingteng.cn/ms-public/go_study/pkg/util/async" "log" "testing" "time" ) type ResultInfo struct { Msg string } func TestAsyncUtil(t *testing.T) { param := "my param" param2 := "my param2" param3 := "my param3" // 超时时间 timeout := 5 * time.Second future1 := async.SupplyAsync(async.NewSupplier[ResultInfo](func(p any) ResultInfo { return supplier1(param) })) future2 := async.SupplyAsync(async.NewSupplier[ResultInfo](func(p any) ResultInfo { return supplier2(param2) })) future3 := async.SupplyAsync(async.NewSupplier[ResultInfo](func(p any) ResultInfo { return supplier3(param3) })) futures := async.All(future1, future2, future3) defer async.Close(futures) err := async.Execute(futures, timeout) if err != nil { log.Printf("execute failed:%v\n", err) // 如果出现error,希望任务立马停止,这里可以放开return // return } // 打印每个任务的结果 log.Printf("future1的结果是:%v\n", future1.Get()) log.Printf("future2的结果是:%v\n", future2.Get()) log.Printf("future3的结果是:%v\n", future3.Get()) } func supplier1(param string) ResultInfo { time.Sleep(1500 * time.Millisecond) // 模拟任务执行时间 log.Println("任务A完成,param --> " + param) return ResultInfo{Msg: "A"} } func supplier2(param string) ResultInfo { time.Sleep(2500 * time.Millisecond) // 模拟任务执行时间 log.Println("任务B完成,param --> " + param) return ResultInfo{Msg: "B"} } func supplier3(param string) ResultInfo { time.Sleep(3500 * time.Millisecond) // 模拟任务执行时间 log.Println("任务C完成,param --> " + param) return ResultInfo{Msg: "C"} }
测试结果:
2023/11/27 18:35:53 任务A完成,param --> my param 2023/11/27 18:35:54 任务B完成,param --> my param2 2023/11/27 18:35:55 任务C完成,param --> my param3 2023/11/27 18:35:55 future1的结果是:{A} 2023/11/27 18:35:55 future2的结果是:{B} 2023/11/27 18:35:55 future3的结果是:{C}
场景二
对于场景二,只需要调整场景一单测代码中的超时时间为3秒即可。
测试结果:
2023/11/27 19:02:19 任务A完成,param --> my param 2023/11/27 19:02:20 任务B完成,param --> my param2 2023/11/27 19:02:20 execute failed:ctx timeout 2023/11/27 19:02:20 future1的结果是:{A} 2023/11/27 19:02:20 future2的结果是:{B} 2023/11/27 19:02:20 future3的结果是:{}
场景三
对于场景三,需要修改supplier3方法:
future3 := async.SupplyAsync(async.Supplier[ResultInfo]{ Apply: func(p any) (ResultInfo, error) { return supplier3(param3) }, }) func supplier3(param string) (ResultInfo, error) { time.Sleep(3500 * time.Millisecond) // 模拟任务执行时间 log.Println("任务C完成,param --> " + param) return ResultInfo{Msg: "C"}, errors.New("supplier3 error") }
测试结果:
2023/11/27 19:01:53 任务A完成,param --> my param 2023/11/27 19:01:54 任务B完成,param --> my param2 2023/11/27 19:01:55 任务C完成,param --> my param3 2023/11/27 19:01:55 execute failed:supplier3 error 2023/11/27 19:01:55 future1的结果是:{A} 2023/11/27 19:01:55 future2的结果是:{B} 2023/11/27 19:01:55 future3的结果是:{}
场景四
对于场景四,只需要调整场景三单测代码中的超时时间为3秒即可。
测试结果(和场景二是一样的):
2023/11/27 19:02:19 任务A完成,param --> my param 2023/11/27 19:02:20 任务B完成,param --> my param2 2023/11/27 19:02:20 execute failed:ctx timeout 2023/11/27 19:02:20 future1的结果是:{A} 2023/11/27 19:02:20 future2的结果是:{B} 2023/11/27 19:02:20 future3的结果是:{}