aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/minio/minio-go/v7/api-compose-object.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/minio/minio-go/v7/api-compose-object.go')
-rw-r--r--vendor/github.com/minio/minio-go/v7/api-compose-object.go594
1 files changed, 594 insertions, 0 deletions
diff --git a/vendor/github.com/minio/minio-go/v7/api-compose-object.go b/vendor/github.com/minio/minio-go/v7/api-compose-object.go
new file mode 100644
index 0000000..e64a244
--- /dev/null
+++ b/vendor/github.com/minio/minio-go/v7/api-compose-object.go
@@ -0,0 +1,594 @@
1/*
2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage
3 * Copyright 2017, 2018 MinIO, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package minio
19
20import (
21 "context"
22 "fmt"
23 "io"
24 "net/http"
25 "net/url"
26 "strconv"
27 "strings"
28 "time"
29
30 "github.com/google/uuid"
31 "github.com/minio/minio-go/v7/pkg/encrypt"
32 "github.com/minio/minio-go/v7/pkg/s3utils"
33)
34
35// CopyDestOptions represents options specified by user for CopyObject/ComposeObject APIs
36type CopyDestOptions struct {
37 Bucket string // points to destination bucket
38 Object string // points to destination object
39
40 // `Encryption` is the key info for server-side-encryption with customer
41 // provided key. If it is nil, no encryption is performed.
42 Encryption encrypt.ServerSide
43
44 // `userMeta` is the user-metadata key-value pairs to be set on the
45 // destination. The keys are automatically prefixed with `x-amz-meta-`
46 // if needed. If nil is passed, and if only a single source (of any
47 // size) is provided in the ComposeObject call, then metadata from the
48 // source is copied to the destination.
49 // if no user-metadata is provided, it is copied from source
50 // (when there is only once source object in the compose
51 // request)
52 UserMetadata map[string]string
53 // UserMetadata is only set to destination if ReplaceMetadata is true
54 // other value is UserMetadata is ignored and we preserve src.UserMetadata
55 // NOTE: if you set this value to true and now metadata is present
56 // in UserMetadata your destination object will not have any metadata
57 // set.
58 ReplaceMetadata bool
59
60 // `userTags` is the user defined object tags to be set on destination.
61 // This will be set only if the `replaceTags` field is set to true.
62 // Otherwise this field is ignored
63 UserTags map[string]string
64 ReplaceTags bool
65
66 // Specifies whether you want to apply a Legal Hold to the copied object.
67 LegalHold LegalHoldStatus
68
69 // Object Retention related fields
70 Mode RetentionMode
71 RetainUntilDate time.Time
72
73 Size int64 // Needs to be specified if progress bar is specified.
74 // Progress of the entire copy operation will be sent here.
75 Progress io.Reader
76}
77
78// Process custom-metadata to remove a `x-amz-meta-` prefix if
79// present and validate that keys are distinct (after this
80// prefix removal).
81func filterCustomMeta(userMeta map[string]string) map[string]string {
82 m := make(map[string]string)
83 for k, v := range userMeta {
84 if strings.HasPrefix(strings.ToLower(k), "x-amz-meta-") {
85 k = k[len("x-amz-meta-"):]
86 }
87 if _, ok := m[k]; ok {
88 continue
89 }
90 m[k] = v
91 }
92 return m
93}
94
95// Marshal converts all the CopyDestOptions into their
96// equivalent HTTP header representation
97func (opts CopyDestOptions) Marshal(header http.Header) {
98 const replaceDirective = "REPLACE"
99 if opts.ReplaceTags {
100 header.Set(amzTaggingHeaderDirective, replaceDirective)
101 if tags := s3utils.TagEncode(opts.UserTags); tags != "" {
102 header.Set(amzTaggingHeader, tags)
103 }
104 }
105
106 if opts.LegalHold != LegalHoldStatus("") {
107 header.Set(amzLegalHoldHeader, opts.LegalHold.String())
108 }
109
110 if opts.Mode != RetentionMode("") && !opts.RetainUntilDate.IsZero() {
111 header.Set(amzLockMode, opts.Mode.String())
112 header.Set(amzLockRetainUntil, opts.RetainUntilDate.Format(time.RFC3339))
113 }
114
115 if opts.Encryption != nil {
116 opts.Encryption.Marshal(header)
117 }
118
119 if opts.ReplaceMetadata {
120 header.Set("x-amz-metadata-directive", replaceDirective)
121 for k, v := range filterCustomMeta(opts.UserMetadata) {
122 if isAmzHeader(k) || isStandardHeader(k) || isStorageClassHeader(k) {
123 header.Set(k, v)
124 } else {
125 header.Set("x-amz-meta-"+k, v)
126 }
127 }
128 }
129}
130
131// toDestinationInfo returns a validated copyOptions object.
132func (opts CopyDestOptions) validate() (err error) {
133 // Input validation.
134 if err = s3utils.CheckValidBucketName(opts.Bucket); err != nil {
135 return err
136 }
137 if err = s3utils.CheckValidObjectName(opts.Object); err != nil {
138 return err
139 }
140 if opts.Progress != nil && opts.Size < 0 {
141 return errInvalidArgument("For progress bar effective size needs to be specified")
142 }
143 return nil
144}
145
146// CopySrcOptions represents a source object to be copied, using
147// server-side copying APIs.
148type CopySrcOptions struct {
149 Bucket, Object string
150 VersionID string
151 MatchETag string
152 NoMatchETag string
153 MatchModifiedSince time.Time
154 MatchUnmodifiedSince time.Time
155 MatchRange bool
156 Start, End int64
157 Encryption encrypt.ServerSide
158}
159
160// Marshal converts all the CopySrcOptions into their
161// equivalent HTTP header representation
162func (opts CopySrcOptions) Marshal(header http.Header) {
163 // Set the source header
164 header.Set("x-amz-copy-source", s3utils.EncodePath(opts.Bucket+"/"+opts.Object))
165 if opts.VersionID != "" {
166 header.Set("x-amz-copy-source", s3utils.EncodePath(opts.Bucket+"/"+opts.Object)+"?versionId="+opts.VersionID)
167 }
168
169 if opts.MatchETag != "" {
170 header.Set("x-amz-copy-source-if-match", opts.MatchETag)
171 }
172 if opts.NoMatchETag != "" {
173 header.Set("x-amz-copy-source-if-none-match", opts.NoMatchETag)
174 }
175
176 if !opts.MatchModifiedSince.IsZero() {
177 header.Set("x-amz-copy-source-if-modified-since", opts.MatchModifiedSince.Format(http.TimeFormat))
178 }
179 if !opts.MatchUnmodifiedSince.IsZero() {
180 header.Set("x-amz-copy-source-if-unmodified-since", opts.MatchUnmodifiedSince.Format(http.TimeFormat))
181 }
182
183 if opts.Encryption != nil {
184 encrypt.SSECopy(opts.Encryption).Marshal(header)
185 }
186}
187
188func (opts CopySrcOptions) validate() (err error) {
189 // Input validation.
190 if err = s3utils.CheckValidBucketName(opts.Bucket); err != nil {
191 return err
192 }
193 if err = s3utils.CheckValidObjectName(opts.Object); err != nil {
194 return err
195 }
196 if opts.Start > opts.End || opts.Start < 0 {
197 return errInvalidArgument("start must be non-negative, and start must be at most end.")
198 }
199 return nil
200}
201
202// Low level implementation of CopyObject API, supports only upto 5GiB worth of copy.
203func (c *Client) copyObjectDo(ctx context.Context, srcBucket, srcObject, destBucket, destObject string,
204 metadata map[string]string, srcOpts CopySrcOptions, dstOpts PutObjectOptions,
205) (ObjectInfo, error) {
206 // Build headers.
207 headers := make(http.Header)
208
209 // Set all the metadata headers.
210 for k, v := range metadata {
211 headers.Set(k, v)
212 }
213 if !dstOpts.Internal.ReplicationStatus.Empty() {
214 headers.Set(amzBucketReplicationStatus, string(dstOpts.Internal.ReplicationStatus))
215 }
216 if !dstOpts.Internal.SourceMTime.IsZero() {
217 headers.Set(minIOBucketSourceMTime, dstOpts.Internal.SourceMTime.Format(time.RFC3339Nano))
218 }
219 if dstOpts.Internal.SourceETag != "" {
220 headers.Set(minIOBucketSourceETag, dstOpts.Internal.SourceETag)
221 }
222 if dstOpts.Internal.ReplicationRequest {
223 headers.Set(minIOBucketReplicationRequest, "true")
224 }
225 if dstOpts.Internal.ReplicationValidityCheck {
226 headers.Set(minIOBucketReplicationCheck, "true")
227 }
228 if !dstOpts.Internal.LegalholdTimestamp.IsZero() {
229 headers.Set(minIOBucketReplicationObjectLegalHoldTimestamp, dstOpts.Internal.LegalholdTimestamp.Format(time.RFC3339Nano))
230 }
231 if !dstOpts.Internal.RetentionTimestamp.IsZero() {
232 headers.Set(minIOBucketReplicationObjectRetentionTimestamp, dstOpts.Internal.RetentionTimestamp.Format(time.RFC3339Nano))
233 }
234 if !dstOpts.Internal.TaggingTimestamp.IsZero() {
235 headers.Set(minIOBucketReplicationTaggingTimestamp, dstOpts.Internal.TaggingTimestamp.Format(time.RFC3339Nano))
236 }
237
238 if len(dstOpts.UserTags) != 0 {
239 headers.Set(amzTaggingHeader, s3utils.TagEncode(dstOpts.UserTags))
240 }
241
242 reqMetadata := requestMetadata{
243 bucketName: destBucket,
244 objectName: destObject,
245 customHeader: headers,
246 }
247 if dstOpts.Internal.SourceVersionID != "" {
248 if dstOpts.Internal.SourceVersionID != nullVersionID {
249 if _, err := uuid.Parse(dstOpts.Internal.SourceVersionID); err != nil {
250 return ObjectInfo{}, errInvalidArgument(err.Error())
251 }
252 }
253 urlValues := make(url.Values)
254 urlValues.Set("versionId", dstOpts.Internal.SourceVersionID)
255 reqMetadata.queryValues = urlValues
256 }
257
258 // Set the source header
259 headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject))
260 if srcOpts.VersionID != "" {
261 headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject)+"?versionId="+srcOpts.VersionID)
262 }
263 // Send upload-part-copy request
264 resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
265 defer closeResponse(resp)
266 if err != nil {
267 return ObjectInfo{}, err
268 }
269
270 // Check if we got an error response.
271 if resp.StatusCode != http.StatusOK {
272 return ObjectInfo{}, httpRespToErrorResponse(resp, srcBucket, srcObject)
273 }
274
275 cpObjRes := copyObjectResult{}
276 err = xmlDecoder(resp.Body, &cpObjRes)
277 if err != nil {
278 return ObjectInfo{}, err
279 }
280
281 objInfo := ObjectInfo{
282 Key: destObject,
283 ETag: strings.Trim(cpObjRes.ETag, "\""),
284 LastModified: cpObjRes.LastModified,
285 }
286 return objInfo, nil
287}
288
289func (c *Client) copyObjectPartDo(ctx context.Context, srcBucket, srcObject, destBucket, destObject, uploadID string,
290 partID int, startOffset, length int64, metadata map[string]string,
291) (p CompletePart, err error) {
292 headers := make(http.Header)
293
294 // Set source
295 headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject))
296
297 if startOffset < 0 {
298 return p, errInvalidArgument("startOffset must be non-negative")
299 }
300
301 if length >= 0 {
302 headers.Set("x-amz-copy-source-range", fmt.Sprintf("bytes=%d-%d", startOffset, startOffset+length-1))
303 }
304
305 for k, v := range metadata {
306 headers.Set(k, v)
307 }
308
309 queryValues := make(url.Values)
310 queryValues.Set("partNumber", strconv.Itoa(partID))
311 queryValues.Set("uploadId", uploadID)
312
313 resp, err := c.executeMethod(ctx, http.MethodPut, requestMetadata{
314 bucketName: destBucket,
315 objectName: destObject,
316 customHeader: headers,
317 queryValues: queryValues,
318 })
319 defer closeResponse(resp)
320 if err != nil {
321 return
322 }
323
324 // Check if we got an error response.
325 if resp.StatusCode != http.StatusOK {
326 return p, httpRespToErrorResponse(resp, destBucket, destObject)
327 }
328
329 // Decode copy-part response on success.
330 cpObjRes := copyObjectResult{}
331 err = xmlDecoder(resp.Body, &cpObjRes)
332 if err != nil {
333 return p, err
334 }
335 p.PartNumber, p.ETag = partID, cpObjRes.ETag
336 return p, nil
337}
338
339// uploadPartCopy - helper function to create a part in a multipart
340// upload via an upload-part-copy request
341// https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
342func (c *Client) uploadPartCopy(ctx context.Context, bucket, object, uploadID string, partNumber int,
343 headers http.Header,
344) (p CompletePart, err error) {
345 // Build query parameters
346 urlValues := make(url.Values)
347 urlValues.Set("partNumber", strconv.Itoa(partNumber))
348 urlValues.Set("uploadId", uploadID)
349
350 // Send upload-part-copy request
351 resp, err := c.executeMethod(ctx, http.MethodPut, requestMetadata{
352 bucketName: bucket,
353 objectName: object,
354 customHeader: headers,
355 queryValues: urlValues,
356 })
357 defer closeResponse(resp)
358 if err != nil {
359 return p, err
360 }
361
362 // Check if we got an error response.
363 if resp.StatusCode != http.StatusOK {
364 return p, httpRespToErrorResponse(resp, bucket, object)
365 }
366
367 // Decode copy-part response on success.
368 cpObjRes := copyObjectResult{}
369 err = xmlDecoder(resp.Body, &cpObjRes)
370 if err != nil {
371 return p, err
372 }
373 p.PartNumber, p.ETag = partNumber, cpObjRes.ETag
374 return p, nil
375}
376
377// ComposeObject - creates an object using server-side copying
378// of existing objects. It takes a list of source objects (with optional offsets)
379// and concatenates them into a new object using only server-side copying
380// operations. Optionally takes progress reader hook for applications to
381// look at current progress.
382func (c *Client) ComposeObject(ctx context.Context, dst CopyDestOptions, srcs ...CopySrcOptions) (UploadInfo, error) {
383 if len(srcs) < 1 || len(srcs) > maxPartsCount {
384 return UploadInfo{}, errInvalidArgument("There must be as least one and up to 10000 source objects.")
385 }
386
387 for _, src := range srcs {
388 if err := src.validate(); err != nil {
389 return UploadInfo{}, err
390 }
391 }
392
393 if err := dst.validate(); err != nil {
394 return UploadInfo{}, err
395 }
396
397 srcObjectInfos := make([]ObjectInfo, len(srcs))
398 srcObjectSizes := make([]int64, len(srcs))
399 var totalSize, totalParts int64
400 var err error
401 for i, src := range srcs {
402 opts := StatObjectOptions{ServerSideEncryption: encrypt.SSE(src.Encryption), VersionID: src.VersionID}
403 srcObjectInfos[i], err = c.StatObject(context.Background(), src.Bucket, src.Object, opts)
404 if err != nil {
405 return UploadInfo{}, err
406 }
407
408 srcCopySize := srcObjectInfos[i].Size
409 // Check if a segment is specified, and if so, is the
410 // segment within object bounds?
411 if src.MatchRange {
412 // Since range is specified,
413 // 0 <= src.start <= src.end
414 // so only invalid case to check is:
415 if src.End >= srcCopySize || src.Start < 0 {
416 return UploadInfo{}, errInvalidArgument(
417 fmt.Sprintf("CopySrcOptions %d has invalid segment-to-copy [%d, %d] (size is %d)",
418 i, src.Start, src.End, srcCopySize))
419 }
420 srcCopySize = src.End - src.Start + 1
421 }
422
423 // Only the last source may be less than `absMinPartSize`
424 if srcCopySize < absMinPartSize && i < len(srcs)-1 {
425 return UploadInfo{}, errInvalidArgument(
426 fmt.Sprintf("CopySrcOptions %d is too small (%d) and it is not the last part", i, srcCopySize))
427 }
428
429 // Is data to copy too large?
430 totalSize += srcCopySize
431 if totalSize > maxMultipartPutObjectSize {
432 return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Cannot compose an object of size %d (> 5TiB)", totalSize))
433 }
434
435 // record source size
436 srcObjectSizes[i] = srcCopySize
437
438 // calculate parts needed for current source
439 totalParts += partsRequired(srcCopySize)
440 // Do we need more parts than we are allowed?
441 if totalParts > maxPartsCount {
442 return UploadInfo{}, errInvalidArgument(fmt.Sprintf(
443 "Your proposed compose object requires more than %d parts", maxPartsCount))
444 }
445 }
446
447 // Single source object case (i.e. when only one source is
448 // involved, it is being copied wholly and at most 5GiB in
449 // size, emptyfiles are also supported).
450 if (totalParts == 1 && srcs[0].Start == -1 && totalSize <= maxPartSize) || (totalSize == 0) {
451 return c.CopyObject(ctx, dst, srcs[0])
452 }
453
454 // Now, handle multipart-copy cases.
455
456 // 1. Ensure that the object has not been changed while
457 // we are copying data.
458 for i, src := range srcs {
459 src.MatchETag = srcObjectInfos[i].ETag
460 }
461
462 // 2. Initiate a new multipart upload.
463
464 // Set user-metadata on the destination object. If no
465 // user-metadata is specified, and there is only one source,
466 // (only) then metadata from source is copied.
467 var userMeta map[string]string
468 if dst.ReplaceMetadata {
469 userMeta = dst.UserMetadata
470 } else {
471 userMeta = srcObjectInfos[0].UserMetadata
472 }
473
474 var userTags map[string]string
475 if dst.ReplaceTags {
476 userTags = dst.UserTags
477 } else {
478 userTags = srcObjectInfos[0].UserTags
479 }
480
481 uploadID, err := c.newUploadID(ctx, dst.Bucket, dst.Object, PutObjectOptions{
482 ServerSideEncryption: dst.Encryption,
483 UserMetadata: userMeta,
484 UserTags: userTags,
485 Mode: dst.Mode,
486 RetainUntilDate: dst.RetainUntilDate,
487 LegalHold: dst.LegalHold,
488 })
489 if err != nil {
490 return UploadInfo{}, err
491 }
492
493 // 3. Perform copy part uploads
494 objParts := []CompletePart{}
495 partIndex := 1
496 for i, src := range srcs {
497 h := make(http.Header)
498 src.Marshal(h)
499 if dst.Encryption != nil && dst.Encryption.Type() == encrypt.SSEC {
500 dst.Encryption.Marshal(h)
501 }
502
503 // calculate start/end indices of parts after
504 // splitting.
505 startIdx, endIdx := calculateEvenSplits(srcObjectSizes[i], src)
506 for j, start := range startIdx {
507 end := endIdx[j]
508
509 // Add (or reset) source range header for
510 // upload part copy request.
511 h.Set("x-amz-copy-source-range",
512 fmt.Sprintf("bytes=%d-%d", start, end))
513
514 // make upload-part-copy request
515 complPart, err := c.uploadPartCopy(ctx, dst.Bucket,
516 dst.Object, uploadID, partIndex, h)
517 if err != nil {
518 return UploadInfo{}, err
519 }
520 if dst.Progress != nil {
521 io.CopyN(io.Discard, dst.Progress, end-start+1)
522 }
523 objParts = append(objParts, complPart)
524 partIndex++
525 }
526 }
527
528 // 4. Make final complete-multipart request.
529 uploadInfo, err := c.completeMultipartUpload(ctx, dst.Bucket, dst.Object, uploadID,
530 completeMultipartUpload{Parts: objParts}, PutObjectOptions{ServerSideEncryption: dst.Encryption})
531 if err != nil {
532 return UploadInfo{}, err
533 }
534
535 uploadInfo.Size = totalSize
536 return uploadInfo, nil
537}
538
539// partsRequired is maximum parts possible with
540// max part size of ceiling(maxMultipartPutObjectSize / (maxPartsCount - 1))
541func partsRequired(size int64) int64 {
542 maxPartSize := maxMultipartPutObjectSize / (maxPartsCount - 1)
543 r := size / int64(maxPartSize)
544 if size%int64(maxPartSize) > 0 {
545 r++
546 }
547 return r
548}
549
550// calculateEvenSplits - computes splits for a source and returns
551// start and end index slices. Splits happen evenly to be sure that no
552// part is less than 5MiB, as that could fail the multipart request if
553// it is not the last part.
554func calculateEvenSplits(size int64, src CopySrcOptions) (startIndex, endIndex []int64) {
555 if size == 0 {
556 return
557 }
558
559 reqParts := partsRequired(size)
560 startIndex = make([]int64, reqParts)
561 endIndex = make([]int64, reqParts)
562 // Compute number of required parts `k`, as:
563 //
564 // k = ceiling(size / copyPartSize)
565 //
566 // Now, distribute the `size` bytes in the source into
567 // k parts as evenly as possible:
568 //
569 // r parts sized (q+1) bytes, and
570 // (k - r) parts sized q bytes, where
571 //
572 // size = q * k + r (by simple division of size by k,
573 // so that 0 <= r < k)
574 //
575 start := src.Start
576 if start == -1 {
577 start = 0
578 }
579 quot, rem := size/reqParts, size%reqParts
580 nextStart := start
581 for j := int64(0); j < reqParts; j++ {
582 curPartSize := quot
583 if j < rem {
584 curPartSize++
585 }
586
587 cStart := nextStart
588 cEnd := cStart + curPartSize - 1
589 nextStart = cEnd + 1
590
591 startIndex[j], endIndex[j] = cStart, cEnd
592 }
593 return
594}