一尘不染

如何实现基于行的文件内容的并行处理

go

我正在编写一个POC来处理大约10亿行以上的超大文本文件,并为此进行了尝试。

package main

import (
        "bufio"
        "fmt"
        "log"
        "os"
        "time"
)

func main() {
        start := time.Now()
        file, err := os.Open("dump10.txt")
        if err != nil {
                log.Fatal(err)
        }
        defer file.Close()

        scanner := bufio.NewScanner(file)
        for scanner.Scan() {
                go fmt.Println(scanner.Text())
        }

        if err := scanner.Err(); err != nil {
                log.Fatal(err)
        }
        secs := time.Since(start).Seconds()
        fmt.Printf("Took %.2fs", secs)
}

但是,当运行此命令时,会出现此错误;

紧急:单个文件或套接字上的并发操作过多(最大1048575)

我还没有在网上找到任何可以解决此特定错误的信息。我不确定这是否是文件描述符问题,错误中列出的最大值远高于我的ulimit -n限制500,000。

做这个的最好方式是什么?

不太明显,它fmt.Println是我在处理数据时将调用的实际功能的替代品。


阅读 208

收藏
2020-07-02

共1个答案

一尘不染

在考虑并行化过程之前,您应该研究输入和计算以确保它有意义。

需要按顺序处理的输入不是很好的匹配,因为并行处理将需要附加的复杂指令来使事物保持顺序,如果这种策略是成功的话,很难预先评估。

同样,为了利用并行化,要运行的计算所花费的时间必须比同步并行任务所需的时间更长。通过批量处理数据可能会超过此成本,但是生成的算法将更加复杂,并会产生其他不良副作用(例如分配)。

否则,请勿并行化。

请参见下面各种计算时间长/短的实现示例及其产生的基准。

结论是,除非您计算出一个长时间运行的异步任务,该任务显然将超过同步成本,否则顺序处理会更快。

main.go

package main

import (
    "bufio"
    "fmt"
    "io"
    "runtime"
    "strings"
    "sync"
    "time"
)

func main() {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    run_line_short(data, true)
    run_line_long(data, true)
    run_line_short_workers(data, true)
    run_line_long_workers(data, true)
    run_bulk_short(data, true)
    run_bulk_long(data, true)
    run_seq_short(data, true)
    run_seq_long(data, true)
}

func run_line_short(data string, stat bool) {
    if stat {
        s := stats("run_line_short")
        defer s()
    }
    r := strings.NewReader(data)
    err := process(r, line_handler_short)
    if err != nil {
        panic(err)
    }
}
func run_line_long(data string, stat bool) {
    if stat {
        s := stats("run_line_long")
        defer s()
    }
    r := strings.NewReader(data)
    err := process(r, line_handler_long)
    if err != nil {
        panic(err)
    }
}
func run_line_short_workers(data string, stat bool) {
    if stat {
        s := stats("run_line_short_workers")
        defer s()
    }
    r := strings.NewReader(data)
    err := processWorkers(r, line_handler_short)
    if err != nil {
        panic(err)
    }
}
func run_line_long_workers(data string, stat bool) {
    if stat {
        s := stats("run_line_long_workers")
        defer s()
    }
    r := strings.NewReader(data)
    err := processWorkers(r, line_handler_long)
    if err != nil {
        panic(err)
    }
}
func run_bulk_short(data string, stat bool) {
    if stat {
        s := stats("run_bulk_short")
        defer s()
    }
    r := strings.NewReader(data)
    err := processBulk(r, bulk_handler_short)
    if err != nil {
        panic(err)
    }
}
func run_bulk_long(data string, stat bool) {
    if stat {
        s := stats("run_bulk_long")
        defer s()
    }
    r := strings.NewReader(data)
    err := processBulk(r, bulk_handler_long)
    if err != nil {
        panic(err)
    }
}
func run_seq_short(data string, stat bool) {
    if stat {
        s := stats("run_seq_short")
        defer s()
    }
    r := strings.NewReader(data)
    err := processSeq(r, line_handler_short)
    if err != nil {
        panic(err)
    }
}
func run_seq_long(data string, stat bool) {
    if stat {
        s := stats("run_seq_long")
        defer s()
    }
    r := strings.NewReader(data)
    err := processSeq(r, line_handler_long)
    if err != nil {
        panic(err)
    }
}

func line_handler_short(k string) error {
    _ = len(k)
    return nil
}

func line_handler_long(k string) error {
    <-time.After(time.Millisecond * 5)
    _ = len(k)
    return nil
}

func bulk_handler_short(b []string) error {
    for _, k := range b {
        _ = len(k)
    }
    return nil
}

func bulk_handler_long(b []string) error {
    <-time.After(time.Millisecond * 5)
    for _, k := range b {
        _ = len(k)
    }
    return nil
}

func stats(name string) func() {
    fmt.Printf("======================\n")
    fmt.Printf("%v\n", name)
    start := time.Now()
    return func() {
        fmt.Printf("time to run %v\n", time.Since(start))
        var ms runtime.MemStats
        runtime.ReadMemStats(&ms)
        fmt.Printf("Alloc: %d MB, TotalAlloc: %d MB, Sys: %d MB\n",
            ms.Alloc/1024/1024, ms.TotalAlloc/1024/1024, ms.Sys/1024/1024)
        fmt.Printf("Mallocs: %d, Frees: %d\n",
            ms.Mallocs, ms.Frees)
        fmt.Printf("HeapAlloc: %d MB, HeapSys: %d MB, HeapIdle: %d MB\n",
            ms.HeapAlloc/1024/1024, ms.HeapSys/1024/1024, ms.HeapIdle/1024/1024)
        fmt.Printf("HeapObjects: %d\n", ms.HeapObjects)
        fmt.Printf("\n")
    }
}

func process(r io.Reader, h func(string) error) error {
    errs := make(chan error)
    workers := make(chan struct{}, 4)
    var wg sync.WaitGroup
    go func() {
        scanner := bufio.NewScanner(r)
        for scanner.Scan() {
            workers <- struct{}{} // acquire a token
            wg.Add(1)
            go func(line string) {
                defer wg.Done()
                if err := h(line); err != nil {
                    errs <- err
                }
                <-workers
            }(scanner.Text())
        }
        wg.Wait()
        if err := scanner.Err(); err != nil {
            errs <- err
        }
        close(errs)
    }()
    var err error
    for e := range errs {
        if e != nil && err == nil {
            err = e
        }
    }
    return err
}

func processWorkers(r io.Reader, h func(string) error) error {
    errs := make(chan error)
    input := make(chan string)
    y := 4
    var wg sync.WaitGroup
    for i := 0; i < y; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for line := range input {
                if err := h(line); err != nil {
                    errs <- err
                }
            }
        }()
    }
    go func() {
        scanner := bufio.NewScanner(r)
        for scanner.Scan() {
            input <- scanner.Text()
        }
        close(input)
        wg.Wait()
        if err := scanner.Err(); err != nil {
            errs <- err
        }
        close(errs)
    }()
    var err error
    for e := range errs {
        if err == nil && e != nil {
            err = e
        }
    }
    return err
}

func processBulk(r io.Reader, h func([]string) error) error {
    errs := make(chan error)
    input := make(chan []string)
    y := 4
    var wg sync.WaitGroup
    for i := 0; i < y; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for bulk := range input {
                if err := h(bulk); err != nil {
                    errs <- err
                }
            }
        }()
    }
    go func() {
        scanner := bufio.NewScanner(r)
        l := 50
        bulk := make([]string, l)
        i := 0
        for scanner.Scan() {
            text := scanner.Text()
            bulk[i] = text
            i++
            if i == l {
                copied := make([]string, l, l)
                copy(copied, bulk)
                i = 0
                input <- copied
            }
        }
        if i > 0 {
            input <- bulk[:i]
        }
        close(input)
        if err := scanner.Err(); err != nil {
            errs <- err
        }
    }()
    go func() {
        wg.Wait()
        close(errs)
    }()
    var err error
    for e := range errs {
        if err == nil && e != nil {
            err = e
        }
    }
    return err
}

func processSeq(r io.Reader, h func(string) error) error {
    scanner := bufio.NewScanner(r)
    for scanner.Scan() {
        text := scanner.Text()
        if err := h(text); err != nil {
            return err
        }
    }
    return scanner.Err()
}

main_test.go

package main

import (
    "strings"
    "testing"
)

func Benchmark_run_line_short(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_line_short(data, false)
    }
}

func Benchmark_run_line_long(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_line_long(data, false)
    }
}
func Benchmark_run_line_short_workers(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_line_short_workers(data, false)
    }
}
func Benchmark_run_line_long_workers(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_line_long_workers(data, false)
    }
}
func Benchmark_run_bulk_short(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_bulk_short(data, false)
    }
}
func Benchmark_run_bulk_long(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_bulk_long(data, false)
    }
}
func Benchmark_run_seq_short(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_seq_short(data, false)
    }
}
func Benchmark_run_seq_long(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_seq_long(data, false)
    }
}

结果

$ go run main.go 
======================
run_line_short
time to run 2.747827ms
Alloc: 2 MB, TotalAlloc: 2 MB, Sys: 68 MB
Mallocs: 1378, Frees: 1
HeapAlloc: 2 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 1377

======================
run_line_long
time to run 1.30987804s
Alloc: 3 MB, TotalAlloc: 3 MB, Sys: 68 MB
Mallocs: 5619, Frees: 5
HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MB
HeapObjects: 5614

======================
run_line_short_workers
time to run 4.54926ms
Alloc: 1 MB, TotalAlloc: 4 MB, Sys: 68 MB
Mallocs: 6648, Frees: 5743
HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 905

======================
run_line_long_workers
time to run 1.29874118s
Alloc: 2 MB, TotalAlloc: 5 MB, Sys: 68 MB
Mallocs: 10670, Frees: 5747
HeapAlloc: 2 MB, HeapSys: 63 MB, HeapIdle: 60 MB
HeapObjects: 4923

======================
run_bulk_short
time to run 1.279059ms
Alloc: 3 MB, TotalAlloc: 6 MB, Sys: 68 MB
Mallocs: 11695, Frees: 5751
HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MB
HeapObjects: 5944

======================
run_bulk_long
time to run 31.328652ms
Alloc: 1 MB, TotalAlloc: 7 MB, Sys: 68 MB
Mallocs: 12728, Frees: 11361
HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 1367

======================
run_seq_short
time to run 956.991µs
Alloc: 3 MB, TotalAlloc: 8 MB, Sys: 68 MB
Mallocs: 13746, Frees: 11160
HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MB
HeapObjects: 2586

======================
run_seq_long
time to run 5.195705859s
Alloc: 1 MB, TotalAlloc: 9 MB, Sys: 68 MB
Mallocs: 17766, Frees: 15973
HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 1793

[mh-cbon@Host-001 bulk] $ go test -bench=. -benchmem -count=4
goos: linux
goarch: amd64
pkg: test/bulk
Benchmark_run_line_short-4                  1000       1750824 ns/op     1029354 B/op       1005 allocs/op
Benchmark_run_line_short-4                  1000       1747408 ns/op     1029348 B/op       1005 allocs/op
Benchmark_run_line_short-4                  1000       1757826 ns/op     1029352 B/op       1005 allocs/op
Benchmark_run_line_short-4                  1000       1758427 ns/op     1029352 B/op       1005 allocs/op
Benchmark_run_line_long-4                      1    1303037704 ns/op     2253776 B/op       4075 allocs/op
Benchmark_run_line_long-4                      1    1305074974 ns/op     2247792 B/op       4032 allocs/op
Benchmark_run_line_long-4                      1    1305353658 ns/op     2246320 B/op       4013 allocs/op
Benchmark_run_line_long-4                      1    1305725817 ns/op     2247792 B/op       4031 allocs/op
Benchmark_run_line_short_workers-4          1000       2148354 ns/op     1029366 B/op       1005 allocs/op
Benchmark_run_line_short_workers-4          1000       2139629 ns/op     1029370 B/op       1005 allocs/op
Benchmark_run_line_short_workers-4          1000       1983352 ns/op     1029359 B/op       1005 allocs/op
Benchmark_run_line_short_workers-4          1000       1909968 ns/op     1029363 B/op       1005 allocs/op
Benchmark_run_line_long_workers-4              1    1298321093 ns/op     2247856 B/op       4013 allocs/op
Benchmark_run_line_long_workers-4              1    1299846127 ns/op     2246384 B/op       4012 allocs/op
Benchmark_run_line_long_workers-4              1    1300003625 ns/op     2246288 B/op       4011 allocs/op
Benchmark_run_line_long_workers-4              1    1302779911 ns/op     2246256 B/op       4011 allocs/op
Benchmark_run_bulk_short-4                  2000        704358 ns/op     1082154 B/op       1011 allocs/op
Benchmark_run_bulk_short-4                  2000        708563 ns/op     1082147 B/op       1011 allocs/op
Benchmark_run_bulk_short-4                  2000        714687 ns/op     1082148 B/op       1011 allocs/op
Benchmark_run_bulk_short-4                  2000        705546 ns/op     1082156 B/op       1011 allocs/op
Benchmark_run_bulk_long-4                     50      31411412 ns/op     1051497 B/op       1088 allocs/op
Benchmark_run_bulk_long-4                     50      31513018 ns/op     1051544 B/op       1088 allocs/op
Benchmark_run_bulk_long-4                     50      31539311 ns/op     1051502 B/op       1088 allocs/op
Benchmark_run_bulk_long-4                     50      31564940 ns/op     1051505 B/op       1088 allocs/op
Benchmark_run_seq_short-4                   2000        574346 ns/op     1028632 B/op       1002 allocs/op
Benchmark_run_seq_short-4                   3000        572857 ns/op     1028464 B/op       1002 allocs/op
Benchmark_run_seq_short-4                   2000        580493 ns/op     1028632 B/op       1002 allocs/op
Benchmark_run_seq_short-4                   3000        572240 ns/op     1028464 B/op       1002 allocs/op
Benchmark_run_seq_long-4                       1    5196313302 ns/op     2245792 B/op       4005 allocs/op
Benchmark_run_seq_long-4                       1    5199995649 ns/op     2245792 B/op       4005 allocs/op
Benchmark_run_seq_long-4                       1    5200460425 ns/op     2245792 B/op       4005 allocs/op
Benchmark_run_seq_long-4                       1    5201080570 ns/op     2245792 B/op       4005 allocs/op
PASS
ok      test/bulk   68.944s

注意:令我惊讶的run_line_short_workers是,它的速度慢于run_line_short,我没有解释该结果,但是使用pprof进行更深入的分析应该可以提供答案。

2020-07-02