一尘不染

开始:一个生产者众多消费者

go

因此,我已经看到了许多在Go中实现一个消费者和许多生产者的方法-Go
并发中的经典fanIn函数。

我想要的是fanOut功能。它以一个通道作为参数,它从中读取一个值,并返回一个通道片,该通道将这个值的副本写入其中。

有没有正确/推荐的方法来实现这一目标?


阅读 205

收藏
2020-07-02

共1个答案

一尘不染

您几乎描述了执行此操作的最佳方法,但这是执行此操作的一小段代码示例。

去游乐场:https :
//play.golang.org/p/jwdtDXVHJk

package main

import (
    "fmt"
    "time"
)

func producer(iters int) <-chan int {
    c := make(chan int)
    go func() {
        for i := 0; i < iters; i++ {
            c <- i
            time.Sleep(1 * time.Second)
        }
        close(c)
    }()
    return c
}

func consumer(cin <-chan int) {
    for i := range cin {
        fmt.Println(i)
    }
}

func fanOut(ch <-chan int, size, lag int) []chan int {
    cs := make([]chan int, size)
    for i, _ := range cs {
        // The size of the channels buffer controls how far behind the recievers
        // of the fanOut channels can lag the other channels.
        cs[i] = make(chan int, lag)
    }
    go func() {
        for i := range ch {
            for _, c := range cs {
                c <- i
            }
        }
        for _, c := range cs {
            // close all our fanOut channels when the input channel is exhausted.
            close(c)
        }
    }()
    return cs
}

func fanOutUnbuffered(ch <-chan int, size int) []chan int {
    cs := make([]chan int, size)
    for i, _ := range cs {
        // The size of the channels buffer controls how far behind the recievers
        // of the fanOut channels can lag the other channels.
        cs[i] = make(chan int)
    }
    go func() {
        for i := range ch {
            for _, c := range cs {
                c <- i
            }
        }
        for _, c := range cs {
            // close all our fanOut channels when the input channel is exhausted.
            close(c)
        }
    }()
    return cs
}

func main() {
    c := producer(10)
    chans := fanOutUnbuffered(c, 3)
    go consumer(chans[0])
    go consumer(chans[1])
    consumer(chans[2])
}

需要注意的重要部分是一旦输入通道用完后如何关闭输出通道。同样,如果输出通道之一阻塞了发送,它将阻止其他输出通道的发送。我们通过设置通道的缓冲区大小来控制延迟量。

2020-07-02