aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/minio/minio-go/v7/api-put-object.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/minio/minio-go/v7/api-put-object.go')
-rw-r--r--vendor/github.com/minio/minio-go/v7/api-put-object.go473
1 files changed, 473 insertions, 0 deletions
diff --git a/vendor/github.com/minio/minio-go/v7/api-put-object.go b/vendor/github.com/minio/minio-go/v7/api-put-object.go
new file mode 100644
index 0000000..bbd8924
--- /dev/null
+++ b/vendor/github.com/minio/minio-go/v7/api-put-object.go
@@ -0,0 +1,473 @@
1/*
2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage
3 * Copyright 2015-2017 MinIO, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package minio
19
20import (
21 "bytes"
22 "context"
23 "encoding/base64"
24 "errors"
25 "fmt"
26 "hash/crc32"
27 "io"
28 "net/http"
29 "sort"
30 "time"
31
32 "github.com/minio/minio-go/v7/pkg/encrypt"
33 "github.com/minio/minio-go/v7/pkg/s3utils"
34 "golang.org/x/net/http/httpguts"
35)
36
37// ReplicationStatus represents replication status of object
38type ReplicationStatus string
39
40const (
41 // ReplicationStatusPending indicates replication is pending
42 ReplicationStatusPending ReplicationStatus = "PENDING"
43 // ReplicationStatusComplete indicates replication completed ok
44 ReplicationStatusComplete ReplicationStatus = "COMPLETED"
45 // ReplicationStatusFailed indicates replication failed
46 ReplicationStatusFailed ReplicationStatus = "FAILED"
47 // ReplicationStatusReplica indicates object is a replica of a source
48 ReplicationStatusReplica ReplicationStatus = "REPLICA"
49)
50
51// Empty returns true if no replication status set.
52func (r ReplicationStatus) Empty() bool {
53 return r == ""
54}
55
56// AdvancedPutOptions for internal use - to be utilized by replication, ILM transition
57// implementation on MinIO server
58type AdvancedPutOptions struct {
59 SourceVersionID string
60 SourceETag string
61 ReplicationStatus ReplicationStatus
62 SourceMTime time.Time
63 ReplicationRequest bool
64 RetentionTimestamp time.Time
65 TaggingTimestamp time.Time
66 LegalholdTimestamp time.Time
67 ReplicationValidityCheck bool
68}
69
70// PutObjectOptions represents options specified by user for PutObject call
71type PutObjectOptions struct {
72 UserMetadata map[string]string
73 UserTags map[string]string
74 Progress io.Reader
75 ContentType string
76 ContentEncoding string
77 ContentDisposition string
78 ContentLanguage string
79 CacheControl string
80 Expires time.Time
81 Mode RetentionMode
82 RetainUntilDate time.Time
83 ServerSideEncryption encrypt.ServerSide
84 NumThreads uint
85 StorageClass string
86 WebsiteRedirectLocation string
87 PartSize uint64
88 LegalHold LegalHoldStatus
89 SendContentMd5 bool
90 DisableContentSha256 bool
91 DisableMultipart bool
92
93 // ConcurrentStreamParts will create NumThreads buffers of PartSize bytes,
94 // fill them serially and upload them in parallel.
95 // This can be used for faster uploads on non-seekable or slow-to-seek input.
96 ConcurrentStreamParts bool
97 Internal AdvancedPutOptions
98
99 customHeaders http.Header
100}
101
102// SetMatchETag if etag matches while PUT MinIO returns an error
103// this is a MinIO specific extension to support optimistic locking
104// semantics.
105func (opts *PutObjectOptions) SetMatchETag(etag string) {
106 if opts.customHeaders == nil {
107 opts.customHeaders = http.Header{}
108 }
109 opts.customHeaders.Set("If-Match", "\""+etag+"\"")
110}
111
112// SetMatchETagExcept if etag does not match while PUT MinIO returns an
113// error this is a MinIO specific extension to support optimistic locking
114// semantics.
115func (opts *PutObjectOptions) SetMatchETagExcept(etag string) {
116 if opts.customHeaders == nil {
117 opts.customHeaders = http.Header{}
118 }
119 opts.customHeaders.Set("If-None-Match", "\""+etag+"\"")
120}
121
122// getNumThreads - gets the number of threads to be used in the multipart
123// put object operation
124func (opts PutObjectOptions) getNumThreads() (numThreads int) {
125 if opts.NumThreads > 0 {
126 numThreads = int(opts.NumThreads)
127 } else {
128 numThreads = totalWorkers
129 }
130 return
131}
132
133// Header - constructs the headers from metadata entered by user in
134// PutObjectOptions struct
135func (opts PutObjectOptions) Header() (header http.Header) {
136 header = make(http.Header)
137
138 contentType := opts.ContentType
139 if contentType == "" {
140 contentType = "application/octet-stream"
141 }
142 header.Set("Content-Type", contentType)
143
144 if opts.ContentEncoding != "" {
145 header.Set("Content-Encoding", opts.ContentEncoding)
146 }
147 if opts.ContentDisposition != "" {
148 header.Set("Content-Disposition", opts.ContentDisposition)
149 }
150 if opts.ContentLanguage != "" {
151 header.Set("Content-Language", opts.ContentLanguage)
152 }
153 if opts.CacheControl != "" {
154 header.Set("Cache-Control", opts.CacheControl)
155 }
156
157 if !opts.Expires.IsZero() {
158 header.Set("Expires", opts.Expires.UTC().Format(http.TimeFormat))
159 }
160
161 if opts.Mode != "" {
162 header.Set(amzLockMode, opts.Mode.String())
163 }
164
165 if !opts.RetainUntilDate.IsZero() {
166 header.Set("X-Amz-Object-Lock-Retain-Until-Date", opts.RetainUntilDate.Format(time.RFC3339))
167 }
168
169 if opts.LegalHold != "" {
170 header.Set(amzLegalHoldHeader, opts.LegalHold.String())
171 }
172
173 if opts.ServerSideEncryption != nil {
174 opts.ServerSideEncryption.Marshal(header)
175 }
176
177 if opts.StorageClass != "" {
178 header.Set(amzStorageClass, opts.StorageClass)
179 }
180
181 if opts.WebsiteRedirectLocation != "" {
182 header.Set(amzWebsiteRedirectLocation, opts.WebsiteRedirectLocation)
183 }
184
185 if !opts.Internal.ReplicationStatus.Empty() {
186 header.Set(amzBucketReplicationStatus, string(opts.Internal.ReplicationStatus))
187 }
188 if !opts.Internal.SourceMTime.IsZero() {
189 header.Set(minIOBucketSourceMTime, opts.Internal.SourceMTime.Format(time.RFC3339Nano))
190 }
191 if opts.Internal.SourceETag != "" {
192 header.Set(minIOBucketSourceETag, opts.Internal.SourceETag)
193 }
194 if opts.Internal.ReplicationRequest {
195 header.Set(minIOBucketReplicationRequest, "true")
196 }
197 if opts.Internal.ReplicationValidityCheck {
198 header.Set(minIOBucketReplicationCheck, "true")
199 }
200 if !opts.Internal.LegalholdTimestamp.IsZero() {
201 header.Set(minIOBucketReplicationObjectLegalHoldTimestamp, opts.Internal.LegalholdTimestamp.Format(time.RFC3339Nano))
202 }
203 if !opts.Internal.RetentionTimestamp.IsZero() {
204 header.Set(minIOBucketReplicationObjectRetentionTimestamp, opts.Internal.RetentionTimestamp.Format(time.RFC3339Nano))
205 }
206 if !opts.Internal.TaggingTimestamp.IsZero() {
207 header.Set(minIOBucketReplicationTaggingTimestamp, opts.Internal.TaggingTimestamp.Format(time.RFC3339Nano))
208 }
209
210 if len(opts.UserTags) != 0 {
211 header.Set(amzTaggingHeader, s3utils.TagEncode(opts.UserTags))
212 }
213
214 for k, v := range opts.UserMetadata {
215 if isAmzHeader(k) || isStandardHeader(k) || isStorageClassHeader(k) {
216 header.Set(k, v)
217 } else {
218 header.Set("x-amz-meta-"+k, v)
219 }
220 }
221
222 // set any other additional custom headers.
223 for k, v := range opts.customHeaders {
224 header[k] = v
225 }
226
227 return
228}
229
230// validate() checks if the UserMetadata map has standard headers or and raises an error if so.
231func (opts PutObjectOptions) validate() (err error) {
232 for k, v := range opts.UserMetadata {
233 if !httpguts.ValidHeaderFieldName(k) || isStandardHeader(k) || isSSEHeader(k) || isStorageClassHeader(k) {
234 return errInvalidArgument(k + " unsupported user defined metadata name")
235 }
236 if !httpguts.ValidHeaderFieldValue(v) {
237 return errInvalidArgument(v + " unsupported user defined metadata value")
238 }
239 }
240 if opts.Mode != "" && !opts.Mode.IsValid() {
241 return errInvalidArgument(opts.Mode.String() + " unsupported retention mode")
242 }
243 if opts.LegalHold != "" && !opts.LegalHold.IsValid() {
244 return errInvalidArgument(opts.LegalHold.String() + " unsupported legal-hold status")
245 }
246 return nil
247}
248
249// completedParts is a collection of parts sortable by their part numbers.
250// used for sorting the uploaded parts before completing the multipart request.
251type completedParts []CompletePart
252
253func (a completedParts) Len() int { return len(a) }
254func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
255func (a completedParts) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
256
257// PutObject creates an object in a bucket.
258//
259// You must have WRITE permissions on a bucket to create an object.
260//
261// - For size smaller than 16MiB PutObject automatically does a
262// single atomic PUT operation.
263//
264// - For size larger than 16MiB PutObject automatically does a
265// multipart upload operation.
266//
267// - For size input as -1 PutObject does a multipart Put operation
268// until input stream reaches EOF. Maximum object size that can
269// be uploaded through this operation will be 5TiB.
270//
271// WARNING: Passing down '-1' will use memory and these cannot
272// be reused for best outcomes for PutObject(), pass the size always.
273//
274// NOTE: Upon errors during upload multipart operation is entirely aborted.
275func (c *Client) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64,
276 opts PutObjectOptions,
277) (info UploadInfo, err error) {
278 if objectSize < 0 && opts.DisableMultipart {
279 return UploadInfo{}, errors.New("object size must be provided with disable multipart upload")
280 }
281
282 err = opts.validate()
283 if err != nil {
284 return UploadInfo{}, err
285 }
286
287 return c.putObjectCommon(ctx, bucketName, objectName, reader, objectSize, opts)
288}
289
290func (c *Client) putObjectCommon(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) {
291 // Check for largest object size allowed.
292 if size > int64(maxMultipartPutObjectSize) {
293 return UploadInfo{}, errEntityTooLarge(size, maxMultipartPutObjectSize, bucketName, objectName)
294 }
295
296 // NOTE: Streaming signature is not supported by GCS.
297 if s3utils.IsGoogleEndpoint(*c.endpointURL) {
298 return c.putObject(ctx, bucketName, objectName, reader, size, opts)
299 }
300
301 partSize := opts.PartSize
302 if opts.PartSize == 0 {
303 partSize = minPartSize
304 }
305
306 if c.overrideSignerType.IsV2() {
307 if size >= 0 && size < int64(partSize) || opts.DisableMultipart {
308 return c.putObject(ctx, bucketName, objectName, reader, size, opts)
309 }
310 return c.putObjectMultipart(ctx, bucketName, objectName, reader, size, opts)
311 }
312
313 if size < 0 {
314 if opts.DisableMultipart {
315 return UploadInfo{}, errors.New("no length provided and multipart disabled")
316 }
317 if opts.ConcurrentStreamParts && opts.NumThreads > 1 {
318 return c.putObjectMultipartStreamParallel(ctx, bucketName, objectName, reader, opts)
319 }
320 return c.putObjectMultipartStreamNoLength(ctx, bucketName, objectName, reader, opts)
321 }
322
323 if size < int64(partSize) || opts.DisableMultipart {
324 return c.putObject(ctx, bucketName, objectName, reader, size, opts)
325 }
326
327 return c.putObjectMultipartStream(ctx, bucketName, objectName, reader, size, opts)
328}
329
330func (c *Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (info UploadInfo, err error) {
331 // Input validation.
332 if err = s3utils.CheckValidBucketName(bucketName); err != nil {
333 return UploadInfo{}, err
334 }
335 if err = s3utils.CheckValidObjectName(objectName); err != nil {
336 return UploadInfo{}, err
337 }
338
339 // Total data read and written to server. should be equal to
340 // 'size' at the end of the call.
341 var totalUploadedSize int64
342
343 // Complete multipart upload.
344 var complMultipartUpload completeMultipartUpload
345
346 // Calculate the optimal parts info for a given size.
347 totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize)
348 if err != nil {
349 return UploadInfo{}, err
350 }
351
352 if !opts.SendContentMd5 {
353 if opts.UserMetadata == nil {
354 opts.UserMetadata = make(map[string]string, 1)
355 }
356 opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
357 }
358
359 // Initiate a new multipart upload.
360 uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
361 if err != nil {
362 return UploadInfo{}, err
363 }
364 delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm")
365
366 defer func() {
367 if err != nil {
368 c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
369 }
370 }()
371
372 // Part number always starts with '1'.
373 partNumber := 1
374
375 // Initialize parts uploaded map.
376 partsInfo := make(map[int]ObjectPart)
377
378 // Create a buffer.
379 buf := make([]byte, partSize)
380
381 // Create checksums
382 // CRC32C is ~50% faster on AMD64 @ 30GB/s
383 var crcBytes []byte
384 customHeader := make(http.Header)
385 crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
386
387 for partNumber <= totalPartsCount {
388 length, rerr := readFull(reader, buf)
389 if rerr == io.EOF && partNumber > 1 {
390 break
391 }
392
393 if rerr != nil && rerr != io.ErrUnexpectedEOF && rerr != io.EOF {
394 return UploadInfo{}, rerr
395 }
396
397 var md5Base64 string
398 if opts.SendContentMd5 {
399 // Calculate md5sum.
400 hash := c.md5Hasher()
401 hash.Write(buf[:length])
402 md5Base64 = base64.StdEncoding.EncodeToString(hash.Sum(nil))
403 hash.Close()
404 } else {
405 crc.Reset()
406 crc.Write(buf[:length])
407 cSum := crc.Sum(nil)
408 customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
409 crcBytes = append(crcBytes, cSum...)
410 }
411
412 // Update progress reader appropriately to the latest offset
413 // as we read from the source.
414 rd := newHook(bytes.NewReader(buf[:length]), opts.Progress)
415
416 // Proceed to upload the part.
417 p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: rd, partNumber: partNumber, md5Base64: md5Base64, size: int64(length), sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader}
418 objPart, uerr := c.uploadPart(ctx, p)
419 if uerr != nil {
420 return UploadInfo{}, uerr
421 }
422
423 // Save successfully uploaded part metadata.
424 partsInfo[partNumber] = objPart
425
426 // Save successfully uploaded size.
427 totalUploadedSize += int64(length)
428
429 // Increment part number.
430 partNumber++
431
432 // For unknown size, Read EOF we break away.
433 // We do not have to upload till totalPartsCount.
434 if rerr == io.EOF {
435 break
436 }
437 }
438
439 // Loop over total uploaded parts to save them in
440 // Parts array before completing the multipart request.
441 for i := 1; i < partNumber; i++ {
442 part, ok := partsInfo[i]
443 if !ok {
444 return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
445 }
446 complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
447 ETag: part.ETag,
448 PartNumber: part.PartNumber,
449 ChecksumCRC32: part.ChecksumCRC32,
450 ChecksumCRC32C: part.ChecksumCRC32C,
451 ChecksumSHA1: part.ChecksumSHA1,
452 ChecksumSHA256: part.ChecksumSHA256,
453 })
454 }
455
456 // Sort all completed parts.
457 sort.Sort(completedParts(complMultipartUpload.Parts))
458
459 opts = PutObjectOptions{}
460 if len(crcBytes) > 0 {
461 // Add hash of hashes.
462 crc.Reset()
463 crc.Write(crcBytes)
464 opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
465 }
466 uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
467 if err != nil {
468 return UploadInfo{}, err
469 }
470
471 uploadInfo.Size = totalUploadedSize
472 return uploadInfo, nil
473}