aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go')
-rw-r--r--vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go809
1 files changed, 809 insertions, 0 deletions
diff --git a/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go b/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go
new file mode 100644
index 0000000..9182d4e
--- /dev/null
+++ b/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go
@@ -0,0 +1,809 @@
1/*
2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage
3 * Copyright 2017 MinIO, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package minio
19
20import (
21 "bytes"
22 "context"
23 "encoding/base64"
24 "fmt"
25 "hash/crc32"
26 "io"
27 "net/http"
28 "net/url"
29 "sort"
30 "strings"
31 "sync"
32
33 "github.com/google/uuid"
34 "github.com/minio/minio-go/v7/pkg/s3utils"
35)
36
37// putObjectMultipartStream - upload a large object using
38// multipart upload and streaming signature for signing payload.
39// Comprehensive put object operation involving multipart uploads.
40//
41// Following code handles these types of readers.
42//
43// - *minio.Object
44// - Any reader which has a method 'ReadAt()'
45func (c *Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string,
46 reader io.Reader, size int64, opts PutObjectOptions,
47) (info UploadInfo, err error) {
48 if opts.ConcurrentStreamParts && opts.NumThreads > 1 {
49 info, err = c.putObjectMultipartStreamParallel(ctx, bucketName, objectName, reader, opts)
50 } else if !isObject(reader) && isReadAt(reader) && !opts.SendContentMd5 {
51 // Verify if the reader implements ReadAt and it is not a *minio.Object then we will use parallel uploader.
52 info, err = c.putObjectMultipartStreamFromReadAt(ctx, bucketName, objectName, reader.(io.ReaderAt), size, opts)
53 } else {
54 info, err = c.putObjectMultipartStreamOptionalChecksum(ctx, bucketName, objectName, reader, size, opts)
55 }
56 if err != nil {
57 errResp := ToErrorResponse(err)
58 // Verify if multipart functionality is not available, if not
59 // fall back to single PutObject operation.
60 if errResp.Code == "AccessDenied" && strings.Contains(errResp.Message, "Access Denied") {
61 // Verify if size of reader is greater than '5GiB'.
62 if size > maxSinglePutObjectSize {
63 return UploadInfo{}, errEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
64 }
65 // Fall back to uploading as single PutObject operation.
66 return c.putObject(ctx, bucketName, objectName, reader, size, opts)
67 }
68 }
69 return info, err
70}
71
72// uploadedPartRes - the response received from a part upload.
73type uploadedPartRes struct {
74 Error error // Any error encountered while uploading the part.
75 PartNum int // Number of the part uploaded.
76 Size int64 // Size of the part uploaded.
77 Part ObjectPart
78}
79
80type uploadPartReq struct {
81 PartNum int // Number of the part uploaded.
82 Part ObjectPart // Size of the part uploaded.
83}
84
85// putObjectMultipartFromReadAt - Uploads files bigger than 128MiB.
86// Supports all readers which implements io.ReaderAt interface
87// (ReadAt method).
88//
89// NOTE: This function is meant to be used for all readers which
90// implement io.ReaderAt which allows us for resuming multipart
91// uploads but reading at an offset, which would avoid re-read the
92// data which was already uploaded. Internally this function uses
93// temporary files for staging all the data, these temporary files are
94// cleaned automatically when the caller i.e http client closes the
95// stream after uploading all the contents successfully.
96func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketName, objectName string,
97 reader io.ReaderAt, size int64, opts PutObjectOptions,
98) (info UploadInfo, err error) {
99 // Input validation.
100 if err = s3utils.CheckValidBucketName(bucketName); err != nil {
101 return UploadInfo{}, err
102 }
103 if err = s3utils.CheckValidObjectName(objectName); err != nil {
104 return UploadInfo{}, err
105 }
106
107 // Calculate the optimal parts info for a given size.
108 totalPartsCount, partSize, lastPartSize, err := OptimalPartInfo(size, opts.PartSize)
109 if err != nil {
110 return UploadInfo{}, err
111 }
112
113 withChecksum := c.trailingHeaderSupport
114 if withChecksum {
115 if opts.UserMetadata == nil {
116 opts.UserMetadata = make(map[string]string, 1)
117 }
118 opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
119 }
120 // Initiate a new multipart upload.
121 uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
122 if err != nil {
123 return UploadInfo{}, err
124 }
125 delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm")
126
127 // Aborts the multipart upload in progress, if the
128 // function returns any error, since we do not resume
129 // we should purge the parts which have been uploaded
130 // to relinquish storage space.
131 defer func() {
132 if err != nil {
133 c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
134 }
135 }()
136
137 // Total data read and written to server. should be equal to 'size' at the end of the call.
138 var totalUploadedSize int64
139
140 // Complete multipart upload.
141 var complMultipartUpload completeMultipartUpload
142
143 // Declare a channel that sends the next part number to be uploaded.
144 uploadPartsCh := make(chan uploadPartReq)
145
146 // Declare a channel that sends back the response of a part upload.
147 uploadedPartsCh := make(chan uploadedPartRes)
148
149 // Used for readability, lastPartNumber is always totalPartsCount.
150 lastPartNumber := totalPartsCount
151
152 partitionCtx, partitionCancel := context.WithCancel(ctx)
153 defer partitionCancel()
154 // Send each part number to the channel to be processed.
155 go func() {
156 defer close(uploadPartsCh)
157
158 for p := 1; p <= totalPartsCount; p++ {
159 select {
160 case <-partitionCtx.Done():
161 return
162 case uploadPartsCh <- uploadPartReq{PartNum: p}:
163 }
164 }
165 }()
166
167 // Receive each part number from the channel allowing three parallel uploads.
168 for w := 1; w <= opts.getNumThreads(); w++ {
169 go func(partSize int64) {
170 for {
171 var uploadReq uploadPartReq
172 var ok bool
173 select {
174 case <-ctx.Done():
175 return
176 case uploadReq, ok = <-uploadPartsCh:
177 if !ok {
178 return
179 }
180 // Each worker will draw from the part channel and upload in parallel.
181 }
182
183 // If partNumber was not uploaded we calculate the missing
184 // part offset and size. For all other part numbers we
185 // calculate offset based on multiples of partSize.
186 readOffset := int64(uploadReq.PartNum-1) * partSize
187
188 // As a special case if partNumber is lastPartNumber, we
189 // calculate the offset based on the last part size.
190 if uploadReq.PartNum == lastPartNumber {
191 readOffset = size - lastPartSize
192 partSize = lastPartSize
193 }
194
195 sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), opts.Progress)
196 trailer := make(http.Header, 1)
197 if withChecksum {
198 crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
199 trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(crc.Sum(nil)))
200 sectionReader = newHashReaderWrapper(sectionReader, crc, func(hash []byte) {
201 trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(hash))
202 })
203 }
204
205 // Proceed to upload the part.
206 p := uploadPartParams{
207 bucketName: bucketName,
208 objectName: objectName,
209 uploadID: uploadID,
210 reader: sectionReader,
211 partNumber: uploadReq.PartNum,
212 size: partSize,
213 sse: opts.ServerSideEncryption,
214 streamSha256: !opts.DisableContentSha256,
215 sha256Hex: "",
216 trailer: trailer,
217 }
218 objPart, err := c.uploadPart(ctx, p)
219 if err != nil {
220 uploadedPartsCh <- uploadedPartRes{
221 Error: err,
222 }
223 // Exit the goroutine.
224 return
225 }
226
227 // Save successfully uploaded part metadata.
228 uploadReq.Part = objPart
229
230 // Send successful part info through the channel.
231 uploadedPartsCh <- uploadedPartRes{
232 Size: objPart.Size,
233 PartNum: uploadReq.PartNum,
234 Part: uploadReq.Part,
235 }
236 }
237 }(partSize)
238 }
239
240 // Gather the responses as they occur and update any
241 // progress bar.
242 for u := 1; u <= totalPartsCount; u++ {
243 select {
244 case <-ctx.Done():
245 return UploadInfo{}, ctx.Err()
246 case uploadRes := <-uploadedPartsCh:
247 if uploadRes.Error != nil {
248 return UploadInfo{}, uploadRes.Error
249 }
250
251 // Update the totalUploadedSize.
252 totalUploadedSize += uploadRes.Size
253 complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
254 ETag: uploadRes.Part.ETag,
255 PartNumber: uploadRes.Part.PartNumber,
256 ChecksumCRC32: uploadRes.Part.ChecksumCRC32,
257 ChecksumCRC32C: uploadRes.Part.ChecksumCRC32C,
258 ChecksumSHA1: uploadRes.Part.ChecksumSHA1,
259 ChecksumSHA256: uploadRes.Part.ChecksumSHA256,
260 })
261 }
262 }
263
264 // Verify if we uploaded all the data.
265 if totalUploadedSize != size {
266 return UploadInfo{}, errUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
267 }
268
269 // Sort all completed parts.
270 sort.Sort(completedParts(complMultipartUpload.Parts))
271
272 opts = PutObjectOptions{
273 ServerSideEncryption: opts.ServerSideEncryption,
274 }
275 if withChecksum {
276 // Add hash of hashes.
277 crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
278 for _, part := range complMultipartUpload.Parts {
279 cs, err := base64.StdEncoding.DecodeString(part.ChecksumCRC32C)
280 if err == nil {
281 crc.Write(cs)
282 }
283 }
284 opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
285 }
286
287 uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
288 if err != nil {
289 return UploadInfo{}, err
290 }
291
292 uploadInfo.Size = totalUploadedSize
293 return uploadInfo, nil
294}
295
296func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, bucketName, objectName string,
297 reader io.Reader, size int64, opts PutObjectOptions,
298) (info UploadInfo, err error) {
299 // Input validation.
300 if err = s3utils.CheckValidBucketName(bucketName); err != nil {
301 return UploadInfo{}, err
302 }
303 if err = s3utils.CheckValidObjectName(objectName); err != nil {
304 return UploadInfo{}, err
305 }
306
307 if !opts.SendContentMd5 {
308 if opts.UserMetadata == nil {
309 opts.UserMetadata = make(map[string]string, 1)
310 }
311 opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
312 }
313
314 // Calculate the optimal parts info for a given size.
315 totalPartsCount, partSize, lastPartSize, err := OptimalPartInfo(size, opts.PartSize)
316 if err != nil {
317 return UploadInfo{}, err
318 }
319 // Initiates a new multipart request
320 uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
321 if err != nil {
322 return UploadInfo{}, err
323 }
324 delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm")
325
326 // Aborts the multipart upload if the function returns
327 // any error, since we do not resume we should purge
328 // the parts which have been uploaded to relinquish
329 // storage space.
330 defer func() {
331 if err != nil {
332 c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
333 }
334 }()
335
336 // Create checksums
337 // CRC32C is ~50% faster on AMD64 @ 30GB/s
338 var crcBytes []byte
339 customHeader := make(http.Header)
340 crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
341 md5Hash := c.md5Hasher()
342 defer md5Hash.Close()
343
344 // Total data read and written to server. should be equal to 'size' at the end of the call.
345 var totalUploadedSize int64
346
347 // Initialize parts uploaded map.
348 partsInfo := make(map[int]ObjectPart)
349
350 // Create a buffer.
351 buf := make([]byte, partSize)
352
353 // Avoid declaring variables in the for loop
354 var md5Base64 string
355
356 // Part number always starts with '1'.
357 var partNumber int
358 for partNumber = 1; partNumber <= totalPartsCount; partNumber++ {
359
360 // Proceed to upload the part.
361 if partNumber == totalPartsCount {
362 partSize = lastPartSize
363 }
364
365 length, rerr := readFull(reader, buf)
366 if rerr == io.EOF && partNumber > 1 {
367 break
368 }
369
370 if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF {
371 return UploadInfo{}, rerr
372 }
373
374 // Calculate md5sum.
375 if opts.SendContentMd5 {
376 md5Hash.Reset()
377 md5Hash.Write(buf[:length])
378 md5Base64 = base64.StdEncoding.EncodeToString(md5Hash.Sum(nil))
379 } else {
380 // Add CRC32C instead.
381 crc.Reset()
382 crc.Write(buf[:length])
383 cSum := crc.Sum(nil)
384 customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
385 crcBytes = append(crcBytes, cSum...)
386 }
387
388 // Update progress reader appropriately to the latest offset
389 // as we read from the source.
390 hooked := newHook(bytes.NewReader(buf[:length]), opts.Progress)
391 p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: hooked, partNumber: partNumber, md5Base64: md5Base64, size: partSize, sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader}
392 objPart, uerr := c.uploadPart(ctx, p)
393 if uerr != nil {
394 return UploadInfo{}, uerr
395 }
396
397 // Save successfully uploaded part metadata.
398 partsInfo[partNumber] = objPart
399
400 // Save successfully uploaded size.
401 totalUploadedSize += partSize
402 }
403
404 // Verify if we uploaded all the data.
405 if size > 0 {
406 if totalUploadedSize != size {
407 return UploadInfo{}, errUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
408 }
409 }
410
411 // Complete multipart upload.
412 var complMultipartUpload completeMultipartUpload
413
414 // Loop over total uploaded parts to save them in
415 // Parts array before completing the multipart request.
416 for i := 1; i < partNumber; i++ {
417 part, ok := partsInfo[i]
418 if !ok {
419 return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
420 }
421 complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
422 ETag: part.ETag,
423 PartNumber: part.PartNumber,
424 ChecksumCRC32: part.ChecksumCRC32,
425 ChecksumCRC32C: part.ChecksumCRC32C,
426 ChecksumSHA1: part.ChecksumSHA1,
427 ChecksumSHA256: part.ChecksumSHA256,
428 })
429 }
430
431 // Sort all completed parts.
432 sort.Sort(completedParts(complMultipartUpload.Parts))
433
434 opts = PutObjectOptions{
435 ServerSideEncryption: opts.ServerSideEncryption,
436 }
437 if len(crcBytes) > 0 {
438 // Add hash of hashes.
439 crc.Reset()
440 crc.Write(crcBytes)
441 opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
442 }
443 uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
444 if err != nil {
445 return UploadInfo{}, err
446 }
447
448 uploadInfo.Size = totalUploadedSize
449 return uploadInfo, nil
450}
451
452// putObjectMultipartStreamParallel uploads opts.NumThreads parts in parallel.
453// This is expected to take opts.PartSize * opts.NumThreads * (GOGC / 100) bytes of buffer.
454func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketName, objectName string,
455 reader io.Reader, opts PutObjectOptions,
456) (info UploadInfo, err error) {
457 // Input validation.
458 if err = s3utils.CheckValidBucketName(bucketName); err != nil {
459 return UploadInfo{}, err
460 }
461
462 if err = s3utils.CheckValidObjectName(objectName); err != nil {
463 return UploadInfo{}, err
464 }
465
466 if !opts.SendContentMd5 {
467 if opts.UserMetadata == nil {
468 opts.UserMetadata = make(map[string]string, 1)
469 }
470 opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
471 }
472
473 // Cancel all when an error occurs.
474 ctx, cancel := context.WithCancel(ctx)
475 defer cancel()
476
477 // Calculate the optimal parts info for a given size.
478 totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize)
479 if err != nil {
480 return UploadInfo{}, err
481 }
482
483 // Initiates a new multipart request
484 uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
485 if err != nil {
486 return UploadInfo{}, err
487 }
488 delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm")
489
490 // Aborts the multipart upload if the function returns
491 // any error, since we do not resume we should purge
492 // the parts which have been uploaded to relinquish
493 // storage space.
494 defer func() {
495 if err != nil {
496 c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
497 }
498 }()
499
500 // Create checksums
501 // CRC32C is ~50% faster on AMD64 @ 30GB/s
502 var crcBytes []byte
503 crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
504
505 // Total data read and written to server. should be equal to 'size' at the end of the call.
506 var totalUploadedSize int64
507
508 // Initialize parts uploaded map.
509 partsInfo := make(map[int]ObjectPart)
510
511 // Create a buffer.
512 nBuffers := int64(opts.NumThreads)
513 bufs := make(chan []byte, nBuffers)
514 all := make([]byte, nBuffers*partSize)
515 for i := int64(0); i < nBuffers; i++ {
516 bufs <- all[i*partSize : i*partSize+partSize]
517 }
518
519 var wg sync.WaitGroup
520 var mu sync.Mutex
521 errCh := make(chan error, opts.NumThreads)
522
523 reader = newHook(reader, opts.Progress)
524
525 // Part number always starts with '1'.
526 var partNumber int
527 for partNumber = 1; partNumber <= totalPartsCount; partNumber++ {
528 // Proceed to upload the part.
529 var buf []byte
530 select {
531 case buf = <-bufs:
532 case err = <-errCh:
533 cancel()
534 wg.Wait()
535 return UploadInfo{}, err
536 }
537
538 if int64(len(buf)) != partSize {
539 return UploadInfo{}, fmt.Errorf("read buffer < %d than expected partSize: %d", len(buf), partSize)
540 }
541
542 length, rerr := readFull(reader, buf)
543 if rerr == io.EOF && partNumber > 1 {
544 // Done
545 break
546 }
547
548 if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF {
549 cancel()
550 wg.Wait()
551 return UploadInfo{}, rerr
552 }
553
554 // Calculate md5sum.
555 customHeader := make(http.Header)
556 if !opts.SendContentMd5 {
557 // Add CRC32C instead.
558 crc.Reset()
559 crc.Write(buf[:length])
560 cSum := crc.Sum(nil)
561 customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
562 crcBytes = append(crcBytes, cSum...)
563 }
564
565 wg.Add(1)
566 go func(partNumber int) {
567 // Avoid declaring variables in the for loop
568 var md5Base64 string
569
570 if opts.SendContentMd5 {
571 md5Hash := c.md5Hasher()
572 md5Hash.Write(buf[:length])
573 md5Base64 = base64.StdEncoding.EncodeToString(md5Hash.Sum(nil))
574 md5Hash.Close()
575 }
576
577 defer wg.Done()
578 p := uploadPartParams{
579 bucketName: bucketName,
580 objectName: objectName,
581 uploadID: uploadID,
582 reader: bytes.NewReader(buf[:length]),
583 partNumber: partNumber,
584 md5Base64: md5Base64,
585 size: int64(length),
586 sse: opts.ServerSideEncryption,
587 streamSha256: !opts.DisableContentSha256,
588 customHeader: customHeader,
589 }
590 objPart, uerr := c.uploadPart(ctx, p)
591 if uerr != nil {
592 errCh <- uerr
593 return
594 }
595
596 // Save successfully uploaded part metadata.
597 mu.Lock()
598 partsInfo[partNumber] = objPart
599 mu.Unlock()
600
601 // Send buffer back so it can be reused.
602 bufs <- buf
603 }(partNumber)
604
605 // Save successfully uploaded size.
606 totalUploadedSize += int64(length)
607 }
608 wg.Wait()
609
610 // Collect any error
611 select {
612 case err = <-errCh:
613 return UploadInfo{}, err
614 default:
615 }
616
617 // Complete multipart upload.
618 var complMultipartUpload completeMultipartUpload
619
620 // Loop over total uploaded parts to save them in
621 // Parts array before completing the multipart request.
622 for i := 1; i < partNumber; i++ {
623 part, ok := partsInfo[i]
624 if !ok {
625 return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
626 }
627 complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
628 ETag: part.ETag,
629 PartNumber: part.PartNumber,
630 ChecksumCRC32: part.ChecksumCRC32,
631 ChecksumCRC32C: part.ChecksumCRC32C,
632 ChecksumSHA1: part.ChecksumSHA1,
633 ChecksumSHA256: part.ChecksumSHA256,
634 })
635 }
636
637 // Sort all completed parts.
638 sort.Sort(completedParts(complMultipartUpload.Parts))
639
640 opts = PutObjectOptions{}
641 if len(crcBytes) > 0 {
642 // Add hash of hashes.
643 crc.Reset()
644 crc.Write(crcBytes)
645 opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
646 }
647 uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
648 if err != nil {
649 return UploadInfo{}, err
650 }
651
652 uploadInfo.Size = totalUploadedSize
653 return uploadInfo, nil
654}
655
656// putObject special function used Google Cloud Storage. This special function
657// is used for Google Cloud Storage since Google's multipart API is not S3 compatible.
658func (c *Client) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) {
659 // Input validation.
660 if err := s3utils.CheckValidBucketName(bucketName); err != nil {
661 return UploadInfo{}, err
662 }
663 if err := s3utils.CheckValidObjectName(objectName); err != nil {
664 return UploadInfo{}, err
665 }
666
667 // Size -1 is only supported on Google Cloud Storage, we error
668 // out in all other situations.
669 if size < 0 && !s3utils.IsGoogleEndpoint(*c.endpointURL) {
670 return UploadInfo{}, errEntityTooSmall(size, bucketName, objectName)
671 }
672
673 if opts.SendContentMd5 && s3utils.IsGoogleEndpoint(*c.endpointURL) && size < 0 {
674 return UploadInfo{}, errInvalidArgument("MD5Sum cannot be calculated with size '-1'")
675 }
676
677 var readSeeker io.Seeker
678 if size > 0 {
679 if isReadAt(reader) && !isObject(reader) {
680 seeker, ok := reader.(io.Seeker)
681 if ok {
682 offset, err := seeker.Seek(0, io.SeekCurrent)
683 if err != nil {
684 return UploadInfo{}, errInvalidArgument(err.Error())
685 }
686 reader = io.NewSectionReader(reader.(io.ReaderAt), offset, size)
687 readSeeker = reader.(io.Seeker)
688 }
689 }
690 }
691
692 var md5Base64 string
693 if opts.SendContentMd5 {
694 // Calculate md5sum.
695 hash := c.md5Hasher()
696
697 if readSeeker != nil {
698 if _, err := io.Copy(hash, reader); err != nil {
699 return UploadInfo{}, err
700 }
701 // Seek back to beginning of io.NewSectionReader's offset.
702 _, err = readSeeker.Seek(0, io.SeekStart)
703 if err != nil {
704 return UploadInfo{}, errInvalidArgument(err.Error())
705 }
706 } else {
707 // Create a buffer.
708 buf := make([]byte, size)
709
710 length, err := readFull(reader, buf)
711 if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
712 return UploadInfo{}, err
713 }
714
715 hash.Write(buf[:length])
716 reader = bytes.NewReader(buf[:length])
717 }
718
719 md5Base64 = base64.StdEncoding.EncodeToString(hash.Sum(nil))
720 hash.Close()
721 }
722
723 // Update progress reader appropriately to the latest offset as we
724 // read from the source.
725 progressReader := newHook(reader, opts.Progress)
726
727 // This function does not calculate sha256 and md5sum for payload.
728 // Execute put object.
729 return c.putObjectDo(ctx, bucketName, objectName, progressReader, md5Base64, "", size, opts)
730}
731
732// putObjectDo - executes the put object http operation.
733// NOTE: You must have WRITE permissions on a bucket to add an object to it.
734func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string, reader io.Reader, md5Base64, sha256Hex string, size int64, opts PutObjectOptions) (UploadInfo, error) {
735 // Input validation.
736 if err := s3utils.CheckValidBucketName(bucketName); err != nil {
737 return UploadInfo{}, err
738 }
739 if err := s3utils.CheckValidObjectName(objectName); err != nil {
740 return UploadInfo{}, err
741 }
742 // Set headers.
743 customHeader := opts.Header()
744
745 // Add CRC when client supports it, MD5 is not set, not Google and we don't add SHA256 to chunks.
746 addCrc := c.trailingHeaderSupport && md5Base64 == "" && !s3utils.IsGoogleEndpoint(*c.endpointURL) && (opts.DisableContentSha256 || c.secure)
747
748 if addCrc {
749 // If user has added checksums, don't add them ourselves.
750 for k := range opts.UserMetadata {
751 if strings.HasPrefix(strings.ToLower(k), "x-amz-checksum-") {
752 addCrc = false
753 }
754 }
755 }
756 // Populate request metadata.
757 reqMetadata := requestMetadata{
758 bucketName: bucketName,
759 objectName: objectName,
760 customHeader: customHeader,
761 contentBody: reader,
762 contentLength: size,
763 contentMD5Base64: md5Base64,
764 contentSHA256Hex: sha256Hex,
765 streamSha256: !opts.DisableContentSha256,
766 addCrc: addCrc,
767 }
768 if opts.Internal.SourceVersionID != "" {
769 if opts.Internal.SourceVersionID != nullVersionID {
770 if _, err := uuid.Parse(opts.Internal.SourceVersionID); err != nil {
771 return UploadInfo{}, errInvalidArgument(err.Error())
772 }
773 }
774 urlValues := make(url.Values)
775 urlValues.Set("versionId", opts.Internal.SourceVersionID)
776 reqMetadata.queryValues = urlValues
777 }
778
779 // Execute PUT an objectName.
780 resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
781 defer closeResponse(resp)
782 if err != nil {
783 return UploadInfo{}, err
784 }
785 if resp != nil {
786 if resp.StatusCode != http.StatusOK {
787 return UploadInfo{}, httpRespToErrorResponse(resp, bucketName, objectName)
788 }
789 }
790
791 // extract lifecycle expiry date and rule ID
792 expTime, ruleID := amzExpirationToExpiryDateRuleID(resp.Header.Get(amzExpiration))
793 h := resp.Header
794 return UploadInfo{
795 Bucket: bucketName,
796 Key: objectName,
797 ETag: trimEtag(h.Get("ETag")),
798 VersionID: h.Get(amzVersionID),
799 Size: size,
800 Expiration: expTime,
801 ExpirationRuleID: ruleID,
802
803 // Checksum values
804 ChecksumCRC32: h.Get("x-amz-checksum-crc32"),
805 ChecksumCRC32C: h.Get("x-amz-checksum-crc32c"),
806 ChecksumSHA1: h.Get("x-amz-checksum-sha1"),
807 ChecksumSHA256: h.Get("x-amz-checksum-sha256"),
808 }, nil
809}