diff options
Diffstat (limited to 'vendor/github.com/klauspost/compress/s2/reader.go')
-rw-r--r-- | vendor/github.com/klauspost/compress/s2/reader.go | 1062 |
1 files changed, 0 insertions, 1062 deletions
diff --git a/vendor/github.com/klauspost/compress/s2/reader.go b/vendor/github.com/klauspost/compress/s2/reader.go deleted file mode 100644 index 2f01a39..0000000 --- a/vendor/github.com/klauspost/compress/s2/reader.go +++ /dev/null | |||
@@ -1,1062 +0,0 @@ | |||
1 | // Copyright 2011 The Snappy-Go Authors. All rights reserved. | ||
2 | // Copyright (c) 2019+ Klaus Post. All rights reserved. | ||
3 | // Use of this source code is governed by a BSD-style | ||
4 | // license that can be found in the LICENSE file. | ||
5 | |||
6 | package s2 | ||
7 | |||
8 | import ( | ||
9 | "errors" | ||
10 | "fmt" | ||
11 | "io" | ||
12 | "io/ioutil" | ||
13 | "math" | ||
14 | "runtime" | ||
15 | "sync" | ||
16 | ) | ||
17 | |||
18 | // ErrCantSeek is returned if the stream cannot be seeked. | ||
19 | type ErrCantSeek struct { | ||
20 | Reason string | ||
21 | } | ||
22 | |||
23 | // Error returns the error as string. | ||
24 | func (e ErrCantSeek) Error() string { | ||
25 | return fmt.Sprintf("s2: Can't seek because %s", e.Reason) | ||
26 | } | ||
27 | |||
28 | // NewReader returns a new Reader that decompresses from r, using the framing | ||
29 | // format described at | ||
30 | // https://github.com/google/snappy/blob/master/framing_format.txt with S2 changes. | ||
31 | func NewReader(r io.Reader, opts ...ReaderOption) *Reader { | ||
32 | nr := Reader{ | ||
33 | r: r, | ||
34 | maxBlock: maxBlockSize, | ||
35 | } | ||
36 | for _, opt := range opts { | ||
37 | if err := opt(&nr); err != nil { | ||
38 | nr.err = err | ||
39 | return &nr | ||
40 | } | ||
41 | } | ||
42 | nr.maxBufSize = MaxEncodedLen(nr.maxBlock) + checksumSize | ||
43 | if nr.lazyBuf > 0 { | ||
44 | nr.buf = make([]byte, MaxEncodedLen(nr.lazyBuf)+checksumSize) | ||
45 | } else { | ||
46 | nr.buf = make([]byte, MaxEncodedLen(defaultBlockSize)+checksumSize) | ||
47 | } | ||
48 | nr.readHeader = nr.ignoreStreamID | ||
49 | nr.paramsOK = true | ||
50 | return &nr | ||
51 | } | ||
52 | |||
53 | // ReaderOption is an option for creating a decoder. | ||
54 | type ReaderOption func(*Reader) error | ||
55 | |||
56 | // ReaderMaxBlockSize allows to control allocations if the stream | ||
57 | // has been compressed with a smaller WriterBlockSize, or with the default 1MB. | ||
58 | // Blocks must be this size or smaller to decompress, | ||
59 | // otherwise the decoder will return ErrUnsupported. | ||
60 | // | ||
61 | // For streams compressed with Snappy this can safely be set to 64KB (64 << 10). | ||
62 | // | ||
63 | // Default is the maximum limit of 4MB. | ||
64 | func ReaderMaxBlockSize(blockSize int) ReaderOption { | ||
65 | return func(r *Reader) error { | ||
66 | if blockSize > maxBlockSize || blockSize <= 0 { | ||
67 | return errors.New("s2: block size too large. Must be <= 4MB and > 0") | ||
68 | } | ||
69 | if r.lazyBuf == 0 && blockSize < defaultBlockSize { | ||
70 | r.lazyBuf = blockSize | ||
71 | } | ||
72 | r.maxBlock = blockSize | ||
73 | return nil | ||
74 | } | ||
75 | } | ||
76 | |||
77 | // ReaderAllocBlock allows to control upfront stream allocations | ||
78 | // and not allocate for frames bigger than this initially. | ||
79 | // If frames bigger than this is seen a bigger buffer will be allocated. | ||
80 | // | ||
81 | // Default is 1MB, which is default output size. | ||
82 | func ReaderAllocBlock(blockSize int) ReaderOption { | ||
83 | return func(r *Reader) error { | ||
84 | if blockSize > maxBlockSize || blockSize < 1024 { | ||
85 | return errors.New("s2: invalid ReaderAllocBlock. Must be <= 4MB and >= 1024") | ||
86 | } | ||
87 | r.lazyBuf = blockSize | ||
88 | return nil | ||
89 | } | ||
90 | } | ||
91 | |||
92 | // ReaderIgnoreStreamIdentifier will make the reader skip the expected | ||
93 | // stream identifier at the beginning of the stream. | ||
94 | // This can be used when serving a stream that has been forwarded to a specific point. | ||
95 | func ReaderIgnoreStreamIdentifier() ReaderOption { | ||
96 | return func(r *Reader) error { | ||
97 | r.ignoreStreamID = true | ||
98 | return nil | ||
99 | } | ||
100 | } | ||
101 | |||
102 | // ReaderSkippableCB will register a callback for chuncks with the specified ID. | ||
103 | // ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive). | ||
104 | // For each chunk with the ID, the callback is called with the content. | ||
105 | // Any returned non-nil error will abort decompression. | ||
106 | // Only one callback per ID is supported, latest sent will be used. | ||
107 | func ReaderSkippableCB(id uint8, fn func(r io.Reader) error) ReaderOption { | ||
108 | return func(r *Reader) error { | ||
109 | if id < 0x80 || id > 0xfd { | ||
110 | return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfd (inclusive)") | ||
111 | } | ||
112 | r.skippableCB[id] = fn | ||
113 | return nil | ||
114 | } | ||
115 | } | ||
116 | |||
117 | // ReaderIgnoreCRC will make the reader skip CRC calculation and checks. | ||
118 | func ReaderIgnoreCRC() ReaderOption { | ||
119 | return func(r *Reader) error { | ||
120 | r.ignoreCRC = true | ||
121 | return nil | ||
122 | } | ||
123 | } | ||
124 | |||
125 | // Reader is an io.Reader that can read Snappy-compressed bytes. | ||
126 | type Reader struct { | ||
127 | r io.Reader | ||
128 | err error | ||
129 | decoded []byte | ||
130 | buf []byte | ||
131 | skippableCB [0x80]func(r io.Reader) error | ||
132 | blockStart int64 // Uncompressed offset at start of current. | ||
133 | index *Index | ||
134 | |||
135 | // decoded[i:j] contains decoded bytes that have not yet been passed on. | ||
136 | i, j int | ||
137 | // maximum block size allowed. | ||
138 | maxBlock int | ||
139 | // maximum expected buffer size. | ||
140 | maxBufSize int | ||
141 | // alloc a buffer this size if > 0. | ||
142 | lazyBuf int | ||
143 | readHeader bool | ||
144 | paramsOK bool | ||
145 | snappyFrame bool | ||
146 | ignoreStreamID bool | ||
147 | ignoreCRC bool | ||
148 | } | ||
149 | |||
150 | // GetBufferCapacity returns the capacity of the internal buffer. | ||
151 | // This might be useful to know when reusing the same reader in combination | ||
152 | // with the lazy buffer option. | ||
153 | func (r *Reader) GetBufferCapacity() int { | ||
154 | return cap(r.buf) | ||
155 | } | ||
156 | |||
157 | // ensureBufferSize will ensure that the buffer can take at least n bytes. | ||
158 | // If false is returned the buffer exceeds maximum allowed size. | ||
159 | func (r *Reader) ensureBufferSize(n int) bool { | ||
160 | if n > r.maxBufSize { | ||
161 | r.err = ErrCorrupt | ||
162 | return false | ||
163 | } | ||
164 | if cap(r.buf) >= n { | ||
165 | return true | ||
166 | } | ||
167 | // Realloc buffer. | ||
168 | r.buf = make([]byte, n) | ||
169 | return true | ||
170 | } | ||
171 | |||
172 | // Reset discards any buffered data, resets all state, and switches the Snappy | ||
173 | // reader to read from r. This permits reusing a Reader rather than allocating | ||
174 | // a new one. | ||
175 | func (r *Reader) Reset(reader io.Reader) { | ||
176 | if !r.paramsOK { | ||
177 | return | ||
178 | } | ||
179 | r.index = nil | ||
180 | r.r = reader | ||
181 | r.err = nil | ||
182 | r.i = 0 | ||
183 | r.j = 0 | ||
184 | r.blockStart = 0 | ||
185 | r.readHeader = r.ignoreStreamID | ||
186 | } | ||
187 | |||
188 | func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) { | ||
189 | if _, r.err = io.ReadFull(r.r, p); r.err != nil { | ||
190 | if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) { | ||
191 | r.err = ErrCorrupt | ||
192 | } | ||
193 | return false | ||
194 | } | ||
195 | return true | ||
196 | } | ||
197 | |||
198 | // skippable will skip n bytes. | ||
199 | // If the supplied reader supports seeking that is used. | ||
200 | // tmp is used as a temporary buffer for reading. | ||
201 | // The supplied slice does not need to be the size of the read. | ||
202 | func (r *Reader) skippable(tmp []byte, n int, allowEOF bool, id uint8) (ok bool) { | ||
203 | if id < 0x80 { | ||
204 | r.err = fmt.Errorf("interbal error: skippable id < 0x80") | ||
205 | return false | ||
206 | } | ||
207 | if fn := r.skippableCB[id-0x80]; fn != nil { | ||
208 | rd := io.LimitReader(r.r, int64(n)) | ||
209 | r.err = fn(rd) | ||
210 | if r.err != nil { | ||
211 | return false | ||
212 | } | ||
213 | _, r.err = io.CopyBuffer(ioutil.Discard, rd, tmp) | ||
214 | return r.err == nil | ||
215 | } | ||
216 | if rs, ok := r.r.(io.ReadSeeker); ok { | ||
217 | _, err := rs.Seek(int64(n), io.SeekCurrent) | ||
218 | if err == nil { | ||
219 | return true | ||
220 | } | ||
221 | if err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) { | ||
222 | r.err = ErrCorrupt | ||
223 | return false | ||
224 | } | ||
225 | } | ||
226 | for n > 0 { | ||
227 | if n < len(tmp) { | ||
228 | tmp = tmp[:n] | ||
229 | } | ||
230 | if _, r.err = io.ReadFull(r.r, tmp); r.err != nil { | ||
231 | if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) { | ||
232 | r.err = ErrCorrupt | ||
233 | } | ||
234 | return false | ||
235 | } | ||
236 | n -= len(tmp) | ||
237 | } | ||
238 | return true | ||
239 | } | ||
240 | |||
241 | // Read satisfies the io.Reader interface. | ||
242 | func (r *Reader) Read(p []byte) (int, error) { | ||
243 | if r.err != nil { | ||
244 | return 0, r.err | ||
245 | } | ||
246 | for { | ||
247 | if r.i < r.j { | ||
248 | n := copy(p, r.decoded[r.i:r.j]) | ||
249 | r.i += n | ||
250 | return n, nil | ||
251 | } | ||
252 | if !r.readFull(r.buf[:4], true) { | ||
253 | return 0, r.err | ||
254 | } | ||
255 | chunkType := r.buf[0] | ||
256 | if !r.readHeader { | ||
257 | if chunkType != chunkTypeStreamIdentifier { | ||
258 | r.err = ErrCorrupt | ||
259 | return 0, r.err | ||
260 | } | ||
261 | r.readHeader = true | ||
262 | } | ||
263 | chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16 | ||
264 | |||
265 | // The chunk types are specified at | ||
266 | // https://github.com/google/snappy/blob/master/framing_format.txt | ||
267 | switch chunkType { | ||
268 | case chunkTypeCompressedData: | ||
269 | r.blockStart += int64(r.j) | ||
270 | // Section 4.2. Compressed data (chunk type 0x00). | ||
271 | if chunkLen < checksumSize { | ||
272 | r.err = ErrCorrupt | ||
273 | return 0, r.err | ||
274 | } | ||
275 | if !r.ensureBufferSize(chunkLen) { | ||
276 | if r.err == nil { | ||
277 | r.err = ErrUnsupported | ||
278 | } | ||
279 | return 0, r.err | ||
280 | } | ||
281 | buf := r.buf[:chunkLen] | ||
282 | if !r.readFull(buf, false) { | ||
283 | return 0, r.err | ||
284 | } | ||
285 | checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24 | ||
286 | buf = buf[checksumSize:] | ||
287 | |||
288 | n, err := DecodedLen(buf) | ||
289 | if err != nil { | ||
290 | r.err = err | ||
291 | return 0, r.err | ||
292 | } | ||
293 | if r.snappyFrame && n > maxSnappyBlockSize { | ||
294 | r.err = ErrCorrupt | ||
295 | return 0, r.err | ||
296 | } | ||
297 | |||
298 | if n > len(r.decoded) { | ||
299 | if n > r.maxBlock { | ||
300 | r.err = ErrCorrupt | ||
301 | return 0, r.err | ||
302 | } | ||
303 | r.decoded = make([]byte, n) | ||
304 | } | ||
305 | if _, err := Decode(r.decoded, buf); err != nil { | ||
306 | r.err = err | ||
307 | return 0, r.err | ||
308 | } | ||
309 | if !r.ignoreCRC && crc(r.decoded[:n]) != checksum { | ||
310 | r.err = ErrCRC | ||
311 | return 0, r.err | ||
312 | } | ||
313 | r.i, r.j = 0, n | ||
314 | continue | ||
315 | |||
316 | case chunkTypeUncompressedData: | ||
317 | r.blockStart += int64(r.j) | ||
318 | // Section 4.3. Uncompressed data (chunk type 0x01). | ||
319 | if chunkLen < checksumSize { | ||
320 | r.err = ErrCorrupt | ||
321 | return 0, r.err | ||
322 | } | ||
323 | if !r.ensureBufferSize(chunkLen) { | ||
324 | if r.err == nil { | ||
325 | r.err = ErrUnsupported | ||
326 | } | ||
327 | return 0, r.err | ||
328 | } | ||
329 | buf := r.buf[:checksumSize] | ||
330 | if !r.readFull(buf, false) { | ||
331 | return 0, r.err | ||
332 | } | ||
333 | checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24 | ||
334 | // Read directly into r.decoded instead of via r.buf. | ||
335 | n := chunkLen - checksumSize | ||
336 | if r.snappyFrame && n > maxSnappyBlockSize { | ||
337 | r.err = ErrCorrupt | ||
338 | return 0, r.err | ||
339 | } | ||
340 | if n > len(r.decoded) { | ||
341 | if n > r.maxBlock { | ||
342 | r.err = ErrCorrupt | ||
343 | return 0, r.err | ||
344 | } | ||
345 | r.decoded = make([]byte, n) | ||
346 | } | ||
347 | if !r.readFull(r.decoded[:n], false) { | ||
348 | return 0, r.err | ||
349 | } | ||
350 | if !r.ignoreCRC && crc(r.decoded[:n]) != checksum { | ||
351 | r.err = ErrCRC | ||
352 | return 0, r.err | ||
353 | } | ||
354 | r.i, r.j = 0, n | ||
355 | continue | ||
356 | |||
357 | case chunkTypeStreamIdentifier: | ||
358 | // Section 4.1. Stream identifier (chunk type 0xff). | ||
359 | if chunkLen != len(magicBody) { | ||
360 | r.err = ErrCorrupt | ||
361 | return 0, r.err | ||
362 | } | ||
363 | if !r.readFull(r.buf[:len(magicBody)], false) { | ||
364 | return 0, r.err | ||
365 | } | ||
366 | if string(r.buf[:len(magicBody)]) != magicBody { | ||
367 | if string(r.buf[:len(magicBody)]) != magicBodySnappy { | ||
368 | r.err = ErrCorrupt | ||
369 | return 0, r.err | ||
370 | } else { | ||
371 | r.snappyFrame = true | ||
372 | } | ||
373 | } else { | ||
374 | r.snappyFrame = false | ||
375 | } | ||
376 | continue | ||
377 | } | ||
378 | |||
379 | if chunkType <= 0x7f { | ||
380 | // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f). | ||
381 | // fmt.Printf("ERR chunktype: 0x%x\n", chunkType) | ||
382 | r.err = ErrUnsupported | ||
383 | return 0, r.err | ||
384 | } | ||
385 | // Section 4.4 Padding (chunk type 0xfe). | ||
386 | // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd). | ||
387 | if chunkLen > maxChunkSize { | ||
388 | // fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen) | ||
389 | r.err = ErrUnsupported | ||
390 | return 0, r.err | ||
391 | } | ||
392 | |||
393 | // fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen) | ||
394 | if !r.skippable(r.buf, chunkLen, false, chunkType) { | ||
395 | return 0, r.err | ||
396 | } | ||
397 | } | ||
398 | } | ||
399 | |||
400 | // DecodeConcurrent will decode the full stream to w. | ||
401 | // This function should not be combined with reading, seeking or other operations. | ||
402 | // Up to 'concurrent' goroutines will be used. | ||
403 | // If <= 0, runtime.NumCPU will be used. | ||
404 | // On success the number of bytes decompressed nil and is returned. | ||
405 | // This is mainly intended for bigger streams. | ||
406 | func (r *Reader) DecodeConcurrent(w io.Writer, concurrent int) (written int64, err error) { | ||
407 | if r.i > 0 || r.j > 0 || r.blockStart > 0 { | ||
408 | return 0, errors.New("DecodeConcurrent called after ") | ||
409 | } | ||
410 | if concurrent <= 0 { | ||
411 | concurrent = runtime.NumCPU() | ||
412 | } | ||
413 | |||
414 | // Write to output | ||
415 | var errMu sync.Mutex | ||
416 | var aErr error | ||
417 | setErr := func(e error) (ok bool) { | ||
418 | errMu.Lock() | ||
419 | defer errMu.Unlock() | ||
420 | if e == nil { | ||
421 | return aErr == nil | ||
422 | } | ||
423 | if aErr == nil { | ||
424 | aErr = e | ||
425 | } | ||
426 | return false | ||
427 | } | ||
428 | hasErr := func() (ok bool) { | ||
429 | errMu.Lock() | ||
430 | v := aErr != nil | ||
431 | errMu.Unlock() | ||
432 | return v | ||
433 | } | ||
434 | |||
435 | var aWritten int64 | ||
436 | toRead := make(chan []byte, concurrent) | ||
437 | writtenBlocks := make(chan []byte, concurrent) | ||
438 | queue := make(chan chan []byte, concurrent) | ||
439 | reUse := make(chan chan []byte, concurrent) | ||
440 | for i := 0; i < concurrent; i++ { | ||
441 | toRead <- make([]byte, 0, r.maxBufSize) | ||
442 | writtenBlocks <- make([]byte, 0, r.maxBufSize) | ||
443 | reUse <- make(chan []byte, 1) | ||
444 | } | ||
445 | // Writer | ||
446 | var wg sync.WaitGroup | ||
447 | wg.Add(1) | ||
448 | go func() { | ||
449 | defer wg.Done() | ||
450 | for toWrite := range queue { | ||
451 | entry := <-toWrite | ||
452 | reUse <- toWrite | ||
453 | if hasErr() { | ||
454 | writtenBlocks <- entry | ||
455 | continue | ||
456 | } | ||
457 | n, err := w.Write(entry) | ||
458 | want := len(entry) | ||
459 | writtenBlocks <- entry | ||
460 | if err != nil { | ||
461 | setErr(err) | ||
462 | continue | ||
463 | } | ||
464 | if n != want { | ||
465 | setErr(io.ErrShortWrite) | ||
466 | continue | ||
467 | } | ||
468 | aWritten += int64(n) | ||
469 | } | ||
470 | }() | ||
471 | |||
472 | // Reader | ||
473 | defer func() { | ||
474 | close(queue) | ||
475 | if r.err != nil { | ||
476 | err = r.err | ||
477 | setErr(r.err) | ||
478 | } | ||
479 | wg.Wait() | ||
480 | if err == nil { | ||
481 | err = aErr | ||
482 | } | ||
483 | written = aWritten | ||
484 | }() | ||
485 | |||
486 | for !hasErr() { | ||
487 | if !r.readFull(r.buf[:4], true) { | ||
488 | if r.err == io.EOF { | ||
489 | r.err = nil | ||
490 | } | ||
491 | return 0, r.err | ||
492 | } | ||
493 | chunkType := r.buf[0] | ||
494 | if !r.readHeader { | ||
495 | if chunkType != chunkTypeStreamIdentifier { | ||
496 | r.err = ErrCorrupt | ||
497 | return 0, r.err | ||
498 | } | ||
499 | r.readHeader = true | ||
500 | } | ||
501 | chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16 | ||
502 | |||
503 | // The chunk types are specified at | ||
504 | // https://github.com/google/snappy/blob/master/framing_format.txt | ||
505 | switch chunkType { | ||
506 | case chunkTypeCompressedData: | ||
507 | r.blockStart += int64(r.j) | ||
508 | // Section 4.2. Compressed data (chunk type 0x00). | ||
509 | if chunkLen < checksumSize { | ||
510 | r.err = ErrCorrupt | ||
511 | return 0, r.err | ||
512 | } | ||
513 | if chunkLen > r.maxBufSize { | ||
514 | r.err = ErrCorrupt | ||
515 | return 0, r.err | ||
516 | } | ||
517 | orgBuf := <-toRead | ||
518 | buf := orgBuf[:chunkLen] | ||
519 | |||
520 | if !r.readFull(buf, false) { | ||
521 | return 0, r.err | ||
522 | } | ||
523 | |||
524 | checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24 | ||
525 | buf = buf[checksumSize:] | ||
526 | |||
527 | n, err := DecodedLen(buf) | ||
528 | if err != nil { | ||
529 | r.err = err | ||
530 | return 0, r.err | ||
531 | } | ||
532 | if r.snappyFrame && n > maxSnappyBlockSize { | ||
533 | r.err = ErrCorrupt | ||
534 | return 0, r.err | ||
535 | } | ||
536 | |||
537 | if n > r.maxBlock { | ||
538 | r.err = ErrCorrupt | ||
539 | return 0, r.err | ||
540 | } | ||
541 | wg.Add(1) | ||
542 | |||
543 | decoded := <-writtenBlocks | ||
544 | entry := <-reUse | ||
545 | queue <- entry | ||
546 | go func() { | ||
547 | defer wg.Done() | ||
548 | decoded = decoded[:n] | ||
549 | _, err := Decode(decoded, buf) | ||
550 | toRead <- orgBuf | ||
551 | if err != nil { | ||
552 | writtenBlocks <- decoded | ||
553 | setErr(err) | ||
554 | return | ||
555 | } | ||
556 | if !r.ignoreCRC && crc(decoded) != checksum { | ||
557 | writtenBlocks <- decoded | ||
558 | setErr(ErrCRC) | ||
559 | return | ||
560 | } | ||
561 | entry <- decoded | ||
562 | }() | ||
563 | continue | ||
564 | |||
565 | case chunkTypeUncompressedData: | ||
566 | |||
567 | // Section 4.3. Uncompressed data (chunk type 0x01). | ||
568 | if chunkLen < checksumSize { | ||
569 | r.err = ErrCorrupt | ||
570 | return 0, r.err | ||
571 | } | ||
572 | if chunkLen > r.maxBufSize { | ||
573 | r.err = ErrCorrupt | ||
574 | return 0, r.err | ||
575 | } | ||
576 | // Grab write buffer | ||
577 | orgBuf := <-writtenBlocks | ||
578 | buf := orgBuf[:checksumSize] | ||
579 | if !r.readFull(buf, false) { | ||
580 | return 0, r.err | ||
581 | } | ||
582 | checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24 | ||
583 | // Read content. | ||
584 | n := chunkLen - checksumSize | ||
585 | |||
586 | if r.snappyFrame && n > maxSnappyBlockSize { | ||
587 | r.err = ErrCorrupt | ||
588 | return 0, r.err | ||
589 | } | ||
590 | if n > r.maxBlock { | ||
591 | r.err = ErrCorrupt | ||
592 | return 0, r.err | ||
593 | } | ||
594 | // Read uncompressed | ||
595 | buf = orgBuf[:n] | ||
596 | if !r.readFull(buf, false) { | ||
597 | return 0, r.err | ||
598 | } | ||
599 | |||
600 | if !r.ignoreCRC && crc(buf) != checksum { | ||
601 | r.err = ErrCRC | ||
602 | return 0, r.err | ||
603 | } | ||
604 | entry := <-reUse | ||
605 | queue <- entry | ||
606 | entry <- buf | ||
607 | continue | ||
608 | |||
609 | case chunkTypeStreamIdentifier: | ||
610 | // Section 4.1. Stream identifier (chunk type 0xff). | ||
611 | if chunkLen != len(magicBody) { | ||
612 | r.err = ErrCorrupt | ||
613 | return 0, r.err | ||
614 | } | ||
615 | if !r.readFull(r.buf[:len(magicBody)], false) { | ||
616 | return 0, r.err | ||
617 | } | ||
618 | if string(r.buf[:len(magicBody)]) != magicBody { | ||
619 | if string(r.buf[:len(magicBody)]) != magicBodySnappy { | ||
620 | r.err = ErrCorrupt | ||
621 | return 0, r.err | ||
622 | } else { | ||
623 | r.snappyFrame = true | ||
624 | } | ||
625 | } else { | ||
626 | r.snappyFrame = false | ||
627 | } | ||
628 | continue | ||
629 | } | ||
630 | |||
631 | if chunkType <= 0x7f { | ||
632 | // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f). | ||
633 | // fmt.Printf("ERR chunktype: 0x%x\n", chunkType) | ||
634 | r.err = ErrUnsupported | ||
635 | return 0, r.err | ||
636 | } | ||
637 | // Section 4.4 Padding (chunk type 0xfe). | ||
638 | // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd). | ||
639 | if chunkLen > maxChunkSize { | ||
640 | // fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen) | ||
641 | r.err = ErrUnsupported | ||
642 | return 0, r.err | ||
643 | } | ||
644 | |||
645 | // fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen) | ||
646 | if !r.skippable(r.buf, chunkLen, false, chunkType) { | ||
647 | return 0, r.err | ||
648 | } | ||
649 | } | ||
650 | return 0, r.err | ||
651 | } | ||
652 | |||
653 | // Skip will skip n bytes forward in the decompressed output. | ||
654 | // For larger skips this consumes less CPU and is faster than reading output and discarding it. | ||
655 | // CRC is not checked on skipped blocks. | ||
656 | // io.ErrUnexpectedEOF is returned if the stream ends before all bytes have been skipped. | ||
657 | // If a decoding error is encountered subsequent calls to Read will also fail. | ||
658 | func (r *Reader) Skip(n int64) error { | ||
659 | if n < 0 { | ||
660 | return errors.New("attempted negative skip") | ||
661 | } | ||
662 | if r.err != nil { | ||
663 | return r.err | ||
664 | } | ||
665 | |||
666 | for n > 0 { | ||
667 | if r.i < r.j { | ||
668 | // Skip in buffer. | ||
669 | // decoded[i:j] contains decoded bytes that have not yet been passed on. | ||
670 | left := int64(r.j - r.i) | ||
671 | if left >= n { | ||
672 | tmp := int64(r.i) + n | ||
673 | if tmp > math.MaxInt32 { | ||
674 | return errors.New("s2: internal overflow in skip") | ||
675 | } | ||
676 | r.i = int(tmp) | ||
677 | return nil | ||
678 | } | ||
679 | n -= int64(r.j - r.i) | ||
680 | r.i = r.j | ||
681 | } | ||
682 | |||
683 | // Buffer empty; read blocks until we have content. | ||
684 | if !r.readFull(r.buf[:4], true) { | ||
685 | if r.err == io.EOF { | ||
686 | r.err = io.ErrUnexpectedEOF | ||
687 | } | ||
688 | return r.err | ||
689 | } | ||
690 | chunkType := r.buf[0] | ||
691 | if !r.readHeader { | ||
692 | if chunkType != chunkTypeStreamIdentifier { | ||
693 | r.err = ErrCorrupt | ||
694 | return r.err | ||
695 | } | ||
696 | r.readHeader = true | ||
697 | } | ||
698 | chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16 | ||
699 | |||
700 | // The chunk types are specified at | ||
701 | // https://github.com/google/snappy/blob/master/framing_format.txt | ||
702 | switch chunkType { | ||
703 | case chunkTypeCompressedData: | ||
704 | r.blockStart += int64(r.j) | ||
705 | // Section 4.2. Compressed data (chunk type 0x00). | ||
706 | if chunkLen < checksumSize { | ||
707 | r.err = ErrCorrupt | ||
708 | return r.err | ||
709 | } | ||
710 | if !r.ensureBufferSize(chunkLen) { | ||
711 | if r.err == nil { | ||
712 | r.err = ErrUnsupported | ||
713 | } | ||
714 | return r.err | ||
715 | } | ||
716 | buf := r.buf[:chunkLen] | ||
717 | if !r.readFull(buf, false) { | ||
718 | return r.err | ||
719 | } | ||
720 | checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24 | ||
721 | buf = buf[checksumSize:] | ||
722 | |||
723 | dLen, err := DecodedLen(buf) | ||
724 | if err != nil { | ||
725 | r.err = err | ||
726 | return r.err | ||
727 | } | ||
728 | if dLen > r.maxBlock { | ||
729 | r.err = ErrCorrupt | ||
730 | return r.err | ||
731 | } | ||
732 | // Check if destination is within this block | ||
733 | if int64(dLen) > n { | ||
734 | if len(r.decoded) < dLen { | ||
735 | r.decoded = make([]byte, dLen) | ||
736 | } | ||
737 | if _, err := Decode(r.decoded, buf); err != nil { | ||
738 | r.err = err | ||
739 | return r.err | ||
740 | } | ||
741 | if crc(r.decoded[:dLen]) != checksum { | ||
742 | r.err = ErrCorrupt | ||
743 | return r.err | ||
744 | } | ||
745 | } else { | ||
746 | // Skip block completely | ||
747 | n -= int64(dLen) | ||
748 | r.blockStart += int64(dLen) | ||
749 | dLen = 0 | ||
750 | } | ||
751 | r.i, r.j = 0, dLen | ||
752 | continue | ||
753 | case chunkTypeUncompressedData: | ||
754 | r.blockStart += int64(r.j) | ||
755 | // Section 4.3. Uncompressed data (chunk type 0x01). | ||
756 | if chunkLen < checksumSize { | ||
757 | r.err = ErrCorrupt | ||
758 | return r.err | ||
759 | } | ||
760 | if !r.ensureBufferSize(chunkLen) { | ||
761 | if r.err != nil { | ||
762 | r.err = ErrUnsupported | ||
763 | } | ||
764 | return r.err | ||
765 | } | ||
766 | buf := r.buf[:checksumSize] | ||
767 | if !r.readFull(buf, false) { | ||
768 | return r.err | ||
769 | } | ||
770 | checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24 | ||
771 | // Read directly into r.decoded instead of via r.buf. | ||
772 | n2 := chunkLen - checksumSize | ||
773 | if n2 > len(r.decoded) { | ||
774 | if n2 > r.maxBlock { | ||
775 | r.err = ErrCorrupt | ||
776 | return r.err | ||
777 | } | ||
778 | r.decoded = make([]byte, n2) | ||
779 | } | ||
780 | if !r.readFull(r.decoded[:n2], false) { | ||
781 | return r.err | ||
782 | } | ||
783 | if int64(n2) < n { | ||
784 | if crc(r.decoded[:n2]) != checksum { | ||
785 | r.err = ErrCorrupt | ||
786 | return r.err | ||
787 | } | ||
788 | } | ||
789 | r.i, r.j = 0, n2 | ||
790 | continue | ||
791 | case chunkTypeStreamIdentifier: | ||
792 | // Section 4.1. Stream identifier (chunk type 0xff). | ||
793 | if chunkLen != len(magicBody) { | ||
794 | r.err = ErrCorrupt | ||
795 | return r.err | ||
796 | } | ||
797 | if !r.readFull(r.buf[:len(magicBody)], false) { | ||
798 | return r.err | ||
799 | } | ||
800 | if string(r.buf[:len(magicBody)]) != magicBody { | ||
801 | if string(r.buf[:len(magicBody)]) != magicBodySnappy { | ||
802 | r.err = ErrCorrupt | ||
803 | return r.err | ||
804 | } | ||
805 | } | ||
806 | |||
807 | continue | ||
808 | } | ||
809 | |||
810 | if chunkType <= 0x7f { | ||
811 | // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f). | ||
812 | r.err = ErrUnsupported | ||
813 | return r.err | ||
814 | } | ||
815 | if chunkLen > maxChunkSize { | ||
816 | r.err = ErrUnsupported | ||
817 | return r.err | ||
818 | } | ||
819 | // Section 4.4 Padding (chunk type 0xfe). | ||
820 | // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd). | ||
821 | if !r.skippable(r.buf, chunkLen, false, chunkType) { | ||
822 | return r.err | ||
823 | } | ||
824 | } | ||
825 | return nil | ||
826 | } | ||
827 | |||
828 | // ReadSeeker provides random or forward seeking in compressed content. | ||
829 | // See Reader.ReadSeeker | ||
830 | type ReadSeeker struct { | ||
831 | *Reader | ||
832 | readAtMu sync.Mutex | ||
833 | } | ||
834 | |||
835 | // ReadSeeker will return an io.ReadSeeker and io.ReaderAt | ||
836 | // compatible version of the reader. | ||
837 | // If 'random' is specified the returned io.Seeker can be used for | ||
838 | // random seeking, otherwise only forward seeking is supported. | ||
839 | // Enabling random seeking requires the original input to support | ||
840 | // the io.Seeker interface. | ||
841 | // A custom index can be specified which will be used if supplied. | ||
842 | // When using a custom index, it will not be read from the input stream. | ||
843 | // The ReadAt position will affect regular reads and the current position of Seek. | ||
844 | // So using Read after ReadAt will continue from where the ReadAt stopped. | ||
845 | // No functions should be used concurrently. | ||
846 | // The returned ReadSeeker contains a shallow reference to the existing Reader, | ||
847 | // meaning changes performed to one is reflected in the other. | ||
848 | func (r *Reader) ReadSeeker(random bool, index []byte) (*ReadSeeker, error) { | ||
849 | // Read index if provided. | ||
850 | if len(index) != 0 { | ||
851 | if r.index == nil { | ||
852 | r.index = &Index{} | ||
853 | } | ||
854 | if _, err := r.index.Load(index); err != nil { | ||
855 | return nil, ErrCantSeek{Reason: "loading index returned: " + err.Error()} | ||
856 | } | ||
857 | } | ||
858 | |||
859 | // Check if input is seekable | ||
860 | rs, ok := r.r.(io.ReadSeeker) | ||
861 | if !ok { | ||
862 | if !random { | ||
863 | return &ReadSeeker{Reader: r}, nil | ||
864 | } | ||
865 | return nil, ErrCantSeek{Reason: "input stream isn't seekable"} | ||
866 | } | ||
867 | |||
868 | if r.index != nil { | ||
869 | // Seekable and index, ok... | ||
870 | return &ReadSeeker{Reader: r}, nil | ||
871 | } | ||
872 | |||
873 | // Load from stream. | ||
874 | r.index = &Index{} | ||
875 | |||
876 | // Read current position. | ||
877 | pos, err := rs.Seek(0, io.SeekCurrent) | ||
878 | if err != nil { | ||
879 | return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()} | ||
880 | } | ||
881 | err = r.index.LoadStream(rs) | ||
882 | if err != nil { | ||
883 | if err == ErrUnsupported { | ||
884 | // If we don't require random seeking, reset input and return. | ||
885 | if !random { | ||
886 | _, err = rs.Seek(pos, io.SeekStart) | ||
887 | if err != nil { | ||
888 | return nil, ErrCantSeek{Reason: "resetting stream returned: " + err.Error()} | ||
889 | } | ||
890 | r.index = nil | ||
891 | return &ReadSeeker{Reader: r}, nil | ||
892 | } | ||
893 | return nil, ErrCantSeek{Reason: "input stream does not contain an index"} | ||
894 | } | ||
895 | return nil, ErrCantSeek{Reason: "reading index returned: " + err.Error()} | ||
896 | } | ||
897 | |||
898 | // reset position. | ||
899 | _, err = rs.Seek(pos, io.SeekStart) | ||
900 | if err != nil { | ||
901 | return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()} | ||
902 | } | ||
903 | return &ReadSeeker{Reader: r}, nil | ||
904 | } | ||
905 | |||
906 | // Seek allows seeking in compressed data. | ||
907 | func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) { | ||
908 | if r.err != nil { | ||
909 | if !errors.Is(r.err, io.EOF) { | ||
910 | return 0, r.err | ||
911 | } | ||
912 | // Reset on EOF | ||
913 | r.err = nil | ||
914 | } | ||
915 | |||
916 | // Calculate absolute offset. | ||
917 | absOffset := offset | ||
918 | |||
919 | switch whence { | ||
920 | case io.SeekStart: | ||
921 | case io.SeekCurrent: | ||
922 | absOffset = r.blockStart + int64(r.i) + offset | ||
923 | case io.SeekEnd: | ||
924 | if r.index == nil { | ||
925 | return 0, ErrUnsupported | ||
926 | } | ||
927 | absOffset = r.index.TotalUncompressed + offset | ||
928 | default: | ||
929 | r.err = ErrUnsupported | ||
930 | return 0, r.err | ||
931 | } | ||
932 | |||
933 | if absOffset < 0 { | ||
934 | return 0, errors.New("seek before start of file") | ||
935 | } | ||
936 | |||
937 | if !r.readHeader { | ||
938 | // Make sure we read the header. | ||
939 | _, r.err = r.Read([]byte{}) | ||
940 | if r.err != nil { | ||
941 | return 0, r.err | ||
942 | } | ||
943 | } | ||
944 | |||
945 | // If we are inside current block no need to seek. | ||
946 | // This includes no offset changes. | ||
947 | if absOffset >= r.blockStart && absOffset < r.blockStart+int64(r.j) { | ||
948 | r.i = int(absOffset - r.blockStart) | ||
949 | return r.blockStart + int64(r.i), nil | ||
950 | } | ||
951 | |||
952 | rs, ok := r.r.(io.ReadSeeker) | ||
953 | if r.index == nil || !ok { | ||
954 | currOffset := r.blockStart + int64(r.i) | ||
955 | if absOffset >= currOffset { | ||
956 | err := r.Skip(absOffset - currOffset) | ||
957 | return r.blockStart + int64(r.i), err | ||
958 | } | ||
959 | return 0, ErrUnsupported | ||
960 | } | ||
961 | |||
962 | // We can seek and we have an index. | ||
963 | c, u, err := r.index.Find(absOffset) | ||
964 | if err != nil { | ||
965 | return r.blockStart + int64(r.i), err | ||
966 | } | ||
967 | |||
968 | // Seek to next block | ||
969 | _, err = rs.Seek(c, io.SeekStart) | ||
970 | if err != nil { | ||
971 | return 0, err | ||
972 | } | ||
973 | |||
974 | r.i = r.j // Remove rest of current block. | ||
975 | r.blockStart = u - int64(r.j) // Adjust current block start for accounting. | ||
976 | if u < absOffset { | ||
977 | // Forward inside block | ||
978 | return absOffset, r.Skip(absOffset - u) | ||
979 | } | ||
980 | if u > absOffset { | ||
981 | return 0, fmt.Errorf("s2 seek: (internal error) u (%d) > absOffset (%d)", u, absOffset) | ||
982 | } | ||
983 | return absOffset, nil | ||
984 | } | ||
985 | |||
986 | // ReadAt reads len(p) bytes into p starting at offset off in the | ||
987 | // underlying input source. It returns the number of bytes | ||
988 | // read (0 <= n <= len(p)) and any error encountered. | ||
989 | // | ||
990 | // When ReadAt returns n < len(p), it returns a non-nil error | ||
991 | // explaining why more bytes were not returned. In this respect, | ||
992 | // ReadAt is stricter than Read. | ||
993 | // | ||
994 | // Even if ReadAt returns n < len(p), it may use all of p as scratch | ||
995 | // space during the call. If some data is available but not len(p) bytes, | ||
996 | // ReadAt blocks until either all the data is available or an error occurs. | ||
997 | // In this respect ReadAt is different from Read. | ||
998 | // | ||
999 | // If the n = len(p) bytes returned by ReadAt are at the end of the | ||
1000 | // input source, ReadAt may return either err == EOF or err == nil. | ||
1001 | // | ||
1002 | // If ReadAt is reading from an input source with a seek offset, | ||
1003 | // ReadAt should not affect nor be affected by the underlying | ||
1004 | // seek offset. | ||
1005 | // | ||
1006 | // Clients of ReadAt can execute parallel ReadAt calls on the | ||
1007 | // same input source. This is however not recommended. | ||
1008 | func (r *ReadSeeker) ReadAt(p []byte, offset int64) (int, error) { | ||
1009 | r.readAtMu.Lock() | ||
1010 | defer r.readAtMu.Unlock() | ||
1011 | _, err := r.Seek(offset, io.SeekStart) | ||
1012 | if err != nil { | ||
1013 | return 0, err | ||
1014 | } | ||
1015 | n := 0 | ||
1016 | for n < len(p) { | ||
1017 | n2, err := r.Read(p[n:]) | ||
1018 | if err != nil { | ||
1019 | // This will include io.EOF | ||
1020 | return n + n2, err | ||
1021 | } | ||
1022 | n += n2 | ||
1023 | } | ||
1024 | return n, nil | ||
1025 | } | ||
1026 | |||
1027 | // ReadByte satisfies the io.ByteReader interface. | ||
1028 | func (r *Reader) ReadByte() (byte, error) { | ||
1029 | if r.err != nil { | ||
1030 | return 0, r.err | ||
1031 | } | ||
1032 | if r.i < r.j { | ||
1033 | c := r.decoded[r.i] | ||
1034 | r.i++ | ||
1035 | return c, nil | ||
1036 | } | ||
1037 | var tmp [1]byte | ||
1038 | for i := 0; i < 10; i++ { | ||
1039 | n, err := r.Read(tmp[:]) | ||
1040 | if err != nil { | ||
1041 | return 0, err | ||
1042 | } | ||
1043 | if n == 1 { | ||
1044 | return tmp[0], nil | ||
1045 | } | ||
1046 | } | ||
1047 | return 0, io.ErrNoProgress | ||
1048 | } | ||
1049 | |||
1050 | // SkippableCB will register a callback for chunks with the specified ID. | ||
1051 | // ID must be a Reserved skippable chunks ID, 0x80-0xfe (inclusive). | ||
1052 | // For each chunk with the ID, the callback is called with the content. | ||
1053 | // Any returned non-nil error will abort decompression. | ||
1054 | // Only one callback per ID is supported, latest sent will be used. | ||
1055 | // Sending a nil function will disable previous callbacks. | ||
1056 | func (r *Reader) SkippableCB(id uint8, fn func(r io.Reader) error) error { | ||
1057 | if id < 0x80 || id > chunkTypePadding { | ||
1058 | return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfe (inclusive)") | ||
1059 | } | ||
1060 | r.skippableCB[id] = fn | ||
1061 | return nil | ||
1062 | } | ||