在http://marcio.io/2015/07/handling-1-million-requests-per-minute-with- golang/提供的示例中,很多地方都引用了该示例。
func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: // a job request has been received go func(job Job) { // try to obtain a worker job channel that is available. // this will block until a worker is idle jobChannel := <-d.WorkerPool // dispatch the job to the worker job channel jobChannel <- job }(job) } } }
MaxWorker分派服务完许多工作后,工人池(chan chan工作)会不会耗尽?因为<-d.WorkerPool从信道和信道工作拉出第一类型后没有被补充dispatcher.Run()被调用的第一次?还是我想念/误读了什么?如何为WorkerPool补充可用的工作渠道?
MaxWorker
<-d.WorkerPool
dispatcher.Run()
go func(job Job) { // try to obtain a worker job channel that is available. // this will block until a worker is idle jobChannel := <-d.WorkerPool // dispatch the job to the worker job channel jobChannel <- job }(job)
如果您仔细阅读了工人的守则,您会发现
w.WorkerPool <- w.JobChannel
每次循环开始时,工作人员本身的渠道都会被退回
我在下面复制整个功能:
func (w Worker) Start() { go func() { for { // register the current worker into the worker queue. w.WorkerPool <- w.JobChannel select { case job := <-w.JobChannel: // we have received a work request. if err := job.Payload.UploadToS3(); err != nil { log.Errorf("Error uploading to S3: %s", err.Error()) } case <-w.quit: // we have received a signal to stop return } } }() }