现代计算机系统每天都会产生大量的数据,随着系统规模的增大,把产生的所有调试数据存储到数据库是不可行的,因为它们产生以后就不会被改变,只是用来分析和解决故障。因此把它存储在本地磁盘上是一个很好的办法。

在这我们打算使用GOLANG读取一个16GB大小,上百万行内容的txt或者log文件。跟我一起来吧。

LET CODE

首先我们使用GO标准库中的os.File来打开文件

f, err := os.Open(fileName)
 if err != nil {
   fmt.Println("cannot able to read the file", err)
   return
 }
// UPDATE: close after checking error
defer file.Close()  //Do not forget to close the file

当文件被打开以后,我们有两个选择

  1. 逐行读取,这能帮助我们减少内存的使用,但会耗费大量的时间在IO上。
  2. 一次性读取整个文件到内存中进行处理,这将显著的增加内存使用,但是会节省大量的时间。

但是要注意,我们的文件大小是16GB,因此我们不可能把它一次性的加载到内存中。但是第一个选择也不适合,因为我们想在几秒内处理完成。

因此,仔细想想,我们也许还有第三个选择,我们不去一次性读取整个文件到内存中,而是分段读取,想想看GO的标准库中是不是有个bufio.NewReader()呢?

r := bufio.NewReader(f)
for {
  buf := make([]byte,4*1024) //the chunk size
  n, err := r.Read(buf) //loading chunk into buffer
  buf = buf[:n]
  if n == 0 {
       if err == io.EOF {
         break
       }
       if err != nil {
         fmt.Println(err)
         break
       }
       return err
    }
}

一旦我们有个一个分块,我们就可以开启一个goroutine去处理它。因此上面的代码可以做如下修改。

//sync pools to reuse the memory and decrease the preassure on //Garbage Collector
linesPool := sync.Pool{New: func() interface{} {
        lines := make([]byte, 500*1024)
        return lines
}}
stringPool := sync.Pool{New: func() interface{} {
          lines := ""
          return lines
}}
slicePool := sync.Pool{New: func() interface{} {
           lines := make([]string, 100)
           return lines
}}
r := bufio.NewReader(f)
var wg sync.WaitGroup //wait group to keep track off all threads
for {
 
     buf := linesPool.Get().([]byte)
     n, err := r.Read(buf)
     buf = buf[:n]
     if n == 0 {
        if err == io.EOF {
            break
        }
        if err != nil {
            fmt.Println(err)
            break
        }
 
        return err
     }
     nextUntillNewline, err := r.ReadBytes('\n')//read entire line
 
     if err != io.EOF {
         buf = append(buf, nextUntillNewline...)
     }
 
     wg.Add(1)
     go func() { 
 
        //process each chunk concurrently
        //start -> log start time, end -> log end time
 
        ProcessChunk(buf, &linesPool, &stringPool, &slicePool,     start, end)
        wg.Done()
 
     }()
    }
    wg.Wait()
}

上面的代码做了两个优化:

  1. sync.Pool可以减轻GC的压力,我们可以重复使用已分配的内存,减少内存消耗,加快处理速度。
  2. goroutine帮助我们并发处理多个切块,显著加快处理速度。

接下来我们就来实现ProcessChunk函数,来处理如下格式的日志文件

2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n

我们将根据命令行提供的时间戳来提取日志

func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {
//another wait group to process every chunk further                             
      var wg2 sync.WaitGroup
logs := stringPool.Get().(string)
logs = string(chunk)
linesPool.Put(chunk) //put back the chunk in pool
//split the string by "\n", so that we have slice of logs
      logsSlice := strings.Split(logs, "\n")
stringPool.Put(logs) //put back the string pool
chunkSize := 100 //process the bunch of 100 logs in thread
n := len(logsSlice)
noOfThread := n / chunkSize
if n%chunkSize != 0 { //check for overflow 
         noOfThread++
      }
length := len(logsSlice)
//traverse the chunk
     for i := 0; i < length; i += chunkSize {
 
         wg2.Add(1)
//process each chunk in saperate chunk
         go func(s int, e int) {
            for i:= s; i<e;i++{
               text := logsSlice[i]
if len(text) == 0 {
                  continue
               }
 
            logParts := strings.SplitN(text, ",", 2)
            logCreationTimeString := logParts[0]
            logCreationTime, err := time.Parse("2006-01-  02T15:04:05.0000Z", logCreationTimeString)
if err != nil {
                 fmt.Printf("\n Could not able to parse the time :%s       for log : %v", logCreationTimeString, text)
                 return
            }
// check if log's timestamp is inbetween our desired period
          if logCreationTime.After(start) && logCreationTime.Before(end) {
 
            fmt.Println(text)
           }
        }
        textSlice = nil
        wg2.Done()
 
     }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
   //passing the indexes for processing
}  
   wg2.Wait() //wait for a chunk to finish
   logsSlice = nil
}

使用上面的代码来打开处理16GB的日志文件,测试花费时间大概是25s。完整代码如下

func main() {
 
	s := time.Now()
	args := os.Args[1:]
	if len(args) != 6 { // for format  LogExtractor.exe -f "From Time" -t "To Time" -i "Log file directory location"
		fmt.Println("Please give proper command line arguments")
		return
	}
	startTimeArg := args[1]
	finishTimeArg := args[3]
	fileName := args[5]
 
	file, err := os.Open(fileName)
 
	if err != nil {
		fmt.Println("cannot able to read the file", err)
		return
	}
 
	defer file.Close() //close after checking err
 
	queryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg)
	if err != nil {
		fmt.Println("Could not able to parse the start time", startTimeArg)
		return
	}
 
	queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg)
	if err != nil {
		fmt.Println("Could not able to parse the finish time", finishTimeArg)
		return
	}
 
	filestat, err := file.Stat()
	if err != nil {
		fmt.Println("Could not able to get the file stat")
		return
	}
 
	fileSize := filestat.Size()
	offset := fileSize - 1
	lastLineSize := 0
 
	for {
		b := make([]byte, 1)
		n, err := file.ReadAt(b, offset)
		if err != nil {
			fmt.Println("Error reading file ", err)
			break
		}
		char := string(b[0])
		if char == "\n" {
			break
		}
		offset--
		lastLineSize += n
	}
 
	lastLine := make([]byte, lastLineSize)
	_, err = file.ReadAt(lastLine, offset+1)
 
	if err != nil {
		fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize)
		return
	}
 
	logSlice := strings.SplitN(string(lastLine), ",", 2)
	logCreationTimeString := logSlice[0]
 
	lastLogCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
	if err != nil {
		fmt.Println("can not able to parse time : ", err)
	}
 
	if lastLogCreationTime.After(queryStartTime) && lastLogCreationTime.Before(queryFinishTime) {
		Process(file, queryStartTime, queryFinishTime)
	}
 
	fmt.Println("\nTime taken - ", time.Since(s))
}
 
func Process(f *os.File, start time.Time, end time.Time) error {
 
	linesPool := sync.Pool{New: func() interface{} {
		lines := make([]byte, 250*1024)
		return lines
	}}
 
	stringPool := sync.Pool{New: func() interface{} {
		lines := ""
		return lines
	}}
 
	r := bufio.NewReader(f)
 
	var wg sync.WaitGroup
 
	for {
		buf := linesPool.Get().([]byte)
 
		n, err := r.Read(buf)
		buf = buf[:n]
 
		if n == 0 {
			if err != nil {
				fmt.Println(err)
				break
			}
			if err == io.EOF {
				break
			}
			return err
		}
 
		nextUntillNewline, err := r.ReadBytes('\n')
 
		if err != io.EOF {
			buf = append(buf, nextUntillNewline...)
		}
 
		wg.Add(1)
		go func() {
			ProcessChunk(buf, &linesPool, &stringPool, start, end)
			wg.Done()
		}()
 
	}
 
	wg.Wait()
	return nil
}
 
func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) {
 
	var wg2 sync.WaitGroup
 
	logs := stringPool.Get().(string)
	logs = string(chunk)
 
	linesPool.Put(chunk)
 
	logsSlice := strings.Split(logs, "\n")
 
	stringPool.Put(logs)
 
	chunkSize := 300
	n := len(logsSlice)
	noOfThread := n / chunkSize
 
	if n%chunkSize != 0 {
		noOfThread++
	}
 
	for i := 0; i < (noOfThread); i++ {
 
		wg2.Add(1)
		go func(s int, e int) {
			defer wg2.Done() //to avaoid deadlocks
			for i := s; i < e; i++ {
				text := logsSlice[i]
				if len(text) == 0 {
					continue
				}
				logSlice := strings.SplitN(text, ",", 2)
				logCreationTimeString := logSlice[0]
 
				logCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
				if err != nil {
					fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
					return
				}
 
				if logCreationTime.After(start) && logCreationTime.Before(end) {
					//fmt.Println(text)
				}
			}
 
 
		}(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
	}
 
	wg2.Wait()
	logsSlice = nil
}
文章翻译自medium Reading 16GB File in Seconds, Golang,如有错误,欢迎指正