我是新手,我正在尝试创建一个简单的聊天服务器,客户端可以在其中向所有连接的客户端广播消息。
在我的服务器中,我有一个goroutine(无限循环),它接受连接,并且所有连接都由一个通道接收。
go func() { for { conn, _ := listener.Accept() ch <- conn } }()
然后,我为每个连接的客户端启动一个处理程序(goroutine)。在处理程序内部,我尝试通过迭代通道来广播到所有连接。
for c := range ch { conn.Write(msg) }
但是,我无法广播,因为(我认为通过阅读文档)通道需要在迭代之前关闭。我不确定何时应该关闭频道,因为我要不断接受新的连接,而关闭频道将不允许我这样做。如果有人可以帮助我,或者提供更好的方法向所有连接的客户端广播消息,将不胜感激。
您正在执行的是扇出模式,也就是说,多个端点正在侦听单个输入源。这种模式的结果是,只要输入源中有消息,这些侦听器中只有一个能够获取消息。唯一的例外是close渠道。close所有的听众都将认识到这一点,因此是“广播”。
close
但是您想要做的是广播从连接读取的消息,因此我们可以执行以下操作:
让每个工作人员收听专用广播频道,并将消息从主频道分发到每个专用广播频道。
type worker struct { source chan interface{} quit chan struct{} } func (w *worker) Start() { w.source = make(chan interface{}, 10) // some buffer size to avoid blocking go func() { for { select { case msg := <-w.source // do something with msg case <-quit: // will explain this in the last section return } } }() }
然后我们可能会有很多工人:
workers := []*worker{&worker{}, &worker{}} for _, worker := range workers { worker.Start() }
然后启动我们的监听器:
和调度员:
go func() { for { msg := <- ch for _, worker := workers { worker.source <- msg } } }()
在这种情况下,上面给出的解决方案仍然有效。唯一的区别是,每当需要新工作人员时,都需要创建一个新工作人员,将其启动,然后将其推入workers切片。但是此方法需要一个线程安全的切片,该切片周围需要一个锁。一种实现可能如下所示:
workers
type threadSafeSlice struct { sync.Mutex workers []*worker } func (slice *threadSafeSlice) Push(w *worker) { slice.Lock() defer slice.Unlock() workers = append(workers, w) } func (slice *threadSafeSlice) Iter(routine func(*worker)) { slice.Lock() defer slice.Unlock() for _, worker := range workers { routine(worker) } }
每当您想开始工作时:
w := &worker{} w.Start() threadSafeSlice.Push(w)
您的调度员将更改为:
go func() { for { msg := <- ch threadSafeSlice.Iter(func(w *worker) { w.source <- msg }) } }()
好的做法之一是:永远不要离开悬空的goroutine。因此,当您听完后,需要关闭所有触发的goroutine。这将通过以下quit渠道进行worker:
quit
worker
首先,我们需要创建一个全局quit信令通道:
globalQuit := make(chan struct{})
每当我们创建一个worker时,我们都会为其分配globalQuit通道作为其退出信号:
globalQuit
worker.quit = globalQuit
然后,当我们要关闭所有工作程序时,我们只需执行以下操作:
close(globalQuit)
由于close所有侦听的goroutine都可以识别(这是您理解的重点),因此将返回所有goroutine。记住也要关闭调度程序,但我会留给您:)