diff options
Diffstat (limited to 'vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go')
-rw-r--r-- | vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go | 809 |
1 files changed, 809 insertions, 0 deletions
diff --git a/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go b/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go new file mode 100644 index 0000000..9182d4e --- /dev/null +++ b/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go | |||
@@ -0,0 +1,809 @@ | |||
1 | /* | ||
2 | * MinIO Go Library for Amazon S3 Compatible Cloud Storage | ||
3 | * Copyright 2017 MinIO, Inc. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | */ | ||
17 | |||
18 | package minio | ||
19 | |||
20 | import ( | ||
21 | "bytes" | ||
22 | "context" | ||
23 | "encoding/base64" | ||
24 | "fmt" | ||
25 | "hash/crc32" | ||
26 | "io" | ||
27 | "net/http" | ||
28 | "net/url" | ||
29 | "sort" | ||
30 | "strings" | ||
31 | "sync" | ||
32 | |||
33 | "github.com/google/uuid" | ||
34 | "github.com/minio/minio-go/v7/pkg/s3utils" | ||
35 | ) | ||
36 | |||
37 | // putObjectMultipartStream - upload a large object using | ||
38 | // multipart upload and streaming signature for signing payload. | ||
39 | // Comprehensive put object operation involving multipart uploads. | ||
40 | // | ||
41 | // Following code handles these types of readers. | ||
42 | // | ||
43 | // - *minio.Object | ||
44 | // - Any reader which has a method 'ReadAt()' | ||
45 | func (c *Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string, | ||
46 | reader io.Reader, size int64, opts PutObjectOptions, | ||
47 | ) (info UploadInfo, err error) { | ||
48 | if opts.ConcurrentStreamParts && opts.NumThreads > 1 { | ||
49 | info, err = c.putObjectMultipartStreamParallel(ctx, bucketName, objectName, reader, opts) | ||
50 | } else if !isObject(reader) && isReadAt(reader) && !opts.SendContentMd5 { | ||
51 | // Verify if the reader implements ReadAt and it is not a *minio.Object then we will use parallel uploader. | ||
52 | info, err = c.putObjectMultipartStreamFromReadAt(ctx, bucketName, objectName, reader.(io.ReaderAt), size, opts) | ||
53 | } else { | ||
54 | info, err = c.putObjectMultipartStreamOptionalChecksum(ctx, bucketName, objectName, reader, size, opts) | ||
55 | } | ||
56 | if err != nil { | ||
57 | errResp := ToErrorResponse(err) | ||
58 | // Verify if multipart functionality is not available, if not | ||
59 | // fall back to single PutObject operation. | ||
60 | if errResp.Code == "AccessDenied" && strings.Contains(errResp.Message, "Access Denied") { | ||
61 | // Verify if size of reader is greater than '5GiB'. | ||
62 | if size > maxSinglePutObjectSize { | ||
63 | return UploadInfo{}, errEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName) | ||
64 | } | ||
65 | // Fall back to uploading as single PutObject operation. | ||
66 | return c.putObject(ctx, bucketName, objectName, reader, size, opts) | ||
67 | } | ||
68 | } | ||
69 | return info, err | ||
70 | } | ||
71 | |||
72 | // uploadedPartRes - the response received from a part upload. | ||
73 | type uploadedPartRes struct { | ||
74 | Error error // Any error encountered while uploading the part. | ||
75 | PartNum int // Number of the part uploaded. | ||
76 | Size int64 // Size of the part uploaded. | ||
77 | Part ObjectPart | ||
78 | } | ||
79 | |||
80 | type uploadPartReq struct { | ||
81 | PartNum int // Number of the part uploaded. | ||
82 | Part ObjectPart // Size of the part uploaded. | ||
83 | } | ||
84 | |||
85 | // putObjectMultipartFromReadAt - Uploads files bigger than 128MiB. | ||
86 | // Supports all readers which implements io.ReaderAt interface | ||
87 | // (ReadAt method). | ||
88 | // | ||
89 | // NOTE: This function is meant to be used for all readers which | ||
90 | // implement io.ReaderAt which allows us for resuming multipart | ||
91 | // uploads but reading at an offset, which would avoid re-read the | ||
92 | // data which was already uploaded. Internally this function uses | ||
93 | // temporary files for staging all the data, these temporary files are | ||
94 | // cleaned automatically when the caller i.e http client closes the | ||
95 | // stream after uploading all the contents successfully. | ||
96 | func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketName, objectName string, | ||
97 | reader io.ReaderAt, size int64, opts PutObjectOptions, | ||
98 | ) (info UploadInfo, err error) { | ||
99 | // Input validation. | ||
100 | if err = s3utils.CheckValidBucketName(bucketName); err != nil { | ||
101 | return UploadInfo{}, err | ||
102 | } | ||
103 | if err = s3utils.CheckValidObjectName(objectName); err != nil { | ||
104 | return UploadInfo{}, err | ||
105 | } | ||
106 | |||
107 | // Calculate the optimal parts info for a given size. | ||
108 | totalPartsCount, partSize, lastPartSize, err := OptimalPartInfo(size, opts.PartSize) | ||
109 | if err != nil { | ||
110 | return UploadInfo{}, err | ||
111 | } | ||
112 | |||
113 | withChecksum := c.trailingHeaderSupport | ||
114 | if withChecksum { | ||
115 | if opts.UserMetadata == nil { | ||
116 | opts.UserMetadata = make(map[string]string, 1) | ||
117 | } | ||
118 | opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C" | ||
119 | } | ||
120 | // Initiate a new multipart upload. | ||
121 | uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) | ||
122 | if err != nil { | ||
123 | return UploadInfo{}, err | ||
124 | } | ||
125 | delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm") | ||
126 | |||
127 | // Aborts the multipart upload in progress, if the | ||
128 | // function returns any error, since we do not resume | ||
129 | // we should purge the parts which have been uploaded | ||
130 | // to relinquish storage space. | ||
131 | defer func() { | ||
132 | if err != nil { | ||
133 | c.abortMultipartUpload(ctx, bucketName, objectName, uploadID) | ||
134 | } | ||
135 | }() | ||
136 | |||
137 | // Total data read and written to server. should be equal to 'size' at the end of the call. | ||
138 | var totalUploadedSize int64 | ||
139 | |||
140 | // Complete multipart upload. | ||
141 | var complMultipartUpload completeMultipartUpload | ||
142 | |||
143 | // Declare a channel that sends the next part number to be uploaded. | ||
144 | uploadPartsCh := make(chan uploadPartReq) | ||
145 | |||
146 | // Declare a channel that sends back the response of a part upload. | ||
147 | uploadedPartsCh := make(chan uploadedPartRes) | ||
148 | |||
149 | // Used for readability, lastPartNumber is always totalPartsCount. | ||
150 | lastPartNumber := totalPartsCount | ||
151 | |||
152 | partitionCtx, partitionCancel := context.WithCancel(ctx) | ||
153 | defer partitionCancel() | ||
154 | // Send each part number to the channel to be processed. | ||
155 | go func() { | ||
156 | defer close(uploadPartsCh) | ||
157 | |||
158 | for p := 1; p <= totalPartsCount; p++ { | ||
159 | select { | ||
160 | case <-partitionCtx.Done(): | ||
161 | return | ||
162 | case uploadPartsCh <- uploadPartReq{PartNum: p}: | ||
163 | } | ||
164 | } | ||
165 | }() | ||
166 | |||
167 | // Receive each part number from the channel allowing three parallel uploads. | ||
168 | for w := 1; w <= opts.getNumThreads(); w++ { | ||
169 | go func(partSize int64) { | ||
170 | for { | ||
171 | var uploadReq uploadPartReq | ||
172 | var ok bool | ||
173 | select { | ||
174 | case <-ctx.Done(): | ||
175 | return | ||
176 | case uploadReq, ok = <-uploadPartsCh: | ||
177 | if !ok { | ||
178 | return | ||
179 | } | ||
180 | // Each worker will draw from the part channel and upload in parallel. | ||
181 | } | ||
182 | |||
183 | // If partNumber was not uploaded we calculate the missing | ||
184 | // part offset and size. For all other part numbers we | ||
185 | // calculate offset based on multiples of partSize. | ||
186 | readOffset := int64(uploadReq.PartNum-1) * partSize | ||
187 | |||
188 | // As a special case if partNumber is lastPartNumber, we | ||
189 | // calculate the offset based on the last part size. | ||
190 | if uploadReq.PartNum == lastPartNumber { | ||
191 | readOffset = size - lastPartSize | ||
192 | partSize = lastPartSize | ||
193 | } | ||
194 | |||
195 | sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), opts.Progress) | ||
196 | trailer := make(http.Header, 1) | ||
197 | if withChecksum { | ||
198 | crc := crc32.New(crc32.MakeTable(crc32.Castagnoli)) | ||
199 | trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(crc.Sum(nil))) | ||
200 | sectionReader = newHashReaderWrapper(sectionReader, crc, func(hash []byte) { | ||
201 | trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(hash)) | ||
202 | }) | ||
203 | } | ||
204 | |||
205 | // Proceed to upload the part. | ||
206 | p := uploadPartParams{ | ||
207 | bucketName: bucketName, | ||
208 | objectName: objectName, | ||
209 | uploadID: uploadID, | ||
210 | reader: sectionReader, | ||
211 | partNumber: uploadReq.PartNum, | ||
212 | size: partSize, | ||
213 | sse: opts.ServerSideEncryption, | ||
214 | streamSha256: !opts.DisableContentSha256, | ||
215 | sha256Hex: "", | ||
216 | trailer: trailer, | ||
217 | } | ||
218 | objPart, err := c.uploadPart(ctx, p) | ||
219 | if err != nil { | ||
220 | uploadedPartsCh <- uploadedPartRes{ | ||
221 | Error: err, | ||
222 | } | ||
223 | // Exit the goroutine. | ||
224 | return | ||
225 | } | ||
226 | |||
227 | // Save successfully uploaded part metadata. | ||
228 | uploadReq.Part = objPart | ||
229 | |||
230 | // Send successful part info through the channel. | ||
231 | uploadedPartsCh <- uploadedPartRes{ | ||
232 | Size: objPart.Size, | ||
233 | PartNum: uploadReq.PartNum, | ||
234 | Part: uploadReq.Part, | ||
235 | } | ||
236 | } | ||
237 | }(partSize) | ||
238 | } | ||
239 | |||
240 | // Gather the responses as they occur and update any | ||
241 | // progress bar. | ||
242 | for u := 1; u <= totalPartsCount; u++ { | ||
243 | select { | ||
244 | case <-ctx.Done(): | ||
245 | return UploadInfo{}, ctx.Err() | ||
246 | case uploadRes := <-uploadedPartsCh: | ||
247 | if uploadRes.Error != nil { | ||
248 | return UploadInfo{}, uploadRes.Error | ||
249 | } | ||
250 | |||
251 | // Update the totalUploadedSize. | ||
252 | totalUploadedSize += uploadRes.Size | ||
253 | complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ | ||
254 | ETag: uploadRes.Part.ETag, | ||
255 | PartNumber: uploadRes.Part.PartNumber, | ||
256 | ChecksumCRC32: uploadRes.Part.ChecksumCRC32, | ||
257 | ChecksumCRC32C: uploadRes.Part.ChecksumCRC32C, | ||
258 | ChecksumSHA1: uploadRes.Part.ChecksumSHA1, | ||
259 | ChecksumSHA256: uploadRes.Part.ChecksumSHA256, | ||
260 | }) | ||
261 | } | ||
262 | } | ||
263 | |||
264 | // Verify if we uploaded all the data. | ||
265 | if totalUploadedSize != size { | ||
266 | return UploadInfo{}, errUnexpectedEOF(totalUploadedSize, size, bucketName, objectName) | ||
267 | } | ||
268 | |||
269 | // Sort all completed parts. | ||
270 | sort.Sort(completedParts(complMultipartUpload.Parts)) | ||
271 | |||
272 | opts = PutObjectOptions{ | ||
273 | ServerSideEncryption: opts.ServerSideEncryption, | ||
274 | } | ||
275 | if withChecksum { | ||
276 | // Add hash of hashes. | ||
277 | crc := crc32.New(crc32.MakeTable(crc32.Castagnoli)) | ||
278 | for _, part := range complMultipartUpload.Parts { | ||
279 | cs, err := base64.StdEncoding.DecodeString(part.ChecksumCRC32C) | ||
280 | if err == nil { | ||
281 | crc.Write(cs) | ||
282 | } | ||
283 | } | ||
284 | opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))} | ||
285 | } | ||
286 | |||
287 | uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts) | ||
288 | if err != nil { | ||
289 | return UploadInfo{}, err | ||
290 | } | ||
291 | |||
292 | uploadInfo.Size = totalUploadedSize | ||
293 | return uploadInfo, nil | ||
294 | } | ||
295 | |||
296 | func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, bucketName, objectName string, | ||
297 | reader io.Reader, size int64, opts PutObjectOptions, | ||
298 | ) (info UploadInfo, err error) { | ||
299 | // Input validation. | ||
300 | if err = s3utils.CheckValidBucketName(bucketName); err != nil { | ||
301 | return UploadInfo{}, err | ||
302 | } | ||
303 | if err = s3utils.CheckValidObjectName(objectName); err != nil { | ||
304 | return UploadInfo{}, err | ||
305 | } | ||
306 | |||
307 | if !opts.SendContentMd5 { | ||
308 | if opts.UserMetadata == nil { | ||
309 | opts.UserMetadata = make(map[string]string, 1) | ||
310 | } | ||
311 | opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C" | ||
312 | } | ||
313 | |||
314 | // Calculate the optimal parts info for a given size. | ||
315 | totalPartsCount, partSize, lastPartSize, err := OptimalPartInfo(size, opts.PartSize) | ||
316 | if err != nil { | ||
317 | return UploadInfo{}, err | ||
318 | } | ||
319 | // Initiates a new multipart request | ||
320 | uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) | ||
321 | if err != nil { | ||
322 | return UploadInfo{}, err | ||
323 | } | ||
324 | delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm") | ||
325 | |||
326 | // Aborts the multipart upload if the function returns | ||
327 | // any error, since we do not resume we should purge | ||
328 | // the parts which have been uploaded to relinquish | ||
329 | // storage space. | ||
330 | defer func() { | ||
331 | if err != nil { | ||
332 | c.abortMultipartUpload(ctx, bucketName, objectName, uploadID) | ||
333 | } | ||
334 | }() | ||
335 | |||
336 | // Create checksums | ||
337 | // CRC32C is ~50% faster on AMD64 @ 30GB/s | ||
338 | var crcBytes []byte | ||
339 | customHeader := make(http.Header) | ||
340 | crc := crc32.New(crc32.MakeTable(crc32.Castagnoli)) | ||
341 | md5Hash := c.md5Hasher() | ||
342 | defer md5Hash.Close() | ||
343 | |||
344 | // Total data read and written to server. should be equal to 'size' at the end of the call. | ||
345 | var totalUploadedSize int64 | ||
346 | |||
347 | // Initialize parts uploaded map. | ||
348 | partsInfo := make(map[int]ObjectPart) | ||
349 | |||
350 | // Create a buffer. | ||
351 | buf := make([]byte, partSize) | ||
352 | |||
353 | // Avoid declaring variables in the for loop | ||
354 | var md5Base64 string | ||
355 | |||
356 | // Part number always starts with '1'. | ||
357 | var partNumber int | ||
358 | for partNumber = 1; partNumber <= totalPartsCount; partNumber++ { | ||
359 | |||
360 | // Proceed to upload the part. | ||
361 | if partNumber == totalPartsCount { | ||
362 | partSize = lastPartSize | ||
363 | } | ||
364 | |||
365 | length, rerr := readFull(reader, buf) | ||
366 | if rerr == io.EOF && partNumber > 1 { | ||
367 | break | ||
368 | } | ||
369 | |||
370 | if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF { | ||
371 | return UploadInfo{}, rerr | ||
372 | } | ||
373 | |||
374 | // Calculate md5sum. | ||
375 | if opts.SendContentMd5 { | ||
376 | md5Hash.Reset() | ||
377 | md5Hash.Write(buf[:length]) | ||
378 | md5Base64 = base64.StdEncoding.EncodeToString(md5Hash.Sum(nil)) | ||
379 | } else { | ||
380 | // Add CRC32C instead. | ||
381 | crc.Reset() | ||
382 | crc.Write(buf[:length]) | ||
383 | cSum := crc.Sum(nil) | ||
384 | customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum)) | ||
385 | crcBytes = append(crcBytes, cSum...) | ||
386 | } | ||
387 | |||
388 | // Update progress reader appropriately to the latest offset | ||
389 | // as we read from the source. | ||
390 | hooked := newHook(bytes.NewReader(buf[:length]), opts.Progress) | ||
391 | p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: hooked, partNumber: partNumber, md5Base64: md5Base64, size: partSize, sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader} | ||
392 | objPart, uerr := c.uploadPart(ctx, p) | ||
393 | if uerr != nil { | ||
394 | return UploadInfo{}, uerr | ||
395 | } | ||
396 | |||
397 | // Save successfully uploaded part metadata. | ||
398 | partsInfo[partNumber] = objPart | ||
399 | |||
400 | // Save successfully uploaded size. | ||
401 | totalUploadedSize += partSize | ||
402 | } | ||
403 | |||
404 | // Verify if we uploaded all the data. | ||
405 | if size > 0 { | ||
406 | if totalUploadedSize != size { | ||
407 | return UploadInfo{}, errUnexpectedEOF(totalUploadedSize, size, bucketName, objectName) | ||
408 | } | ||
409 | } | ||
410 | |||
411 | // Complete multipart upload. | ||
412 | var complMultipartUpload completeMultipartUpload | ||
413 | |||
414 | // Loop over total uploaded parts to save them in | ||
415 | // Parts array before completing the multipart request. | ||
416 | for i := 1; i < partNumber; i++ { | ||
417 | part, ok := partsInfo[i] | ||
418 | if !ok { | ||
419 | return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i)) | ||
420 | } | ||
421 | complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ | ||
422 | ETag: part.ETag, | ||
423 | PartNumber: part.PartNumber, | ||
424 | ChecksumCRC32: part.ChecksumCRC32, | ||
425 | ChecksumCRC32C: part.ChecksumCRC32C, | ||
426 | ChecksumSHA1: part.ChecksumSHA1, | ||
427 | ChecksumSHA256: part.ChecksumSHA256, | ||
428 | }) | ||
429 | } | ||
430 | |||
431 | // Sort all completed parts. | ||
432 | sort.Sort(completedParts(complMultipartUpload.Parts)) | ||
433 | |||
434 | opts = PutObjectOptions{ | ||
435 | ServerSideEncryption: opts.ServerSideEncryption, | ||
436 | } | ||
437 | if len(crcBytes) > 0 { | ||
438 | // Add hash of hashes. | ||
439 | crc.Reset() | ||
440 | crc.Write(crcBytes) | ||
441 | opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))} | ||
442 | } | ||
443 | uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts) | ||
444 | if err != nil { | ||
445 | return UploadInfo{}, err | ||
446 | } | ||
447 | |||
448 | uploadInfo.Size = totalUploadedSize | ||
449 | return uploadInfo, nil | ||
450 | } | ||
451 | |||
452 | // putObjectMultipartStreamParallel uploads opts.NumThreads parts in parallel. | ||
453 | // This is expected to take opts.PartSize * opts.NumThreads * (GOGC / 100) bytes of buffer. | ||
454 | func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketName, objectName string, | ||
455 | reader io.Reader, opts PutObjectOptions, | ||
456 | ) (info UploadInfo, err error) { | ||
457 | // Input validation. | ||
458 | if err = s3utils.CheckValidBucketName(bucketName); err != nil { | ||
459 | return UploadInfo{}, err | ||
460 | } | ||
461 | |||
462 | if err = s3utils.CheckValidObjectName(objectName); err != nil { | ||
463 | return UploadInfo{}, err | ||
464 | } | ||
465 | |||
466 | if !opts.SendContentMd5 { | ||
467 | if opts.UserMetadata == nil { | ||
468 | opts.UserMetadata = make(map[string]string, 1) | ||
469 | } | ||
470 | opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C" | ||
471 | } | ||
472 | |||
473 | // Cancel all when an error occurs. | ||
474 | ctx, cancel := context.WithCancel(ctx) | ||
475 | defer cancel() | ||
476 | |||
477 | // Calculate the optimal parts info for a given size. | ||
478 | totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize) | ||
479 | if err != nil { | ||
480 | return UploadInfo{}, err | ||
481 | } | ||
482 | |||
483 | // Initiates a new multipart request | ||
484 | uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) | ||
485 | if err != nil { | ||
486 | return UploadInfo{}, err | ||
487 | } | ||
488 | delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm") | ||
489 | |||
490 | // Aborts the multipart upload if the function returns | ||
491 | // any error, since we do not resume we should purge | ||
492 | // the parts which have been uploaded to relinquish | ||
493 | // storage space. | ||
494 | defer func() { | ||
495 | if err != nil { | ||
496 | c.abortMultipartUpload(ctx, bucketName, objectName, uploadID) | ||
497 | } | ||
498 | }() | ||
499 | |||
500 | // Create checksums | ||
501 | // CRC32C is ~50% faster on AMD64 @ 30GB/s | ||
502 | var crcBytes []byte | ||
503 | crc := crc32.New(crc32.MakeTable(crc32.Castagnoli)) | ||
504 | |||
505 | // Total data read and written to server. should be equal to 'size' at the end of the call. | ||
506 | var totalUploadedSize int64 | ||
507 | |||
508 | // Initialize parts uploaded map. | ||
509 | partsInfo := make(map[int]ObjectPart) | ||
510 | |||
511 | // Create a buffer. | ||
512 | nBuffers := int64(opts.NumThreads) | ||
513 | bufs := make(chan []byte, nBuffers) | ||
514 | all := make([]byte, nBuffers*partSize) | ||
515 | for i := int64(0); i < nBuffers; i++ { | ||
516 | bufs <- all[i*partSize : i*partSize+partSize] | ||
517 | } | ||
518 | |||
519 | var wg sync.WaitGroup | ||
520 | var mu sync.Mutex | ||
521 | errCh := make(chan error, opts.NumThreads) | ||
522 | |||
523 | reader = newHook(reader, opts.Progress) | ||
524 | |||
525 | // Part number always starts with '1'. | ||
526 | var partNumber int | ||
527 | for partNumber = 1; partNumber <= totalPartsCount; partNumber++ { | ||
528 | // Proceed to upload the part. | ||
529 | var buf []byte | ||
530 | select { | ||
531 | case buf = <-bufs: | ||
532 | case err = <-errCh: | ||
533 | cancel() | ||
534 | wg.Wait() | ||
535 | return UploadInfo{}, err | ||
536 | } | ||
537 | |||
538 | if int64(len(buf)) != partSize { | ||
539 | return UploadInfo{}, fmt.Errorf("read buffer < %d than expected partSize: %d", len(buf), partSize) | ||
540 | } | ||
541 | |||
542 | length, rerr := readFull(reader, buf) | ||
543 | if rerr == io.EOF && partNumber > 1 { | ||
544 | // Done | ||
545 | break | ||
546 | } | ||
547 | |||
548 | if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF { | ||
549 | cancel() | ||
550 | wg.Wait() | ||
551 | return UploadInfo{}, rerr | ||
552 | } | ||
553 | |||
554 | // Calculate md5sum. | ||
555 | customHeader := make(http.Header) | ||
556 | if !opts.SendContentMd5 { | ||
557 | // Add CRC32C instead. | ||
558 | crc.Reset() | ||
559 | crc.Write(buf[:length]) | ||
560 | cSum := crc.Sum(nil) | ||
561 | customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum)) | ||
562 | crcBytes = append(crcBytes, cSum...) | ||
563 | } | ||
564 | |||
565 | wg.Add(1) | ||
566 | go func(partNumber int) { | ||
567 | // Avoid declaring variables in the for loop | ||
568 | var md5Base64 string | ||
569 | |||
570 | if opts.SendContentMd5 { | ||
571 | md5Hash := c.md5Hasher() | ||
572 | md5Hash.Write(buf[:length]) | ||
573 | md5Base64 = base64.StdEncoding.EncodeToString(md5Hash.Sum(nil)) | ||
574 | md5Hash.Close() | ||
575 | } | ||
576 | |||
577 | defer wg.Done() | ||
578 | p := uploadPartParams{ | ||
579 | bucketName: bucketName, | ||
580 | objectName: objectName, | ||
581 | uploadID: uploadID, | ||
582 | reader: bytes.NewReader(buf[:length]), | ||
583 | partNumber: partNumber, | ||
584 | md5Base64: md5Base64, | ||
585 | size: int64(length), | ||
586 | sse: opts.ServerSideEncryption, | ||
587 | streamSha256: !opts.DisableContentSha256, | ||
588 | customHeader: customHeader, | ||
589 | } | ||
590 | objPart, uerr := c.uploadPart(ctx, p) | ||
591 | if uerr != nil { | ||
592 | errCh <- uerr | ||
593 | return | ||
594 | } | ||
595 | |||
596 | // Save successfully uploaded part metadata. | ||
597 | mu.Lock() | ||
598 | partsInfo[partNumber] = objPart | ||
599 | mu.Unlock() | ||
600 | |||
601 | // Send buffer back so it can be reused. | ||
602 | bufs <- buf | ||
603 | }(partNumber) | ||
604 | |||
605 | // Save successfully uploaded size. | ||
606 | totalUploadedSize += int64(length) | ||
607 | } | ||
608 | wg.Wait() | ||
609 | |||
610 | // Collect any error | ||
611 | select { | ||
612 | case err = <-errCh: | ||
613 | return UploadInfo{}, err | ||
614 | default: | ||
615 | } | ||
616 | |||
617 | // Complete multipart upload. | ||
618 | var complMultipartUpload completeMultipartUpload | ||
619 | |||
620 | // Loop over total uploaded parts to save them in | ||
621 | // Parts array before completing the multipart request. | ||
622 | for i := 1; i < partNumber; i++ { | ||
623 | part, ok := partsInfo[i] | ||
624 | if !ok { | ||
625 | return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i)) | ||
626 | } | ||
627 | complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ | ||
628 | ETag: part.ETag, | ||
629 | PartNumber: part.PartNumber, | ||
630 | ChecksumCRC32: part.ChecksumCRC32, | ||
631 | ChecksumCRC32C: part.ChecksumCRC32C, | ||
632 | ChecksumSHA1: part.ChecksumSHA1, | ||
633 | ChecksumSHA256: part.ChecksumSHA256, | ||
634 | }) | ||
635 | } | ||
636 | |||
637 | // Sort all completed parts. | ||
638 | sort.Sort(completedParts(complMultipartUpload.Parts)) | ||
639 | |||
640 | opts = PutObjectOptions{} | ||
641 | if len(crcBytes) > 0 { | ||
642 | // Add hash of hashes. | ||
643 | crc.Reset() | ||
644 | crc.Write(crcBytes) | ||
645 | opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))} | ||
646 | } | ||
647 | uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts) | ||
648 | if err != nil { | ||
649 | return UploadInfo{}, err | ||
650 | } | ||
651 | |||
652 | uploadInfo.Size = totalUploadedSize | ||
653 | return uploadInfo, nil | ||
654 | } | ||
655 | |||
656 | // putObject special function used Google Cloud Storage. This special function | ||
657 | // is used for Google Cloud Storage since Google's multipart API is not S3 compatible. | ||
658 | func (c *Client) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) { | ||
659 | // Input validation. | ||
660 | if err := s3utils.CheckValidBucketName(bucketName); err != nil { | ||
661 | return UploadInfo{}, err | ||
662 | } | ||
663 | if err := s3utils.CheckValidObjectName(objectName); err != nil { | ||
664 | return UploadInfo{}, err | ||
665 | } | ||
666 | |||
667 | // Size -1 is only supported on Google Cloud Storage, we error | ||
668 | // out in all other situations. | ||
669 | if size < 0 && !s3utils.IsGoogleEndpoint(*c.endpointURL) { | ||
670 | return UploadInfo{}, errEntityTooSmall(size, bucketName, objectName) | ||
671 | } | ||
672 | |||
673 | if opts.SendContentMd5 && s3utils.IsGoogleEndpoint(*c.endpointURL) && size < 0 { | ||
674 | return UploadInfo{}, errInvalidArgument("MD5Sum cannot be calculated with size '-1'") | ||
675 | } | ||
676 | |||
677 | var readSeeker io.Seeker | ||
678 | if size > 0 { | ||
679 | if isReadAt(reader) && !isObject(reader) { | ||
680 | seeker, ok := reader.(io.Seeker) | ||
681 | if ok { | ||
682 | offset, err := seeker.Seek(0, io.SeekCurrent) | ||
683 | if err != nil { | ||
684 | return UploadInfo{}, errInvalidArgument(err.Error()) | ||
685 | } | ||
686 | reader = io.NewSectionReader(reader.(io.ReaderAt), offset, size) | ||
687 | readSeeker = reader.(io.Seeker) | ||
688 | } | ||
689 | } | ||
690 | } | ||
691 | |||
692 | var md5Base64 string | ||
693 | if opts.SendContentMd5 { | ||
694 | // Calculate md5sum. | ||
695 | hash := c.md5Hasher() | ||
696 | |||
697 | if readSeeker != nil { | ||
698 | if _, err := io.Copy(hash, reader); err != nil { | ||
699 | return UploadInfo{}, err | ||
700 | } | ||
701 | // Seek back to beginning of io.NewSectionReader's offset. | ||
702 | _, err = readSeeker.Seek(0, io.SeekStart) | ||
703 | if err != nil { | ||
704 | return UploadInfo{}, errInvalidArgument(err.Error()) | ||
705 | } | ||
706 | } else { | ||
707 | // Create a buffer. | ||
708 | buf := make([]byte, size) | ||
709 | |||
710 | length, err := readFull(reader, buf) | ||
711 | if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF { | ||
712 | return UploadInfo{}, err | ||
713 | } | ||
714 | |||
715 | hash.Write(buf[:length]) | ||
716 | reader = bytes.NewReader(buf[:length]) | ||
717 | } | ||
718 | |||
719 | md5Base64 = base64.StdEncoding.EncodeToString(hash.Sum(nil)) | ||
720 | hash.Close() | ||
721 | } | ||
722 | |||
723 | // Update progress reader appropriately to the latest offset as we | ||
724 | // read from the source. | ||
725 | progressReader := newHook(reader, opts.Progress) | ||
726 | |||
727 | // This function does not calculate sha256 and md5sum for payload. | ||
728 | // Execute put object. | ||
729 | return c.putObjectDo(ctx, bucketName, objectName, progressReader, md5Base64, "", size, opts) | ||
730 | } | ||
731 | |||
732 | // putObjectDo - executes the put object http operation. | ||
733 | // NOTE: You must have WRITE permissions on a bucket to add an object to it. | ||
734 | func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string, reader io.Reader, md5Base64, sha256Hex string, size int64, opts PutObjectOptions) (UploadInfo, error) { | ||
735 | // Input validation. | ||
736 | if err := s3utils.CheckValidBucketName(bucketName); err != nil { | ||
737 | return UploadInfo{}, err | ||
738 | } | ||
739 | if err := s3utils.CheckValidObjectName(objectName); err != nil { | ||
740 | return UploadInfo{}, err | ||
741 | } | ||
742 | // Set headers. | ||
743 | customHeader := opts.Header() | ||
744 | |||
745 | // Add CRC when client supports it, MD5 is not set, not Google and we don't add SHA256 to chunks. | ||
746 | addCrc := c.trailingHeaderSupport && md5Base64 == "" && !s3utils.IsGoogleEndpoint(*c.endpointURL) && (opts.DisableContentSha256 || c.secure) | ||
747 | |||
748 | if addCrc { | ||
749 | // If user has added checksums, don't add them ourselves. | ||
750 | for k := range opts.UserMetadata { | ||
751 | if strings.HasPrefix(strings.ToLower(k), "x-amz-checksum-") { | ||
752 | addCrc = false | ||
753 | } | ||
754 | } | ||
755 | } | ||
756 | // Populate request metadata. | ||
757 | reqMetadata := requestMetadata{ | ||
758 | bucketName: bucketName, | ||
759 | objectName: objectName, | ||
760 | customHeader: customHeader, | ||
761 | contentBody: reader, | ||
762 | contentLength: size, | ||
763 | contentMD5Base64: md5Base64, | ||
764 | contentSHA256Hex: sha256Hex, | ||
765 | streamSha256: !opts.DisableContentSha256, | ||
766 | addCrc: addCrc, | ||
767 | } | ||
768 | if opts.Internal.SourceVersionID != "" { | ||
769 | if opts.Internal.SourceVersionID != nullVersionID { | ||
770 | if _, err := uuid.Parse(opts.Internal.SourceVersionID); err != nil { | ||
771 | return UploadInfo{}, errInvalidArgument(err.Error()) | ||
772 | } | ||
773 | } | ||
774 | urlValues := make(url.Values) | ||
775 | urlValues.Set("versionId", opts.Internal.SourceVersionID) | ||
776 | reqMetadata.queryValues = urlValues | ||
777 | } | ||
778 | |||
779 | // Execute PUT an objectName. | ||
780 | resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata) | ||
781 | defer closeResponse(resp) | ||
782 | if err != nil { | ||
783 | return UploadInfo{}, err | ||
784 | } | ||
785 | if resp != nil { | ||
786 | if resp.StatusCode != http.StatusOK { | ||
787 | return UploadInfo{}, httpRespToErrorResponse(resp, bucketName, objectName) | ||
788 | } | ||
789 | } | ||
790 | |||
791 | // extract lifecycle expiry date and rule ID | ||
792 | expTime, ruleID := amzExpirationToExpiryDateRuleID(resp.Header.Get(amzExpiration)) | ||
793 | h := resp.Header | ||
794 | return UploadInfo{ | ||
795 | Bucket: bucketName, | ||
796 | Key: objectName, | ||
797 | ETag: trimEtag(h.Get("ETag")), | ||
798 | VersionID: h.Get(amzVersionID), | ||
799 | Size: size, | ||
800 | Expiration: expTime, | ||
801 | ExpirationRuleID: ruleID, | ||
802 | |||
803 | // Checksum values | ||
804 | ChecksumCRC32: h.Get("x-amz-checksum-crc32"), | ||
805 | ChecksumCRC32C: h.Get("x-amz-checksum-crc32c"), | ||
806 | ChecksumSHA1: h.Get("x-amz-checksum-sha1"), | ||
807 | ChecksumSHA256: h.Get("x-amz-checksum-sha256"), | ||
808 | }, nil | ||
809 | } | ||