在这篇文章中,我们会使用一些 Go 的著名并行范例(Goroutine 和 WaitGroup),高效地遍历有大量文件的目录。所有代码都可以在 GitHub 找到。

filepath.Walk()findfilepath.Walk()

递归版本

find

首先,打开目录:

func lsFiles(dir string) {
 file, err := os.Open(dir)
 if err != nil {
  fmt.Println("error opening directory")
 }
 defer file.Close()

然后,获取这个文件中的子文件切片(Slice,也就是其他语言中的列表或数组)。

files, err := file.Readdir(-1)
if err != nil {
 fmt.Println("error reading directory")
}

接着,我们将遍历这些文件,并再次调用我们的函数。

for _, f := range files {
  if f.IsDir() {
   lsFiles(dir + "/" + f.Name())
  }
  fmt.Println(dir + "/" + f.Name())
 }
}

可以看到,只有当文件是一个目录时,我们才会调用我们的函数,否则,只是打印出该文件的路径和名称。

初步测试

time
$ find /Users/alexkreidler
	274165

real	0m2.046s
user	0m0.416s
sys	0m1.640s

$ ./recursive /Users/alexkreidler
	274165

real	0m13.127s
user	0m1.751s
sys	0m10.294s
filepath.Walk()
func main() {
 err := filepath.Walk(os.Args[1], func(path string, fi os.FileInfo, err error) error {
  if err != nil {
   return err
  }
  fmt.Println(path)
  return nil
 })
 if err != nil {
  log.Fatal(err)
 }
}

./walk /Users/alexkreidler
	274165

real	0m13.287s
user	0m2.033s
sys	0m10.863s

Goroutine

好了,是时候并行化了。如果我们试着将递归调用改为 goroutine,会怎样呢?

只是

if f.IsDir() {
 lsFiles(dir + "/" + f.Name())
}

改成

if f.IsDir() {
 go lsFiles(dir + "/" + f.Name())
}

哎呀,不好了!现在,它只是列出一些顶级文件。这个程序生成了很多 goroutine,但是随着 main 函数的结束,程序并不会等待 goroutine 完成。我们需要让程序等待所有的 goroutine 结束。

WaitGroup

sync.WaitGroup
WaitGroup
var wg sync.WaitGroup
lsFiles()mainwg
wg.Add(1)
lsFiles(dir)
wg.Wait()

现在,为我们产生的每一个 goroutine 往 WaitGroup 加一:

if f.IsDir() {
 wg.Add(1)
 go lsFiles(dir + "/" + f.Name())
}
lsFileswg.Done()
defer wg.Done()

好啦!现在,在它打印每一个文件之前,它应该会处于等待状态了。

ulimits 和信号量 Channel

ulimits

在一台最近生产的有更多内核的计算机上,Go 调度器可能会同时创建超过 10,240 个 goroutine。每个 goroutine 都会打开文件,因此你会获得这样的错误:

too many open files

要解决这个问题,我们将使用一个信号量 channel:

var semaphoreChan = make(chan struct{}, runtime.GOMAXPROCS(runtime.NumCPU()))

这个 channel 的大小限制为我们机器上的 CPU 或者核心数。

func lsFiles(dir string) {
 // 满的时候阻塞
 semaphoreChan <- struct{}{}
 defer func() {
   // 读取以释放槽
   <-semaphoreChan
   wg.Done()
 }()
 ...

当我们试图发送到这个 channel 时,将会被阻塞。然后当完成之后,从该 channel 读取以释放槽。详细信息,请参阅。

测试和基准

$ ./benchmark.sh
CPUs/Cores: 2
GOMAXPROCS: 2
find /Users/alexkreidler
 274165

real 0m2.046s
user 0m0.416s
sys 0m1.640s
./recursive /Users/alexkreidler
 274165

real 0m13.127s
user 0m1.751s
sys 0m10.294s
./parallel /Users/alexkreidler
 274165

real 0m9.120s
user 0m4.781s
sys 0m10.676s
./walk /Users/alexkreidler
 274165

real 0m13.287s
user 0m2.033s
sys 0m10.863s

总而言之

findfilepath.Walk()

希望这篇文章说明了如何利用 Go 中的一些强大的功能来构建并行系统。我们讨论了:

* Goroutine
* WaitGroup
* Channel (信号量)
filepath.Walkfilepath

译者:

校对:

微信公众号:Go语言中文网: