aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/minio/minio-go/v7/api-put-object-multipart.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/minio/minio-go/v7/api-put-object-multipart.go')
-rw-r--r--vendor/github.com/minio/minio-go/v7/api-put-object-multipart.go465
1 files changed, 465 insertions, 0 deletions
diff --git a/vendor/github.com/minio/minio-go/v7/api-put-object-multipart.go b/vendor/github.com/minio/minio-go/v7/api-put-object-multipart.go
new file mode 100644
index 0000000..5f117af
--- /dev/null
+++ b/vendor/github.com/minio/minio-go/v7/api-put-object-multipart.go
@@ -0,0 +1,465 @@
1/*
2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage
3 * Copyright 2015-2017 MinIO, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package minio
19
20import (
21 "bytes"
22 "context"
23 "encoding/base64"
24 "encoding/hex"
25 "encoding/xml"
26 "fmt"
27 "hash/crc32"
28 "io"
29 "net/http"
30 "net/url"
31 "sort"
32 "strconv"
33 "strings"
34
35 "github.com/google/uuid"
36 "github.com/minio/minio-go/v7/pkg/encrypt"
37 "github.com/minio/minio-go/v7/pkg/s3utils"
38)
39
40func (c *Client) putObjectMultipart(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64,
41 opts PutObjectOptions,
42) (info UploadInfo, err error) {
43 info, err = c.putObjectMultipartNoStream(ctx, bucketName, objectName, reader, opts)
44 if err != nil {
45 errResp := ToErrorResponse(err)
46 // Verify if multipart functionality is not available, if not
47 // fall back to single PutObject operation.
48 if errResp.Code == "AccessDenied" && strings.Contains(errResp.Message, "Access Denied") {
49 // Verify if size of reader is greater than '5GiB'.
50 if size > maxSinglePutObjectSize {
51 return UploadInfo{}, errEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
52 }
53 // Fall back to uploading as single PutObject operation.
54 return c.putObject(ctx, bucketName, objectName, reader, size, opts)
55 }
56 }
57 return info, err
58}
59
60func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (info UploadInfo, err error) {
61 // Input validation.
62 if err = s3utils.CheckValidBucketName(bucketName); err != nil {
63 return UploadInfo{}, err
64 }
65 if err = s3utils.CheckValidObjectName(objectName); err != nil {
66 return UploadInfo{}, err
67 }
68
69 // Total data read and written to server. should be equal to
70 // 'size' at the end of the call.
71 var totalUploadedSize int64
72
73 // Complete multipart upload.
74 var complMultipartUpload completeMultipartUpload
75
76 // Calculate the optimal parts info for a given size.
77 totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize)
78 if err != nil {
79 return UploadInfo{}, err
80 }
81
82 // Choose hash algorithms to be calculated by hashCopyN,
83 // avoid sha256 with non-v4 signature request or
84 // HTTPS connection.
85 hashAlgos, hashSums := c.hashMaterials(opts.SendContentMd5, !opts.DisableContentSha256)
86 if len(hashSums) == 0 {
87 if opts.UserMetadata == nil {
88 opts.UserMetadata = make(map[string]string, 1)
89 }
90 opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
91 }
92
93 // Initiate a new multipart upload.
94 uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
95 if err != nil {
96 return UploadInfo{}, err
97 }
98 delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm")
99
100 defer func() {
101 if err != nil {
102 c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
103 }
104 }()
105
106 // Part number always starts with '1'.
107 partNumber := 1
108
109 // Initialize parts uploaded map.
110 partsInfo := make(map[int]ObjectPart)
111
112 // Create a buffer.
113 buf := make([]byte, partSize)
114
115 // Create checksums
116 // CRC32C is ~50% faster on AMD64 @ 30GB/s
117 var crcBytes []byte
118 customHeader := make(http.Header)
119 crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
120 for partNumber <= totalPartsCount {
121 length, rErr := readFull(reader, buf)
122 if rErr == io.EOF && partNumber > 1 {
123 break
124 }
125
126 if rErr != nil && rErr != io.ErrUnexpectedEOF && rErr != io.EOF {
127 return UploadInfo{}, rErr
128 }
129
130 // Calculates hash sums while copying partSize bytes into cw.
131 for k, v := range hashAlgos {
132 v.Write(buf[:length])
133 hashSums[k] = v.Sum(nil)
134 v.Close()
135 }
136
137 // Update progress reader appropriately to the latest offset
138 // as we read from the source.
139 rd := newHook(bytes.NewReader(buf[:length]), opts.Progress)
140
141 // Checksums..
142 var (
143 md5Base64 string
144 sha256Hex string
145 )
146
147 if hashSums["md5"] != nil {
148 md5Base64 = base64.StdEncoding.EncodeToString(hashSums["md5"])
149 }
150 if hashSums["sha256"] != nil {
151 sha256Hex = hex.EncodeToString(hashSums["sha256"])
152 }
153 if len(hashSums) == 0 {
154 crc.Reset()
155 crc.Write(buf[:length])
156 cSum := crc.Sum(nil)
157 customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
158 crcBytes = append(crcBytes, cSum...)
159 }
160
161 p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: rd, partNumber: partNumber, md5Base64: md5Base64, sha256Hex: sha256Hex, size: int64(length), sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader}
162 // Proceed to upload the part.
163 objPart, uerr := c.uploadPart(ctx, p)
164 if uerr != nil {
165 return UploadInfo{}, uerr
166 }
167
168 // Save successfully uploaded part metadata.
169 partsInfo[partNumber] = objPart
170
171 // Save successfully uploaded size.
172 totalUploadedSize += int64(length)
173
174 // Increment part number.
175 partNumber++
176
177 // For unknown size, Read EOF we break away.
178 // We do not have to upload till totalPartsCount.
179 if rErr == io.EOF {
180 break
181 }
182 }
183
184 // Loop over total uploaded parts to save them in
185 // Parts array before completing the multipart request.
186 for i := 1; i < partNumber; i++ {
187 part, ok := partsInfo[i]
188 if !ok {
189 return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
190 }
191 complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
192 ETag: part.ETag,
193 PartNumber: part.PartNumber,
194 ChecksumCRC32: part.ChecksumCRC32,
195 ChecksumCRC32C: part.ChecksumCRC32C,
196 ChecksumSHA1: part.ChecksumSHA1,
197 ChecksumSHA256: part.ChecksumSHA256,
198 })
199 }
200
201 // Sort all completed parts.
202 sort.Sort(completedParts(complMultipartUpload.Parts))
203 opts = PutObjectOptions{
204 ServerSideEncryption: opts.ServerSideEncryption,
205 }
206 if len(crcBytes) > 0 {
207 // Add hash of hashes.
208 crc.Reset()
209 crc.Write(crcBytes)
210 opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
211 }
212 uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
213 if err != nil {
214 return UploadInfo{}, err
215 }
216
217 uploadInfo.Size = totalUploadedSize
218 return uploadInfo, nil
219}
220
221// initiateMultipartUpload - Initiates a multipart upload and returns an upload ID.
222func (c *Client) initiateMultipartUpload(ctx context.Context, bucketName, objectName string, opts PutObjectOptions) (initiateMultipartUploadResult, error) {
223 // Input validation.
224 if err := s3utils.CheckValidBucketName(bucketName); err != nil {
225 return initiateMultipartUploadResult{}, err
226 }
227 if err := s3utils.CheckValidObjectName(objectName); err != nil {
228 return initiateMultipartUploadResult{}, err
229 }
230
231 // Initialize url queries.
232 urlValues := make(url.Values)
233 urlValues.Set("uploads", "")
234
235 if opts.Internal.SourceVersionID != "" {
236 if opts.Internal.SourceVersionID != nullVersionID {
237 if _, err := uuid.Parse(opts.Internal.SourceVersionID); err != nil {
238 return initiateMultipartUploadResult{}, errInvalidArgument(err.Error())
239 }
240 }
241 urlValues.Set("versionId", opts.Internal.SourceVersionID)
242 }
243
244 // Set ContentType header.
245 customHeader := opts.Header()
246
247 reqMetadata := requestMetadata{
248 bucketName: bucketName,
249 objectName: objectName,
250 queryValues: urlValues,
251 customHeader: customHeader,
252 }
253
254 // Execute POST on an objectName to initiate multipart upload.
255 resp, err := c.executeMethod(ctx, http.MethodPost, reqMetadata)
256 defer closeResponse(resp)
257 if err != nil {
258 return initiateMultipartUploadResult{}, err
259 }
260 if resp != nil {
261 if resp.StatusCode != http.StatusOK {
262 return initiateMultipartUploadResult{}, httpRespToErrorResponse(resp, bucketName, objectName)
263 }
264 }
265 // Decode xml for new multipart upload.
266 initiateMultipartUploadResult := initiateMultipartUploadResult{}
267 err = xmlDecoder(resp.Body, &initiateMultipartUploadResult)
268 if err != nil {
269 return initiateMultipartUploadResult, err
270 }
271 return initiateMultipartUploadResult, nil
272}
273
274type uploadPartParams struct {
275 bucketName string
276 objectName string
277 uploadID string
278 reader io.Reader
279 partNumber int
280 md5Base64 string
281 sha256Hex string
282 size int64
283 sse encrypt.ServerSide
284 streamSha256 bool
285 customHeader http.Header
286 trailer http.Header
287}
288
289// uploadPart - Uploads a part in a multipart upload.
290func (c *Client) uploadPart(ctx context.Context, p uploadPartParams) (ObjectPart, error) {
291 // Input validation.
292 if err := s3utils.CheckValidBucketName(p.bucketName); err != nil {
293 return ObjectPart{}, err
294 }
295 if err := s3utils.CheckValidObjectName(p.objectName); err != nil {
296 return ObjectPart{}, err
297 }
298 if p.size > maxPartSize {
299 return ObjectPart{}, errEntityTooLarge(p.size, maxPartSize, p.bucketName, p.objectName)
300 }
301 if p.size <= -1 {
302 return ObjectPart{}, errEntityTooSmall(p.size, p.bucketName, p.objectName)
303 }
304 if p.partNumber <= 0 {
305 return ObjectPart{}, errInvalidArgument("Part number cannot be negative or equal to zero.")
306 }
307 if p.uploadID == "" {
308 return ObjectPart{}, errInvalidArgument("UploadID cannot be empty.")
309 }
310
311 // Get resources properly escaped and lined up before using them in http request.
312 urlValues := make(url.Values)
313 // Set part number.
314 urlValues.Set("partNumber", strconv.Itoa(p.partNumber))
315 // Set upload id.
316 urlValues.Set("uploadId", p.uploadID)
317
318 // Set encryption headers, if any.
319 if p.customHeader == nil {
320 p.customHeader = make(http.Header)
321 }
322 // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html
323 // Server-side encryption is supported by the S3 Multipart Upload actions.
324 // Unless you are using a customer-provided encryption key, you don't need
325 // to specify the encryption parameters in each UploadPart request.
326 if p.sse != nil && p.sse.Type() == encrypt.SSEC {
327 p.sse.Marshal(p.customHeader)
328 }
329
330 reqMetadata := requestMetadata{
331 bucketName: p.bucketName,
332 objectName: p.objectName,
333 queryValues: urlValues,
334 customHeader: p.customHeader,
335 contentBody: p.reader,
336 contentLength: p.size,
337 contentMD5Base64: p.md5Base64,
338 contentSHA256Hex: p.sha256Hex,
339 streamSha256: p.streamSha256,
340 trailer: p.trailer,
341 }
342
343 // Execute PUT on each part.
344 resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
345 defer closeResponse(resp)
346 if err != nil {
347 return ObjectPart{}, err
348 }
349 if resp != nil {
350 if resp.StatusCode != http.StatusOK {
351 return ObjectPart{}, httpRespToErrorResponse(resp, p.bucketName, p.objectName)
352 }
353 }
354 // Once successfully uploaded, return completed part.
355 h := resp.Header
356 objPart := ObjectPart{
357 ChecksumCRC32: h.Get("x-amz-checksum-crc32"),
358 ChecksumCRC32C: h.Get("x-amz-checksum-crc32c"),
359 ChecksumSHA1: h.Get("x-amz-checksum-sha1"),
360 ChecksumSHA256: h.Get("x-amz-checksum-sha256"),
361 }
362 objPart.Size = p.size
363 objPart.PartNumber = p.partNumber
364 // Trim off the odd double quotes from ETag in the beginning and end.
365 objPart.ETag = trimEtag(h.Get("ETag"))
366 return objPart, nil
367}
368
369// completeMultipartUpload - Completes a multipart upload by assembling previously uploaded parts.
370func (c *Client) completeMultipartUpload(ctx context.Context, bucketName, objectName, uploadID string,
371 complete completeMultipartUpload, opts PutObjectOptions,
372) (UploadInfo, error) {
373 // Input validation.
374 if err := s3utils.CheckValidBucketName(bucketName); err != nil {
375 return UploadInfo{}, err
376 }
377 if err := s3utils.CheckValidObjectName(objectName); err != nil {
378 return UploadInfo{}, err
379 }
380
381 // Initialize url queries.
382 urlValues := make(url.Values)
383 urlValues.Set("uploadId", uploadID)
384 // Marshal complete multipart body.
385 completeMultipartUploadBytes, err := xml.Marshal(complete)
386 if err != nil {
387 return UploadInfo{}, err
388 }
389
390 headers := opts.Header()
391 if s3utils.IsAmazonEndpoint(*c.endpointURL) {
392 headers.Del(encrypt.SseKmsKeyID) // Remove X-Amz-Server-Side-Encryption-Aws-Kms-Key-Id not supported in CompleteMultipartUpload
393 headers.Del(encrypt.SseGenericHeader) // Remove X-Amz-Server-Side-Encryption not supported in CompleteMultipartUpload
394 headers.Del(encrypt.SseEncryptionContext) // Remove X-Amz-Server-Side-Encryption-Context not supported in CompleteMultipartUpload
395 }
396
397 // Instantiate all the complete multipart buffer.
398 completeMultipartUploadBuffer := bytes.NewReader(completeMultipartUploadBytes)
399 reqMetadata := requestMetadata{
400 bucketName: bucketName,
401 objectName: objectName,
402 queryValues: urlValues,
403 contentBody: completeMultipartUploadBuffer,
404 contentLength: int64(len(completeMultipartUploadBytes)),
405 contentSHA256Hex: sum256Hex(completeMultipartUploadBytes),
406 customHeader: headers,
407 }
408
409 // Execute POST to complete multipart upload for an objectName.
410 resp, err := c.executeMethod(ctx, http.MethodPost, reqMetadata)
411 defer closeResponse(resp)
412 if err != nil {
413 return UploadInfo{}, err
414 }
415 if resp != nil {
416 if resp.StatusCode != http.StatusOK {
417 return UploadInfo{}, httpRespToErrorResponse(resp, bucketName, objectName)
418 }
419 }
420
421 // Read resp.Body into a []bytes to parse for Error response inside the body
422 var b []byte
423 b, err = io.ReadAll(resp.Body)
424 if err != nil {
425 return UploadInfo{}, err
426 }
427 // Decode completed multipart upload response on success.
428 completeMultipartUploadResult := completeMultipartUploadResult{}
429 err = xmlDecoder(bytes.NewReader(b), &completeMultipartUploadResult)
430 if err != nil {
431 // xml parsing failure due to presence an ill-formed xml fragment
432 return UploadInfo{}, err
433 } else if completeMultipartUploadResult.Bucket == "" {
434 // xml's Decode method ignores well-formed xml that don't apply to the type of value supplied.
435 // In this case, it would leave completeMultipartUploadResult with the corresponding zero-values
436 // of the members.
437
438 // Decode completed multipart upload response on failure
439 completeMultipartUploadErr := ErrorResponse{}
440 err = xmlDecoder(bytes.NewReader(b), &completeMultipartUploadErr)
441 if err != nil {
442 // xml parsing failure due to presence an ill-formed xml fragment
443 return UploadInfo{}, err
444 }
445 return UploadInfo{}, completeMultipartUploadErr
446 }
447
448 // extract lifecycle expiry date and rule ID
449 expTime, ruleID := amzExpirationToExpiryDateRuleID(resp.Header.Get(amzExpiration))
450
451 return UploadInfo{
452 Bucket: completeMultipartUploadResult.Bucket,
453 Key: completeMultipartUploadResult.Key,
454 ETag: trimEtag(completeMultipartUploadResult.ETag),
455 VersionID: resp.Header.Get(amzVersionID),
456 Location: completeMultipartUploadResult.Location,
457 Expiration: expTime,
458 ExpirationRuleID: ruleID,
459
460 ChecksumSHA256: completeMultipartUploadResult.ChecksumSHA256,
461 ChecksumSHA1: completeMultipartUploadResult.ChecksumSHA1,
462 ChecksumCRC32: completeMultipartUploadResult.ChecksumCRC32,
463 ChecksumCRC32C: completeMultipartUploadResult.ChecksumCRC32C,
464 }, nil
465}