bufio 源码解读

bufio包实现了有缓冲的I/O。它包装一个io.Reader或io.Writer接口对象,创建另一个也实现了该接口,且同时还提供了缓冲和一些文本I/O的帮助函数的对象。

Reader 部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
// 设置默认缓存大小
const (
defaultBufSize = 4096
)
// 定义错误类型
var (
ErrInvalidUnreadByte = errors.New("bufio: invalid use of UnreadByte")
ErrInvalidUnreadRune = errors.New("bufio: invalid use of UnreadRune")
ErrBufferFull = errors.New("bufio: buffer full")
ErrNegativeCount = errors.New("bufio: negative count")
)
// bufio.Reader 结构包装了一个 io.Reader 对象,提供缓存功能,同时实现了 io.Reader 接口。
// 其结构没有任何导出的字段。
type Reader struct {
buf []byte // 缓存
rd io.Reader // 底层的 io.Reader
r, w int // r:从buf中读走的字节(偏移);w:buf 中填充内容的偏移;
err error // 读过程中遇到的错误
lastByte int // 最后一次读到的字节
lastRuneSize int // 最后一次读到的 Rune 的大小
}
// 设置连续读取最大尝试次数
const maxConsecutiveEmptyReads = 100
// NewReaderSize 将 rd 封装成一个带缓存的 bufio.Reader 对象,
// 缓存大小由 size 指定(如果小于 16 则会被设置为 16)。
// 如果 rd 的基类型就是有足够缓存的 bufio.Reader 类型,则直接将
// rd 转换为基类型返回。
func NewReaderSize(rd io.Reader, size int) *Reader {
// 已经是bufio.Reader类型,且缓存大小不小于 size,则直接返回
b, ok := rd.(*Reader)
// 判断缓存是否足够
if ok && len(b.buf) >= size {
return b
}
// 缓存大小不会小于 minReadBufferSize (16字节)
if size < minReadBufferSize {
size = minReadBufferSize
}
// 构造一个bufio.Reader实例
r := new(Reader)
r.reset(make([]byte, size), rd)
return r
}
// 创建默认大小缓存的 bufio.Reader
func NewReader(rd io.Reader) *Reader {
return NewReaderSize(rd, defaultBufSize)
}
// 提供reset的接口
func (b *Reader) Reset(r io.Reader) {
b.reset(b.buf, r)
}
// Reset丢弃缓冲中的数据,清除任何错误,将b重设为其下层从r读取数据。
func (b *Reader) reset(buf []byte, r io.Reader) {
*b = Reader{
buf: buf,
rd: r,
lastByte: -1,
lastRuneSize: -1,
}
}
// 定义错误:读出字节数为负
var errNegativeRead = errors.New("bufio: reader returned negative count from Read")
// 将新块读入缓冲区
func (b *Reader) fill() {
// 将 r 之前的数据覆盖,即,将现有数据滑动到开始。
if b.r > 0 {
copy(b.buf, b.buf[b.r:b.w])
b.w -= b.r
b.r = 0
}
if b.w >= len(b.buf) {
panic("bufio: tried to fill full buffer")
}
// 将新数据读到 buf 中,如果说读到数据则返回;如果读到数据为空,继续尝试有限次数。
for i := maxConsecutiveEmptyReads; i > 0; i-- {
n, err := b.rd.Read(b.buf[b.w:])
if n < 0 {
panic(errNegativeRead)
}
b.w += n
if err != nil {
b.err = err
return
}
if n > 0 {
return
}
}
b.err = io.ErrNoProgress
}
// 返回读取过程中的错误,调用后清空
func (b *Reader) readErr() error {
err := b.err
b.err = nil
return err
}
// Peek返回输入流的下n个字节,而不会移动读取位置。
// 返回的[]byte只在下一次调用读取操作前合法。
// 如果Peek返回的切片长度比n小,它也会返会一个错误说明原因。
// 如果n比缓冲尺寸还大,返回的错误将是ErrBufferFull。
func (b *Reader) Peek(n int) ([]byte, error) {
// 检查参数
if n < 0 {
return nil, ErrNegativeCount
}
// 当 buf 中数据量小于 n,且 buf 未满时,向其中填充数据,直至填满;如果有错误则停止。
for b.w-b.r < n && b.w-b.r < len(b.buf) && b.err == nil {
b.fill()
}
// 最多读 len(buf) 长度
if n > len(b.buf) {
return b.buf[b.r:b.w], ErrBufferFull
}
// 0 <= n <= len(b.buf)
var err error
if avail := b.w - b.r; avail < n {
// 缓存中没有足够数据,设置错误
n = avail
err = b.readErr()
if err == nil {
err = ErrBufferFull
}
}
return b.buf[b.r : b.r+n], err
}
// Discard 跳过后续的 n 个字节的数据,返回跳过的字节数。
// 如果结果小于 n,将返回错误信息。
// 如果 n 小于缓存中的数据长度,则不会从底层提取数据。
func (b *Reader) Discard(n int) (discarded int, err error) {
// 检查参数
if n < 0 {
return 0, ErrNegativeCount
}
if n == 0 {
return
}
remain := n
for {
// 获取缓存中的数据长度,如果没有数据,添加数据后再次获取长度。
skip := b.Buffered()
if skip == 0 {
b.fill()
skip = b.Buffered()
}
// 如果缓冲长度大于n,设置跳过长度为n。
if skip > remain {
skip = remain
}
b.r += skip
remain -= skip
if remain == 0 {
return n, nil
}
// 如果缓冲长度小于n,返回实际跳过长度及错误。
if b.err != nil {
return n - remain, b.readErr()
}
}
}
// Read 从 b 中读出数据到 p 中,返回读出的字节数和遇到的错误。
// 如果缓存不为空,则只能读出缓存中的数据,不会从底层 io.Reader
// 中提取数据,如果缓存为空,则:
// 1、len(p) >= 缓存大小,则跳过缓存,直接从底层 io.Reader 中读
// 出到 p 中。
// 2、len(p) < 缓存大小,则先将数据从底层 io.Reader 中读取到缓存
// 中,再从缓存读取到 p 中。
// 读取到达结尾时,返回值n将为0而 err 将为 io.EOF。
func (b *Reader) Read(p []byte) (n int, err error) {
n = len(p)
if n == 0 {
return 0, b.readErr()
}
if b.r == b.w {
// 如果缓冲为 0,且错误不为空,直接返回 0 及错误。
if b.err != nil {
return 0, b.readErr()
}
if len(p) >= len(b.buf) {
//跳过缓存,直接从底层 io.Reader 中读取, 避免 copy。
n, b.err = b.rd.Read(p)
if n < 0 {
panic(errNegativeRead)
}
if n > 0 {
b.lastByte = int(p[n-1])
b.lastRuneSize = -1
}
return n, b.readErr()
}
// 先将数据读到 buf 中,再读到 p 中。
// 不要使用 fill ,那将会循环读取。
b.r = 0
b.w = 0
n, b.err = b.rd.Read(b.buf)
if n < 0 {
panic(errNegativeRead)
}
if n == 0 {
return 0, b.readErr()
}
b.w += n
}
// 如果缓存不为空,则将缓存中数据复制到 p 中。
n = copy(p, b.buf[b.r:b.w])
b.r += n
b.lastByte = int(b.buf[b.r-1])
b.lastRuneSize = -1
return n, nil
}
// ReadByte读取一个字节并返回该字节。如果没有可用的数据,会返回错误。
func (b *Reader) ReadByte() (byte, error) {
b.lastRuneSize = -1
for b.r == b.w {
if b.err != nil {
return 0, b.readErr()
}
b.fill() // buffer is empty
}
c := b.buf[b.r]
b.r++
b.lastByte = int(c)
return c, nil
}
// UnreadByte吐出最近一次读取操作读取的最后一个字节。(只能吐出最后一个,多次调用会出问题)
func (b *Reader) UnreadByte() error {
if b.lastByte < 0 || b.r == 0 && b.w > 0 {
return ErrInvalidUnreadByte
}
// b.r > 0 || b.w == 0
if b.r > 0 {
b.r--
} else {
// b.r == 0 && b.w == 0
b.w = 1
}
b.buf[b.r] = byte(b.lastByte)
b.lastByte = -1
b.lastRuneSize = -1
return nil
}
// ReadRune 读取一个 utf-8 编码的 unicode 码值,返回该码值、其编码长度以及可能的错误。
// 如果 utf-8 编码非法,读取位置只移动1字节,返回 U+FFFD,返回值 size 为1而 err 为 nil。
// 如果没有可用的数据,会返回错误。
func (b *Reader) ReadRune() (r rune, size int, err error) {
for b.r+utf8.UTFMax > b.w && !utf8.FullRune(b.buf[b.r:b.w]) && b.err == nil && b.w-b.r < len(b.buf) {
b.fill() // b.w-b.r < len(buf) => buffer is not full
}
b.lastRuneSize = -1
if b.r == b.w {
return 0, 0, b.readErr()
}
r, size = rune(b.buf[b.r]), 1
if r >= utf8.RuneSelf {
r, size = utf8.DecodeRune(b.buf[b.r:b.w])
}
b.r += size
b.lastByte = int(b.buf[b.r-1])
b.lastRuneSize = size
return r, size, nil
}
// UnreadRune 吐出最近一次 ReadRune 调用读取的 unicode 码值。
// 如果最近一次读取不是调用的ReadRune,会返回错误。(从这点看,UnreadRune比UnreadByte严格很多)
func (b *Reader) UnreadRune() error {
if b.lastRuneSize < 0 || b.r < b.lastRuneSize {
return ErrInvalidUnreadRune
}
b.r -= b.lastRuneSize
b.lastByte = -1
b.lastRuneSize = -1
return nil
}
// Buffered 返回缓存中未读取的数据的长度。
func (b *Reader) Buffered() int { return b.w - b.r }
// ReadSlice 在 b 中查找 delim 并返回 delim 及其之前的所有数据。
// 该操作会读出数据,返回的切片是已读出的数据的引用,切片中的数据
// 在下一次读取操作之前是有效的。
//
// 如果找到 delim,则返回查找结果,err 返回 nil。
// 如果未找到 delim,则:
// 1、缓存不满,则将缓存填满后再次查找。
// 2、缓存是满的,则返回整个缓存,err 返回 ErrBufferFull。
//
// 如果未找到 delim 且遇到错误(通常是 io.EOF),则返回缓存中的所
// 有数据和遇到的错误。
//
// 因为返回的数据有可能被下一次的读写操作修改,所以大多数操作应该
// 使用 ReadBytes 或 ReadString,它们返回的是数据的拷贝。
func (b *Reader) ReadSlice(delim byte) (line []byte, err error) {
for {
// 在 buf 中查找 delim
if i := bytes.IndexByte(b.buf[b.r:b.w], delim); i >= 0 {
line = b.buf[b.r : b.r+i+1]
b.r += i + 1
break
}
// Pending error?
if b.err != nil {
line = b.buf[b.r:b.w]
b.r = b.w
err = b.readErr()
break
}
// Buffer full?
if b.Buffered() >= len(b.buf) {
b.r = b.w
line = b.buf
err = ErrBufferFull
break
}
b.fill() // buffer is not full
}
// Handle last byte, if any.
if i := len(line) - 1; i >= 0 {
b.lastByte = int(line[i])
b.lastRuneSize = -1
}
return
}
// ReadLine 是一个低水平的行读取原语,大多数情况下,应该使用
// ReadBytes('\n') 或 ReadString('\n'),或者使用一个 Scanner。
//
// ReadLine 通过调用 ReadSlice 方法实现,返回的也是缓存的切片。用于
// 读取一行数据,不包括行尾标记(\n 或 \r\n)。
//
// 只要能读出数据,err 就为 nil。如果没有数据可读,则 isPrefix 返回
// false,err 返回 io.EOF。
//
// 如果找到行尾标记,则返回查找结果,isPrefix 返回 false。
// 如果未找到行尾标记,则:
// 1、缓存不满,则将缓存填满后再次查找。
// 2、缓存是满的,则返回整个缓存,isPrefix 返回 true。
//
// 整个数据尾部“有一个换行标记”和“没有换行标记”的读取结果是一样。
//
// 如果 ReadLine 读取到换行标记,则调用 UnreadByte 撤销的是换行标记,
// 而不是返回的数据。
func (b *Reader) ReadLine() (line []byte, isPrefix bool, err error) {
line, err = b.ReadSlice('\n')
if err == ErrBufferFull {
// Handle the case where "\r\n" straddles the buffer.
if len(line) > 0 && line[len(line)-1] == '\r' {
// Put the '\r' back on buf and drop it from line.
// Let the next call to ReadLine check for "\r\n".
if b.r == 0 {
// should be unreachable
panic("bufio: tried to rewind past start of buffer")
}
b.r--
line = line[:len(line)-1]
}
return line, true, nil
}
if len(line) == 0 {
if err != nil {
line = nil
}
return
}
err = nil
if line[len(line)-1] == '\n' {
drop := 1
if len(line) > 1 && line[len(line)-2] == '\r' {
drop = 2
}
line = line[:len(line)-drop]
}
return
}
// ReadBytes 功能同 ReadSlice,只不过返回的是缓存的拷贝。
func (b *Reader) ReadBytes(delim byte) ([]byte, error) {
// Use ReadSlice to look for array,
// accumulating full buffers.
var frag []byte
var full [][]byte
var err error
for {
var e error
frag, e = b.ReadSlice(delim)
if e == nil { // got final fragment
break
}
if e != ErrBufferFull { // unexpected error
err = e
break
}
// Make a copy of the buffer.
buf := make([]byte, len(frag))
copy(buf, frag)
full = append(full, buf)
}
// Allocate new buffer to hold the full pieces and the fragment.
n := 0
for i := range full {
n += len(full[i])
}
n += len(frag)
// Copy full pieces and fragment in.
buf := make([]byte, n)
n = 0
for i := range full {
n += copy(buf[n:], full[i])
}
copy(buf[n:], frag)
return buf, err
}
// ReadString 功能同 ReadBytes,只不过返回的是字符串。
func (b *Reader) ReadString(delim byte) (string, error) {
bytes, err := b.ReadBytes(delim)
return string(bytes), err
}
// WriteTo方法实现了 io.WriterTo 接口。
func (b *Reader) WriteTo(w io.Writer) (n int64, err error) {
n, err = b.writeBuf(w)
if err != nil {
return
}
// 如果 b.rd 已经实现了 io.WriterTo 接口,直接调用并返回。
if r, ok := b.rd.(io.WriterTo); ok {
m, err := r.WriteTo(w)
n += m
return n, err
}
// 如果 w 已经实现了 io.ReaderFrom 接口,调用并返回。
if w, ok := w.(io.ReaderFrom); ok {
m, err := w.ReadFrom(b.rd)
n += m
return n, err
}
// buffer not full
if b.w-b.r < len(b.buf) {
b.fill()
}
// b.r < b.w => buffer is not empty
for b.r < b.w {
m, err := b.writeBuf(w)
n += m
if err != nil {
return n, err
}
b.fill() // buffer is empty
}
if b.err == io.EOF {
b.err = nil
}
return n, b.readErr()
}
// 定义错误:写入字节数为负
var errNegativeWrite = errors.New("bufio: writer returned negative count from Write")
// 将 Reader.buf 中的数据写到 io.Writer 中。
func (b *Reader) writeBuf(w io.Writer) (int64, error) {
n, err := w.Write(b.buf[b.r:b.w])
if n < 0 {
panic(errNegativeWrite)
}
b.r += n
return int64(n), err
}

Writer 部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
// Writer实现了 io.Writer 接口并为其添加缓冲。
// 如果在向一个 Writer 类型写入时遇到了错误,该对象将不再接受任何数据,且所有写操作都会返回该错误。
// 在所有数据都写入后,调用者有义务调用 Flush 方法以保证所有的数据都交给了下层的 io.Writer。
type Writer struct {
err error
buf []byte
n int
wr io.Writer
}
// NewWriterSize 将 w 封装成一个带缓存的 bufio.Writer 对象,
// 缓存大小由 size 指定(如果小于 4096 则会被设置为 4096)。
// 如果 wr 的基类型就是有足够缓存的 bufio.Writer 类型,则直接将
// wr 转换为基类型返回。
func NewWriterSize(w io.Writer, size int) *Writer {
// Is it already a Writer?
b, ok := w.(*Writer)
if ok && len(b.buf) >= size {
return b
}
if size <= 0 {
size = defaultBufSize
}
return &Writer{
buf: make([]byte, size),
wr: w,
}
}
// NewWriter 相当于 NewWriterSize(wr, 4096)。
func NewWriter(w io.Writer) *Writer {
return NewWriterSize(w, defaultBufSize)
}
// Reset 将 b 的底层 Writer 重新指定为 w,同时丢弃缓存中的所有数据,复位
// 所有标记和错误信息。相当于创建了一个新的 bufio.Writer。
func (b *Writer) Reset(w io.Writer) {
b.err = nil
b.n = 0
b.wr = w
}
// Flush 将缓存中的数据提交到底层的 io.Writer 中。
func (b *Writer) Flush() error {
if b.err != nil {
return b.err
}
if b.n == 0 {
return nil
}
// 将缓冲数据写入底层数据流
n, err := b.wr.Write(b.buf[0:b.n])
// 当未全部写完时,设置错误
if n < b.n && err == nil {
err = io.ErrShortWrite
}
if err != nil {
if n > 0 && n < b.n {
// 将已写过内容覆盖
copy(b.buf[0:b.n-n], b.buf[n:b.n])
}
b.n -= n
b.err = err
return err
}
b.n = 0
return nil
}
// Available 返回缓存中未使用的空间的长度
func (b *Writer) Available() int { return len(b.buf) - b.n }
// Buffered 返回缓存中未提交的数据的长度
func (b *Writer) Buffered() int { return b.n }
// Write 将 p 的内容写入缓冲。
// 返回写入的字节数。
// 如果返回值 nn < len(p),还会返回一个错误说明原因。
func (b *Writer) Write(p []byte) (nn int, err error) {
for len(p) > b.Available() && b.err == nil {
var n int
if b.Buffered() == 0 {
// Large write, empty buffer.
// Write directly from p to avoid copy.
n, b.err = b.wr.Write(p)
} else {
n = copy(b.buf[b.n:], p)
b.n += n
b.Flush()
}
nn += n
p = p[n:]
}
if b.err != nil {
return nn, b.err
}
n := copy(b.buf[b.n:], p)
b.n += n
nn += n
return nn, nil
}
// WriteByte 写入单个字节。
func (b *Writer) WriteByte(c byte) error {
if b.err != nil {
return b.err
}
if b.Available() <= 0 && b.Flush() != nil {
return b.err
}
b.buf[b.n] = c
b.n++
return nil
}
// WriteRune写入一个unicode码值(的utf-8编码),返回写入的字节数和可能的错误。
func (b *Writer) WriteRune(r rune) (size int, err error) {
if r < utf8.RuneSelf {
err = b.WriteByte(byte(r))
if err != nil {
return 0, err
}
return 1, nil
}
if b.err != nil {
return 0, b.err
}
n := b.Available()
if n < utf8.UTFMax {
if b.Flush(); b.err != nil {
return 0, b.err
}
n = b.Available()
if n < utf8.UTFMax {
// Can only happen if buffer is silly small.
return b.WriteString(string(r))
}
}
size = utf8.EncodeRune(b.buf[b.n:], r)
b.n += size
return size, nil
}
// WriteString 功能同 Write,只不过写入的是字符串。
// 返回写入的字节数。如果返回值nn < len(s),还会返回一个错误说明原因。
func (b *Writer) WriteString(s string) (int, error) {
nn := 0
for len(s) > b.Available() && b.err == nil {
n := copy(b.buf[b.n:], s)
b.n += n
nn += n
s = s[n:]
b.Flush()
}
if b.err != nil {
return nn, b.err
}
n := copy(b.buf[b.n:], s)
b.n += n
nn += n
return nn, nil
}
// ReadFrom实现了io.ReaderFrom接口。
func (b *Writer) ReadFrom(r io.Reader) (n int64, err error) {
if b.Buffered() == 0 {
if w, ok := b.wr.(io.ReaderFrom); ok {
return w.ReadFrom(r)
}
}
var m int
for {
if b.Available() == 0 {
if err1 := b.Flush(); err1 != nil {
return n, err1
}
}
nr := 0
for nr < maxConsecutiveEmptyReads {
m, err = r.Read(b.buf[b.n:])
if m != 0 || err != nil {
break
}
nr++
}
if nr == maxConsecutiveEmptyReads {
return n, io.ErrNoProgress
}
b.n += m
n += int64(m)
if err != nil {
break
}
}
if err == io.EOF {
// If we filled the buffer exactly, flush preemptively.
if b.Available() == 0 {
err = b.Flush()
} else {
err = nil
}
}
return n, err
}

ReadWriter 部分:

1
2
3
4
5
6
7
8
9
10
// ReadWriter类型保管了指向Reader和Writer类型的指针,因此实现了io.ReadWriter接口。
type ReadWriter struct {
*Reader
*Writer
}
// NewReadWriter申请创建一个新的、将读写操作分派给r和w 的ReadWriter。
func NewReadWriter(r *Reader, w *Writer) *ReadWriter {
return &ReadWriter{r, w}
}