diff options
Diffstat (limited to 'vendor/github.com/klauspost/compress/s2/writer.go')
| -rw-r--r-- | vendor/github.com/klauspost/compress/s2/writer.go | 1020 |
1 files changed, 1020 insertions, 0 deletions
diff --git a/vendor/github.com/klauspost/compress/s2/writer.go b/vendor/github.com/klauspost/compress/s2/writer.go new file mode 100644 index 0000000..089cd36 --- /dev/null +++ b/vendor/github.com/klauspost/compress/s2/writer.go | |||
| @@ -0,0 +1,1020 @@ | |||
| 1 | // Copyright 2011 The Snappy-Go Authors. All rights reserved. | ||
| 2 | // Copyright (c) 2019+ Klaus Post. All rights reserved. | ||
| 3 | // Use of this source code is governed by a BSD-style | ||
| 4 | // license that can be found in the LICENSE file. | ||
| 5 | |||
| 6 | package s2 | ||
| 7 | |||
| 8 | import ( | ||
| 9 | "crypto/rand" | ||
| 10 | "encoding/binary" | ||
| 11 | "errors" | ||
| 12 | "fmt" | ||
| 13 | "io" | ||
| 14 | "runtime" | ||
| 15 | "sync" | ||
| 16 | ) | ||
| 17 | |||
| 18 | const ( | ||
| 19 | levelUncompressed = iota + 1 | ||
| 20 | levelFast | ||
| 21 | levelBetter | ||
| 22 | levelBest | ||
| 23 | ) | ||
| 24 | |||
| 25 | // NewWriter returns a new Writer that compresses to w, using the | ||
| 26 | // framing format described at | ||
| 27 | // https://github.com/google/snappy/blob/master/framing_format.txt | ||
| 28 | // | ||
| 29 | // Users must call Close to guarantee all data has been forwarded to | ||
| 30 | // the underlying io.Writer and that resources are released. | ||
| 31 | // They may also call Flush zero or more times before calling Close. | ||
| 32 | func NewWriter(w io.Writer, opts ...WriterOption) *Writer { | ||
| 33 | w2 := Writer{ | ||
| 34 | blockSize: defaultBlockSize, | ||
| 35 | concurrency: runtime.GOMAXPROCS(0), | ||
| 36 | randSrc: rand.Reader, | ||
| 37 | level: levelFast, | ||
| 38 | } | ||
| 39 | for _, opt := range opts { | ||
| 40 | if err := opt(&w2); err != nil { | ||
| 41 | w2.errState = err | ||
| 42 | return &w2 | ||
| 43 | } | ||
| 44 | } | ||
| 45 | w2.obufLen = obufHeaderLen + MaxEncodedLen(w2.blockSize) | ||
| 46 | w2.paramsOK = true | ||
| 47 | w2.ibuf = make([]byte, 0, w2.blockSize) | ||
| 48 | w2.buffers.New = func() interface{} { | ||
| 49 | return make([]byte, w2.obufLen) | ||
| 50 | } | ||
| 51 | w2.Reset(w) | ||
| 52 | return &w2 | ||
| 53 | } | ||
| 54 | |||
| 55 | // Writer is an io.Writer that can write Snappy-compressed bytes. | ||
| 56 | type Writer struct { | ||
| 57 | errMu sync.Mutex | ||
| 58 | errState error | ||
| 59 | |||
| 60 | // ibuf is a buffer for the incoming (uncompressed) bytes. | ||
| 61 | ibuf []byte | ||
| 62 | |||
| 63 | blockSize int | ||
| 64 | obufLen int | ||
| 65 | concurrency int | ||
| 66 | written int64 | ||
| 67 | uncompWritten int64 // Bytes sent to compression | ||
| 68 | output chan chan result | ||
| 69 | buffers sync.Pool | ||
| 70 | pad int | ||
| 71 | |||
| 72 | writer io.Writer | ||
| 73 | randSrc io.Reader | ||
| 74 | writerWg sync.WaitGroup | ||
| 75 | index Index | ||
| 76 | customEnc func(dst, src []byte) int | ||
| 77 | |||
| 78 | // wroteStreamHeader is whether we have written the stream header. | ||
| 79 | wroteStreamHeader bool | ||
| 80 | paramsOK bool | ||
| 81 | snappy bool | ||
| 82 | flushOnWrite bool | ||
| 83 | appendIndex bool | ||
| 84 | level uint8 | ||
| 85 | } | ||
| 86 | |||
| 87 | type result struct { | ||
| 88 | b []byte | ||
| 89 | // Uncompressed start offset | ||
| 90 | startOffset int64 | ||
| 91 | } | ||
| 92 | |||
| 93 | // err returns the previously set error. | ||
| 94 | // If no error has been set it is set to err if not nil. | ||
| 95 | func (w *Writer) err(err error) error { | ||
| 96 | w.errMu.Lock() | ||
| 97 | errSet := w.errState | ||
| 98 | if errSet == nil && err != nil { | ||
| 99 | w.errState = err | ||
| 100 | errSet = err | ||
| 101 | } | ||
| 102 | w.errMu.Unlock() | ||
| 103 | return errSet | ||
| 104 | } | ||
| 105 | |||
| 106 | // Reset discards the writer's state and switches the Snappy writer to write to w. | ||
| 107 | // This permits reusing a Writer rather than allocating a new one. | ||
| 108 | func (w *Writer) Reset(writer io.Writer) { | ||
| 109 | if !w.paramsOK { | ||
| 110 | return | ||
| 111 | } | ||
| 112 | // Close previous writer, if any. | ||
| 113 | if w.output != nil { | ||
| 114 | close(w.output) | ||
| 115 | w.writerWg.Wait() | ||
| 116 | w.output = nil | ||
| 117 | } | ||
| 118 | w.errState = nil | ||
| 119 | w.ibuf = w.ibuf[:0] | ||
| 120 | w.wroteStreamHeader = false | ||
| 121 | w.written = 0 | ||
| 122 | w.writer = writer | ||
| 123 | w.uncompWritten = 0 | ||
| 124 | w.index.reset(w.blockSize) | ||
| 125 | |||
| 126 | // If we didn't get a writer, stop here. | ||
| 127 | if writer == nil { | ||
| 128 | return | ||
| 129 | } | ||
| 130 | // If no concurrency requested, don't spin up writer goroutine. | ||
| 131 | if w.concurrency == 1 { | ||
| 132 | return | ||
| 133 | } | ||
| 134 | |||
| 135 | toWrite := make(chan chan result, w.concurrency) | ||
| 136 | w.output = toWrite | ||
| 137 | w.writerWg.Add(1) | ||
| 138 | |||
| 139 | // Start a writer goroutine that will write all output in order. | ||
| 140 | go func() { | ||
| 141 | defer w.writerWg.Done() | ||
| 142 | |||
| 143 | // Get a queued write. | ||
| 144 | for write := range toWrite { | ||
| 145 | // Wait for the data to be available. | ||
| 146 | input := <-write | ||
| 147 | in := input.b | ||
| 148 | if len(in) > 0 { | ||
| 149 | if w.err(nil) == nil { | ||
| 150 | // Don't expose data from previous buffers. | ||
| 151 | toWrite := in[:len(in):len(in)] | ||
| 152 | // Write to output. | ||
| 153 | n, err := writer.Write(toWrite) | ||
| 154 | if err == nil && n != len(toWrite) { | ||
| 155 | err = io.ErrShortBuffer | ||
| 156 | } | ||
| 157 | _ = w.err(err) | ||
| 158 | w.err(w.index.add(w.written, input.startOffset)) | ||
| 159 | w.written += int64(n) | ||
| 160 | } | ||
| 161 | } | ||
| 162 | if cap(in) >= w.obufLen { | ||
| 163 | w.buffers.Put(in) | ||
| 164 | } | ||
| 165 | // close the incoming write request. | ||
| 166 | // This can be used for synchronizing flushes. | ||
| 167 | close(write) | ||
| 168 | } | ||
| 169 | }() | ||
| 170 | } | ||
| 171 | |||
| 172 | // Write satisfies the io.Writer interface. | ||
| 173 | func (w *Writer) Write(p []byte) (nRet int, errRet error) { | ||
| 174 | if err := w.err(nil); err != nil { | ||
| 175 | return 0, err | ||
| 176 | } | ||
| 177 | if w.flushOnWrite { | ||
| 178 | return w.write(p) | ||
| 179 | } | ||
| 180 | // If we exceed the input buffer size, start writing | ||
| 181 | for len(p) > (cap(w.ibuf)-len(w.ibuf)) && w.err(nil) == nil { | ||
| 182 | var n int | ||
| 183 | if len(w.ibuf) == 0 { | ||
| 184 | // Large write, empty buffer. | ||
| 185 | // Write directly from p to avoid copy. | ||
| 186 | n, _ = w.write(p) | ||
| 187 | } else { | ||
| 188 | n = copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p) | ||
| 189 | w.ibuf = w.ibuf[:len(w.ibuf)+n] | ||
| 190 | w.write(w.ibuf) | ||
| 191 | w.ibuf = w.ibuf[:0] | ||
| 192 | } | ||
| 193 | nRet += n | ||
| 194 | p = p[n:] | ||
| 195 | } | ||
| 196 | if err := w.err(nil); err != nil { | ||
| 197 | return nRet, err | ||
| 198 | } | ||
| 199 | // p should always be able to fit into w.ibuf now. | ||
| 200 | n := copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p) | ||
| 201 | w.ibuf = w.ibuf[:len(w.ibuf)+n] | ||
| 202 | nRet += n | ||
| 203 | return nRet, nil | ||
| 204 | } | ||
| 205 | |||
| 206 | // ReadFrom implements the io.ReaderFrom interface. | ||
| 207 | // Using this is typically more efficient since it avoids a memory copy. | ||
| 208 | // ReadFrom reads data from r until EOF or error. | ||
| 209 | // The return value n is the number of bytes read. | ||
| 210 | // Any error except io.EOF encountered during the read is also returned. | ||
| 211 | func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) { | ||
| 212 | if err := w.err(nil); err != nil { | ||
| 213 | return 0, err | ||
| 214 | } | ||
| 215 | if len(w.ibuf) > 0 { | ||
| 216 | err := w.Flush() | ||
| 217 | if err != nil { | ||
| 218 | return 0, err | ||
| 219 | } | ||
| 220 | } | ||
| 221 | if br, ok := r.(byter); ok { | ||
| 222 | buf := br.Bytes() | ||
| 223 | if err := w.EncodeBuffer(buf); err != nil { | ||
| 224 | return 0, err | ||
| 225 | } | ||
| 226 | return int64(len(buf)), w.Flush() | ||
| 227 | } | ||
| 228 | for { | ||
| 229 | inbuf := w.buffers.Get().([]byte)[:w.blockSize+obufHeaderLen] | ||
| 230 | n2, err := io.ReadFull(r, inbuf[obufHeaderLen:]) | ||
| 231 | if err != nil { | ||
| 232 | if err == io.ErrUnexpectedEOF { | ||
| 233 | err = io.EOF | ||
| 234 | } | ||
| 235 | if err != io.EOF { | ||
| 236 | return n, w.err(err) | ||
| 237 | } | ||
| 238 | } | ||
| 239 | if n2 == 0 { | ||
| 240 | break | ||
| 241 | } | ||
| 242 | n += int64(n2) | ||
| 243 | err2 := w.writeFull(inbuf[:n2+obufHeaderLen]) | ||
| 244 | if w.err(err2) != nil { | ||
| 245 | break | ||
| 246 | } | ||
| 247 | |||
| 248 | if err != nil { | ||
| 249 | // We got EOF and wrote everything | ||
| 250 | break | ||
| 251 | } | ||
| 252 | } | ||
| 253 | |||
| 254 | return n, w.err(nil) | ||
| 255 | } | ||
| 256 | |||
| 257 | // AddSkippableBlock will add a skippable block to the stream. | ||
| 258 | // The ID must be 0x80-0xfe (inclusive). | ||
| 259 | // Length of the skippable block must be <= 16777215 bytes. | ||
| 260 | func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) { | ||
| 261 | if err := w.err(nil); err != nil { | ||
| 262 | return err | ||
| 263 | } | ||
| 264 | if len(data) == 0 { | ||
| 265 | return nil | ||
| 266 | } | ||
| 267 | if id < 0x80 || id > chunkTypePadding { | ||
| 268 | return fmt.Errorf("invalid skippable block id %x", id) | ||
| 269 | } | ||
| 270 | if len(data) > maxChunkSize { | ||
| 271 | return fmt.Errorf("skippable block excessed maximum size") | ||
| 272 | } | ||
| 273 | var header [4]byte | ||
| 274 | chunkLen := 4 + len(data) | ||
| 275 | header[0] = id | ||
| 276 | header[1] = uint8(chunkLen >> 0) | ||
| 277 | header[2] = uint8(chunkLen >> 8) | ||
| 278 | header[3] = uint8(chunkLen >> 16) | ||
| 279 | if w.concurrency == 1 { | ||
| 280 | write := func(b []byte) error { | ||
| 281 | n, err := w.writer.Write(b) | ||
| 282 | if err = w.err(err); err != nil { | ||
| 283 | return err | ||
| 284 | } | ||
| 285 | if n != len(data) { | ||
| 286 | return w.err(io.ErrShortWrite) | ||
| 287 | } | ||
| 288 | w.written += int64(n) | ||
| 289 | return w.err(nil) | ||
| 290 | } | ||
| 291 | if !w.wroteStreamHeader { | ||
| 292 | w.wroteStreamHeader = true | ||
| 293 | if w.snappy { | ||
| 294 | if err := write([]byte(magicChunkSnappy)); err != nil { | ||
| 295 | return err | ||
| 296 | } | ||
| 297 | } else { | ||
| 298 | if err := write([]byte(magicChunk)); err != nil { | ||
| 299 | return err | ||
| 300 | } | ||
| 301 | } | ||
| 302 | } | ||
| 303 | if err := write(header[:]); err != nil { | ||
| 304 | return err | ||
| 305 | } | ||
| 306 | if err := write(data); err != nil { | ||
| 307 | return err | ||
| 308 | } | ||
| 309 | } | ||
| 310 | |||
| 311 | // Create output... | ||
| 312 | if !w.wroteStreamHeader { | ||
| 313 | w.wroteStreamHeader = true | ||
| 314 | hWriter := make(chan result) | ||
| 315 | w.output <- hWriter | ||
| 316 | if w.snappy { | ||
| 317 | hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} | ||
| 318 | } else { | ||
| 319 | hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} | ||
| 320 | } | ||
| 321 | } | ||
| 322 | |||
| 323 | // Copy input. | ||
| 324 | inbuf := w.buffers.Get().([]byte)[:4] | ||
| 325 | copy(inbuf, header[:]) | ||
| 326 | inbuf = append(inbuf, data...) | ||
| 327 | |||
| 328 | output := make(chan result, 1) | ||
| 329 | // Queue output. | ||
| 330 | w.output <- output | ||
| 331 | output <- result{startOffset: w.uncompWritten, b: inbuf} | ||
| 332 | |||
| 333 | return nil | ||
| 334 | } | ||
| 335 | |||
| 336 | // EncodeBuffer will add a buffer to the stream. | ||
| 337 | // This is the fastest way to encode a stream, | ||
| 338 | // but the input buffer cannot be written to by the caller | ||
| 339 | // until Flush or Close has been called when concurrency != 1. | ||
| 340 | // | ||
| 341 | // If you cannot control that, use the regular Write function. | ||
| 342 | // | ||
| 343 | // Note that input is not buffered. | ||
| 344 | // This means that each write will result in discrete blocks being created. | ||
| 345 | // For buffered writes, use the regular Write function. | ||
| 346 | func (w *Writer) EncodeBuffer(buf []byte) (err error) { | ||
| 347 | if err := w.err(nil); err != nil { | ||
| 348 | return err | ||
| 349 | } | ||
| 350 | |||
| 351 | if w.flushOnWrite { | ||
| 352 | _, err := w.write(buf) | ||
| 353 | return err | ||
| 354 | } | ||
| 355 | // Flush queued data first. | ||
| 356 | if len(w.ibuf) > 0 { | ||
| 357 | err := w.Flush() | ||
| 358 | if err != nil { | ||
| 359 | return err | ||
| 360 | } | ||
| 361 | } | ||
| 362 | if w.concurrency == 1 { | ||
| 363 | _, err := w.writeSync(buf) | ||
| 364 | return err | ||
| 365 | } | ||
| 366 | |||
| 367 | // Spawn goroutine and write block to output channel. | ||
| 368 | if !w.wroteStreamHeader { | ||
| 369 | w.wroteStreamHeader = true | ||
| 370 | hWriter := make(chan result) | ||
| 371 | w.output <- hWriter | ||
| 372 | if w.snappy { | ||
| 373 | hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} | ||
| 374 | } else { | ||
| 375 | hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} | ||
| 376 | } | ||
| 377 | } | ||
| 378 | |||
| 379 | for len(buf) > 0 { | ||
| 380 | // Cut input. | ||
| 381 | uncompressed := buf | ||
| 382 | if len(uncompressed) > w.blockSize { | ||
| 383 | uncompressed = uncompressed[:w.blockSize] | ||
| 384 | } | ||
| 385 | buf = buf[len(uncompressed):] | ||
| 386 | // Get an output buffer. | ||
| 387 | obuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen] | ||
| 388 | output := make(chan result) | ||
| 389 | // Queue output now, so we keep order. | ||
| 390 | w.output <- output | ||
| 391 | res := result{ | ||
| 392 | startOffset: w.uncompWritten, | ||
| 393 | } | ||
| 394 | w.uncompWritten += int64(len(uncompressed)) | ||
| 395 | go func() { | ||
| 396 | checksum := crc(uncompressed) | ||
| 397 | |||
| 398 | // Set to uncompressed. | ||
| 399 | chunkType := uint8(chunkTypeUncompressedData) | ||
| 400 | chunkLen := 4 + len(uncompressed) | ||
| 401 | |||
| 402 | // Attempt compressing. | ||
| 403 | n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed))) | ||
| 404 | n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed) | ||
| 405 | |||
| 406 | // Check if we should use this, or store as uncompressed instead. | ||
| 407 | if n2 > 0 { | ||
| 408 | chunkType = uint8(chunkTypeCompressedData) | ||
| 409 | chunkLen = 4 + n + n2 | ||
| 410 | obuf = obuf[:obufHeaderLen+n+n2] | ||
| 411 | } else { | ||
| 412 | // copy uncompressed | ||
| 413 | copy(obuf[obufHeaderLen:], uncompressed) | ||
| 414 | } | ||
| 415 | |||
| 416 | // Fill in the per-chunk header that comes before the body. | ||
| 417 | obuf[0] = chunkType | ||
| 418 | obuf[1] = uint8(chunkLen >> 0) | ||
| 419 | obuf[2] = uint8(chunkLen >> 8) | ||
| 420 | obuf[3] = uint8(chunkLen >> 16) | ||
| 421 | obuf[4] = uint8(checksum >> 0) | ||
| 422 | obuf[5] = uint8(checksum >> 8) | ||
| 423 | obuf[6] = uint8(checksum >> 16) | ||
| 424 | obuf[7] = uint8(checksum >> 24) | ||
| 425 | |||
| 426 | // Queue final output. | ||
| 427 | res.b = obuf | ||
| 428 | output <- res | ||
| 429 | }() | ||
| 430 | } | ||
| 431 | return nil | ||
| 432 | } | ||
| 433 | |||
| 434 | func (w *Writer) encodeBlock(obuf, uncompressed []byte) int { | ||
| 435 | if w.customEnc != nil { | ||
| 436 | if ret := w.customEnc(obuf, uncompressed); ret >= 0 { | ||
| 437 | return ret | ||
| 438 | } | ||
| 439 | } | ||
| 440 | if w.snappy { | ||
| 441 | switch w.level { | ||
| 442 | case levelFast: | ||
| 443 | return encodeBlockSnappy(obuf, uncompressed) | ||
| 444 | case levelBetter: | ||
| 445 | return encodeBlockBetterSnappy(obuf, uncompressed) | ||
| 446 | case levelBest: | ||
| 447 | return encodeBlockBestSnappy(obuf, uncompressed) | ||
| 448 | } | ||
| 449 | return 0 | ||
| 450 | } | ||
| 451 | switch w.level { | ||
| 452 | case levelFast: | ||
| 453 | return encodeBlock(obuf, uncompressed) | ||
| 454 | case levelBetter: | ||
| 455 | return encodeBlockBetter(obuf, uncompressed) | ||
| 456 | case levelBest: | ||
| 457 | return encodeBlockBest(obuf, uncompressed, nil) | ||
| 458 | } | ||
| 459 | return 0 | ||
| 460 | } | ||
| 461 | |||
| 462 | func (w *Writer) write(p []byte) (nRet int, errRet error) { | ||
| 463 | if err := w.err(nil); err != nil { | ||
| 464 | return 0, err | ||
| 465 | } | ||
| 466 | if w.concurrency == 1 { | ||
| 467 | return w.writeSync(p) | ||
| 468 | } | ||
| 469 | |||
| 470 | // Spawn goroutine and write block to output channel. | ||
| 471 | for len(p) > 0 { | ||
| 472 | if !w.wroteStreamHeader { | ||
| 473 | w.wroteStreamHeader = true | ||
| 474 | hWriter := make(chan result) | ||
| 475 | w.output <- hWriter | ||
| 476 | if w.snappy { | ||
| 477 | hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} | ||
| 478 | } else { | ||
| 479 | hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} | ||
| 480 | } | ||
| 481 | } | ||
| 482 | |||
| 483 | var uncompressed []byte | ||
| 484 | if len(p) > w.blockSize { | ||
| 485 | uncompressed, p = p[:w.blockSize], p[w.blockSize:] | ||
| 486 | } else { | ||
| 487 | uncompressed, p = p, nil | ||
| 488 | } | ||
| 489 | |||
| 490 | // Copy input. | ||
| 491 | // If the block is incompressible, this is used for the result. | ||
| 492 | inbuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen] | ||
| 493 | obuf := w.buffers.Get().([]byte)[:w.obufLen] | ||
| 494 | copy(inbuf[obufHeaderLen:], uncompressed) | ||
| 495 | uncompressed = inbuf[obufHeaderLen:] | ||
| 496 | |||
| 497 | output := make(chan result) | ||
| 498 | // Queue output now, so we keep order. | ||
| 499 | w.output <- output | ||
| 500 | res := result{ | ||
| 501 | startOffset: w.uncompWritten, | ||
| 502 | } | ||
| 503 | w.uncompWritten += int64(len(uncompressed)) | ||
| 504 | |||
| 505 | go func() { | ||
| 506 | checksum := crc(uncompressed) | ||
| 507 | |||
| 508 | // Set to uncompressed. | ||
| 509 | chunkType := uint8(chunkTypeUncompressedData) | ||
| 510 | chunkLen := 4 + len(uncompressed) | ||
| 511 | |||
| 512 | // Attempt compressing. | ||
| 513 | n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed))) | ||
| 514 | n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed) | ||
| 515 | |||
| 516 | // Check if we should use this, or store as uncompressed instead. | ||
| 517 | if n2 > 0 { | ||
| 518 | chunkType = uint8(chunkTypeCompressedData) | ||
| 519 | chunkLen = 4 + n + n2 | ||
| 520 | obuf = obuf[:obufHeaderLen+n+n2] | ||
| 521 | } else { | ||
| 522 | // Use input as output. | ||
| 523 | obuf, inbuf = inbuf, obuf | ||
| 524 | } | ||
| 525 | |||
| 526 | // Fill in the per-chunk header that comes before the body. | ||
| 527 | obuf[0] = chunkType | ||
| 528 | obuf[1] = uint8(chunkLen >> 0) | ||
| 529 | obuf[2] = uint8(chunkLen >> 8) | ||
| 530 | obuf[3] = uint8(chunkLen >> 16) | ||
| 531 | obuf[4] = uint8(checksum >> 0) | ||
| 532 | obuf[5] = uint8(checksum >> 8) | ||
| 533 | obuf[6] = uint8(checksum >> 16) | ||
| 534 | obuf[7] = uint8(checksum >> 24) | ||
| 535 | |||
| 536 | // Queue final output. | ||
| 537 | res.b = obuf | ||
| 538 | output <- res | ||
| 539 | |||
| 540 | // Put unused buffer back in pool. | ||
| 541 | w.buffers.Put(inbuf) | ||
| 542 | }() | ||
| 543 | nRet += len(uncompressed) | ||
| 544 | } | ||
| 545 | return nRet, nil | ||
| 546 | } | ||
| 547 | |||
| 548 | // writeFull is a special version of write that will always write the full buffer. | ||
| 549 | // Data to be compressed should start at offset obufHeaderLen and fill the remainder of the buffer. | ||
| 550 | // The data will be written as a single block. | ||
| 551 | // The caller is not allowed to use inbuf after this function has been called. | ||
| 552 | func (w *Writer) writeFull(inbuf []byte) (errRet error) { | ||
| 553 | if err := w.err(nil); err != nil { | ||
| 554 | return err | ||
| 555 | } | ||
| 556 | |||
| 557 | if w.concurrency == 1 { | ||
| 558 | _, err := w.writeSync(inbuf[obufHeaderLen:]) | ||
| 559 | return err | ||
| 560 | } | ||
| 561 | |||
| 562 | // Spawn goroutine and write block to output channel. | ||
| 563 | if !w.wroteStreamHeader { | ||
| 564 | w.wroteStreamHeader = true | ||
| 565 | hWriter := make(chan result) | ||
| 566 | w.output <- hWriter | ||
| 567 | if w.snappy { | ||
| 568 | hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} | ||
| 569 | } else { | ||
| 570 | hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} | ||
| 571 | } | ||
| 572 | } | ||
| 573 | |||
| 574 | // Get an output buffer. | ||
| 575 | obuf := w.buffers.Get().([]byte)[:w.obufLen] | ||
| 576 | uncompressed := inbuf[obufHeaderLen:] | ||
| 577 | |||
| 578 | output := make(chan result) | ||
| 579 | // Queue output now, so we keep order. | ||
| 580 | w.output <- output | ||
| 581 | res := result{ | ||
| 582 | startOffset: w.uncompWritten, | ||
| 583 | } | ||
| 584 | w.uncompWritten += int64(len(uncompressed)) | ||
| 585 | |||
| 586 | go func() { | ||
| 587 | checksum := crc(uncompressed) | ||
| 588 | |||
| 589 | // Set to uncompressed. | ||
| 590 | chunkType := uint8(chunkTypeUncompressedData) | ||
| 591 | chunkLen := 4 + len(uncompressed) | ||
| 592 | |||
| 593 | // Attempt compressing. | ||
| 594 | n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed))) | ||
| 595 | n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed) | ||
| 596 | |||
| 597 | // Check if we should use this, or store as uncompressed instead. | ||
| 598 | if n2 > 0 { | ||
| 599 | chunkType = uint8(chunkTypeCompressedData) | ||
| 600 | chunkLen = 4 + n + n2 | ||
| 601 | obuf = obuf[:obufHeaderLen+n+n2] | ||
| 602 | } else { | ||
| 603 | // Use input as output. | ||
| 604 | obuf, inbuf = inbuf, obuf | ||
| 605 | } | ||
| 606 | |||
| 607 | // Fill in the per-chunk header that comes before the body. | ||
| 608 | obuf[0] = chunkType | ||
| 609 | obuf[1] = uint8(chunkLen >> 0) | ||
| 610 | obuf[2] = uint8(chunkLen >> 8) | ||
| 611 | obuf[3] = uint8(chunkLen >> 16) | ||
| 612 | obuf[4] = uint8(checksum >> 0) | ||
| 613 | obuf[5] = uint8(checksum >> 8) | ||
| 614 | obuf[6] = uint8(checksum >> 16) | ||
| 615 | obuf[7] = uint8(checksum >> 24) | ||
| 616 | |||
| 617 | // Queue final output. | ||
| 618 | res.b = obuf | ||
| 619 | output <- res | ||
| 620 | |||
| 621 | // Put unused buffer back in pool. | ||
| 622 | w.buffers.Put(inbuf) | ||
| 623 | }() | ||
| 624 | return nil | ||
| 625 | } | ||
| 626 | |||
| 627 | func (w *Writer) writeSync(p []byte) (nRet int, errRet error) { | ||
| 628 | if err := w.err(nil); err != nil { | ||
| 629 | return 0, err | ||
| 630 | } | ||
| 631 | if !w.wroteStreamHeader { | ||
| 632 | w.wroteStreamHeader = true | ||
| 633 | var n int | ||
| 634 | var err error | ||
| 635 | if w.snappy { | ||
| 636 | n, err = w.writer.Write([]byte(magicChunkSnappy)) | ||
| 637 | } else { | ||
| 638 | n, err = w.writer.Write([]byte(magicChunk)) | ||
| 639 | } | ||
| 640 | if err != nil { | ||
| 641 | return 0, w.err(err) | ||
| 642 | } | ||
| 643 | if n != len(magicChunk) { | ||
| 644 | return 0, w.err(io.ErrShortWrite) | ||
| 645 | } | ||
| 646 | w.written += int64(n) | ||
| 647 | } | ||
| 648 | |||
| 649 | for len(p) > 0 { | ||
| 650 | var uncompressed []byte | ||
| 651 | if len(p) > w.blockSize { | ||
| 652 | uncompressed, p = p[:w.blockSize], p[w.blockSize:] | ||
| 653 | } else { | ||
| 654 | uncompressed, p = p, nil | ||
| 655 | } | ||
| 656 | |||
| 657 | obuf := w.buffers.Get().([]byte)[:w.obufLen] | ||
| 658 | checksum := crc(uncompressed) | ||
| 659 | |||
| 660 | // Set to uncompressed. | ||
| 661 | chunkType := uint8(chunkTypeUncompressedData) | ||
| 662 | chunkLen := 4 + len(uncompressed) | ||
| 663 | |||
| 664 | // Attempt compressing. | ||
| 665 | n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed))) | ||
| 666 | n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed) | ||
| 667 | |||
| 668 | if n2 > 0 { | ||
| 669 | chunkType = uint8(chunkTypeCompressedData) | ||
| 670 | chunkLen = 4 + n + n2 | ||
| 671 | obuf = obuf[:obufHeaderLen+n+n2] | ||
| 672 | } else { | ||
| 673 | obuf = obuf[:8] | ||
| 674 | } | ||
| 675 | |||
| 676 | // Fill in the per-chunk header that comes before the body. | ||
| 677 | obuf[0] = chunkType | ||
| 678 | obuf[1] = uint8(chunkLen >> 0) | ||
| 679 | obuf[2] = uint8(chunkLen >> 8) | ||
| 680 | obuf[3] = uint8(chunkLen >> 16) | ||
| 681 | obuf[4] = uint8(checksum >> 0) | ||
| 682 | obuf[5] = uint8(checksum >> 8) | ||
| 683 | obuf[6] = uint8(checksum >> 16) | ||
| 684 | obuf[7] = uint8(checksum >> 24) | ||
| 685 | |||
| 686 | n, err := w.writer.Write(obuf) | ||
| 687 | if err != nil { | ||
| 688 | return 0, w.err(err) | ||
| 689 | } | ||
| 690 | if n != len(obuf) { | ||
| 691 | return 0, w.err(io.ErrShortWrite) | ||
| 692 | } | ||
| 693 | w.err(w.index.add(w.written, w.uncompWritten)) | ||
| 694 | w.written += int64(n) | ||
| 695 | w.uncompWritten += int64(len(uncompressed)) | ||
| 696 | |||
| 697 | if chunkType == chunkTypeUncompressedData { | ||
| 698 | // Write uncompressed data. | ||
| 699 | n, err := w.writer.Write(uncompressed) | ||
| 700 | if err != nil { | ||
| 701 | return 0, w.err(err) | ||
| 702 | } | ||
| 703 | if n != len(uncompressed) { | ||
| 704 | return 0, w.err(io.ErrShortWrite) | ||
| 705 | } | ||
| 706 | w.written += int64(n) | ||
| 707 | } | ||
| 708 | w.buffers.Put(obuf) | ||
| 709 | // Queue final output. | ||
| 710 | nRet += len(uncompressed) | ||
| 711 | } | ||
| 712 | return nRet, nil | ||
| 713 | } | ||
| 714 | |||
| 715 | // Flush flushes the Writer to its underlying io.Writer. | ||
| 716 | // This does not apply padding. | ||
| 717 | func (w *Writer) Flush() error { | ||
| 718 | if err := w.err(nil); err != nil { | ||
| 719 | return err | ||
| 720 | } | ||
| 721 | |||
| 722 | // Queue any data still in input buffer. | ||
| 723 | if len(w.ibuf) != 0 { | ||
| 724 | if !w.wroteStreamHeader { | ||
| 725 | _, err := w.writeSync(w.ibuf) | ||
| 726 | w.ibuf = w.ibuf[:0] | ||
| 727 | return w.err(err) | ||
| 728 | } else { | ||
| 729 | _, err := w.write(w.ibuf) | ||
| 730 | w.ibuf = w.ibuf[:0] | ||
| 731 | err = w.err(err) | ||
| 732 | if err != nil { | ||
| 733 | return err | ||
| 734 | } | ||
| 735 | } | ||
| 736 | } | ||
| 737 | if w.output == nil { | ||
| 738 | return w.err(nil) | ||
| 739 | } | ||
| 740 | |||
| 741 | // Send empty buffer | ||
| 742 | res := make(chan result) | ||
| 743 | w.output <- res | ||
| 744 | // Block until this has been picked up. | ||
| 745 | res <- result{b: nil, startOffset: w.uncompWritten} | ||
| 746 | // When it is closed, we have flushed. | ||
| 747 | <-res | ||
| 748 | return w.err(nil) | ||
| 749 | } | ||
| 750 | |||
| 751 | // Close calls Flush and then closes the Writer. | ||
| 752 | // Calling Close multiple times is ok, | ||
| 753 | // but calling CloseIndex after this will make it not return the index. | ||
| 754 | func (w *Writer) Close() error { | ||
| 755 | _, err := w.closeIndex(w.appendIndex) | ||
| 756 | return err | ||
| 757 | } | ||
| 758 | |||
| 759 | // CloseIndex calls Close and returns an index on first call. | ||
| 760 | // This is not required if you are only adding index to a stream. | ||
| 761 | func (w *Writer) CloseIndex() ([]byte, error) { | ||
| 762 | return w.closeIndex(true) | ||
| 763 | } | ||
| 764 | |||
| 765 | func (w *Writer) closeIndex(idx bool) ([]byte, error) { | ||
| 766 | err := w.Flush() | ||
| 767 | if w.output != nil { | ||
| 768 | close(w.output) | ||
| 769 | w.writerWg.Wait() | ||
| 770 | w.output = nil | ||
| 771 | } | ||
| 772 | |||
| 773 | var index []byte | ||
| 774 | if w.err(err) == nil && w.writer != nil { | ||
| 775 | // Create index. | ||
| 776 | if idx { | ||
| 777 | compSize := int64(-1) | ||
| 778 | if w.pad <= 1 { | ||
| 779 | compSize = w.written | ||
| 780 | } | ||
| 781 | index = w.index.appendTo(w.ibuf[:0], w.uncompWritten, compSize) | ||
| 782 | // Count as written for padding. | ||
| 783 | if w.appendIndex { | ||
| 784 | w.written += int64(len(index)) | ||
| 785 | } | ||
| 786 | } | ||
| 787 | |||
| 788 | if w.pad > 1 { | ||
| 789 | tmp := w.ibuf[:0] | ||
| 790 | if len(index) > 0 { | ||
| 791 | // Allocate another buffer. | ||
| 792 | tmp = w.buffers.Get().([]byte)[:0] | ||
| 793 | defer w.buffers.Put(tmp) | ||
| 794 | } | ||
| 795 | add := calcSkippableFrame(w.written, int64(w.pad)) | ||
| 796 | frame, err := skippableFrame(tmp, add, w.randSrc) | ||
| 797 | if err = w.err(err); err != nil { | ||
| 798 | return nil, err | ||
| 799 | } | ||
| 800 | n, err2 := w.writer.Write(frame) | ||
| 801 | if err2 == nil && n != len(frame) { | ||
| 802 | err2 = io.ErrShortWrite | ||
| 803 | } | ||
| 804 | _ = w.err(err2) | ||
| 805 | } | ||
| 806 | if len(index) > 0 && w.appendIndex { | ||
| 807 | n, err2 := w.writer.Write(index) | ||
| 808 | if err2 == nil && n != len(index) { | ||
| 809 | err2 = io.ErrShortWrite | ||
| 810 | } | ||
| 811 | _ = w.err(err2) | ||
| 812 | } | ||
| 813 | } | ||
| 814 | err = w.err(errClosed) | ||
| 815 | if err == errClosed { | ||
| 816 | return index, nil | ||
| 817 | } | ||
| 818 | return nil, err | ||
| 819 | } | ||
| 820 | |||
| 821 | // calcSkippableFrame will return a total size to be added for written | ||
| 822 | // to be divisible by multiple. | ||
| 823 | // The value will always be > skippableFrameHeader. | ||
| 824 | // The function will panic if written < 0 or wantMultiple <= 0. | ||
| 825 | func calcSkippableFrame(written, wantMultiple int64) int { | ||
| 826 | if wantMultiple <= 0 { | ||
| 827 | panic("wantMultiple <= 0") | ||
| 828 | } | ||
| 829 | if written < 0 { | ||
| 830 | panic("written < 0") | ||
| 831 | } | ||
| 832 | leftOver := written % wantMultiple | ||
| 833 | if leftOver == 0 { | ||
| 834 | return 0 | ||
| 835 | } | ||
| 836 | toAdd := wantMultiple - leftOver | ||
| 837 | for toAdd < skippableFrameHeader { | ||
| 838 | toAdd += wantMultiple | ||
| 839 | } | ||
| 840 | return int(toAdd) | ||
| 841 | } | ||
| 842 | |||
| 843 | // skippableFrame will add a skippable frame with a total size of bytes. | ||
| 844 | // total should be >= skippableFrameHeader and < maxBlockSize + skippableFrameHeader | ||
| 845 | func skippableFrame(dst []byte, total int, r io.Reader) ([]byte, error) { | ||
| 846 | if total == 0 { | ||
| 847 | return dst, nil | ||
| 848 | } | ||
| 849 | if total < skippableFrameHeader { | ||
| 850 | return dst, fmt.Errorf("s2: requested skippable frame (%d) < 4", total) | ||
| 851 | } | ||
| 852 | if int64(total) >= maxBlockSize+skippableFrameHeader { | ||
| 853 | return dst, fmt.Errorf("s2: requested skippable frame (%d) >= max 1<<24", total) | ||
| 854 | } | ||
| 855 | // Chunk type 0xfe "Section 4.4 Padding (chunk type 0xfe)" | ||
| 856 | dst = append(dst, chunkTypePadding) | ||
| 857 | f := uint32(total - skippableFrameHeader) | ||
| 858 | // Add chunk length. | ||
| 859 | dst = append(dst, uint8(f), uint8(f>>8), uint8(f>>16)) | ||
| 860 | // Add data | ||
| 861 | start := len(dst) | ||
| 862 | dst = append(dst, make([]byte, f)...) | ||
| 863 | _, err := io.ReadFull(r, dst[start:]) | ||
| 864 | return dst, err | ||
| 865 | } | ||
| 866 | |||
| 867 | var errClosed = errors.New("s2: Writer is closed") | ||
| 868 | |||
| 869 | // WriterOption is an option for creating a encoder. | ||
| 870 | type WriterOption func(*Writer) error | ||
| 871 | |||
| 872 | // WriterConcurrency will set the concurrency, | ||
| 873 | // meaning the maximum number of decoders to run concurrently. | ||
| 874 | // The value supplied must be at least 1. | ||
| 875 | // By default this will be set to GOMAXPROCS. | ||
| 876 | func WriterConcurrency(n int) WriterOption { | ||
| 877 | return func(w *Writer) error { | ||
| 878 | if n <= 0 { | ||
| 879 | return errors.New("concurrency must be at least 1") | ||
| 880 | } | ||
| 881 | w.concurrency = n | ||
| 882 | return nil | ||
| 883 | } | ||
| 884 | } | ||
| 885 | |||
| 886 | // WriterAddIndex will append an index to the end of a stream | ||
| 887 | // when it is closed. | ||
| 888 | func WriterAddIndex() WriterOption { | ||
| 889 | return func(w *Writer) error { | ||
| 890 | w.appendIndex = true | ||
| 891 | return nil | ||
| 892 | } | ||
| 893 | } | ||
| 894 | |||
| 895 | // WriterBetterCompression will enable better compression. | ||
| 896 | // EncodeBetter compresses better than Encode but typically with a | ||
| 897 | // 10-40% speed decrease on both compression and decompression. | ||
| 898 | func WriterBetterCompression() WriterOption { | ||
| 899 | return func(w *Writer) error { | ||
| 900 | w.level = levelBetter | ||
| 901 | return nil | ||
| 902 | } | ||
| 903 | } | ||
| 904 | |||
| 905 | // WriterBestCompression will enable better compression. | ||
| 906 | // EncodeBetter compresses better than Encode but typically with a | ||
| 907 | // big speed decrease on compression. | ||
| 908 | func WriterBestCompression() WriterOption { | ||
| 909 | return func(w *Writer) error { | ||
| 910 | w.level = levelBest | ||
| 911 | return nil | ||
| 912 | } | ||
| 913 | } | ||
| 914 | |||
| 915 | // WriterUncompressed will bypass compression. | ||
| 916 | // The stream will be written as uncompressed blocks only. | ||
| 917 | // If concurrency is > 1 CRC and output will still be done async. | ||
| 918 | func WriterUncompressed() WriterOption { | ||
| 919 | return func(w *Writer) error { | ||
| 920 | w.level = levelUncompressed | ||
| 921 | return nil | ||
| 922 | } | ||
| 923 | } | ||
| 924 | |||
| 925 | // WriterBlockSize allows to override the default block size. | ||
| 926 | // Blocks will be this size or smaller. | ||
| 927 | // Minimum size is 4KB and and maximum size is 4MB. | ||
| 928 | // | ||
| 929 | // Bigger blocks may give bigger throughput on systems with many cores, | ||
| 930 | // and will increase compression slightly, but it will limit the possible | ||
| 931 | // concurrency for smaller payloads for both encoding and decoding. | ||
| 932 | // Default block size is 1MB. | ||
| 933 | // | ||
| 934 | // When writing Snappy compatible output using WriterSnappyCompat, | ||
| 935 | // the maximum block size is 64KB. | ||
| 936 | func WriterBlockSize(n int) WriterOption { | ||
| 937 | return func(w *Writer) error { | ||
| 938 | if w.snappy && n > maxSnappyBlockSize || n < minBlockSize { | ||
| 939 | return errors.New("s2: block size too large. Must be <= 64K and >=4KB on for snappy compatible output") | ||
| 940 | } | ||
| 941 | if n > maxBlockSize || n < minBlockSize { | ||
| 942 | return errors.New("s2: block size too large. Must be <= 4MB and >=4KB") | ||
| 943 | } | ||
| 944 | w.blockSize = n | ||
| 945 | return nil | ||
| 946 | } | ||
| 947 | } | ||
| 948 | |||
| 949 | // WriterPadding will add padding to all output so the size will be a multiple of n. | ||
| 950 | // This can be used to obfuscate the exact output size or make blocks of a certain size. | ||
| 951 | // The contents will be a skippable frame, so it will be invisible by the decoder. | ||
| 952 | // n must be > 0 and <= 4MB. | ||
| 953 | // The padded area will be filled with data from crypto/rand.Reader. | ||
| 954 | // The padding will be applied whenever Close is called on the writer. | ||
| 955 | func WriterPadding(n int) WriterOption { | ||
| 956 | return func(w *Writer) error { | ||
| 957 | if n <= 0 { | ||
| 958 | return fmt.Errorf("s2: padding must be at least 1") | ||
| 959 | } | ||
| 960 | // No need to waste our time. | ||
| 961 | if n == 1 { | ||
| 962 | w.pad = 0 | ||
| 963 | } | ||
| 964 | if n > maxBlockSize { | ||
| 965 | return fmt.Errorf("s2: padding must less than 4MB") | ||
| 966 | } | ||
| 967 | w.pad = n | ||
| 968 | return nil | ||
| 969 | } | ||
| 970 | } | ||
| 971 | |||
| 972 | // WriterPaddingSrc will get random data for padding from the supplied source. | ||
| 973 | // By default crypto/rand is used. | ||
| 974 | func WriterPaddingSrc(reader io.Reader) WriterOption { | ||
| 975 | return func(w *Writer) error { | ||
| 976 | w.randSrc = reader | ||
| 977 | return nil | ||
| 978 | } | ||
| 979 | } | ||
| 980 | |||
| 981 | // WriterSnappyCompat will write snappy compatible output. | ||
| 982 | // The output can be decompressed using either snappy or s2. | ||
| 983 | // If block size is more than 64KB it is set to that. | ||
| 984 | func WriterSnappyCompat() WriterOption { | ||
| 985 | return func(w *Writer) error { | ||
| 986 | w.snappy = true | ||
| 987 | if w.blockSize > 64<<10 { | ||
| 988 | // We choose 8 bytes less than 64K, since that will make literal emits slightly more effective. | ||
| 989 | // And allows us to skip some size checks. | ||
| 990 | w.blockSize = (64 << 10) - 8 | ||
| 991 | } | ||
| 992 | return nil | ||
| 993 | } | ||
| 994 | } | ||
| 995 | |||
| 996 | // WriterFlushOnWrite will compress blocks on each call to the Write function. | ||
| 997 | // | ||
| 998 | // This is quite inefficient as blocks size will depend on the write size. | ||
| 999 | // | ||
| 1000 | // Use WriterConcurrency(1) to also make sure that output is flushed. | ||
| 1001 | // When Write calls return, otherwise they will be written when compression is done. | ||
| 1002 | func WriterFlushOnWrite() WriterOption { | ||
| 1003 | return func(w *Writer) error { | ||
| 1004 | w.flushOnWrite = true | ||
| 1005 | return nil | ||
| 1006 | } | ||
| 1007 | } | ||
| 1008 | |||
| 1009 | // WriterCustomEncoder allows to override the encoder for blocks on the stream. | ||
| 1010 | // The function must compress 'src' into 'dst' and return the bytes used in dst as an integer. | ||
| 1011 | // Block size (initial varint) should not be added by the encoder. | ||
| 1012 | // Returning value 0 indicates the block could not be compressed. | ||
| 1013 | // Returning a negative value indicates that compression should be attempted. | ||
| 1014 | // The function should expect to be called concurrently. | ||
| 1015 | func WriterCustomEncoder(fn func(dst, src []byte) int) WriterOption { | ||
| 1016 | return func(w *Writer) error { | ||
| 1017 | w.customEnc = fn | ||
| 1018 | return nil | ||
| 1019 | } | ||
| 1020 | } | ||