diff options
author | Rutger Broekhoff | 2023-12-29 21:31:53 +0100 |
---|---|---|
committer | Rutger Broekhoff | 2023-12-29 21:31:53 +0100 |
commit | 404aeae4545d2426c089a5f8d5e82dae56f5212b (patch) | |
tree | 2d84e00af272b39fc04f3795ae06bc48970e57b5 /vendor/github.com/klauspost/compress/s2/writer.go | |
parent | 209d8b0187ed025dec9ac149ebcced3462877bff (diff) | |
download | gitolfs3-404aeae4545d2426c089a5f8d5e82dae56f5212b.tar.gz gitolfs3-404aeae4545d2426c089a5f8d5e82dae56f5212b.zip |
Make Nix builds work
Diffstat (limited to 'vendor/github.com/klauspost/compress/s2/writer.go')
-rw-r--r-- | vendor/github.com/klauspost/compress/s2/writer.go | 1020 |
1 files changed, 1020 insertions, 0 deletions
diff --git a/vendor/github.com/klauspost/compress/s2/writer.go b/vendor/github.com/klauspost/compress/s2/writer.go new file mode 100644 index 0000000..089cd36 --- /dev/null +++ b/vendor/github.com/klauspost/compress/s2/writer.go | |||
@@ -0,0 +1,1020 @@ | |||
1 | // Copyright 2011 The Snappy-Go Authors. All rights reserved. | ||
2 | // Copyright (c) 2019+ Klaus Post. All rights reserved. | ||
3 | // Use of this source code is governed by a BSD-style | ||
4 | // license that can be found in the LICENSE file. | ||
5 | |||
6 | package s2 | ||
7 | |||
8 | import ( | ||
9 | "crypto/rand" | ||
10 | "encoding/binary" | ||
11 | "errors" | ||
12 | "fmt" | ||
13 | "io" | ||
14 | "runtime" | ||
15 | "sync" | ||
16 | ) | ||
17 | |||
18 | const ( | ||
19 | levelUncompressed = iota + 1 | ||
20 | levelFast | ||
21 | levelBetter | ||
22 | levelBest | ||
23 | ) | ||
24 | |||
25 | // NewWriter returns a new Writer that compresses to w, using the | ||
26 | // framing format described at | ||
27 | // https://github.com/google/snappy/blob/master/framing_format.txt | ||
28 | // | ||
29 | // Users must call Close to guarantee all data has been forwarded to | ||
30 | // the underlying io.Writer and that resources are released. | ||
31 | // They may also call Flush zero or more times before calling Close. | ||
32 | func NewWriter(w io.Writer, opts ...WriterOption) *Writer { | ||
33 | w2 := Writer{ | ||
34 | blockSize: defaultBlockSize, | ||
35 | concurrency: runtime.GOMAXPROCS(0), | ||
36 | randSrc: rand.Reader, | ||
37 | level: levelFast, | ||
38 | } | ||
39 | for _, opt := range opts { | ||
40 | if err := opt(&w2); err != nil { | ||
41 | w2.errState = err | ||
42 | return &w2 | ||
43 | } | ||
44 | } | ||
45 | w2.obufLen = obufHeaderLen + MaxEncodedLen(w2.blockSize) | ||
46 | w2.paramsOK = true | ||
47 | w2.ibuf = make([]byte, 0, w2.blockSize) | ||
48 | w2.buffers.New = func() interface{} { | ||
49 | return make([]byte, w2.obufLen) | ||
50 | } | ||
51 | w2.Reset(w) | ||
52 | return &w2 | ||
53 | } | ||
54 | |||
55 | // Writer is an io.Writer that can write Snappy-compressed bytes. | ||
56 | type Writer struct { | ||
57 | errMu sync.Mutex | ||
58 | errState error | ||
59 | |||
60 | // ibuf is a buffer for the incoming (uncompressed) bytes. | ||
61 | ibuf []byte | ||
62 | |||
63 | blockSize int | ||
64 | obufLen int | ||
65 | concurrency int | ||
66 | written int64 | ||
67 | uncompWritten int64 // Bytes sent to compression | ||
68 | output chan chan result | ||
69 | buffers sync.Pool | ||
70 | pad int | ||
71 | |||
72 | writer io.Writer | ||
73 | randSrc io.Reader | ||
74 | writerWg sync.WaitGroup | ||
75 | index Index | ||
76 | customEnc func(dst, src []byte) int | ||
77 | |||
78 | // wroteStreamHeader is whether we have written the stream header. | ||
79 | wroteStreamHeader bool | ||
80 | paramsOK bool | ||
81 | snappy bool | ||
82 | flushOnWrite bool | ||
83 | appendIndex bool | ||
84 | level uint8 | ||
85 | } | ||
86 | |||
87 | type result struct { | ||
88 | b []byte | ||
89 | // Uncompressed start offset | ||
90 | startOffset int64 | ||
91 | } | ||
92 | |||
93 | // err returns the previously set error. | ||
94 | // If no error has been set it is set to err if not nil. | ||
95 | func (w *Writer) err(err error) error { | ||
96 | w.errMu.Lock() | ||
97 | errSet := w.errState | ||
98 | if errSet == nil && err != nil { | ||
99 | w.errState = err | ||
100 | errSet = err | ||
101 | } | ||
102 | w.errMu.Unlock() | ||
103 | return errSet | ||
104 | } | ||
105 | |||
106 | // Reset discards the writer's state and switches the Snappy writer to write to w. | ||
107 | // This permits reusing a Writer rather than allocating a new one. | ||
108 | func (w *Writer) Reset(writer io.Writer) { | ||
109 | if !w.paramsOK { | ||
110 | return | ||
111 | } | ||
112 | // Close previous writer, if any. | ||
113 | if w.output != nil { | ||
114 | close(w.output) | ||
115 | w.writerWg.Wait() | ||
116 | w.output = nil | ||
117 | } | ||
118 | w.errState = nil | ||
119 | w.ibuf = w.ibuf[:0] | ||
120 | w.wroteStreamHeader = false | ||
121 | w.written = 0 | ||
122 | w.writer = writer | ||
123 | w.uncompWritten = 0 | ||
124 | w.index.reset(w.blockSize) | ||
125 | |||
126 | // If we didn't get a writer, stop here. | ||
127 | if writer == nil { | ||
128 | return | ||
129 | } | ||
130 | // If no concurrency requested, don't spin up writer goroutine. | ||
131 | if w.concurrency == 1 { | ||
132 | return | ||
133 | } | ||
134 | |||
135 | toWrite := make(chan chan result, w.concurrency) | ||
136 | w.output = toWrite | ||
137 | w.writerWg.Add(1) | ||
138 | |||
139 | // Start a writer goroutine that will write all output in order. | ||
140 | go func() { | ||
141 | defer w.writerWg.Done() | ||
142 | |||
143 | // Get a queued write. | ||
144 | for write := range toWrite { | ||
145 | // Wait for the data to be available. | ||
146 | input := <-write | ||
147 | in := input.b | ||
148 | if len(in) > 0 { | ||
149 | if w.err(nil) == nil { | ||
150 | // Don't expose data from previous buffers. | ||
151 | toWrite := in[:len(in):len(in)] | ||
152 | // Write to output. | ||
153 | n, err := writer.Write(toWrite) | ||
154 | if err == nil && n != len(toWrite) { | ||
155 | err = io.ErrShortBuffer | ||
156 | } | ||
157 | _ = w.err(err) | ||
158 | w.err(w.index.add(w.written, input.startOffset)) | ||
159 | w.written += int64(n) | ||
160 | } | ||
161 | } | ||
162 | if cap(in) >= w.obufLen { | ||
163 | w.buffers.Put(in) | ||
164 | } | ||
165 | // close the incoming write request. | ||
166 | // This can be used for synchronizing flushes. | ||
167 | close(write) | ||
168 | } | ||
169 | }() | ||
170 | } | ||
171 | |||
172 | // Write satisfies the io.Writer interface. | ||
173 | func (w *Writer) Write(p []byte) (nRet int, errRet error) { | ||
174 | if err := w.err(nil); err != nil { | ||
175 | return 0, err | ||
176 | } | ||
177 | if w.flushOnWrite { | ||
178 | return w.write(p) | ||
179 | } | ||
180 | // If we exceed the input buffer size, start writing | ||
181 | for len(p) > (cap(w.ibuf)-len(w.ibuf)) && w.err(nil) == nil { | ||
182 | var n int | ||
183 | if len(w.ibuf) == 0 { | ||
184 | // Large write, empty buffer. | ||
185 | // Write directly from p to avoid copy. | ||
186 | n, _ = w.write(p) | ||
187 | } else { | ||
188 | n = copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p) | ||
189 | w.ibuf = w.ibuf[:len(w.ibuf)+n] | ||
190 | w.write(w.ibuf) | ||
191 | w.ibuf = w.ibuf[:0] | ||
192 | } | ||
193 | nRet += n | ||
194 | p = p[n:] | ||
195 | } | ||
196 | if err := w.err(nil); err != nil { | ||
197 | return nRet, err | ||
198 | } | ||
199 | // p should always be able to fit into w.ibuf now. | ||
200 | n := copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p) | ||
201 | w.ibuf = w.ibuf[:len(w.ibuf)+n] | ||
202 | nRet += n | ||
203 | return nRet, nil | ||
204 | } | ||
205 | |||
206 | // ReadFrom implements the io.ReaderFrom interface. | ||
207 | // Using this is typically more efficient since it avoids a memory copy. | ||
208 | // ReadFrom reads data from r until EOF or error. | ||
209 | // The return value n is the number of bytes read. | ||
210 | // Any error except io.EOF encountered during the read is also returned. | ||
211 | func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) { | ||
212 | if err := w.err(nil); err != nil { | ||
213 | return 0, err | ||
214 | } | ||
215 | if len(w.ibuf) > 0 { | ||
216 | err := w.Flush() | ||
217 | if err != nil { | ||
218 | return 0, err | ||
219 | } | ||
220 | } | ||
221 | if br, ok := r.(byter); ok { | ||
222 | buf := br.Bytes() | ||
223 | if err := w.EncodeBuffer(buf); err != nil { | ||
224 | return 0, err | ||
225 | } | ||
226 | return int64(len(buf)), w.Flush() | ||
227 | } | ||
228 | for { | ||
229 | inbuf := w.buffers.Get().([]byte)[:w.blockSize+obufHeaderLen] | ||
230 | n2, err := io.ReadFull(r, inbuf[obufHeaderLen:]) | ||
231 | if err != nil { | ||
232 | if err == io.ErrUnexpectedEOF { | ||
233 | err = io.EOF | ||
234 | } | ||
235 | if err != io.EOF { | ||
236 | return n, w.err(err) | ||
237 | } | ||
238 | } | ||
239 | if n2 == 0 { | ||
240 | break | ||
241 | } | ||
242 | n += int64(n2) | ||
243 | err2 := w.writeFull(inbuf[:n2+obufHeaderLen]) | ||
244 | if w.err(err2) != nil { | ||
245 | break | ||
246 | } | ||
247 | |||
248 | if err != nil { | ||
249 | // We got EOF and wrote everything | ||
250 | break | ||
251 | } | ||
252 | } | ||
253 | |||
254 | return n, w.err(nil) | ||
255 | } | ||
256 | |||
257 | // AddSkippableBlock will add a skippable block to the stream. | ||
258 | // The ID must be 0x80-0xfe (inclusive). | ||
259 | // Length of the skippable block must be <= 16777215 bytes. | ||
260 | func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) { | ||
261 | if err := w.err(nil); err != nil { | ||
262 | return err | ||
263 | } | ||
264 | if len(data) == 0 { | ||
265 | return nil | ||
266 | } | ||
267 | if id < 0x80 || id > chunkTypePadding { | ||
268 | return fmt.Errorf("invalid skippable block id %x", id) | ||
269 | } | ||
270 | if len(data) > maxChunkSize { | ||
271 | return fmt.Errorf("skippable block excessed maximum size") | ||
272 | } | ||
273 | var header [4]byte | ||
274 | chunkLen := 4 + len(data) | ||
275 | header[0] = id | ||
276 | header[1] = uint8(chunkLen >> 0) | ||
277 | header[2] = uint8(chunkLen >> 8) | ||
278 | header[3] = uint8(chunkLen >> 16) | ||
279 | if w.concurrency == 1 { | ||
280 | write := func(b []byte) error { | ||
281 | n, err := w.writer.Write(b) | ||
282 | if err = w.err(err); err != nil { | ||
283 | return err | ||
284 | } | ||
285 | if n != len(data) { | ||
286 | return w.err(io.ErrShortWrite) | ||
287 | } | ||
288 | w.written += int64(n) | ||
289 | return w.err(nil) | ||
290 | } | ||
291 | if !w.wroteStreamHeader { | ||
292 | w.wroteStreamHeader = true | ||
293 | if w.snappy { | ||
294 | if err := write([]byte(magicChunkSnappy)); err != nil { | ||
295 | return err | ||
296 | } | ||
297 | } else { | ||
298 | if err := write([]byte(magicChunk)); err != nil { | ||
299 | return err | ||
300 | } | ||
301 | } | ||
302 | } | ||
303 | if err := write(header[:]); err != nil { | ||
304 | return err | ||
305 | } | ||
306 | if err := write(data); err != nil { | ||
307 | return err | ||
308 | } | ||
309 | } | ||
310 | |||
311 | // Create output... | ||
312 | if !w.wroteStreamHeader { | ||
313 | w.wroteStreamHeader = true | ||
314 | hWriter := make(chan result) | ||
315 | w.output <- hWriter | ||
316 | if w.snappy { | ||
317 | hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} | ||
318 | } else { | ||
319 | hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} | ||
320 | } | ||
321 | } | ||
322 | |||
323 | // Copy input. | ||
324 | inbuf := w.buffers.Get().([]byte)[:4] | ||
325 | copy(inbuf, header[:]) | ||
326 | inbuf = append(inbuf, data...) | ||
327 | |||
328 | output := make(chan result, 1) | ||
329 | // Queue output. | ||
330 | w.output <- output | ||
331 | output <- result{startOffset: w.uncompWritten, b: inbuf} | ||
332 | |||
333 | return nil | ||
334 | } | ||
335 | |||
336 | // EncodeBuffer will add a buffer to the stream. | ||
337 | // This is the fastest way to encode a stream, | ||
338 | // but the input buffer cannot be written to by the caller | ||
339 | // until Flush or Close has been called when concurrency != 1. | ||
340 | // | ||
341 | // If you cannot control that, use the regular Write function. | ||
342 | // | ||
343 | // Note that input is not buffered. | ||
344 | // This means that each write will result in discrete blocks being created. | ||
345 | // For buffered writes, use the regular Write function. | ||
346 | func (w *Writer) EncodeBuffer(buf []byte) (err error) { | ||
347 | if err := w.err(nil); err != nil { | ||
348 | return err | ||
349 | } | ||
350 | |||
351 | if w.flushOnWrite { | ||
352 | _, err := w.write(buf) | ||
353 | return err | ||
354 | } | ||
355 | // Flush queued data first. | ||
356 | if len(w.ibuf) > 0 { | ||
357 | err := w.Flush() | ||
358 | if err != nil { | ||
359 | return err | ||
360 | } | ||
361 | } | ||
362 | if w.concurrency == 1 { | ||
363 | _, err := w.writeSync(buf) | ||
364 | return err | ||
365 | } | ||
366 | |||
367 | // Spawn goroutine and write block to output channel. | ||
368 | if !w.wroteStreamHeader { | ||
369 | w.wroteStreamHeader = true | ||
370 | hWriter := make(chan result) | ||
371 | w.output <- hWriter | ||
372 | if w.snappy { | ||
373 | hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} | ||
374 | } else { | ||
375 | hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} | ||
376 | } | ||
377 | } | ||
378 | |||
379 | for len(buf) > 0 { | ||
380 | // Cut input. | ||
381 | uncompressed := buf | ||
382 | if len(uncompressed) > w.blockSize { | ||
383 | uncompressed = uncompressed[:w.blockSize] | ||
384 | } | ||
385 | buf = buf[len(uncompressed):] | ||
386 | // Get an output buffer. | ||
387 | obuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen] | ||
388 | output := make(chan result) | ||
389 | // Queue output now, so we keep order. | ||
390 | w.output <- output | ||
391 | res := result{ | ||
392 | startOffset: w.uncompWritten, | ||
393 | } | ||
394 | w.uncompWritten += int64(len(uncompressed)) | ||
395 | go func() { | ||
396 | checksum := crc(uncompressed) | ||
397 | |||
398 | // Set to uncompressed. | ||
399 | chunkType := uint8(chunkTypeUncompressedData) | ||
400 | chunkLen := 4 + len(uncompressed) | ||
401 | |||
402 | // Attempt compressing. | ||
403 | n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed))) | ||
404 | n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed) | ||
405 | |||
406 | // Check if we should use this, or store as uncompressed instead. | ||
407 | if n2 > 0 { | ||
408 | chunkType = uint8(chunkTypeCompressedData) | ||
409 | chunkLen = 4 + n + n2 | ||
410 | obuf = obuf[:obufHeaderLen+n+n2] | ||
411 | } else { | ||
412 | // copy uncompressed | ||
413 | copy(obuf[obufHeaderLen:], uncompressed) | ||
414 | } | ||
415 | |||
416 | // Fill in the per-chunk header that comes before the body. | ||
417 | obuf[0] = chunkType | ||
418 | obuf[1] = uint8(chunkLen >> 0) | ||
419 | obuf[2] = uint8(chunkLen >> 8) | ||
420 | obuf[3] = uint8(chunkLen >> 16) | ||
421 | obuf[4] = uint8(checksum >> 0) | ||
422 | obuf[5] = uint8(checksum >> 8) | ||
423 | obuf[6] = uint8(checksum >> 16) | ||
424 | obuf[7] = uint8(checksum >> 24) | ||
425 | |||
426 | // Queue final output. | ||
427 | res.b = obuf | ||
428 | output <- res | ||
429 | }() | ||
430 | } | ||
431 | return nil | ||
432 | } | ||
433 | |||
434 | func (w *Writer) encodeBlock(obuf, uncompressed []byte) int { | ||
435 | if w.customEnc != nil { | ||
436 | if ret := w.customEnc(obuf, uncompressed); ret >= 0 { | ||
437 | return ret | ||
438 | } | ||
439 | } | ||
440 | if w.snappy { | ||
441 | switch w.level { | ||
442 | case levelFast: | ||
443 | return encodeBlockSnappy(obuf, uncompressed) | ||
444 | case levelBetter: | ||
445 | return encodeBlockBetterSnappy(obuf, uncompressed) | ||
446 | case levelBest: | ||
447 | return encodeBlockBestSnappy(obuf, uncompressed) | ||
448 | } | ||
449 | return 0 | ||
450 | } | ||
451 | switch w.level { | ||
452 | case levelFast: | ||
453 | return encodeBlock(obuf, uncompressed) | ||
454 | case levelBetter: | ||
455 | return encodeBlockBetter(obuf, uncompressed) | ||
456 | case levelBest: | ||
457 | return encodeBlockBest(obuf, uncompressed, nil) | ||
458 | } | ||
459 | return 0 | ||
460 | } | ||
461 | |||
462 | func (w *Writer) write(p []byte) (nRet int, errRet error) { | ||
463 | if err := w.err(nil); err != nil { | ||
464 | return 0, err | ||
465 | } | ||
466 | if w.concurrency == 1 { | ||
467 | return w.writeSync(p) | ||
468 | } | ||
469 | |||
470 | // Spawn goroutine and write block to output channel. | ||
471 | for len(p) > 0 { | ||
472 | if !w.wroteStreamHeader { | ||
473 | w.wroteStreamHeader = true | ||
474 | hWriter := make(chan result) | ||
475 | w.output <- hWriter | ||
476 | if w.snappy { | ||
477 | hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} | ||
478 | } else { | ||
479 | hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} | ||
480 | } | ||
481 | } | ||
482 | |||
483 | var uncompressed []byte | ||
484 | if len(p) > w.blockSize { | ||
485 | uncompressed, p = p[:w.blockSize], p[w.blockSize:] | ||
486 | } else { | ||
487 | uncompressed, p = p, nil | ||
488 | } | ||
489 | |||
490 | // Copy input. | ||
491 | // If the block is incompressible, this is used for the result. | ||
492 | inbuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen] | ||
493 | obuf := w.buffers.Get().([]byte)[:w.obufLen] | ||
494 | copy(inbuf[obufHeaderLen:], uncompressed) | ||
495 | uncompressed = inbuf[obufHeaderLen:] | ||
496 | |||
497 | output := make(chan result) | ||
498 | // Queue output now, so we keep order. | ||
499 | w.output <- output | ||
500 | res := result{ | ||
501 | startOffset: w.uncompWritten, | ||
502 | } | ||
503 | w.uncompWritten += int64(len(uncompressed)) | ||
504 | |||
505 | go func() { | ||
506 | checksum := crc(uncompressed) | ||
507 | |||
508 | // Set to uncompressed. | ||
509 | chunkType := uint8(chunkTypeUncompressedData) | ||
510 | chunkLen := 4 + len(uncompressed) | ||
511 | |||
512 | // Attempt compressing. | ||
513 | n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed))) | ||
514 | n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed) | ||
515 | |||
516 | // Check if we should use this, or store as uncompressed instead. | ||
517 | if n2 > 0 { | ||
518 | chunkType = uint8(chunkTypeCompressedData) | ||
519 | chunkLen = 4 + n + n2 | ||
520 | obuf = obuf[:obufHeaderLen+n+n2] | ||
521 | } else { | ||
522 | // Use input as output. | ||
523 | obuf, inbuf = inbuf, obuf | ||
524 | } | ||
525 | |||
526 | // Fill in the per-chunk header that comes before the body. | ||
527 | obuf[0] = chunkType | ||
528 | obuf[1] = uint8(chunkLen >> 0) | ||
529 | obuf[2] = uint8(chunkLen >> 8) | ||
530 | obuf[3] = uint8(chunkLen >> 16) | ||
531 | obuf[4] = uint8(checksum >> 0) | ||
532 | obuf[5] = uint8(checksum >> 8) | ||
533 | obuf[6] = uint8(checksum >> 16) | ||
534 | obuf[7] = uint8(checksum >> 24) | ||
535 | |||
536 | // Queue final output. | ||
537 | res.b = obuf | ||
538 | output <- res | ||
539 | |||
540 | // Put unused buffer back in pool. | ||
541 | w.buffers.Put(inbuf) | ||
542 | }() | ||
543 | nRet += len(uncompressed) | ||
544 | } | ||
545 | return nRet, nil | ||
546 | } | ||
547 | |||
548 | // writeFull is a special version of write that will always write the full buffer. | ||
549 | // Data to be compressed should start at offset obufHeaderLen and fill the remainder of the buffer. | ||
550 | // The data will be written as a single block. | ||
551 | // The caller is not allowed to use inbuf after this function has been called. | ||
552 | func (w *Writer) writeFull(inbuf []byte) (errRet error) { | ||
553 | if err := w.err(nil); err != nil { | ||
554 | return err | ||
555 | } | ||
556 | |||
557 | if w.concurrency == 1 { | ||
558 | _, err := w.writeSync(inbuf[obufHeaderLen:]) | ||
559 | return err | ||
560 | } | ||
561 | |||
562 | // Spawn goroutine and write block to output channel. | ||
563 | if !w.wroteStreamHeader { | ||
564 | w.wroteStreamHeader = true | ||
565 | hWriter := make(chan result) | ||
566 | w.output <- hWriter | ||
567 | if w.snappy { | ||
568 | hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} | ||
569 | } else { | ||
570 | hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} | ||
571 | } | ||
572 | } | ||
573 | |||
574 | // Get an output buffer. | ||
575 | obuf := w.buffers.Get().([]byte)[:w.obufLen] | ||
576 | uncompressed := inbuf[obufHeaderLen:] | ||
577 | |||
578 | output := make(chan result) | ||
579 | // Queue output now, so we keep order. | ||
580 | w.output <- output | ||
581 | res := result{ | ||
582 | startOffset: w.uncompWritten, | ||
583 | } | ||
584 | w.uncompWritten += int64(len(uncompressed)) | ||
585 | |||
586 | go func() { | ||
587 | checksum := crc(uncompressed) | ||
588 | |||
589 | // Set to uncompressed. | ||
590 | chunkType := uint8(chunkTypeUncompressedData) | ||
591 | chunkLen := 4 + len(uncompressed) | ||
592 | |||
593 | // Attempt compressing. | ||
594 | n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed))) | ||
595 | n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed) | ||
596 | |||
597 | // Check if we should use this, or store as uncompressed instead. | ||
598 | if n2 > 0 { | ||
599 | chunkType = uint8(chunkTypeCompressedData) | ||
600 | chunkLen = 4 + n + n2 | ||
601 | obuf = obuf[:obufHeaderLen+n+n2] | ||
602 | } else { | ||
603 | // Use input as output. | ||
604 | obuf, inbuf = inbuf, obuf | ||
605 | } | ||
606 | |||
607 | // Fill in the per-chunk header that comes before the body. | ||
608 | obuf[0] = chunkType | ||
609 | obuf[1] = uint8(chunkLen >> 0) | ||
610 | obuf[2] = uint8(chunkLen >> 8) | ||
611 | obuf[3] = uint8(chunkLen >> 16) | ||
612 | obuf[4] = uint8(checksum >> 0) | ||
613 | obuf[5] = uint8(checksum >> 8) | ||
614 | obuf[6] = uint8(checksum >> 16) | ||
615 | obuf[7] = uint8(checksum >> 24) | ||
616 | |||
617 | // Queue final output. | ||
618 | res.b = obuf | ||
619 | output <- res | ||
620 | |||
621 | // Put unused buffer back in pool. | ||
622 | w.buffers.Put(inbuf) | ||
623 | }() | ||
624 | return nil | ||
625 | } | ||
626 | |||
627 | func (w *Writer) writeSync(p []byte) (nRet int, errRet error) { | ||
628 | if err := w.err(nil); err != nil { | ||
629 | return 0, err | ||
630 | } | ||
631 | if !w.wroteStreamHeader { | ||
632 | w.wroteStreamHeader = true | ||
633 | var n int | ||
634 | var err error | ||
635 | if w.snappy { | ||
636 | n, err = w.writer.Write([]byte(magicChunkSnappy)) | ||
637 | } else { | ||
638 | n, err = w.writer.Write([]byte(magicChunk)) | ||
639 | } | ||
640 | if err != nil { | ||
641 | return 0, w.err(err) | ||
642 | } | ||
643 | if n != len(magicChunk) { | ||
644 | return 0, w.err(io.ErrShortWrite) | ||
645 | } | ||
646 | w.written += int64(n) | ||
647 | } | ||
648 | |||
649 | for len(p) > 0 { | ||
650 | var uncompressed []byte | ||
651 | if len(p) > w.blockSize { | ||
652 | uncompressed, p = p[:w.blockSize], p[w.blockSize:] | ||
653 | } else { | ||
654 | uncompressed, p = p, nil | ||
655 | } | ||
656 | |||
657 | obuf := w.buffers.Get().([]byte)[:w.obufLen] | ||
658 | checksum := crc(uncompressed) | ||
659 | |||
660 | // Set to uncompressed. | ||
661 | chunkType := uint8(chunkTypeUncompressedData) | ||
662 | chunkLen := 4 + len(uncompressed) | ||
663 | |||
664 | // Attempt compressing. | ||
665 | n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed))) | ||
666 | n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed) | ||
667 | |||
668 | if n2 > 0 { | ||
669 | chunkType = uint8(chunkTypeCompressedData) | ||
670 | chunkLen = 4 + n + n2 | ||
671 | obuf = obuf[:obufHeaderLen+n+n2] | ||
672 | } else { | ||
673 | obuf = obuf[:8] | ||
674 | } | ||
675 | |||
676 | // Fill in the per-chunk header that comes before the body. | ||
677 | obuf[0] = chunkType | ||
678 | obuf[1] = uint8(chunkLen >> 0) | ||
679 | obuf[2] = uint8(chunkLen >> 8) | ||
680 | obuf[3] = uint8(chunkLen >> 16) | ||
681 | obuf[4] = uint8(checksum >> 0) | ||
682 | obuf[5] = uint8(checksum >> 8) | ||
683 | obuf[6] = uint8(checksum >> 16) | ||
684 | obuf[7] = uint8(checksum >> 24) | ||
685 | |||
686 | n, err := w.writer.Write(obuf) | ||
687 | if err != nil { | ||
688 | return 0, w.err(err) | ||
689 | } | ||
690 | if n != len(obuf) { | ||
691 | return 0, w.err(io.ErrShortWrite) | ||
692 | } | ||
693 | w.err(w.index.add(w.written, w.uncompWritten)) | ||
694 | w.written += int64(n) | ||
695 | w.uncompWritten += int64(len(uncompressed)) | ||
696 | |||
697 | if chunkType == chunkTypeUncompressedData { | ||
698 | // Write uncompressed data. | ||
699 | n, err := w.writer.Write(uncompressed) | ||
700 | if err != nil { | ||
701 | return 0, w.err(err) | ||
702 | } | ||
703 | if n != len(uncompressed) { | ||
704 | return 0, w.err(io.ErrShortWrite) | ||
705 | } | ||
706 | w.written += int64(n) | ||
707 | } | ||
708 | w.buffers.Put(obuf) | ||
709 | // Queue final output. | ||
710 | nRet += len(uncompressed) | ||
711 | } | ||
712 | return nRet, nil | ||
713 | } | ||
714 | |||
715 | // Flush flushes the Writer to its underlying io.Writer. | ||
716 | // This does not apply padding. | ||
717 | func (w *Writer) Flush() error { | ||
718 | if err := w.err(nil); err != nil { | ||
719 | return err | ||
720 | } | ||
721 | |||
722 | // Queue any data still in input buffer. | ||
723 | if len(w.ibuf) != 0 { | ||
724 | if !w.wroteStreamHeader { | ||
725 | _, err := w.writeSync(w.ibuf) | ||
726 | w.ibuf = w.ibuf[:0] | ||
727 | return w.err(err) | ||
728 | } else { | ||
729 | _, err := w.write(w.ibuf) | ||
730 | w.ibuf = w.ibuf[:0] | ||
731 | err = w.err(err) | ||
732 | if err != nil { | ||
733 | return err | ||
734 | } | ||
735 | } | ||
736 | } | ||
737 | if w.output == nil { | ||
738 | return w.err(nil) | ||
739 | } | ||
740 | |||
741 | // Send empty buffer | ||
742 | res := make(chan result) | ||
743 | w.output <- res | ||
744 | // Block until this has been picked up. | ||
745 | res <- result{b: nil, startOffset: w.uncompWritten} | ||
746 | // When it is closed, we have flushed. | ||
747 | <-res | ||
748 | return w.err(nil) | ||
749 | } | ||
750 | |||
751 | // Close calls Flush and then closes the Writer. | ||
752 | // Calling Close multiple times is ok, | ||
753 | // but calling CloseIndex after this will make it not return the index. | ||
754 | func (w *Writer) Close() error { | ||
755 | _, err := w.closeIndex(w.appendIndex) | ||
756 | return err | ||
757 | } | ||
758 | |||
759 | // CloseIndex calls Close and returns an index on first call. | ||
760 | // This is not required if you are only adding index to a stream. | ||
761 | func (w *Writer) CloseIndex() ([]byte, error) { | ||
762 | return w.closeIndex(true) | ||
763 | } | ||
764 | |||
765 | func (w *Writer) closeIndex(idx bool) ([]byte, error) { | ||
766 | err := w.Flush() | ||
767 | if w.output != nil { | ||
768 | close(w.output) | ||
769 | w.writerWg.Wait() | ||
770 | w.output = nil | ||
771 | } | ||
772 | |||
773 | var index []byte | ||
774 | if w.err(err) == nil && w.writer != nil { | ||
775 | // Create index. | ||
776 | if idx { | ||
777 | compSize := int64(-1) | ||
778 | if w.pad <= 1 { | ||
779 | compSize = w.written | ||
780 | } | ||
781 | index = w.index.appendTo(w.ibuf[:0], w.uncompWritten, compSize) | ||
782 | // Count as written for padding. | ||
783 | if w.appendIndex { | ||
784 | w.written += int64(len(index)) | ||
785 | } | ||
786 | } | ||
787 | |||
788 | if w.pad > 1 { | ||
789 | tmp := w.ibuf[:0] | ||
790 | if len(index) > 0 { | ||
791 | // Allocate another buffer. | ||
792 | tmp = w.buffers.Get().([]byte)[:0] | ||
793 | defer w.buffers.Put(tmp) | ||
794 | } | ||
795 | add := calcSkippableFrame(w.written, int64(w.pad)) | ||
796 | frame, err := skippableFrame(tmp, add, w.randSrc) | ||
797 | if err = w.err(err); err != nil { | ||
798 | return nil, err | ||
799 | } | ||
800 | n, err2 := w.writer.Write(frame) | ||
801 | if err2 == nil && n != len(frame) { | ||
802 | err2 = io.ErrShortWrite | ||
803 | } | ||
804 | _ = w.err(err2) | ||
805 | } | ||
806 | if len(index) > 0 && w.appendIndex { | ||
807 | n, err2 := w.writer.Write(index) | ||
808 | if err2 == nil && n != len(index) { | ||
809 | err2 = io.ErrShortWrite | ||
810 | } | ||
811 | _ = w.err(err2) | ||
812 | } | ||
813 | } | ||
814 | err = w.err(errClosed) | ||
815 | if err == errClosed { | ||
816 | return index, nil | ||
817 | } | ||
818 | return nil, err | ||
819 | } | ||
820 | |||
821 | // calcSkippableFrame will return a total size to be added for written | ||
822 | // to be divisible by multiple. | ||
823 | // The value will always be > skippableFrameHeader. | ||
824 | // The function will panic if written < 0 or wantMultiple <= 0. | ||
825 | func calcSkippableFrame(written, wantMultiple int64) int { | ||
826 | if wantMultiple <= 0 { | ||
827 | panic("wantMultiple <= 0") | ||
828 | } | ||
829 | if written < 0 { | ||
830 | panic("written < 0") | ||
831 | } | ||
832 | leftOver := written % wantMultiple | ||
833 | if leftOver == 0 { | ||
834 | return 0 | ||
835 | } | ||
836 | toAdd := wantMultiple - leftOver | ||
837 | for toAdd < skippableFrameHeader { | ||
838 | toAdd += wantMultiple | ||
839 | } | ||
840 | return int(toAdd) | ||
841 | } | ||
842 | |||
843 | // skippableFrame will add a skippable frame with a total size of bytes. | ||
844 | // total should be >= skippableFrameHeader and < maxBlockSize + skippableFrameHeader | ||
845 | func skippableFrame(dst []byte, total int, r io.Reader) ([]byte, error) { | ||
846 | if total == 0 { | ||
847 | return dst, nil | ||
848 | } | ||
849 | if total < skippableFrameHeader { | ||
850 | return dst, fmt.Errorf("s2: requested skippable frame (%d) < 4", total) | ||
851 | } | ||
852 | if int64(total) >= maxBlockSize+skippableFrameHeader { | ||
853 | return dst, fmt.Errorf("s2: requested skippable frame (%d) >= max 1<<24", total) | ||
854 | } | ||
855 | // Chunk type 0xfe "Section 4.4 Padding (chunk type 0xfe)" | ||
856 | dst = append(dst, chunkTypePadding) | ||
857 | f := uint32(total - skippableFrameHeader) | ||
858 | // Add chunk length. | ||
859 | dst = append(dst, uint8(f), uint8(f>>8), uint8(f>>16)) | ||
860 | // Add data | ||
861 | start := len(dst) | ||
862 | dst = append(dst, make([]byte, f)...) | ||
863 | _, err := io.ReadFull(r, dst[start:]) | ||
864 | return dst, err | ||
865 | } | ||
866 | |||
867 | var errClosed = errors.New("s2: Writer is closed") | ||
868 | |||
869 | // WriterOption is an option for creating a encoder. | ||
870 | type WriterOption func(*Writer) error | ||
871 | |||
872 | // WriterConcurrency will set the concurrency, | ||
873 | // meaning the maximum number of decoders to run concurrently. | ||
874 | // The value supplied must be at least 1. | ||
875 | // By default this will be set to GOMAXPROCS. | ||
876 | func WriterConcurrency(n int) WriterOption { | ||
877 | return func(w *Writer) error { | ||
878 | if n <= 0 { | ||
879 | return errors.New("concurrency must be at least 1") | ||
880 | } | ||
881 | w.concurrency = n | ||
882 | return nil | ||
883 | } | ||
884 | } | ||
885 | |||
886 | // WriterAddIndex will append an index to the end of a stream | ||
887 | // when it is closed. | ||
888 | func WriterAddIndex() WriterOption { | ||
889 | return func(w *Writer) error { | ||
890 | w.appendIndex = true | ||
891 | return nil | ||
892 | } | ||
893 | } | ||
894 | |||
895 | // WriterBetterCompression will enable better compression. | ||
896 | // EncodeBetter compresses better than Encode but typically with a | ||
897 | // 10-40% speed decrease on both compression and decompression. | ||
898 | func WriterBetterCompression() WriterOption { | ||
899 | return func(w *Writer) error { | ||
900 | w.level = levelBetter | ||
901 | return nil | ||
902 | } | ||
903 | } | ||
904 | |||
905 | // WriterBestCompression will enable better compression. | ||
906 | // EncodeBetter compresses better than Encode but typically with a | ||
907 | // big speed decrease on compression. | ||
908 | func WriterBestCompression() WriterOption { | ||
909 | return func(w *Writer) error { | ||
910 | w.level = levelBest | ||
911 | return nil | ||
912 | } | ||
913 | } | ||
914 | |||
915 | // WriterUncompressed will bypass compression. | ||
916 | // The stream will be written as uncompressed blocks only. | ||
917 | // If concurrency is > 1 CRC and output will still be done async. | ||
918 | func WriterUncompressed() WriterOption { | ||
919 | return func(w *Writer) error { | ||
920 | w.level = levelUncompressed | ||
921 | return nil | ||
922 | } | ||
923 | } | ||
924 | |||
925 | // WriterBlockSize allows to override the default block size. | ||
926 | // Blocks will be this size or smaller. | ||
927 | // Minimum size is 4KB and and maximum size is 4MB. | ||
928 | // | ||
929 | // Bigger blocks may give bigger throughput on systems with many cores, | ||
930 | // and will increase compression slightly, but it will limit the possible | ||
931 | // concurrency for smaller payloads for both encoding and decoding. | ||
932 | // Default block size is 1MB. | ||
933 | // | ||
934 | // When writing Snappy compatible output using WriterSnappyCompat, | ||
935 | // the maximum block size is 64KB. | ||
936 | func WriterBlockSize(n int) WriterOption { | ||
937 | return func(w *Writer) error { | ||
938 | if w.snappy && n > maxSnappyBlockSize || n < minBlockSize { | ||
939 | return errors.New("s2: block size too large. Must be <= 64K and >=4KB on for snappy compatible output") | ||
940 | } | ||
941 | if n > maxBlockSize || n < minBlockSize { | ||
942 | return errors.New("s2: block size too large. Must be <= 4MB and >=4KB") | ||
943 | } | ||
944 | w.blockSize = n | ||
945 | return nil | ||
946 | } | ||
947 | } | ||
948 | |||
949 | // WriterPadding will add padding to all output so the size will be a multiple of n. | ||
950 | // This can be used to obfuscate the exact output size or make blocks of a certain size. | ||
951 | // The contents will be a skippable frame, so it will be invisible by the decoder. | ||
952 | // n must be > 0 and <= 4MB. | ||
953 | // The padded area will be filled with data from crypto/rand.Reader. | ||
954 | // The padding will be applied whenever Close is called on the writer. | ||
955 | func WriterPadding(n int) WriterOption { | ||
956 | return func(w *Writer) error { | ||
957 | if n <= 0 { | ||
958 | return fmt.Errorf("s2: padding must be at least 1") | ||
959 | } | ||
960 | // No need to waste our time. | ||
961 | if n == 1 { | ||
962 | w.pad = 0 | ||
963 | } | ||
964 | if n > maxBlockSize { | ||
965 | return fmt.Errorf("s2: padding must less than 4MB") | ||
966 | } | ||
967 | w.pad = n | ||
968 | return nil | ||
969 | } | ||
970 | } | ||
971 | |||
972 | // WriterPaddingSrc will get random data for padding from the supplied source. | ||
973 | // By default crypto/rand is used. | ||
974 | func WriterPaddingSrc(reader io.Reader) WriterOption { | ||
975 | return func(w *Writer) error { | ||
976 | w.randSrc = reader | ||
977 | return nil | ||
978 | } | ||
979 | } | ||
980 | |||
981 | // WriterSnappyCompat will write snappy compatible output. | ||
982 | // The output can be decompressed using either snappy or s2. | ||
983 | // If block size is more than 64KB it is set to that. | ||
984 | func WriterSnappyCompat() WriterOption { | ||
985 | return func(w *Writer) error { | ||
986 | w.snappy = true | ||
987 | if w.blockSize > 64<<10 { | ||
988 | // We choose 8 bytes less than 64K, since that will make literal emits slightly more effective. | ||
989 | // And allows us to skip some size checks. | ||
990 | w.blockSize = (64 << 10) - 8 | ||
991 | } | ||
992 | return nil | ||
993 | } | ||
994 | } | ||
995 | |||
996 | // WriterFlushOnWrite will compress blocks on each call to the Write function. | ||
997 | // | ||
998 | // This is quite inefficient as blocks size will depend on the write size. | ||
999 | // | ||
1000 | // Use WriterConcurrency(1) to also make sure that output is flushed. | ||
1001 | // When Write calls return, otherwise they will be written when compression is done. | ||
1002 | func WriterFlushOnWrite() WriterOption { | ||
1003 | return func(w *Writer) error { | ||
1004 | w.flushOnWrite = true | ||
1005 | return nil | ||
1006 | } | ||
1007 | } | ||
1008 | |||
1009 | // WriterCustomEncoder allows to override the encoder for blocks on the stream. | ||
1010 | // The function must compress 'src' into 'dst' and return the bytes used in dst as an integer. | ||
1011 | // Block size (initial varint) should not be added by the encoder. | ||
1012 | // Returning value 0 indicates the block could not be compressed. | ||
1013 | // Returning a negative value indicates that compression should be attempted. | ||
1014 | // The function should expect to be called concurrently. | ||
1015 | func WriterCustomEncoder(fn func(dst, src []byte) int) WriterOption { | ||
1016 | return func(w *Writer) error { | ||
1017 | w.customEnc = fn | ||
1018 | return nil | ||
1019 | } | ||
1020 | } | ||