一尘不染

缓冲区为空后,关闭“工作者”执行例程

go

我希望我的go常规工作者(ProcessToDo()在下面的代码中)在关闭所有“排队”工作之前等待。

工作例程具有一个“待办事项”通道(已缓冲),通过该通道将工作发送给它。它有一个“完成”通道来告诉它开始关闭。该文档说,如果满足多个选择之一,则通道上的选择将选择一个“伪随机值”……这意味着在所有缓冲工作完成之前将触发关闭(返回)。

在下面的代码示例中,我希望所有20条消息都可以打印…

package main

import (
    "time"
    "fmt"
)


func ProcessToDo(done chan struct{}, todo chan string) {
    for {
        select {
        case work, ok := <-todo:
            if !ok {
                fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")
                return
            }
            fmt.Printf("todo: %q\n", work)
            time.Sleep(100 * time.Millisecond)
        case _, ok := <-done:
            if ok {
                fmt.Printf("Shutting down ProcessToDo - done message received!\n")
            } else {
                fmt.Printf("Shutting down ProcessToDo - done channel closed!\n")
            }
            close(todo)
            return
        }
    }
}

func main() {

    done := make(chan struct{})
    todo := make(chan string, 100)

    go ProcessToDo(done, todo)

    for i := 0; i < 20; i++ {
        todo <- fmt.Sprintf("Message %02d", i)
    }

    fmt.Println("*** all messages queued ***")
    time.Sleep(1 * time.Second)
    close(done)
    time.Sleep(4 * time.Second)
}

阅读 247

收藏
2020-07-02

共1个答案

一尘不染

done您完全不需要使用通道,因为您可以通过关闭todo通道本身来发出关闭信号。

for range在通道上使用,它将迭代直到通道关闭且其缓冲区为空。

您应该有一个done通道,但只有这样,goroutine本身才能发出信号,表明它已完成工作,因此主goroutine可以继续或退出。

此变体与您的变体等效,简单得多,并且不需要time.Sleep()调用来等待其他goroutine(反正太错误和不确定)。在GoPlayground上尝试一下:

func ProcessToDo(done chan struct{}, todo chan string) {
    for work := range todo {
        fmt.Printf("todo: %q\n", work)
        time.Sleep(100 * time.Millisecond)
    }
    fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")
    done <- struct{}{} // Signal that we processed all jobs
}

func main() {
    done := make(chan struct{})
    todo := make(chan string, 100)

    go ProcessToDo(done, todo)

    for i := 0; i < 20; i++ {
        todo <- fmt.Sprintf("Message %02d", i)
    }

    fmt.Println("*** all messages queued ***")
    close(todo)
    <-done // Wait until the other goroutine finishes all jobs
}

还要注意,工作程序goroutine应该使用来表示信号已完成,defer因此,如果主工作程序以某种意外的方式或发生紧急情况返回,则不会阻塞主工作程序。因此,它应该这样开始:

defer func() {
    done <- struct{}{} // Signal that we processed all jobs
}()

您还可以sync.WaitGroup用于将主goroutine同步到worker(以等待它)。实际上,如果您打算使用多个工作程序例程,那比从done通道读取多个值更干净。同样,WaitGroup由于它带有一个Done()方法(这是一个函数调用),因此用来表示完成也更简单,因此您不需要匿名函数:

defer wg.Done()

有关完整的示例,请参见JimB的答案WaitGroup

使用forrange信道同步,因此你不需要任何额外的代码,将同步访问:如果你想使用多工作够程也是地道的todo通道或在收到该职位。并且,如果您关闭中的todo频道main(),则会正确地向所有工作程序发出信号。但是,当然,所有排队的作业将只被接收和处理一次。

现在,使用WaitGroup用于使主goroutine等待工作程序使用的变体(JimB的答案):如果您希望有1个以上的工作程序goroutine,该怎么办?同时(最有可能并行)处理您的工作?

您需要在代码中添加/更改的唯一一件事是:真正启动多个代码:

for i := 0; i < 10; i++ {
    wg.Add(1)
    go ProcessToDo(todo)
}

现在,您无需更改任何其他内容,便有了一个正确的并发应用程序,该应用程序使用10个并发goroutine接收并处理您的作业。而且我们没有使用任何“丑陋的”time.Sleep()(我们只使用了“丑陋的”,而只是模拟了缓慢的处理,而不是等待其他goroutine),并且您不需要任何额外的同步。

2020-07-02