aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/klauspost/compress/s2/writer.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/klauspost/compress/s2/writer.go')
-rw-r--r--vendor/github.com/klauspost/compress/s2/writer.go1020
1 files changed, 1020 insertions, 0 deletions
diff --git a/vendor/github.com/klauspost/compress/s2/writer.go b/vendor/github.com/klauspost/compress/s2/writer.go
new file mode 100644
index 0000000..089cd36
--- /dev/null
+++ b/vendor/github.com/klauspost/compress/s2/writer.go
@@ -0,0 +1,1020 @@
1// Copyright 2011 The Snappy-Go Authors. All rights reserved.
2// Copyright (c) 2019+ Klaus Post. All rights reserved.
3// Use of this source code is governed by a BSD-style
4// license that can be found in the LICENSE file.
5
6package s2
7
8import (
9 "crypto/rand"
10 "encoding/binary"
11 "errors"
12 "fmt"
13 "io"
14 "runtime"
15 "sync"
16)
17
18const (
19 levelUncompressed = iota + 1
20 levelFast
21 levelBetter
22 levelBest
23)
24
25// NewWriter returns a new Writer that compresses to w, using the
26// framing format described at
27// https://github.com/google/snappy/blob/master/framing_format.txt
28//
29// Users must call Close to guarantee all data has been forwarded to
30// the underlying io.Writer and that resources are released.
31// They may also call Flush zero or more times before calling Close.
32func NewWriter(w io.Writer, opts ...WriterOption) *Writer {
33 w2 := Writer{
34 blockSize: defaultBlockSize,
35 concurrency: runtime.GOMAXPROCS(0),
36 randSrc: rand.Reader,
37 level: levelFast,
38 }
39 for _, opt := range opts {
40 if err := opt(&w2); err != nil {
41 w2.errState = err
42 return &w2
43 }
44 }
45 w2.obufLen = obufHeaderLen + MaxEncodedLen(w2.blockSize)
46 w2.paramsOK = true
47 w2.ibuf = make([]byte, 0, w2.blockSize)
48 w2.buffers.New = func() interface{} {
49 return make([]byte, w2.obufLen)
50 }
51 w2.Reset(w)
52 return &w2
53}
54
55// Writer is an io.Writer that can write Snappy-compressed bytes.
56type Writer struct {
57 errMu sync.Mutex
58 errState error
59
60 // ibuf is a buffer for the incoming (uncompressed) bytes.
61 ibuf []byte
62
63 blockSize int
64 obufLen int
65 concurrency int
66 written int64
67 uncompWritten int64 // Bytes sent to compression
68 output chan chan result
69 buffers sync.Pool
70 pad int
71
72 writer io.Writer
73 randSrc io.Reader
74 writerWg sync.WaitGroup
75 index Index
76 customEnc func(dst, src []byte) int
77
78 // wroteStreamHeader is whether we have written the stream header.
79 wroteStreamHeader bool
80 paramsOK bool
81 snappy bool
82 flushOnWrite bool
83 appendIndex bool
84 level uint8
85}
86
87type result struct {
88 b []byte
89 // Uncompressed start offset
90 startOffset int64
91}
92
93// err returns the previously set error.
94// If no error has been set it is set to err if not nil.
95func (w *Writer) err(err error) error {
96 w.errMu.Lock()
97 errSet := w.errState
98 if errSet == nil && err != nil {
99 w.errState = err
100 errSet = err
101 }
102 w.errMu.Unlock()
103 return errSet
104}
105
106// Reset discards the writer's state and switches the Snappy writer to write to w.
107// This permits reusing a Writer rather than allocating a new one.
108func (w *Writer) Reset(writer io.Writer) {
109 if !w.paramsOK {
110 return
111 }
112 // Close previous writer, if any.
113 if w.output != nil {
114 close(w.output)
115 w.writerWg.Wait()
116 w.output = nil
117 }
118 w.errState = nil
119 w.ibuf = w.ibuf[:0]
120 w.wroteStreamHeader = false
121 w.written = 0
122 w.writer = writer
123 w.uncompWritten = 0
124 w.index.reset(w.blockSize)
125
126 // If we didn't get a writer, stop here.
127 if writer == nil {
128 return
129 }
130 // If no concurrency requested, don't spin up writer goroutine.
131 if w.concurrency == 1 {
132 return
133 }
134
135 toWrite := make(chan chan result, w.concurrency)
136 w.output = toWrite
137 w.writerWg.Add(1)
138
139 // Start a writer goroutine that will write all output in order.
140 go func() {
141 defer w.writerWg.Done()
142
143 // Get a queued write.
144 for write := range toWrite {
145 // Wait for the data to be available.
146 input := <-write
147 in := input.b
148 if len(in) > 0 {
149 if w.err(nil) == nil {
150 // Don't expose data from previous buffers.
151 toWrite := in[:len(in):len(in)]
152 // Write to output.
153 n, err := writer.Write(toWrite)
154 if err == nil && n != len(toWrite) {
155 err = io.ErrShortBuffer
156 }
157 _ = w.err(err)
158 w.err(w.index.add(w.written, input.startOffset))
159 w.written += int64(n)
160 }
161 }
162 if cap(in) >= w.obufLen {
163 w.buffers.Put(in)
164 }
165 // close the incoming write request.
166 // This can be used for synchronizing flushes.
167 close(write)
168 }
169 }()
170}
171
172// Write satisfies the io.Writer interface.
173func (w *Writer) Write(p []byte) (nRet int, errRet error) {
174 if err := w.err(nil); err != nil {
175 return 0, err
176 }
177 if w.flushOnWrite {
178 return w.write(p)
179 }
180 // If we exceed the input buffer size, start writing
181 for len(p) > (cap(w.ibuf)-len(w.ibuf)) && w.err(nil) == nil {
182 var n int
183 if len(w.ibuf) == 0 {
184 // Large write, empty buffer.
185 // Write directly from p to avoid copy.
186 n, _ = w.write(p)
187 } else {
188 n = copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
189 w.ibuf = w.ibuf[:len(w.ibuf)+n]
190 w.write(w.ibuf)
191 w.ibuf = w.ibuf[:0]
192 }
193 nRet += n
194 p = p[n:]
195 }
196 if err := w.err(nil); err != nil {
197 return nRet, err
198 }
199 // p should always be able to fit into w.ibuf now.
200 n := copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
201 w.ibuf = w.ibuf[:len(w.ibuf)+n]
202 nRet += n
203 return nRet, nil
204}
205
206// ReadFrom implements the io.ReaderFrom interface.
207// Using this is typically more efficient since it avoids a memory copy.
208// ReadFrom reads data from r until EOF or error.
209// The return value n is the number of bytes read.
210// Any error except io.EOF encountered during the read is also returned.
211func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
212 if err := w.err(nil); err != nil {
213 return 0, err
214 }
215 if len(w.ibuf) > 0 {
216 err := w.Flush()
217 if err != nil {
218 return 0, err
219 }
220 }
221 if br, ok := r.(byter); ok {
222 buf := br.Bytes()
223 if err := w.EncodeBuffer(buf); err != nil {
224 return 0, err
225 }
226 return int64(len(buf)), w.Flush()
227 }
228 for {
229 inbuf := w.buffers.Get().([]byte)[:w.blockSize+obufHeaderLen]
230 n2, err := io.ReadFull(r, inbuf[obufHeaderLen:])
231 if err != nil {
232 if err == io.ErrUnexpectedEOF {
233 err = io.EOF
234 }
235 if err != io.EOF {
236 return n, w.err(err)
237 }
238 }
239 if n2 == 0 {
240 break
241 }
242 n += int64(n2)
243 err2 := w.writeFull(inbuf[:n2+obufHeaderLen])
244 if w.err(err2) != nil {
245 break
246 }
247
248 if err != nil {
249 // We got EOF and wrote everything
250 break
251 }
252 }
253
254 return n, w.err(nil)
255}
256
257// AddSkippableBlock will add a skippable block to the stream.
258// The ID must be 0x80-0xfe (inclusive).
259// Length of the skippable block must be <= 16777215 bytes.
260func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) {
261 if err := w.err(nil); err != nil {
262 return err
263 }
264 if len(data) == 0 {
265 return nil
266 }
267 if id < 0x80 || id > chunkTypePadding {
268 return fmt.Errorf("invalid skippable block id %x", id)
269 }
270 if len(data) > maxChunkSize {
271 return fmt.Errorf("skippable block excessed maximum size")
272 }
273 var header [4]byte
274 chunkLen := 4 + len(data)
275 header[0] = id
276 header[1] = uint8(chunkLen >> 0)
277 header[2] = uint8(chunkLen >> 8)
278 header[3] = uint8(chunkLen >> 16)
279 if w.concurrency == 1 {
280 write := func(b []byte) error {
281 n, err := w.writer.Write(b)
282 if err = w.err(err); err != nil {
283 return err
284 }
285 if n != len(data) {
286 return w.err(io.ErrShortWrite)
287 }
288 w.written += int64(n)
289 return w.err(nil)
290 }
291 if !w.wroteStreamHeader {
292 w.wroteStreamHeader = true
293 if w.snappy {
294 if err := write([]byte(magicChunkSnappy)); err != nil {
295 return err
296 }
297 } else {
298 if err := write([]byte(magicChunk)); err != nil {
299 return err
300 }
301 }
302 }
303 if err := write(header[:]); err != nil {
304 return err
305 }
306 if err := write(data); err != nil {
307 return err
308 }
309 }
310
311 // Create output...
312 if !w.wroteStreamHeader {
313 w.wroteStreamHeader = true
314 hWriter := make(chan result)
315 w.output <- hWriter
316 if w.snappy {
317 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
318 } else {
319 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
320 }
321 }
322
323 // Copy input.
324 inbuf := w.buffers.Get().([]byte)[:4]
325 copy(inbuf, header[:])
326 inbuf = append(inbuf, data...)
327
328 output := make(chan result, 1)
329 // Queue output.
330 w.output <- output
331 output <- result{startOffset: w.uncompWritten, b: inbuf}
332
333 return nil
334}
335
336// EncodeBuffer will add a buffer to the stream.
337// This is the fastest way to encode a stream,
338// but the input buffer cannot be written to by the caller
339// until Flush or Close has been called when concurrency != 1.
340//
341// If you cannot control that, use the regular Write function.
342//
343// Note that input is not buffered.
344// This means that each write will result in discrete blocks being created.
345// For buffered writes, use the regular Write function.
346func (w *Writer) EncodeBuffer(buf []byte) (err error) {
347 if err := w.err(nil); err != nil {
348 return err
349 }
350
351 if w.flushOnWrite {
352 _, err := w.write(buf)
353 return err
354 }
355 // Flush queued data first.
356 if len(w.ibuf) > 0 {
357 err := w.Flush()
358 if err != nil {
359 return err
360 }
361 }
362 if w.concurrency == 1 {
363 _, err := w.writeSync(buf)
364 return err
365 }
366
367 // Spawn goroutine and write block to output channel.
368 if !w.wroteStreamHeader {
369 w.wroteStreamHeader = true
370 hWriter := make(chan result)
371 w.output <- hWriter
372 if w.snappy {
373 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
374 } else {
375 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
376 }
377 }
378
379 for len(buf) > 0 {
380 // Cut input.
381 uncompressed := buf
382 if len(uncompressed) > w.blockSize {
383 uncompressed = uncompressed[:w.blockSize]
384 }
385 buf = buf[len(uncompressed):]
386 // Get an output buffer.
387 obuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen]
388 output := make(chan result)
389 // Queue output now, so we keep order.
390 w.output <- output
391 res := result{
392 startOffset: w.uncompWritten,
393 }
394 w.uncompWritten += int64(len(uncompressed))
395 go func() {
396 checksum := crc(uncompressed)
397
398 // Set to uncompressed.
399 chunkType := uint8(chunkTypeUncompressedData)
400 chunkLen := 4 + len(uncompressed)
401
402 // Attempt compressing.
403 n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
404 n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
405
406 // Check if we should use this, or store as uncompressed instead.
407 if n2 > 0 {
408 chunkType = uint8(chunkTypeCompressedData)
409 chunkLen = 4 + n + n2
410 obuf = obuf[:obufHeaderLen+n+n2]
411 } else {
412 // copy uncompressed
413 copy(obuf[obufHeaderLen:], uncompressed)
414 }
415
416 // Fill in the per-chunk header that comes before the body.
417 obuf[0] = chunkType
418 obuf[1] = uint8(chunkLen >> 0)
419 obuf[2] = uint8(chunkLen >> 8)
420 obuf[3] = uint8(chunkLen >> 16)
421 obuf[4] = uint8(checksum >> 0)
422 obuf[5] = uint8(checksum >> 8)
423 obuf[6] = uint8(checksum >> 16)
424 obuf[7] = uint8(checksum >> 24)
425
426 // Queue final output.
427 res.b = obuf
428 output <- res
429 }()
430 }
431 return nil
432}
433
434func (w *Writer) encodeBlock(obuf, uncompressed []byte) int {
435 if w.customEnc != nil {
436 if ret := w.customEnc(obuf, uncompressed); ret >= 0 {
437 return ret
438 }
439 }
440 if w.snappy {
441 switch w.level {
442 case levelFast:
443 return encodeBlockSnappy(obuf, uncompressed)
444 case levelBetter:
445 return encodeBlockBetterSnappy(obuf, uncompressed)
446 case levelBest:
447 return encodeBlockBestSnappy(obuf, uncompressed)
448 }
449 return 0
450 }
451 switch w.level {
452 case levelFast:
453 return encodeBlock(obuf, uncompressed)
454 case levelBetter:
455 return encodeBlockBetter(obuf, uncompressed)
456 case levelBest:
457 return encodeBlockBest(obuf, uncompressed, nil)
458 }
459 return 0
460}
461
462func (w *Writer) write(p []byte) (nRet int, errRet error) {
463 if err := w.err(nil); err != nil {
464 return 0, err
465 }
466 if w.concurrency == 1 {
467 return w.writeSync(p)
468 }
469
470 // Spawn goroutine and write block to output channel.
471 for len(p) > 0 {
472 if !w.wroteStreamHeader {
473 w.wroteStreamHeader = true
474 hWriter := make(chan result)
475 w.output <- hWriter
476 if w.snappy {
477 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
478 } else {
479 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
480 }
481 }
482
483 var uncompressed []byte
484 if len(p) > w.blockSize {
485 uncompressed, p = p[:w.blockSize], p[w.blockSize:]
486 } else {
487 uncompressed, p = p, nil
488 }
489
490 // Copy input.
491 // If the block is incompressible, this is used for the result.
492 inbuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen]
493 obuf := w.buffers.Get().([]byte)[:w.obufLen]
494 copy(inbuf[obufHeaderLen:], uncompressed)
495 uncompressed = inbuf[obufHeaderLen:]
496
497 output := make(chan result)
498 // Queue output now, so we keep order.
499 w.output <- output
500 res := result{
501 startOffset: w.uncompWritten,
502 }
503 w.uncompWritten += int64(len(uncompressed))
504
505 go func() {
506 checksum := crc(uncompressed)
507
508 // Set to uncompressed.
509 chunkType := uint8(chunkTypeUncompressedData)
510 chunkLen := 4 + len(uncompressed)
511
512 // Attempt compressing.
513 n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
514 n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
515
516 // Check if we should use this, or store as uncompressed instead.
517 if n2 > 0 {
518 chunkType = uint8(chunkTypeCompressedData)
519 chunkLen = 4 + n + n2
520 obuf = obuf[:obufHeaderLen+n+n2]
521 } else {
522 // Use input as output.
523 obuf, inbuf = inbuf, obuf
524 }
525
526 // Fill in the per-chunk header that comes before the body.
527 obuf[0] = chunkType
528 obuf[1] = uint8(chunkLen >> 0)
529 obuf[2] = uint8(chunkLen >> 8)
530 obuf[3] = uint8(chunkLen >> 16)
531 obuf[4] = uint8(checksum >> 0)
532 obuf[5] = uint8(checksum >> 8)
533 obuf[6] = uint8(checksum >> 16)
534 obuf[7] = uint8(checksum >> 24)
535
536 // Queue final output.
537 res.b = obuf
538 output <- res
539
540 // Put unused buffer back in pool.
541 w.buffers.Put(inbuf)
542 }()
543 nRet += len(uncompressed)
544 }
545 return nRet, nil
546}
547
548// writeFull is a special version of write that will always write the full buffer.
549// Data to be compressed should start at offset obufHeaderLen and fill the remainder of the buffer.
550// The data will be written as a single block.
551// The caller is not allowed to use inbuf after this function has been called.
552func (w *Writer) writeFull(inbuf []byte) (errRet error) {
553 if err := w.err(nil); err != nil {
554 return err
555 }
556
557 if w.concurrency == 1 {
558 _, err := w.writeSync(inbuf[obufHeaderLen:])
559 return err
560 }
561
562 // Spawn goroutine and write block to output channel.
563 if !w.wroteStreamHeader {
564 w.wroteStreamHeader = true
565 hWriter := make(chan result)
566 w.output <- hWriter
567 if w.snappy {
568 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
569 } else {
570 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
571 }
572 }
573
574 // Get an output buffer.
575 obuf := w.buffers.Get().([]byte)[:w.obufLen]
576 uncompressed := inbuf[obufHeaderLen:]
577
578 output := make(chan result)
579 // Queue output now, so we keep order.
580 w.output <- output
581 res := result{
582 startOffset: w.uncompWritten,
583 }
584 w.uncompWritten += int64(len(uncompressed))
585
586 go func() {
587 checksum := crc(uncompressed)
588
589 // Set to uncompressed.
590 chunkType := uint8(chunkTypeUncompressedData)
591 chunkLen := 4 + len(uncompressed)
592
593 // Attempt compressing.
594 n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
595 n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
596
597 // Check if we should use this, or store as uncompressed instead.
598 if n2 > 0 {
599 chunkType = uint8(chunkTypeCompressedData)
600 chunkLen = 4 + n + n2
601 obuf = obuf[:obufHeaderLen+n+n2]
602 } else {
603 // Use input as output.
604 obuf, inbuf = inbuf, obuf
605 }
606
607 // Fill in the per-chunk header that comes before the body.
608 obuf[0] = chunkType
609 obuf[1] = uint8(chunkLen >> 0)
610 obuf[2] = uint8(chunkLen >> 8)
611 obuf[3] = uint8(chunkLen >> 16)
612 obuf[4] = uint8(checksum >> 0)
613 obuf[5] = uint8(checksum >> 8)
614 obuf[6] = uint8(checksum >> 16)
615 obuf[7] = uint8(checksum >> 24)
616
617 // Queue final output.
618 res.b = obuf
619 output <- res
620
621 // Put unused buffer back in pool.
622 w.buffers.Put(inbuf)
623 }()
624 return nil
625}
626
627func (w *Writer) writeSync(p []byte) (nRet int, errRet error) {
628 if err := w.err(nil); err != nil {
629 return 0, err
630 }
631 if !w.wroteStreamHeader {
632 w.wroteStreamHeader = true
633 var n int
634 var err error
635 if w.snappy {
636 n, err = w.writer.Write([]byte(magicChunkSnappy))
637 } else {
638 n, err = w.writer.Write([]byte(magicChunk))
639 }
640 if err != nil {
641 return 0, w.err(err)
642 }
643 if n != len(magicChunk) {
644 return 0, w.err(io.ErrShortWrite)
645 }
646 w.written += int64(n)
647 }
648
649 for len(p) > 0 {
650 var uncompressed []byte
651 if len(p) > w.blockSize {
652 uncompressed, p = p[:w.blockSize], p[w.blockSize:]
653 } else {
654 uncompressed, p = p, nil
655 }
656
657 obuf := w.buffers.Get().([]byte)[:w.obufLen]
658 checksum := crc(uncompressed)
659
660 // Set to uncompressed.
661 chunkType := uint8(chunkTypeUncompressedData)
662 chunkLen := 4 + len(uncompressed)
663
664 // Attempt compressing.
665 n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
666 n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
667
668 if n2 > 0 {
669 chunkType = uint8(chunkTypeCompressedData)
670 chunkLen = 4 + n + n2
671 obuf = obuf[:obufHeaderLen+n+n2]
672 } else {
673 obuf = obuf[:8]
674 }
675
676 // Fill in the per-chunk header that comes before the body.
677 obuf[0] = chunkType
678 obuf[1] = uint8(chunkLen >> 0)
679 obuf[2] = uint8(chunkLen >> 8)
680 obuf[3] = uint8(chunkLen >> 16)
681 obuf[4] = uint8(checksum >> 0)
682 obuf[5] = uint8(checksum >> 8)
683 obuf[6] = uint8(checksum >> 16)
684 obuf[7] = uint8(checksum >> 24)
685
686 n, err := w.writer.Write(obuf)
687 if err != nil {
688 return 0, w.err(err)
689 }
690 if n != len(obuf) {
691 return 0, w.err(io.ErrShortWrite)
692 }
693 w.err(w.index.add(w.written, w.uncompWritten))
694 w.written += int64(n)
695 w.uncompWritten += int64(len(uncompressed))
696
697 if chunkType == chunkTypeUncompressedData {
698 // Write uncompressed data.
699 n, err := w.writer.Write(uncompressed)
700 if err != nil {
701 return 0, w.err(err)
702 }
703 if n != len(uncompressed) {
704 return 0, w.err(io.ErrShortWrite)
705 }
706 w.written += int64(n)
707 }
708 w.buffers.Put(obuf)
709 // Queue final output.
710 nRet += len(uncompressed)
711 }
712 return nRet, nil
713}
714
715// Flush flushes the Writer to its underlying io.Writer.
716// This does not apply padding.
717func (w *Writer) Flush() error {
718 if err := w.err(nil); err != nil {
719 return err
720 }
721
722 // Queue any data still in input buffer.
723 if len(w.ibuf) != 0 {
724 if !w.wroteStreamHeader {
725 _, err := w.writeSync(w.ibuf)
726 w.ibuf = w.ibuf[:0]
727 return w.err(err)
728 } else {
729 _, err := w.write(w.ibuf)
730 w.ibuf = w.ibuf[:0]
731 err = w.err(err)
732 if err != nil {
733 return err
734 }
735 }
736 }
737 if w.output == nil {
738 return w.err(nil)
739 }
740
741 // Send empty buffer
742 res := make(chan result)
743 w.output <- res
744 // Block until this has been picked up.
745 res <- result{b: nil, startOffset: w.uncompWritten}
746 // When it is closed, we have flushed.
747 <-res
748 return w.err(nil)
749}
750
751// Close calls Flush and then closes the Writer.
752// Calling Close multiple times is ok,
753// but calling CloseIndex after this will make it not return the index.
754func (w *Writer) Close() error {
755 _, err := w.closeIndex(w.appendIndex)
756 return err
757}
758
759// CloseIndex calls Close and returns an index on first call.
760// This is not required if you are only adding index to a stream.
761func (w *Writer) CloseIndex() ([]byte, error) {
762 return w.closeIndex(true)
763}
764
765func (w *Writer) closeIndex(idx bool) ([]byte, error) {
766 err := w.Flush()
767 if w.output != nil {
768 close(w.output)
769 w.writerWg.Wait()
770 w.output = nil
771 }
772
773 var index []byte
774 if w.err(err) == nil && w.writer != nil {
775 // Create index.
776 if idx {
777 compSize := int64(-1)
778 if w.pad <= 1 {
779 compSize = w.written
780 }
781 index = w.index.appendTo(w.ibuf[:0], w.uncompWritten, compSize)
782 // Count as written for padding.
783 if w.appendIndex {
784 w.written += int64(len(index))
785 }
786 }
787
788 if w.pad > 1 {
789 tmp := w.ibuf[:0]
790 if len(index) > 0 {
791 // Allocate another buffer.
792 tmp = w.buffers.Get().([]byte)[:0]
793 defer w.buffers.Put(tmp)
794 }
795 add := calcSkippableFrame(w.written, int64(w.pad))
796 frame, err := skippableFrame(tmp, add, w.randSrc)
797 if err = w.err(err); err != nil {
798 return nil, err
799 }
800 n, err2 := w.writer.Write(frame)
801 if err2 == nil && n != len(frame) {
802 err2 = io.ErrShortWrite
803 }
804 _ = w.err(err2)
805 }
806 if len(index) > 0 && w.appendIndex {
807 n, err2 := w.writer.Write(index)
808 if err2 == nil && n != len(index) {
809 err2 = io.ErrShortWrite
810 }
811 _ = w.err(err2)
812 }
813 }
814 err = w.err(errClosed)
815 if err == errClosed {
816 return index, nil
817 }
818 return nil, err
819}
820
821// calcSkippableFrame will return a total size to be added for written
822// to be divisible by multiple.
823// The value will always be > skippableFrameHeader.
824// The function will panic if written < 0 or wantMultiple <= 0.
825func calcSkippableFrame(written, wantMultiple int64) int {
826 if wantMultiple <= 0 {
827 panic("wantMultiple <= 0")
828 }
829 if written < 0 {
830 panic("written < 0")
831 }
832 leftOver := written % wantMultiple
833 if leftOver == 0 {
834 return 0
835 }
836 toAdd := wantMultiple - leftOver
837 for toAdd < skippableFrameHeader {
838 toAdd += wantMultiple
839 }
840 return int(toAdd)
841}
842
843// skippableFrame will add a skippable frame with a total size of bytes.
844// total should be >= skippableFrameHeader and < maxBlockSize + skippableFrameHeader
845func skippableFrame(dst []byte, total int, r io.Reader) ([]byte, error) {
846 if total == 0 {
847 return dst, nil
848 }
849 if total < skippableFrameHeader {
850 return dst, fmt.Errorf("s2: requested skippable frame (%d) < 4", total)
851 }
852 if int64(total) >= maxBlockSize+skippableFrameHeader {
853 return dst, fmt.Errorf("s2: requested skippable frame (%d) >= max 1<<24", total)
854 }
855 // Chunk type 0xfe "Section 4.4 Padding (chunk type 0xfe)"
856 dst = append(dst, chunkTypePadding)
857 f := uint32(total - skippableFrameHeader)
858 // Add chunk length.
859 dst = append(dst, uint8(f), uint8(f>>8), uint8(f>>16))
860 // Add data
861 start := len(dst)
862 dst = append(dst, make([]byte, f)...)
863 _, err := io.ReadFull(r, dst[start:])
864 return dst, err
865}
866
867var errClosed = errors.New("s2: Writer is closed")
868
869// WriterOption is an option for creating a encoder.
870type WriterOption func(*Writer) error
871
872// WriterConcurrency will set the concurrency,
873// meaning the maximum number of decoders to run concurrently.
874// The value supplied must be at least 1.
875// By default this will be set to GOMAXPROCS.
876func WriterConcurrency(n int) WriterOption {
877 return func(w *Writer) error {
878 if n <= 0 {
879 return errors.New("concurrency must be at least 1")
880 }
881 w.concurrency = n
882 return nil
883 }
884}
885
886// WriterAddIndex will append an index to the end of a stream
887// when it is closed.
888func WriterAddIndex() WriterOption {
889 return func(w *Writer) error {
890 w.appendIndex = true
891 return nil
892 }
893}
894
895// WriterBetterCompression will enable better compression.
896// EncodeBetter compresses better than Encode but typically with a
897// 10-40% speed decrease on both compression and decompression.
898func WriterBetterCompression() WriterOption {
899 return func(w *Writer) error {
900 w.level = levelBetter
901 return nil
902 }
903}
904
905// WriterBestCompression will enable better compression.
906// EncodeBetter compresses better than Encode but typically with a
907// big speed decrease on compression.
908func WriterBestCompression() WriterOption {
909 return func(w *Writer) error {
910 w.level = levelBest
911 return nil
912 }
913}
914
915// WriterUncompressed will bypass compression.
916// The stream will be written as uncompressed blocks only.
917// If concurrency is > 1 CRC and output will still be done async.
918func WriterUncompressed() WriterOption {
919 return func(w *Writer) error {
920 w.level = levelUncompressed
921 return nil
922 }
923}
924
925// WriterBlockSize allows to override the default block size.
926// Blocks will be this size or smaller.
927// Minimum size is 4KB and and maximum size is 4MB.
928//
929// Bigger blocks may give bigger throughput on systems with many cores,
930// and will increase compression slightly, but it will limit the possible
931// concurrency for smaller payloads for both encoding and decoding.
932// Default block size is 1MB.
933//
934// When writing Snappy compatible output using WriterSnappyCompat,
935// the maximum block size is 64KB.
936func WriterBlockSize(n int) WriterOption {
937 return func(w *Writer) error {
938 if w.snappy && n > maxSnappyBlockSize || n < minBlockSize {
939 return errors.New("s2: block size too large. Must be <= 64K and >=4KB on for snappy compatible output")
940 }
941 if n > maxBlockSize || n < minBlockSize {
942 return errors.New("s2: block size too large. Must be <= 4MB and >=4KB")
943 }
944 w.blockSize = n
945 return nil
946 }
947}
948
949// WriterPadding will add padding to all output so the size will be a multiple of n.
950// This can be used to obfuscate the exact output size or make blocks of a certain size.
951// The contents will be a skippable frame, so it will be invisible by the decoder.
952// n must be > 0 and <= 4MB.
953// The padded area will be filled with data from crypto/rand.Reader.
954// The padding will be applied whenever Close is called on the writer.
955func WriterPadding(n int) WriterOption {
956 return func(w *Writer) error {
957 if n <= 0 {
958 return fmt.Errorf("s2: padding must be at least 1")
959 }
960 // No need to waste our time.
961 if n == 1 {
962 w.pad = 0
963 }
964 if n > maxBlockSize {
965 return fmt.Errorf("s2: padding must less than 4MB")
966 }
967 w.pad = n
968 return nil
969 }
970}
971
972// WriterPaddingSrc will get random data for padding from the supplied source.
973// By default crypto/rand is used.
974func WriterPaddingSrc(reader io.Reader) WriterOption {
975 return func(w *Writer) error {
976 w.randSrc = reader
977 return nil
978 }
979}
980
981// WriterSnappyCompat will write snappy compatible output.
982// The output can be decompressed using either snappy or s2.
983// If block size is more than 64KB it is set to that.
984func WriterSnappyCompat() WriterOption {
985 return func(w *Writer) error {
986 w.snappy = true
987 if w.blockSize > 64<<10 {
988 // We choose 8 bytes less than 64K, since that will make literal emits slightly more effective.
989 // And allows us to skip some size checks.
990 w.blockSize = (64 << 10) - 8
991 }
992 return nil
993 }
994}
995
996// WriterFlushOnWrite will compress blocks on each call to the Write function.
997//
998// This is quite inefficient as blocks size will depend on the write size.
999//
1000// Use WriterConcurrency(1) to also make sure that output is flushed.
1001// When Write calls return, otherwise they will be written when compression is done.
1002func WriterFlushOnWrite() WriterOption {
1003 return func(w *Writer) error {
1004 w.flushOnWrite = true
1005 return nil
1006 }
1007}
1008
1009// WriterCustomEncoder allows to override the encoder for blocks on the stream.
1010// The function must compress 'src' into 'dst' and return the bytes used in dst as an integer.
1011// Block size (initial varint) should not be added by the encoder.
1012// Returning value 0 indicates the block could not be compressed.
1013// Returning a negative value indicates that compression should be attempted.
1014// The function should expect to be called concurrently.
1015func WriterCustomEncoder(fn func(dst, src []byte) int) WriterOption {
1016 return func(w *Writer) error {
1017 w.customEnc = fn
1018 return nil
1019 }
1020}