Go Writer 和 Reader接口的设计遵循了Unix的输入和输出,一个程序的输出可以是另外一个程序的输入,与之类比,Reader将流中的数据写进缓冲中,是一种数据的流入;Writer将缓冲的数据写至流,是一种数据的流出。
问题背景
ToB的业务场景中的一种常见模式是私有云部署。对于一些保密性质的机关单位,应用只能在私有网络下部署,因此将数据包、镜像等部署所需的artifact克隆到用户现场便成为私有化交付中重要的一环。
完整流程大致如下:
- 解析对应的应用编排包及依赖,并下载所需的镜像及数据包(artifact)
- 将artifacts(可能需要压缩)、编排包pack成tar包
- 将tar包通过离线存储设备带到现场
- 在现场网络环境下将tar包load到私有化集群中
项目中采用virtual FS来实现封包/解包函数,借助两个helper函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
| // ArchiveFS 将fs封包成tar并写到buf中
func ArchiveFS(buf io.Writer, files fs.FS) error {
tw := tar.NewWriter(buf)
err := fs.WalkDir(files, "", func(path string, entry fs.DirEntry, err error) error {
if err != nil {
return err
}
if path == ".git" || path == ".idea" {
return filepath.SkipDir
}
// don't put "" into tar, since it can't be parsed
if path == "" {
return nil
}
file, err := files.Open(path)
if err != nil {
return err
}
defer file.Close()
info, err := entry.Info()
if err != nil {
return err
}
hdr, err := tar.FileInfoHeader(info, "")
if err != nil {
return err
}
hdr.Name = normalizeFilepathSeparator(path)
if info.IsDir() {
hdr.Mode = 0o755
} else {
hdr.Mode = 0o644
}
if err := tw.WriteHeader(hdr); err != nil {
return err
}
// dir must be writen to tar header for index files within it
if info.IsDir() {
return nil
}
if _, err := io.Copy(tw, file); err != nil {
return err
}
return nil
})
tw.Close()
if err != nil {
return err
}
return nil
}
// LoadArchive 将tar解包到memory fs中
func LoadArchive(r io.Reader) (afero.Fs, error) {
t := tar.NewReader(r)
fs := afero.NewMemMapFs()
for {
hdr, err := t.Next()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
if hdr.FileInfo().IsDir() {
path := hdr.Name
fs.MkdirAll(path, 0o755)
fs.Chtimes(path, time.Now(), time.Now())
continue
}
var buf bytes.Buffer
size, err := buf.ReadFrom(t)
if err != nil {
panic("tarfs: reading from tar:" + err.Error())
}
if size != hdr.Size {
panic("tarfs: size mismatch")
}
afero.WriteFile(fs, hdr.Name, buf.Bytes(), 0o644)
}
return fs, nil
}
|
起初,这两个函数只是作为编排包(几KB)的封包/解包helper。后来引入了带artifact的全量打包,为了方便就复用了这两个helper函数。最终在测试时发现在模拟现场加载tar包时出现OOM的问题。
问题分析
上述两个helper的Reader/Writer的入参是bytes.Buffer
,virtual FS都是memory FS。这样就导致每调用一次helper函数时,都会在内存中新增一份全量包的拷贝。对于应用编排包而言,其大小不过几KB,完全可以忽略掉内存的变化。而当引入了镜像及数据包这类size比较大的artifact时,pack出的全量包一度达到20+GB。即使在内存中完整存在一份全量包,都可能引入OOM的风险。因此我们考虑通过对IO进行优化,避免将全量包加载到内存。
经过分析,我们的处理流程中有以下部分可以优化:
- Artifact的下载都是通过http call,通过
ioutil.ReadAll(resp.Body)
copy到bytes.Buffer
中。可以考虑将artifact下载到os.File
中,借助磁盘节省内存。 - 内存里维护memory FS存放各类artifacts,可以考虑通过物理文件系统来维护相应的结构。
- FS的打包是通过上述
ArchiveFS
helper将FS以tar的形式写到bytes.Buffer中,可以考虑直接写到os.File
中。 - Artifact的上传是通过http.Post形式发送大文件,可以改造成流式传输。
FS的优化比较简单,可以直接通过tempDir来构造物理文件系统替换掉memory FS,下面着重讨论相应的IO优化。
IO优化
选用bytes.Buffer
的好处是其实现了io.Reader
与io.Writer
方法,可以灵活地读写。此外,其可以很方便地获取到完整的字节流,便于额外构造一个新的reader,这对于我们既要将reader写入文件,也要通过reader算md5的场景提供了便利。
使用os.File
来替代bytes.Buffer
1
2
3
4
5
6
7
8
9
10
11
12
| file, err := os.Create(tempPath)
defer file.Close()
resp, err := http.Get(url)
// ignoring handle err
defer resp.Body.Close()
// file作为writer,放置下载后的artifact
io.Copy(file, resp.Body)
// file作为reader,计算MD5
hash := md5.New()
io.Copy(hash, file)
// ... other logic
|
上述会将artifact下载到tempPath
下,但是计算出的md5值总为空内容的md5。
究其原因,os.File
写完后不能直接读,其在write后会将offset置为写入末尾处,需要手动将offset seek到文件开头才能读取完整的文件。举个例子:
1
2
3
4
5
6
7
8
| a := "text.txt"
file, _ := os.Create(a)
file.WriteString("test")
// file.Seek(0, io.SeekStart)
buf := new(bytes.Buffer)
buf.ReadFrom(file)
fmt.Println(buf.String())
|
输出结果为""
,去掉第5行注释的话,输出的结果便是test
。
使用io.TeeReader
实现双写
我们的案例中存在两种双写的场景:
- 将下载的内容写入到文件,同时计算md5
- 由于某种artifact存储方式为content-addressable,下载到文件的同时需要写入本地文件系统某个缓存目录(类比docker)
使用bytes.Buffer
可以方便地获取到内容,从而构建另一个reader,完成独立的两次写入。除此之外,io
package为我们提供了专门的双写方法:io.TeeReader
,其实现十分简单:
1
2
3
4
5
6
7
8
9
10
11
12
13
| func TeeReader(r Reader, w Writer) Reader {
return &teeReader{r, w}
}
func (t *teeReader) Read(p []byte) (n int, err error) {
n, err = t.r.Read(p)
if n > 0 {
if n, err := t.w.Write(p[:n]); err != nil {
return n, err
}
}
return
}
|
teeReader
内部wrap了一个writer,在进行read时,同时会将读出来的内容write到内部的writer中。在我们的场景下,以计算md5并写入文件为例:
1
2
3
4
5
6
7
8
| // reader comes from http.Body
hash := md5.New()
reader := io.TeeReader(reader, hash)
if _, err := io.Copy(writer, reader); err != nil {
return err
}
hashsum := fmt.Sprintf("%x", hash.Sum(nil))
// handle hashsum
|
使用io.Pipe实现读写管道
io.Pipe
是native go的一种实现,类似于生产者消费者模式,主要靠channel实现。在使用中,读写需要是异步的,否则会出现死锁的情况。在我们的案例中,加载全量包时会以http post的形式发送大文件。通常我们会在内存中构建body:
1
2
3
4
5
6
7
8
9
10
11
12
| // bytes.Buffer构造post body
buf := new(bytes.Buffer)
writer := multipart.NewWriter(buf)
defer writer.Close()
part, err := writer.CreateFormFile("upload", fn)
if err != nil {
return err
}
if _, err = io.Copy(part, file); err != nil {
return err
}
http.Post(url, writer.FormDataContentType(), buf)
|
当file
的size很大时(我们场景会是GB量级),body占用的内存会增大,增加了OOM的风险,因此我们采用io.Pipe进行内存优化:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| r, w := io.Pipe()
m := multipart.NewWriter(w)
// 异步写入,否则会出现死锁
go func() {
defer w.Close()
defer m.Close()
part, err := m.CreateFormFile("upload", fn)
if err != nil {
return
}
if _, err = io.Copy(part, file); err != nil {
return
}
}()
http.Post(url, m.FormDataContentType(), r)
|
观察io.Pipe
的核心实现,主要依赖于一个数据channel流式地进行数据传递:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| func Pipe() (*PipeReader, *PipeWriter) {
p := &pipe{
wrCh: make(chan []byte),
rdCh: make(chan int),
done: make(chan struct{}),
}
return &PipeReader{p}, &PipeWriter{p}
}
func (p *pipe) write(b []byte) (n int, err error) {
// ignore some control logic
for once := true; once || len(b) > 0; once = false {
select {
// wrCh是数据channel
case p.wrCh <- b:
// rdCh是读写计数channel
nw := <-p.rdCh
b = b[nw:]
n += nw
case <-p.done:
return n, p.writeCloseError()
}
}
return n, nil
}
func (p *pipe) read(b []byte) (n int, err error) {
// ignore some control logic
select {
case bw := <-p.wrCh:
nr := copy(b, bw)
p.rdCh <- nr
return nr, nil
case <-p.done:
return 0, p.readCloseError()
}
}
|
一坑多踩
除了OOM的问题,在开发过程中经常出现unexpected EOF的报错。此类问题排查的切入点比较模糊,经过深入地单步调试才得以定位:项目中使用了tar.Writer
进行封包,在使用tar.Reader
解包时报EOF错误。这意味着这个tar包不是一个合法的tar包,使用tar命令去操作此tar包也会有类似的报错。
究其原因,tar会在封包结束后,通过Close()
方法在包尾部加入一层padding:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| // Flush is called by tw.Close()
func (tw *Writer) Flush() error {
if tw.err != nil {
return tw.err
}
if nb := tw.curr.logicalRemaining(); nb > 0 {
return fmt.Errorf("archive/tar: missed writing %d bytes", nb)
}
if _, tw.err = tw.w.Write(zeroBlock[:tw.pad]); tw.err != nil {
return tw.err
}
tw.pad = 0
return nil
}
|
我们在封包时,没有调用Close()
,也就不会再尾部添加padding。在解包时,如果没有在尾部发现这个padding,就无法定位此tar包结束的位置,因此会认为此tar包是invalid,因此抛出unexpected EOF
那么,为什么会多此踩到这个坑呢?
除了使用了tar进行封包,我们还是用了zstd.Writer
对artifact进行了压缩,同样忘记了调用Close()
方法。在此总结出一条经验教训:对于ReadCloser
以及WriteCloser
的实现,在读/写完一定要记得调用Close()
方法,否则对于前者读而言,存在内存泄露的风险;对于后者写而言,存在因缺失尾部标识而导致包格式错误。
经验总结
本文描述了与io相关的OOM问题及unexpected EOF问题,总结了如下经验:
- 当待处理的内容足够大时,避免使用
bytes.Buffer
加载到内存中进行处理,借助磁盘或管道减少内存占用 os.File
写入后直接读,需要将offset重置到起始处- 使用http.Post传大文件时,可以使用
io.Pipe
构建body,减少内存占用 - 对于所有的
WriteCloser
的实现,写完记得调用Close()
以防写出的格式不完整
配图引自 https://medium.com/learning-the-go-programming-language/streaming-io-in-go-d93507931185