aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/klauspost/compress/s2/reader.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/klauspost/compress/s2/reader.go')
-rw-r--r--vendor/github.com/klauspost/compress/s2/reader.go1062
1 files changed, 1062 insertions, 0 deletions
diff --git a/vendor/github.com/klauspost/compress/s2/reader.go b/vendor/github.com/klauspost/compress/s2/reader.go
new file mode 100644
index 0000000..2f01a39
--- /dev/null
+++ b/vendor/github.com/klauspost/compress/s2/reader.go
@@ -0,0 +1,1062 @@
1// Copyright 2011 The Snappy-Go Authors. All rights reserved.
2// Copyright (c) 2019+ Klaus Post. All rights reserved.
3// Use of this source code is governed by a BSD-style
4// license that can be found in the LICENSE file.
5
6package s2
7
8import (
9 "errors"
10 "fmt"
11 "io"
12 "io/ioutil"
13 "math"
14 "runtime"
15 "sync"
16)
17
18// ErrCantSeek is returned if the stream cannot be seeked.
19type ErrCantSeek struct {
20 Reason string
21}
22
23// Error returns the error as string.
24func (e ErrCantSeek) Error() string {
25 return fmt.Sprintf("s2: Can't seek because %s", e.Reason)
26}
27
28// NewReader returns a new Reader that decompresses from r, using the framing
29// format described at
30// https://github.com/google/snappy/blob/master/framing_format.txt with S2 changes.
31func NewReader(r io.Reader, opts ...ReaderOption) *Reader {
32 nr := Reader{
33 r: r,
34 maxBlock: maxBlockSize,
35 }
36 for _, opt := range opts {
37 if err := opt(&nr); err != nil {
38 nr.err = err
39 return &nr
40 }
41 }
42 nr.maxBufSize = MaxEncodedLen(nr.maxBlock) + checksumSize
43 if nr.lazyBuf > 0 {
44 nr.buf = make([]byte, MaxEncodedLen(nr.lazyBuf)+checksumSize)
45 } else {
46 nr.buf = make([]byte, MaxEncodedLen(defaultBlockSize)+checksumSize)
47 }
48 nr.readHeader = nr.ignoreStreamID
49 nr.paramsOK = true
50 return &nr
51}
52
53// ReaderOption is an option for creating a decoder.
54type ReaderOption func(*Reader) error
55
56// ReaderMaxBlockSize allows to control allocations if the stream
57// has been compressed with a smaller WriterBlockSize, or with the default 1MB.
58// Blocks must be this size or smaller to decompress,
59// otherwise the decoder will return ErrUnsupported.
60//
61// For streams compressed with Snappy this can safely be set to 64KB (64 << 10).
62//
63// Default is the maximum limit of 4MB.
64func ReaderMaxBlockSize(blockSize int) ReaderOption {
65 return func(r *Reader) error {
66 if blockSize > maxBlockSize || blockSize <= 0 {
67 return errors.New("s2: block size too large. Must be <= 4MB and > 0")
68 }
69 if r.lazyBuf == 0 && blockSize < defaultBlockSize {
70 r.lazyBuf = blockSize
71 }
72 r.maxBlock = blockSize
73 return nil
74 }
75}
76
77// ReaderAllocBlock allows to control upfront stream allocations
78// and not allocate for frames bigger than this initially.
79// If frames bigger than this is seen a bigger buffer will be allocated.
80//
81// Default is 1MB, which is default output size.
82func ReaderAllocBlock(blockSize int) ReaderOption {
83 return func(r *Reader) error {
84 if blockSize > maxBlockSize || blockSize < 1024 {
85 return errors.New("s2: invalid ReaderAllocBlock. Must be <= 4MB and >= 1024")
86 }
87 r.lazyBuf = blockSize
88 return nil
89 }
90}
91
92// ReaderIgnoreStreamIdentifier will make the reader skip the expected
93// stream identifier at the beginning of the stream.
94// This can be used when serving a stream that has been forwarded to a specific point.
95func ReaderIgnoreStreamIdentifier() ReaderOption {
96 return func(r *Reader) error {
97 r.ignoreStreamID = true
98 return nil
99 }
100}
101
102// ReaderSkippableCB will register a callback for chuncks with the specified ID.
103// ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive).
104// For each chunk with the ID, the callback is called with the content.
105// Any returned non-nil error will abort decompression.
106// Only one callback per ID is supported, latest sent will be used.
107func ReaderSkippableCB(id uint8, fn func(r io.Reader) error) ReaderOption {
108 return func(r *Reader) error {
109 if id < 0x80 || id > 0xfd {
110 return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfd (inclusive)")
111 }
112 r.skippableCB[id] = fn
113 return nil
114 }
115}
116
117// ReaderIgnoreCRC will make the reader skip CRC calculation and checks.
118func ReaderIgnoreCRC() ReaderOption {
119 return func(r *Reader) error {
120 r.ignoreCRC = true
121 return nil
122 }
123}
124
125// Reader is an io.Reader that can read Snappy-compressed bytes.
126type Reader struct {
127 r io.Reader
128 err error
129 decoded []byte
130 buf []byte
131 skippableCB [0x80]func(r io.Reader) error
132 blockStart int64 // Uncompressed offset at start of current.
133 index *Index
134
135 // decoded[i:j] contains decoded bytes that have not yet been passed on.
136 i, j int
137 // maximum block size allowed.
138 maxBlock int
139 // maximum expected buffer size.
140 maxBufSize int
141 // alloc a buffer this size if > 0.
142 lazyBuf int
143 readHeader bool
144 paramsOK bool
145 snappyFrame bool
146 ignoreStreamID bool
147 ignoreCRC bool
148}
149
150// GetBufferCapacity returns the capacity of the internal buffer.
151// This might be useful to know when reusing the same reader in combination
152// with the lazy buffer option.
153func (r *Reader) GetBufferCapacity() int {
154 return cap(r.buf)
155}
156
157// ensureBufferSize will ensure that the buffer can take at least n bytes.
158// If false is returned the buffer exceeds maximum allowed size.
159func (r *Reader) ensureBufferSize(n int) bool {
160 if n > r.maxBufSize {
161 r.err = ErrCorrupt
162 return false
163 }
164 if cap(r.buf) >= n {
165 return true
166 }
167 // Realloc buffer.
168 r.buf = make([]byte, n)
169 return true
170}
171
172// Reset discards any buffered data, resets all state, and switches the Snappy
173// reader to read from r. This permits reusing a Reader rather than allocating
174// a new one.
175func (r *Reader) Reset(reader io.Reader) {
176 if !r.paramsOK {
177 return
178 }
179 r.index = nil
180 r.r = reader
181 r.err = nil
182 r.i = 0
183 r.j = 0
184 r.blockStart = 0
185 r.readHeader = r.ignoreStreamID
186}
187
188func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) {
189 if _, r.err = io.ReadFull(r.r, p); r.err != nil {
190 if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
191 r.err = ErrCorrupt
192 }
193 return false
194 }
195 return true
196}
197
198// skippable will skip n bytes.
199// If the supplied reader supports seeking that is used.
200// tmp is used as a temporary buffer for reading.
201// The supplied slice does not need to be the size of the read.
202func (r *Reader) skippable(tmp []byte, n int, allowEOF bool, id uint8) (ok bool) {
203 if id < 0x80 {
204 r.err = fmt.Errorf("interbal error: skippable id < 0x80")
205 return false
206 }
207 if fn := r.skippableCB[id-0x80]; fn != nil {
208 rd := io.LimitReader(r.r, int64(n))
209 r.err = fn(rd)
210 if r.err != nil {
211 return false
212 }
213 _, r.err = io.CopyBuffer(ioutil.Discard, rd, tmp)
214 return r.err == nil
215 }
216 if rs, ok := r.r.(io.ReadSeeker); ok {
217 _, err := rs.Seek(int64(n), io.SeekCurrent)
218 if err == nil {
219 return true
220 }
221 if err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
222 r.err = ErrCorrupt
223 return false
224 }
225 }
226 for n > 0 {
227 if n < len(tmp) {
228 tmp = tmp[:n]
229 }
230 if _, r.err = io.ReadFull(r.r, tmp); r.err != nil {
231 if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
232 r.err = ErrCorrupt
233 }
234 return false
235 }
236 n -= len(tmp)
237 }
238 return true
239}
240
241// Read satisfies the io.Reader interface.
242func (r *Reader) Read(p []byte) (int, error) {
243 if r.err != nil {
244 return 0, r.err
245 }
246 for {
247 if r.i < r.j {
248 n := copy(p, r.decoded[r.i:r.j])
249 r.i += n
250 return n, nil
251 }
252 if !r.readFull(r.buf[:4], true) {
253 return 0, r.err
254 }
255 chunkType := r.buf[0]
256 if !r.readHeader {
257 if chunkType != chunkTypeStreamIdentifier {
258 r.err = ErrCorrupt
259 return 0, r.err
260 }
261 r.readHeader = true
262 }
263 chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
264
265 // The chunk types are specified at
266 // https://github.com/google/snappy/blob/master/framing_format.txt
267 switch chunkType {
268 case chunkTypeCompressedData:
269 r.blockStart += int64(r.j)
270 // Section 4.2. Compressed data (chunk type 0x00).
271 if chunkLen < checksumSize {
272 r.err = ErrCorrupt
273 return 0, r.err
274 }
275 if !r.ensureBufferSize(chunkLen) {
276 if r.err == nil {
277 r.err = ErrUnsupported
278 }
279 return 0, r.err
280 }
281 buf := r.buf[:chunkLen]
282 if !r.readFull(buf, false) {
283 return 0, r.err
284 }
285 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
286 buf = buf[checksumSize:]
287
288 n, err := DecodedLen(buf)
289 if err != nil {
290 r.err = err
291 return 0, r.err
292 }
293 if r.snappyFrame && n > maxSnappyBlockSize {
294 r.err = ErrCorrupt
295 return 0, r.err
296 }
297
298 if n > len(r.decoded) {
299 if n > r.maxBlock {
300 r.err = ErrCorrupt
301 return 0, r.err
302 }
303 r.decoded = make([]byte, n)
304 }
305 if _, err := Decode(r.decoded, buf); err != nil {
306 r.err = err
307 return 0, r.err
308 }
309 if !r.ignoreCRC && crc(r.decoded[:n]) != checksum {
310 r.err = ErrCRC
311 return 0, r.err
312 }
313 r.i, r.j = 0, n
314 continue
315
316 case chunkTypeUncompressedData:
317 r.blockStart += int64(r.j)
318 // Section 4.3. Uncompressed data (chunk type 0x01).
319 if chunkLen < checksumSize {
320 r.err = ErrCorrupt
321 return 0, r.err
322 }
323 if !r.ensureBufferSize(chunkLen) {
324 if r.err == nil {
325 r.err = ErrUnsupported
326 }
327 return 0, r.err
328 }
329 buf := r.buf[:checksumSize]
330 if !r.readFull(buf, false) {
331 return 0, r.err
332 }
333 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
334 // Read directly into r.decoded instead of via r.buf.
335 n := chunkLen - checksumSize
336 if r.snappyFrame && n > maxSnappyBlockSize {
337 r.err = ErrCorrupt
338 return 0, r.err
339 }
340 if n > len(r.decoded) {
341 if n > r.maxBlock {
342 r.err = ErrCorrupt
343 return 0, r.err
344 }
345 r.decoded = make([]byte, n)
346 }
347 if !r.readFull(r.decoded[:n], false) {
348 return 0, r.err
349 }
350 if !r.ignoreCRC && crc(r.decoded[:n]) != checksum {
351 r.err = ErrCRC
352 return 0, r.err
353 }
354 r.i, r.j = 0, n
355 continue
356
357 case chunkTypeStreamIdentifier:
358 // Section 4.1. Stream identifier (chunk type 0xff).
359 if chunkLen != len(magicBody) {
360 r.err = ErrCorrupt
361 return 0, r.err
362 }
363 if !r.readFull(r.buf[:len(magicBody)], false) {
364 return 0, r.err
365 }
366 if string(r.buf[:len(magicBody)]) != magicBody {
367 if string(r.buf[:len(magicBody)]) != magicBodySnappy {
368 r.err = ErrCorrupt
369 return 0, r.err
370 } else {
371 r.snappyFrame = true
372 }
373 } else {
374 r.snappyFrame = false
375 }
376 continue
377 }
378
379 if chunkType <= 0x7f {
380 // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
381 // fmt.Printf("ERR chunktype: 0x%x\n", chunkType)
382 r.err = ErrUnsupported
383 return 0, r.err
384 }
385 // Section 4.4 Padding (chunk type 0xfe).
386 // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
387 if chunkLen > maxChunkSize {
388 // fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen)
389 r.err = ErrUnsupported
390 return 0, r.err
391 }
392
393 // fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen)
394 if !r.skippable(r.buf, chunkLen, false, chunkType) {
395 return 0, r.err
396 }
397 }
398}
399
400// DecodeConcurrent will decode the full stream to w.
401// This function should not be combined with reading, seeking or other operations.
402// Up to 'concurrent' goroutines will be used.
403// If <= 0, runtime.NumCPU will be used.
404// On success the number of bytes decompressed nil and is returned.
405// This is mainly intended for bigger streams.
406func (r *Reader) DecodeConcurrent(w io.Writer, concurrent int) (written int64, err error) {
407 if r.i > 0 || r.j > 0 || r.blockStart > 0 {
408 return 0, errors.New("DecodeConcurrent called after ")
409 }
410 if concurrent <= 0 {
411 concurrent = runtime.NumCPU()
412 }
413
414 // Write to output
415 var errMu sync.Mutex
416 var aErr error
417 setErr := func(e error) (ok bool) {
418 errMu.Lock()
419 defer errMu.Unlock()
420 if e == nil {
421 return aErr == nil
422 }
423 if aErr == nil {
424 aErr = e
425 }
426 return false
427 }
428 hasErr := func() (ok bool) {
429 errMu.Lock()
430 v := aErr != nil
431 errMu.Unlock()
432 return v
433 }
434
435 var aWritten int64
436 toRead := make(chan []byte, concurrent)
437 writtenBlocks := make(chan []byte, concurrent)
438 queue := make(chan chan []byte, concurrent)
439 reUse := make(chan chan []byte, concurrent)
440 for i := 0; i < concurrent; i++ {
441 toRead <- make([]byte, 0, r.maxBufSize)
442 writtenBlocks <- make([]byte, 0, r.maxBufSize)
443 reUse <- make(chan []byte, 1)
444 }
445 // Writer
446 var wg sync.WaitGroup
447 wg.Add(1)
448 go func() {
449 defer wg.Done()
450 for toWrite := range queue {
451 entry := <-toWrite
452 reUse <- toWrite
453 if hasErr() {
454 writtenBlocks <- entry
455 continue
456 }
457 n, err := w.Write(entry)
458 want := len(entry)
459 writtenBlocks <- entry
460 if err != nil {
461 setErr(err)
462 continue
463 }
464 if n != want {
465 setErr(io.ErrShortWrite)
466 continue
467 }
468 aWritten += int64(n)
469 }
470 }()
471
472 // Reader
473 defer func() {
474 close(queue)
475 if r.err != nil {
476 err = r.err
477 setErr(r.err)
478 }
479 wg.Wait()
480 if err == nil {
481 err = aErr
482 }
483 written = aWritten
484 }()
485
486 for !hasErr() {
487 if !r.readFull(r.buf[:4], true) {
488 if r.err == io.EOF {
489 r.err = nil
490 }
491 return 0, r.err
492 }
493 chunkType := r.buf[0]
494 if !r.readHeader {
495 if chunkType != chunkTypeStreamIdentifier {
496 r.err = ErrCorrupt
497 return 0, r.err
498 }
499 r.readHeader = true
500 }
501 chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
502
503 // The chunk types are specified at
504 // https://github.com/google/snappy/blob/master/framing_format.txt
505 switch chunkType {
506 case chunkTypeCompressedData:
507 r.blockStart += int64(r.j)
508 // Section 4.2. Compressed data (chunk type 0x00).
509 if chunkLen < checksumSize {
510 r.err = ErrCorrupt
511 return 0, r.err
512 }
513 if chunkLen > r.maxBufSize {
514 r.err = ErrCorrupt
515 return 0, r.err
516 }
517 orgBuf := <-toRead
518 buf := orgBuf[:chunkLen]
519
520 if !r.readFull(buf, false) {
521 return 0, r.err
522 }
523
524 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
525 buf = buf[checksumSize:]
526
527 n, err := DecodedLen(buf)
528 if err != nil {
529 r.err = err
530 return 0, r.err
531 }
532 if r.snappyFrame && n > maxSnappyBlockSize {
533 r.err = ErrCorrupt
534 return 0, r.err
535 }
536
537 if n > r.maxBlock {
538 r.err = ErrCorrupt
539 return 0, r.err
540 }
541 wg.Add(1)
542
543 decoded := <-writtenBlocks
544 entry := <-reUse
545 queue <- entry
546 go func() {
547 defer wg.Done()
548 decoded = decoded[:n]
549 _, err := Decode(decoded, buf)
550 toRead <- orgBuf
551 if err != nil {
552 writtenBlocks <- decoded
553 setErr(err)
554 return
555 }
556 if !r.ignoreCRC && crc(decoded) != checksum {
557 writtenBlocks <- decoded
558 setErr(ErrCRC)
559 return
560 }
561 entry <- decoded
562 }()
563 continue
564
565 case chunkTypeUncompressedData:
566
567 // Section 4.3. Uncompressed data (chunk type 0x01).
568 if chunkLen < checksumSize {
569 r.err = ErrCorrupt
570 return 0, r.err
571 }
572 if chunkLen > r.maxBufSize {
573 r.err = ErrCorrupt
574 return 0, r.err
575 }
576 // Grab write buffer
577 orgBuf := <-writtenBlocks
578 buf := orgBuf[:checksumSize]
579 if !r.readFull(buf, false) {
580 return 0, r.err
581 }
582 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
583 // Read content.
584 n := chunkLen - checksumSize
585
586 if r.snappyFrame && n > maxSnappyBlockSize {
587 r.err = ErrCorrupt
588 return 0, r.err
589 }
590 if n > r.maxBlock {
591 r.err = ErrCorrupt
592 return 0, r.err
593 }
594 // Read uncompressed
595 buf = orgBuf[:n]
596 if !r.readFull(buf, false) {
597 return 0, r.err
598 }
599
600 if !r.ignoreCRC && crc(buf) != checksum {
601 r.err = ErrCRC
602 return 0, r.err
603 }
604 entry := <-reUse
605 queue <- entry
606 entry <- buf
607 continue
608
609 case chunkTypeStreamIdentifier:
610 // Section 4.1. Stream identifier (chunk type 0xff).
611 if chunkLen != len(magicBody) {
612 r.err = ErrCorrupt
613 return 0, r.err
614 }
615 if !r.readFull(r.buf[:len(magicBody)], false) {
616 return 0, r.err
617 }
618 if string(r.buf[:len(magicBody)]) != magicBody {
619 if string(r.buf[:len(magicBody)]) != magicBodySnappy {
620 r.err = ErrCorrupt
621 return 0, r.err
622 } else {
623 r.snappyFrame = true
624 }
625 } else {
626 r.snappyFrame = false
627 }
628 continue
629 }
630
631 if chunkType <= 0x7f {
632 // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
633 // fmt.Printf("ERR chunktype: 0x%x\n", chunkType)
634 r.err = ErrUnsupported
635 return 0, r.err
636 }
637 // Section 4.4 Padding (chunk type 0xfe).
638 // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
639 if chunkLen > maxChunkSize {
640 // fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen)
641 r.err = ErrUnsupported
642 return 0, r.err
643 }
644
645 // fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen)
646 if !r.skippable(r.buf, chunkLen, false, chunkType) {
647 return 0, r.err
648 }
649 }
650 return 0, r.err
651}
652
653// Skip will skip n bytes forward in the decompressed output.
654// For larger skips this consumes less CPU and is faster than reading output and discarding it.
655// CRC is not checked on skipped blocks.
656// io.ErrUnexpectedEOF is returned if the stream ends before all bytes have been skipped.
657// If a decoding error is encountered subsequent calls to Read will also fail.
658func (r *Reader) Skip(n int64) error {
659 if n < 0 {
660 return errors.New("attempted negative skip")
661 }
662 if r.err != nil {
663 return r.err
664 }
665
666 for n > 0 {
667 if r.i < r.j {
668 // Skip in buffer.
669 // decoded[i:j] contains decoded bytes that have not yet been passed on.
670 left := int64(r.j - r.i)
671 if left >= n {
672 tmp := int64(r.i) + n
673 if tmp > math.MaxInt32 {
674 return errors.New("s2: internal overflow in skip")
675 }
676 r.i = int(tmp)
677 return nil
678 }
679 n -= int64(r.j - r.i)
680 r.i = r.j
681 }
682
683 // Buffer empty; read blocks until we have content.
684 if !r.readFull(r.buf[:4], true) {
685 if r.err == io.EOF {
686 r.err = io.ErrUnexpectedEOF
687 }
688 return r.err
689 }
690 chunkType := r.buf[0]
691 if !r.readHeader {
692 if chunkType != chunkTypeStreamIdentifier {
693 r.err = ErrCorrupt
694 return r.err
695 }
696 r.readHeader = true
697 }
698 chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
699
700 // The chunk types are specified at
701 // https://github.com/google/snappy/blob/master/framing_format.txt
702 switch chunkType {
703 case chunkTypeCompressedData:
704 r.blockStart += int64(r.j)
705 // Section 4.2. Compressed data (chunk type 0x00).
706 if chunkLen < checksumSize {
707 r.err = ErrCorrupt
708 return r.err
709 }
710 if !r.ensureBufferSize(chunkLen) {
711 if r.err == nil {
712 r.err = ErrUnsupported
713 }
714 return r.err
715 }
716 buf := r.buf[:chunkLen]
717 if !r.readFull(buf, false) {
718 return r.err
719 }
720 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
721 buf = buf[checksumSize:]
722
723 dLen, err := DecodedLen(buf)
724 if err != nil {
725 r.err = err
726 return r.err
727 }
728 if dLen > r.maxBlock {
729 r.err = ErrCorrupt
730 return r.err
731 }
732 // Check if destination is within this block
733 if int64(dLen) > n {
734 if len(r.decoded) < dLen {
735 r.decoded = make([]byte, dLen)
736 }
737 if _, err := Decode(r.decoded, buf); err != nil {
738 r.err = err
739 return r.err
740 }
741 if crc(r.decoded[:dLen]) != checksum {
742 r.err = ErrCorrupt
743 return r.err
744 }
745 } else {
746 // Skip block completely
747 n -= int64(dLen)
748 r.blockStart += int64(dLen)
749 dLen = 0
750 }
751 r.i, r.j = 0, dLen
752 continue
753 case chunkTypeUncompressedData:
754 r.blockStart += int64(r.j)
755 // Section 4.3. Uncompressed data (chunk type 0x01).
756 if chunkLen < checksumSize {
757 r.err = ErrCorrupt
758 return r.err
759 }
760 if !r.ensureBufferSize(chunkLen) {
761 if r.err != nil {
762 r.err = ErrUnsupported
763 }
764 return r.err
765 }
766 buf := r.buf[:checksumSize]
767 if !r.readFull(buf, false) {
768 return r.err
769 }
770 checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
771 // Read directly into r.decoded instead of via r.buf.
772 n2 := chunkLen - checksumSize
773 if n2 > len(r.decoded) {
774 if n2 > r.maxBlock {
775 r.err = ErrCorrupt
776 return r.err
777 }
778 r.decoded = make([]byte, n2)
779 }
780 if !r.readFull(r.decoded[:n2], false) {
781 return r.err
782 }
783 if int64(n2) < n {
784 if crc(r.decoded[:n2]) != checksum {
785 r.err = ErrCorrupt
786 return r.err
787 }
788 }
789 r.i, r.j = 0, n2
790 continue
791 case chunkTypeStreamIdentifier:
792 // Section 4.1. Stream identifier (chunk type 0xff).
793 if chunkLen != len(magicBody) {
794 r.err = ErrCorrupt
795 return r.err
796 }
797 if !r.readFull(r.buf[:len(magicBody)], false) {
798 return r.err
799 }
800 if string(r.buf[:len(magicBody)]) != magicBody {
801 if string(r.buf[:len(magicBody)]) != magicBodySnappy {
802 r.err = ErrCorrupt
803 return r.err
804 }
805 }
806
807 continue
808 }
809
810 if chunkType <= 0x7f {
811 // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
812 r.err = ErrUnsupported
813 return r.err
814 }
815 if chunkLen > maxChunkSize {
816 r.err = ErrUnsupported
817 return r.err
818 }
819 // Section 4.4 Padding (chunk type 0xfe).
820 // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
821 if !r.skippable(r.buf, chunkLen, false, chunkType) {
822 return r.err
823 }
824 }
825 return nil
826}
827
828// ReadSeeker provides random or forward seeking in compressed content.
829// See Reader.ReadSeeker
830type ReadSeeker struct {
831 *Reader
832 readAtMu sync.Mutex
833}
834
835// ReadSeeker will return an io.ReadSeeker and io.ReaderAt
836// compatible version of the reader.
837// If 'random' is specified the returned io.Seeker can be used for
838// random seeking, otherwise only forward seeking is supported.
839// Enabling random seeking requires the original input to support
840// the io.Seeker interface.
841// A custom index can be specified which will be used if supplied.
842// When using a custom index, it will not be read from the input stream.
843// The ReadAt position will affect regular reads and the current position of Seek.
844// So using Read after ReadAt will continue from where the ReadAt stopped.
845// No functions should be used concurrently.
846// The returned ReadSeeker contains a shallow reference to the existing Reader,
847// meaning changes performed to one is reflected in the other.
848func (r *Reader) ReadSeeker(random bool, index []byte) (*ReadSeeker, error) {
849 // Read index if provided.
850 if len(index) != 0 {
851 if r.index == nil {
852 r.index = &Index{}
853 }
854 if _, err := r.index.Load(index); err != nil {
855 return nil, ErrCantSeek{Reason: "loading index returned: " + err.Error()}
856 }
857 }
858
859 // Check if input is seekable
860 rs, ok := r.r.(io.ReadSeeker)
861 if !ok {
862 if !random {
863 return &ReadSeeker{Reader: r}, nil
864 }
865 return nil, ErrCantSeek{Reason: "input stream isn't seekable"}
866 }
867
868 if r.index != nil {
869 // Seekable and index, ok...
870 return &ReadSeeker{Reader: r}, nil
871 }
872
873 // Load from stream.
874 r.index = &Index{}
875
876 // Read current position.
877 pos, err := rs.Seek(0, io.SeekCurrent)
878 if err != nil {
879 return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
880 }
881 err = r.index.LoadStream(rs)
882 if err != nil {
883 if err == ErrUnsupported {
884 // If we don't require random seeking, reset input and return.
885 if !random {
886 _, err = rs.Seek(pos, io.SeekStart)
887 if err != nil {
888 return nil, ErrCantSeek{Reason: "resetting stream returned: " + err.Error()}
889 }
890 r.index = nil
891 return &ReadSeeker{Reader: r}, nil
892 }
893 return nil, ErrCantSeek{Reason: "input stream does not contain an index"}
894 }
895 return nil, ErrCantSeek{Reason: "reading index returned: " + err.Error()}
896 }
897
898 // reset position.
899 _, err = rs.Seek(pos, io.SeekStart)
900 if err != nil {
901 return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
902 }
903 return &ReadSeeker{Reader: r}, nil
904}
905
906// Seek allows seeking in compressed data.
907func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) {
908 if r.err != nil {
909 if !errors.Is(r.err, io.EOF) {
910 return 0, r.err
911 }
912 // Reset on EOF
913 r.err = nil
914 }
915
916 // Calculate absolute offset.
917 absOffset := offset
918
919 switch whence {
920 case io.SeekStart:
921 case io.SeekCurrent:
922 absOffset = r.blockStart + int64(r.i) + offset
923 case io.SeekEnd:
924 if r.index == nil {
925 return 0, ErrUnsupported
926 }
927 absOffset = r.index.TotalUncompressed + offset
928 default:
929 r.err = ErrUnsupported
930 return 0, r.err
931 }
932
933 if absOffset < 0 {
934 return 0, errors.New("seek before start of file")
935 }
936
937 if !r.readHeader {
938 // Make sure we read the header.
939 _, r.err = r.Read([]byte{})
940 if r.err != nil {
941 return 0, r.err
942 }
943 }
944
945 // If we are inside current block no need to seek.
946 // This includes no offset changes.
947 if absOffset >= r.blockStart && absOffset < r.blockStart+int64(r.j) {
948 r.i = int(absOffset - r.blockStart)
949 return r.blockStart + int64(r.i), nil
950 }
951
952 rs, ok := r.r.(io.ReadSeeker)
953 if r.index == nil || !ok {
954 currOffset := r.blockStart + int64(r.i)
955 if absOffset >= currOffset {
956 err := r.Skip(absOffset - currOffset)
957 return r.blockStart + int64(r.i), err
958 }
959 return 0, ErrUnsupported
960 }
961
962 // We can seek and we have an index.
963 c, u, err := r.index.Find(absOffset)
964 if err != nil {
965 return r.blockStart + int64(r.i), err
966 }
967
968 // Seek to next block
969 _, err = rs.Seek(c, io.SeekStart)
970 if err != nil {
971 return 0, err
972 }
973
974 r.i = r.j // Remove rest of current block.
975 r.blockStart = u - int64(r.j) // Adjust current block start for accounting.
976 if u < absOffset {
977 // Forward inside block
978 return absOffset, r.Skip(absOffset - u)
979 }
980 if u > absOffset {
981 return 0, fmt.Errorf("s2 seek: (internal error) u (%d) > absOffset (%d)", u, absOffset)
982 }
983 return absOffset, nil
984}
985
986// ReadAt reads len(p) bytes into p starting at offset off in the
987// underlying input source. It returns the number of bytes
988// read (0 <= n <= len(p)) and any error encountered.
989//
990// When ReadAt returns n < len(p), it returns a non-nil error
991// explaining why more bytes were not returned. In this respect,
992// ReadAt is stricter than Read.
993//
994// Even if ReadAt returns n < len(p), it may use all of p as scratch
995// space during the call. If some data is available but not len(p) bytes,
996// ReadAt blocks until either all the data is available or an error occurs.
997// In this respect ReadAt is different from Read.
998//
999// If the n = len(p) bytes returned by ReadAt are at the end of the
1000// input source, ReadAt may return either err == EOF or err == nil.
1001//
1002// If ReadAt is reading from an input source with a seek offset,
1003// ReadAt should not affect nor be affected by the underlying
1004// seek offset.
1005//
1006// Clients of ReadAt can execute parallel ReadAt calls on the
1007// same input source. This is however not recommended.
1008func (r *ReadSeeker) ReadAt(p []byte, offset int64) (int, error) {
1009 r.readAtMu.Lock()
1010 defer r.readAtMu.Unlock()
1011 _, err := r.Seek(offset, io.SeekStart)
1012 if err != nil {
1013 return 0, err
1014 }
1015 n := 0
1016 for n < len(p) {
1017 n2, err := r.Read(p[n:])
1018 if err != nil {
1019 // This will include io.EOF
1020 return n + n2, err
1021 }
1022 n += n2
1023 }
1024 return n, nil
1025}
1026
1027// ReadByte satisfies the io.ByteReader interface.
1028func (r *Reader) ReadByte() (byte, error) {
1029 if r.err != nil {
1030 return 0, r.err
1031 }
1032 if r.i < r.j {
1033 c := r.decoded[r.i]
1034 r.i++
1035 return c, nil
1036 }
1037 var tmp [1]byte
1038 for i := 0; i < 10; i++ {
1039 n, err := r.Read(tmp[:])
1040 if err != nil {
1041 return 0, err
1042 }
1043 if n == 1 {
1044 return tmp[0], nil
1045 }
1046 }
1047 return 0, io.ErrNoProgress
1048}
1049
1050// SkippableCB will register a callback for chunks with the specified ID.
1051// ID must be a Reserved skippable chunks ID, 0x80-0xfe (inclusive).
1052// For each chunk with the ID, the callback is called with the content.
1053// Any returned non-nil error will abort decompression.
1054// Only one callback per ID is supported, latest sent will be used.
1055// Sending a nil function will disable previous callbacks.
1056func (r *Reader) SkippableCB(id uint8, fn func(r io.Reader) error) error {
1057 if id < 0x80 || id > chunkTypePadding {
1058 return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfe (inclusive)")
1059 }
1060 r.skippableCB[id] = fn
1061 return nil
1062}