我正在编写一个程序来处理文本文件中的数百万行,500k耗时5秒来验证文件,我想加快速度。
我想遍历所有项目并异步处理它们中的x,然后等待响应以查看是否应该继续。
我已经写了一些伪代码,我不确定我写的内容是否有意义,这看起来似乎很复杂,是否有更简单,更优雅的方法来做到这一点。
package main import ( "fmt" "sync" "time" ) func main() { // Need an object to loop over // need a loop to read the response items := 100000 concurrency := 20 sem := make(chan bool, concurrency) returnChan := make(chan error) finChan := make(chan bool) var wg sync.WaitGroup go func() { for x := 0; x < items; x++ { // loop over all items // only do maxitems at a time wg.Add(1) sem <- true go delayFunc(x, sem, returnChan, &wg) } wg.Wait() finChan <- true }() var err error finished := false for { select { case err = <-returnChan: if err != nil { break } case _ = <-finChan: finished = true break default: continue } if err != nil || finished == true { break } } fmt.Println(err) } func delayFunc(x int, sem chan bool, returnChan chan error, wg *sync.WaitGroup) { //fmt.Printf("PROCESSING (%v)\n", x) time.Sleep(10 * time.Millisecond) <-sem // release the lock wg.Done() if x == 95000 { returnChan <- fmt.Errorf("Something not right") } else { returnChan <- nil } }
您的代码看起来不错,您实现了Go模式中常用的代码。缺点是- 您为每个项目生成工作程序goroutine。廉价地生成goroutine不是免费的。另一种方法是生成N个工人并通过渠道向他们提供物品。像这样
package main import ( "fmt" "time" ) func main() { items := 100 concurrency := 10 in := make(chan int) ret := make(chan error) for x := 0; x < concurrency; x++ { go worker(in, ret) } go func() { for x := 0; x < items; x++ { // loop over all items in <- x } close(in) }() for err := range ret { if err != nil { fmt.Println(err.Error()) break } } } func worker(in chan int, returnChan chan error) { //fmt.Printf("PROCESSING (%v)\n", x) for x := range in { if x == 95 { returnChan <- fmt.Errorf("Something not right") } else { returnChan <- nil } time.Sleep(10 * time.Millisecond) } returnChan <- fmt.Errorf("The End") }
操场