考虑一下这个功能:
func doAllWork() error { var wg sync.WaitGroup wg.Add(3) for i := 0; i < 2; i++ { go func() { defer wg.Done() for j := 0; j < 10; j++ { result, err := work(j) if err != nil { // can't use `return err` here // what sould I put instead ? os.Exit(0) } } }() } wg.Wait() return nil }
在每个goroutine中,该函数work()被调用10次。如果一个调用work()在任何正在运行的goroutine中返回错误,我希望所有goroutine立即停止,并退出程序。可以os.Exit()在这里使用吗?我该如何处理?
work()
os.Exit()
编辑 :此问题与如何停止goroutine不同,因为如果一个错误发生,在这里我需要关闭所有goroutine
您可以使用context为此类事情创建的软件包( “带有截止日期,取消信号…” )。
context
您创建了一个能够发布带有的取消信号的context.WithCancel()上下文(父上下文可能是所返回的上下文context.Background())。这将为您返回一个cancel()函数,该函数可用于取消(或更准确地 发出 取消意图)给辅助goroutines。 并且在worker goroutine中,您必须通过检查返回的通道Context.Done()是否关闭来检查是否已经启动了该意图,这是通过尝试从其接收消息(如果关闭该通道将立即进行)最简单的方法。并执行非阻塞检查(因此如果未关闭,则可以继续执行),将select语句与default分支一起使用。
context.WithCancel()
context.Background()
cancel()
Context.Done()
select
default
我将使用以下work()实现,该实现模拟10%的失败机会,并模拟1秒的工作:
func work(i int) (int, error) { if rand.Intn(100) < 10 { // 10% of failure return 0, errors.New("random error") } time.Sleep(time.Second) return 100 + i, nil }
和doAllWork()可能看起来像这样:
doAllWork()
func doAllWork() error { var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Make sure it's called to release resources even if no errors for i := 0; i < 2; i++ { wg.Add(1) go func(i int) { defer wg.Done() for j := 0; j < 10; j++ { // Check if any error occurred in any other gorouties: select { case <-ctx.Done(): return // Error somewhere, terminate default: // Default is must to avoid blocking } result, err := work(j) if err != nil { fmt.Printf("Worker #%d during %d, error: %v\n", i, j, err) cancel() return } fmt.Printf("Worker #%d finished %d, result: %d.\n", i, j, result) } }(i) } wg.Wait() return ctx.Err() }
这是可以如何测试的方法:
func main() { rand.Seed(time.Now().UnixNano() + 1) // +1 'cause Playground's time is fixed fmt.Printf("doAllWork: %v\n", doAllWork()) }
输出(在Go Playground上尝试):
Worker #0 finished 0, result: 100. Worker #1 finished 0, result: 100. Worker #1 finished 1, result: 101. Worker #0 finished 1, result: 101. Worker #0 finished 2, result: 102. Worker #1 finished 2, result: 102. Worker #1 finished 3, result: 103. Worker #1 during 4, error: random error Worker #0 finished 3, result: 103. doAllWork: context canceled
如果没有错误,例如使用以下work()功能时:
func work(i int) (int, error) { time.Sleep(time.Second) return 100 + i, nil }
输出就像(在Go Playground上尝试):
Worker #0 finished 0, result: 100. Worker #1 finished 0, result: 100. Worker #1 finished 1, result: 101. Worker #0 finished 1, result: 101. Worker #0 finished 2, result: 102. Worker #1 finished 2, result: 102. Worker #1 finished 3, result: 103. Worker #0 finished 3, result: 103. Worker #0 finished 4, result: 104. Worker #1 finished 4, result: 104. Worker #1 finished 5, result: 105. Worker #0 finished 5, result: 105. Worker #0 finished 6, result: 106. Worker #1 finished 6, result: 106. Worker #1 finished 7, result: 107. Worker #0 finished 7, result: 107. Worker #0 finished 8, result: 108. Worker #1 finished 8, result: 108. Worker #1 finished 9, result: 109. Worker #0 finished 9, result: 109. doAllWork: <nil>
笔记:
基本上,我们只是使用了Done()上下文的通道,因此似乎可以轻松地(如果不是更简单)使用done通道而不是Context,关闭通道即可完成cancel()上述解决方案中的工作。
Done()
done
Context
这不是真的。 仅当只有一个goroutine可以关闭通道时才可以使用此方法,但是在我们的情况下,任何工人都可以这样做。 并尝试关闭已经关闭的通道恐慌。因此,您必须确保围绕进行某种同步/排除close(done),这会使它的可读性降低,甚至变得更加复杂。实际上,这恰恰是cancel()函数在后台执行的功能cancel(),使您的代码隐藏/抽象,因此可以多次调用它,以使您的代码/使用起来更简单。
close(done)
为此,您可以使用错误通道:
errs := make(chan error, 2) // Buffer for 2 errors
并且在工作人员内部遇到错误时,请在通道上发送该错误,而不是打印该错误:
result, err := work(j) if err != nil { errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err) cancel() return }
在循环之后,如果有错误,请返回该错误(nil否则):
nil
// Return (first) error, if any: if ctx.Err() != nil { return <-errs } return nil
这次输出(在Go Playground上尝试):
Worker #0 finished 0, result: 100. Worker #1 finished 0, result: 100. Worker #1 finished 1, result: 101. Worker #0 finished 1, result: 101. Worker #0 finished 2, result: 102. Worker #1 finished 2, result: 102. Worker #1 finished 3, result: 103. Worker #0 finished 3, result: 103. doAllWork: Worker #1 during 4, error: random error
请注意,我使用的缓冲区通道的缓冲区大小等于工作线程数,以确保在该通道上进行发送始终是非阻塞的。这也使您可以接收和处理所有错误,而不仅仅是一个错误(例如第一个错误)。另一种选择是使用缓冲通道仅保留1,并在其上进行非阻塞发送,如下所示:
errs := make(chan error, 1) // Buffered only for the first error // ...and inside the worker: result, err := work(j) if err != nil { // Non-blocking send: select { case errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err): default: } cancel() return }