在Go语言中处理任何stream数据时,我已经深陷io.Readerio.Writer的灵活性中不能自拔。同时我在有一点上又或多或少的受了些折磨,挑战我的reader interface在你看来可能会觉得很简单:那就是怎么样拆分读操作。

我甚至不知道使用“拆分(split)”这个词是否正确,我就是想通过io.Reader多次读取接收到的东西,有时候可能还需要并行操作。但是由于readers不一定会暴露Seek方法重置读取位置,我需要一个方法来复制它。或者可以算是clone或fork么?

现状

假设你有一个web服务允许用户上传一个文件。这个服务将会把文件存储在云端。但是在存储前需要对这个文件进行一些简单的处理。对于接下来的所有请求,你都不得不使用io.Reader去处理。

解决方案

当然,有不止一种方法可以处理这种情况。根据文件的类型,服务的吞吐量,以及文件需要的处理方式的不同有些方式可能比其他的更合适。下面,我给出了5中不同复杂度和灵活性的方法。可以想象还会有更多的方法,但是这几个会是一个不错的起点。

bytes.Reader
Seekbytes.Reader
func handleUpload(u io.Reader)(err error) {
    //capture all bytes from upload
    b, err := ioutil.ReadAll(u)
    if err != nil {
        return err
    }

    //wrap the bytes in a ReaderSeeker
    r := bytes.NewReader(b)

    //process the metadata
    err = processMetaData(r)
    if err != nil {
        return err
    }

    r.Seek(0, 0)

    //upload the data
    err = uploadFile(r)
    if err != nil {
        return err
    }

    return nil
}
bytes.Reader
  • 优点:最简单的方案
  • 缺点:同步,无法适应你期望的很多、很大的文件。

Solution #2:可靠的文件系统

ioutil.TempFile
func handleUpload(u io.Reader)(err error) {
    //create a temporary file for the upload
    f, err := ioutil.TempFile("", "upload")
    if err != nil {
        return err
    }

    //destroy the file once done
    defer func() {
        n := f.name()
        f.Close()
        os.Remove(n)
    }()

    //transfer the bytes to the file
    _, err := io.Copy(f, u)
    if err != nil {
        return err
    }

    //rewind the file
    f.Seek(0.0)

    //upload the file
    err = uploadFile(f)
    if err != nil{
        return err
    }

    return nil
}

如果最终是要将文件存储在service运行的文件系统中,这种方法可能是最好的选择(尽管会产生一个真实的临时文件),但是我们假设它最终将落在云上。继续,如果这个文件同样很大,则将产生显著的,但是不必要的IO。同时,你还将面临机器上单个文件错误或宕机的风险,所以如果你的数据比较敏感,我也不推荐这种方式。

  • 优点:避免大量内存占用保存整个文件
  • 缺点:同步,潜在的占用大量IO、磁盘空间以及数据单点故障
io.MultiReader
0xFF 0xD8io.MultiReaderio.MultiReader
func handleUpload(u io.Reader)(err error) {
    //read in the first 2 bytes
    b := make([]byte, 2)
    _, err := u.Read(b)
    if err != nil {
        return err
    }

    //check that they match the JPEG header
    jpg := []byte{0xFF, 0xD8}
    if !bytes.Equal(b, jpg) {
        return errors.New("not a JPEG")
    }

    //glue those bytes back onto the reader
    r := io.MultiReader(bytes.NewReader(b), u)

    //upload the file 
    err = uploadFile(r)
    if err != nil {
        return err
    }

    return nil
}
io.MultiReader
bufio.Reader.PeekMultiReader
  • 优点:快速且是对文件头的脏读,可以作为文件上传的门槛。
  • 缺点:不适用于不定长读取,处理整个文件,密集任务,或和很多第三方包一同使用。
io.TeeReaderio.Pipe
io.Reader
io.TeeReaderfunc TeeReader(r Reader, w Writer) Readerio.Pipeio.PipeWriterio.PipeReader
func HandleUpload(u io.Reader) (err error) {
    //create the pipe and tee reader
    pr, pw := io.Pipe()
    tr := io.TeeReader(u, pw)

    //Create channels to synchronize
    done := make(chan bool)
    errs := make(chan error)
    defer close(done)
    defer close(errs)

    go func() {
        //close the PipeWriter after the 
        //TeeReader completes to trigger EOF
        defer pw.Close()

        //upload the original MP4 data
        err := uploadFile(tr)
        if err != nil {
            errs <- err
            return
        }

        done <- true
    }()

    go func() {
        //transcode to WebM
        webmr, err := transcode(pr)
        if err != nil {
            errs <- err
            return
        }

        //upload to storage
        err = uploadFile(webmr)
        if err != nil {
            errs <- err
            return
        }

        done <- true
    }()

    //wait until both are done
    //or an error occurs
    for c := 0; c < 2; {
        select {
        case err := <-errs:
            return err
        case <- done:
            c++
        }
    }

    return nil
}
trio.Pipeio.Pipefatal error;all goroutines are asleep - deadlockio.PipeWriterTeeReader

这个示例同样采用了channel来进行goroutines之间的“doneness”和error的同步。如果你期望在执行过程中有一些更具体的值返回,你可以使用更合适的类型替换chan bool。

  • 优点:完全独立的,并行的处理相同的数据流
  • 缺点,使用goroutines和channel增加了复杂度
io.MultiWriterio.Copy
io.TeeReaderio.MultiWriterio.TeeReaderio.Copy
func handleUpload(u io.Reader)(err error) {
    //create the pipes
    mp4R, mp4W := io.Pipe()
    webmR, webmW := io.Pipe()
    oggR, oggW := io.Pipe()
    wavR, wavW := io.Pipe()

    //create channels to syschronize
    done := make(chan bool)
    errs := make(chan error)
    defer close(done)
    defer close(err)

    //spawn all the task goroutines. these looks identical to
    //the TeeReader example, but pulled out into separate 
    //methods for clarity
    go uploadMP4(mp4R, done, errs)
    go transcodeAndUploadWebM(webmR, done, errs)
    go transcodeAndUploadOgg(webmR, done, errs)
    go transcodeAndUploadWav(webmR, done, errs)

    go func() {
        // after completing the copy, we need to close
        // the PipeWriters to propagate the EOF to all 
        // PipeReaders to avoid deadlock
        defer mp4W.Close()
        defer webmW.Close()
        defer oggW.Close()
        defer wavW.Close()

        //build the multiwriter for all the pipes
        mw := io.MultiWriter(mp4W, webmW, oggW, wavW)

        //copy the data into the multiwriter
        _, err := io.Copy(mw, u)
        if err != nil {
            errs <- err
        }
    }()

     // wait until all are done
     // or an error occurs
     for c := 0; c < 4; c++ {
        select {
        case err := <-errs:
            return err
        case <-done:
        }
    }
    return nil
}

这个方法和前面的方法有点类似,但是当数据需要被克隆多次时,这种方法明显的更加简洁。因为使用了PIPEs,同样需要使用goroutines和同步channel,以防止死锁。我们在copy完成了关闭了所有的pipes。

  • 优点:可以根据需要fork多份原始数据
  • 缺点:过多的依赖goroutines和channel进行协调。

关于channels?

Channels是Go提供的独特的,强大的并发工具之一。它是goroutines之间的桥梁,同时兼顾了通信和同步。你可以创建带buffer和不带buffer的channel,来实现数据共享。那么,为什么我不提供一个充分利用Channels的解决方案,而不仅仅是用作同步呢?

查阅了一些标准库的top-level包,发现channels很少出现在函数签名中:

timeclose
io.Pipesync.Mutex

当开发一个可重复利用的包的时候,我会像标准库一样在我公开的API中避免使用Channels,但是会在内部使用它们用作同步。如果复杂度足够的低,使用mutex替代channel也许更加理想。这也就是说,在程序开发中,channel是更完美的抽象,比lock更好使用,更加灵活。

抛砖迎玉

io.Reader