From 8db41da676ac8368ef7c2549d56239a5ff5eedde Mon Sep 17 00:00:00 2001 From: Rutger Broekhoff Date: Tue, 2 Jan 2024 18:56:31 +0100 Subject: Delete vendor directory --- .../minio/minio-go/v7/api-put-object-streaming.go | 809 --------------------- 1 file changed, 809 deletions(-) delete mode 100644 vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go (limited to 'vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go') diff --git a/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go b/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go deleted file mode 100644 index 9182d4e..0000000 --- a/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go +++ /dev/null @@ -1,809 +0,0 @@ -/* - * MinIO Go Library for Amazon S3 Compatible Cloud Storage - * Copyright 2017 MinIO, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package minio - -import ( - "bytes" - "context" - "encoding/base64" - "fmt" - "hash/crc32" - "io" - "net/http" - "net/url" - "sort" - "strings" - "sync" - - "github.com/google/uuid" - "github.com/minio/minio-go/v7/pkg/s3utils" -) - -// putObjectMultipartStream - upload a large object using -// multipart upload and streaming signature for signing payload. -// Comprehensive put object operation involving multipart uploads. -// -// Following code handles these types of readers. -// -// - *minio.Object -// - Any reader which has a method 'ReadAt()' -func (c *Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string, - reader io.Reader, size int64, opts PutObjectOptions, -) (info UploadInfo, err error) { - if opts.ConcurrentStreamParts && opts.NumThreads > 1 { - info, err = c.putObjectMultipartStreamParallel(ctx, bucketName, objectName, reader, opts) - } else if !isObject(reader) && isReadAt(reader) && !opts.SendContentMd5 { - // Verify if the reader implements ReadAt and it is not a *minio.Object then we will use parallel uploader. - info, err = c.putObjectMultipartStreamFromReadAt(ctx, bucketName, objectName, reader.(io.ReaderAt), size, opts) - } else { - info, err = c.putObjectMultipartStreamOptionalChecksum(ctx, bucketName, objectName, reader, size, opts) - } - if err != nil { - errResp := ToErrorResponse(err) - // Verify if multipart functionality is not available, if not - // fall back to single PutObject operation. - if errResp.Code == "AccessDenied" && strings.Contains(errResp.Message, "Access Denied") { - // Verify if size of reader is greater than '5GiB'. - if size > maxSinglePutObjectSize { - return UploadInfo{}, errEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName) - } - // Fall back to uploading as single PutObject operation. - return c.putObject(ctx, bucketName, objectName, reader, size, opts) - } - } - return info, err -} - -// uploadedPartRes - the response received from a part upload. -type uploadedPartRes struct { - Error error // Any error encountered while uploading the part. - PartNum int // Number of the part uploaded. - Size int64 // Size of the part uploaded. - Part ObjectPart -} - -type uploadPartReq struct { - PartNum int // Number of the part uploaded. - Part ObjectPart // Size of the part uploaded. -} - -// putObjectMultipartFromReadAt - Uploads files bigger than 128MiB. -// Supports all readers which implements io.ReaderAt interface -// (ReadAt method). -// -// NOTE: This function is meant to be used for all readers which -// implement io.ReaderAt which allows us for resuming multipart -// uploads but reading at an offset, which would avoid re-read the -// data which was already uploaded. Internally this function uses -// temporary files for staging all the data, these temporary files are -// cleaned automatically when the caller i.e http client closes the -// stream after uploading all the contents successfully. -func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketName, objectName string, - reader io.ReaderAt, size int64, opts PutObjectOptions, -) (info UploadInfo, err error) { - // Input validation. - if err = s3utils.CheckValidBucketName(bucketName); err != nil { - return UploadInfo{}, err - } - if err = s3utils.CheckValidObjectName(objectName); err != nil { - return UploadInfo{}, err - } - - // Calculate the optimal parts info for a given size. - totalPartsCount, partSize, lastPartSize, err := OptimalPartInfo(size, opts.PartSize) - if err != nil { - return UploadInfo{}, err - } - - withChecksum := c.trailingHeaderSupport - if withChecksum { - if opts.UserMetadata == nil { - opts.UserMetadata = make(map[string]string, 1) - } - opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C" - } - // Initiate a new multipart upload. - uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) - if err != nil { - return UploadInfo{}, err - } - delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm") - - // Aborts the multipart upload in progress, if the - // function returns any error, since we do not resume - // we should purge the parts which have been uploaded - // to relinquish storage space. - defer func() { - if err != nil { - c.abortMultipartUpload(ctx, bucketName, objectName, uploadID) - } - }() - - // Total data read and written to server. should be equal to 'size' at the end of the call. - var totalUploadedSize int64 - - // Complete multipart upload. - var complMultipartUpload completeMultipartUpload - - // Declare a channel that sends the next part number to be uploaded. - uploadPartsCh := make(chan uploadPartReq) - - // Declare a channel that sends back the response of a part upload. - uploadedPartsCh := make(chan uploadedPartRes) - - // Used for readability, lastPartNumber is always totalPartsCount. - lastPartNumber := totalPartsCount - - partitionCtx, partitionCancel := context.WithCancel(ctx) - defer partitionCancel() - // Send each part number to the channel to be processed. - go func() { - defer close(uploadPartsCh) - - for p := 1; p <= totalPartsCount; p++ { - select { - case <-partitionCtx.Done(): - return - case uploadPartsCh <- uploadPartReq{PartNum: p}: - } - } - }() - - // Receive each part number from the channel allowing three parallel uploads. - for w := 1; w <= opts.getNumThreads(); w++ { - go func(partSize int64) { - for { - var uploadReq uploadPartReq - var ok bool - select { - case <-ctx.Done(): - return - case uploadReq, ok = <-uploadPartsCh: - if !ok { - return - } - // Each worker will draw from the part channel and upload in parallel. - } - - // If partNumber was not uploaded we calculate the missing - // part offset and size. For all other part numbers we - // calculate offset based on multiples of partSize. - readOffset := int64(uploadReq.PartNum-1) * partSize - - // As a special case if partNumber is lastPartNumber, we - // calculate the offset based on the last part size. - if uploadReq.PartNum == lastPartNumber { - readOffset = size - lastPartSize - partSize = lastPartSize - } - - sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), opts.Progress) - trailer := make(http.Header, 1) - if withChecksum { - crc := crc32.New(crc32.MakeTable(crc32.Castagnoli)) - trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(crc.Sum(nil))) - sectionReader = newHashReaderWrapper(sectionReader, crc, func(hash []byte) { - trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(hash)) - }) - } - - // Proceed to upload the part. - p := uploadPartParams{ - bucketName: bucketName, - objectName: objectName, - uploadID: uploadID, - reader: sectionReader, - partNumber: uploadReq.PartNum, - size: partSize, - sse: opts.ServerSideEncryption, - streamSha256: !opts.DisableContentSha256, - sha256Hex: "", - trailer: trailer, - } - objPart, err := c.uploadPart(ctx, p) - if err != nil { - uploadedPartsCh <- uploadedPartRes{ - Error: err, - } - // Exit the goroutine. - return - } - - // Save successfully uploaded part metadata. - uploadReq.Part = objPart - - // Send successful part info through the channel. - uploadedPartsCh <- uploadedPartRes{ - Size: objPart.Size, - PartNum: uploadReq.PartNum, - Part: uploadReq.Part, - } - } - }(partSize) - } - - // Gather the responses as they occur and update any - // progress bar. - for u := 1; u <= totalPartsCount; u++ { - select { - case <-ctx.Done(): - return UploadInfo{}, ctx.Err() - case uploadRes := <-uploadedPartsCh: - if uploadRes.Error != nil { - return UploadInfo{}, uploadRes.Error - } - - // Update the totalUploadedSize. - totalUploadedSize += uploadRes.Size - complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ - ETag: uploadRes.Part.ETag, - PartNumber: uploadRes.Part.PartNumber, - ChecksumCRC32: uploadRes.Part.ChecksumCRC32, - ChecksumCRC32C: uploadRes.Part.ChecksumCRC32C, - ChecksumSHA1: uploadRes.Part.ChecksumSHA1, - ChecksumSHA256: uploadRes.Part.ChecksumSHA256, - }) - } - } - - // Verify if we uploaded all the data. - if totalUploadedSize != size { - return UploadInfo{}, errUnexpectedEOF(totalUploadedSize, size, bucketName, objectName) - } - - // Sort all completed parts. - sort.Sort(completedParts(complMultipartUpload.Parts)) - - opts = PutObjectOptions{ - ServerSideEncryption: opts.ServerSideEncryption, - } - if withChecksum { - // Add hash of hashes. - crc := crc32.New(crc32.MakeTable(crc32.Castagnoli)) - for _, part := range complMultipartUpload.Parts { - cs, err := base64.StdEncoding.DecodeString(part.ChecksumCRC32C) - if err == nil { - crc.Write(cs) - } - } - opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))} - } - - uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts) - if err != nil { - return UploadInfo{}, err - } - - uploadInfo.Size = totalUploadedSize - return uploadInfo, nil -} - -func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, bucketName, objectName string, - reader io.Reader, size int64, opts PutObjectOptions, -) (info UploadInfo, err error) { - // Input validation. - if err = s3utils.CheckValidBucketName(bucketName); err != nil { - return UploadInfo{}, err - } - if err = s3utils.CheckValidObjectName(objectName); err != nil { - return UploadInfo{}, err - } - - if !opts.SendContentMd5 { - if opts.UserMetadata == nil { - opts.UserMetadata = make(map[string]string, 1) - } - opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C" - } - - // Calculate the optimal parts info for a given size. - totalPartsCount, partSize, lastPartSize, err := OptimalPartInfo(size, opts.PartSize) - if err != nil { - return UploadInfo{}, err - } - // Initiates a new multipart request - uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) - if err != nil { - return UploadInfo{}, err - } - delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm") - - // Aborts the multipart upload if the function returns - // any error, since we do not resume we should purge - // the parts which have been uploaded to relinquish - // storage space. - defer func() { - if err != nil { - c.abortMultipartUpload(ctx, bucketName, objectName, uploadID) - } - }() - - // Create checksums - // CRC32C is ~50% faster on AMD64 @ 30GB/s - var crcBytes []byte - customHeader := make(http.Header) - crc := crc32.New(crc32.MakeTable(crc32.Castagnoli)) - md5Hash := c.md5Hasher() - defer md5Hash.Close() - - // Total data read and written to server. should be equal to 'size' at the end of the call. - var totalUploadedSize int64 - - // Initialize parts uploaded map. - partsInfo := make(map[int]ObjectPart) - - // Create a buffer. - buf := make([]byte, partSize) - - // Avoid declaring variables in the for loop - var md5Base64 string - - // Part number always starts with '1'. - var partNumber int - for partNumber = 1; partNumber <= totalPartsCount; partNumber++ { - - // Proceed to upload the part. - if partNumber == totalPartsCount { - partSize = lastPartSize - } - - length, rerr := readFull(reader, buf) - if rerr == io.EOF && partNumber > 1 { - break - } - - if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF { - return UploadInfo{}, rerr - } - - // Calculate md5sum. - if opts.SendContentMd5 { - md5Hash.Reset() - md5Hash.Write(buf[:length]) - md5Base64 = base64.StdEncoding.EncodeToString(md5Hash.Sum(nil)) - } else { - // Add CRC32C instead. - crc.Reset() - crc.Write(buf[:length]) - cSum := crc.Sum(nil) - customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum)) - crcBytes = append(crcBytes, cSum...) - } - - // Update progress reader appropriately to the latest offset - // as we read from the source. - hooked := newHook(bytes.NewReader(buf[:length]), opts.Progress) - p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: hooked, partNumber: partNumber, md5Base64: md5Base64, size: partSize, sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader} - objPart, uerr := c.uploadPart(ctx, p) - if uerr != nil { - return UploadInfo{}, uerr - } - - // Save successfully uploaded part metadata. - partsInfo[partNumber] = objPart - - // Save successfully uploaded size. - totalUploadedSize += partSize - } - - // Verify if we uploaded all the data. - if size > 0 { - if totalUploadedSize != size { - return UploadInfo{}, errUnexpectedEOF(totalUploadedSize, size, bucketName, objectName) - } - } - - // Complete multipart upload. - var complMultipartUpload completeMultipartUpload - - // Loop over total uploaded parts to save them in - // Parts array before completing the multipart request. - for i := 1; i < partNumber; i++ { - part, ok := partsInfo[i] - if !ok { - return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i)) - } - complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ - ETag: part.ETag, - PartNumber: part.PartNumber, - ChecksumCRC32: part.ChecksumCRC32, - ChecksumCRC32C: part.ChecksumCRC32C, - ChecksumSHA1: part.ChecksumSHA1, - ChecksumSHA256: part.ChecksumSHA256, - }) - } - - // Sort all completed parts. - sort.Sort(completedParts(complMultipartUpload.Parts)) - - opts = PutObjectOptions{ - ServerSideEncryption: opts.ServerSideEncryption, - } - if len(crcBytes) > 0 { - // Add hash of hashes. - crc.Reset() - crc.Write(crcBytes) - opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))} - } - uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts) - if err != nil { - return UploadInfo{}, err - } - - uploadInfo.Size = totalUploadedSize - return uploadInfo, nil -} - -// putObjectMultipartStreamParallel uploads opts.NumThreads parts in parallel. -// This is expected to take opts.PartSize * opts.NumThreads * (GOGC / 100) bytes of buffer. -func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketName, objectName string, - reader io.Reader, opts PutObjectOptions, -) (info UploadInfo, err error) { - // Input validation. - if err = s3utils.CheckValidBucketName(bucketName); err != nil { - return UploadInfo{}, err - } - - if err = s3utils.CheckValidObjectName(objectName); err != nil { - return UploadInfo{}, err - } - - if !opts.SendContentMd5 { - if opts.UserMetadata == nil { - opts.UserMetadata = make(map[string]string, 1) - } - opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C" - } - - // Cancel all when an error occurs. - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - // Calculate the optimal parts info for a given size. - totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize) - if err != nil { - return UploadInfo{}, err - } - - // Initiates a new multipart request - uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) - if err != nil { - return UploadInfo{}, err - } - delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm") - - // Aborts the multipart upload if the function returns - // any error, since we do not resume we should purge - // the parts which have been uploaded to relinquish - // storage space. - defer func() { - if err != nil { - c.abortMultipartUpload(ctx, bucketName, objectName, uploadID) - } - }() - - // Create checksums - // CRC32C is ~50% faster on AMD64 @ 30GB/s - var crcBytes []byte - crc := crc32.New(crc32.MakeTable(crc32.Castagnoli)) - - // Total data read and written to server. should be equal to 'size' at the end of the call. - var totalUploadedSize int64 - - // Initialize parts uploaded map. - partsInfo := make(map[int]ObjectPart) - - // Create a buffer. - nBuffers := int64(opts.NumThreads) - bufs := make(chan []byte, nBuffers) - all := make([]byte, nBuffers*partSize) - for i := int64(0); i < nBuffers; i++ { - bufs <- all[i*partSize : i*partSize+partSize] - } - - var wg sync.WaitGroup - var mu sync.Mutex - errCh := make(chan error, opts.NumThreads) - - reader = newHook(reader, opts.Progress) - - // Part number always starts with '1'. - var partNumber int - for partNumber = 1; partNumber <= totalPartsCount; partNumber++ { - // Proceed to upload the part. - var buf []byte - select { - case buf = <-bufs: - case err = <-errCh: - cancel() - wg.Wait() - return UploadInfo{}, err - } - - if int64(len(buf)) != partSize { - return UploadInfo{}, fmt.Errorf("read buffer < %d than expected partSize: %d", len(buf), partSize) - } - - length, rerr := readFull(reader, buf) - if rerr == io.EOF && partNumber > 1 { - // Done - break - } - - if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF { - cancel() - wg.Wait() - return UploadInfo{}, rerr - } - - // Calculate md5sum. - customHeader := make(http.Header) - if !opts.SendContentMd5 { - // Add CRC32C instead. - crc.Reset() - crc.Write(buf[:length]) - cSum := crc.Sum(nil) - customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum)) - crcBytes = append(crcBytes, cSum...) - } - - wg.Add(1) - go func(partNumber int) { - // Avoid declaring variables in the for loop - var md5Base64 string - - if opts.SendContentMd5 { - md5Hash := c.md5Hasher() - md5Hash.Write(buf[:length]) - md5Base64 = base64.StdEncoding.EncodeToString(md5Hash.Sum(nil)) - md5Hash.Close() - } - - defer wg.Done() - p := uploadPartParams{ - bucketName: bucketName, - objectName: objectName, - uploadID: uploadID, - reader: bytes.NewReader(buf[:length]), - partNumber: partNumber, - md5Base64: md5Base64, - size: int64(length), - sse: opts.ServerSideEncryption, - streamSha256: !opts.DisableContentSha256, - customHeader: customHeader, - } - objPart, uerr := c.uploadPart(ctx, p) - if uerr != nil { - errCh <- uerr - return - } - - // Save successfully uploaded part metadata. - mu.Lock() - partsInfo[partNumber] = objPart - mu.Unlock() - - // Send buffer back so it can be reused. - bufs <- buf - }(partNumber) - - // Save successfully uploaded size. - totalUploadedSize += int64(length) - } - wg.Wait() - - // Collect any error - select { - case err = <-errCh: - return UploadInfo{}, err - default: - } - - // Complete multipart upload. - var complMultipartUpload completeMultipartUpload - - // Loop over total uploaded parts to save them in - // Parts array before completing the multipart request. - for i := 1; i < partNumber; i++ { - part, ok := partsInfo[i] - if !ok { - return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i)) - } - complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ - ETag: part.ETag, - PartNumber: part.PartNumber, - ChecksumCRC32: part.ChecksumCRC32, - ChecksumCRC32C: part.ChecksumCRC32C, - ChecksumSHA1: part.ChecksumSHA1, - ChecksumSHA256: part.ChecksumSHA256, - }) - } - - // Sort all completed parts. - sort.Sort(completedParts(complMultipartUpload.Parts)) - - opts = PutObjectOptions{} - if len(crcBytes) > 0 { - // Add hash of hashes. - crc.Reset() - crc.Write(crcBytes) - opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))} - } - uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts) - if err != nil { - return UploadInfo{}, err - } - - uploadInfo.Size = totalUploadedSize - return uploadInfo, nil -} - -// putObject special function used Google Cloud Storage. This special function -// is used for Google Cloud Storage since Google's multipart API is not S3 compatible. -func (c *Client) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) { - // Input validation. - if err := s3utils.CheckValidBucketName(bucketName); err != nil { - return UploadInfo{}, err - } - if err := s3utils.CheckValidObjectName(objectName); err != nil { - return UploadInfo{}, err - } - - // Size -1 is only supported on Google Cloud Storage, we error - // out in all other situations. - if size < 0 && !s3utils.IsGoogleEndpoint(*c.endpointURL) { - return UploadInfo{}, errEntityTooSmall(size, bucketName, objectName) - } - - if opts.SendContentMd5 && s3utils.IsGoogleEndpoint(*c.endpointURL) && size < 0 { - return UploadInfo{}, errInvalidArgument("MD5Sum cannot be calculated with size '-1'") - } - - var readSeeker io.Seeker - if size > 0 { - if isReadAt(reader) && !isObject(reader) { - seeker, ok := reader.(io.Seeker) - if ok { - offset, err := seeker.Seek(0, io.SeekCurrent) - if err != nil { - return UploadInfo{}, errInvalidArgument(err.Error()) - } - reader = io.NewSectionReader(reader.(io.ReaderAt), offset, size) - readSeeker = reader.(io.Seeker) - } - } - } - - var md5Base64 string - if opts.SendContentMd5 { - // Calculate md5sum. - hash := c.md5Hasher() - - if readSeeker != nil { - if _, err := io.Copy(hash, reader); err != nil { - return UploadInfo{}, err - } - // Seek back to beginning of io.NewSectionReader's offset. - _, err = readSeeker.Seek(0, io.SeekStart) - if err != nil { - return UploadInfo{}, errInvalidArgument(err.Error()) - } - } else { - // Create a buffer. - buf := make([]byte, size) - - length, err := readFull(reader, buf) - if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF { - return UploadInfo{}, err - } - - hash.Write(buf[:length]) - reader = bytes.NewReader(buf[:length]) - } - - md5Base64 = base64.StdEncoding.EncodeToString(hash.Sum(nil)) - hash.Close() - } - - // Update progress reader appropriately to the latest offset as we - // read from the source. - progressReader := newHook(reader, opts.Progress) - - // This function does not calculate sha256 and md5sum for payload. - // Execute put object. - return c.putObjectDo(ctx, bucketName, objectName, progressReader, md5Base64, "", size, opts) -} - -// putObjectDo - executes the put object http operation. -// NOTE: You must have WRITE permissions on a bucket to add an object to it. -func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string, reader io.Reader, md5Base64, sha256Hex string, size int64, opts PutObjectOptions) (UploadInfo, error) { - // Input validation. - if err := s3utils.CheckValidBucketName(bucketName); err != nil { - return UploadInfo{}, err - } - if err := s3utils.CheckValidObjectName(objectName); err != nil { - return UploadInfo{}, err - } - // Set headers. - customHeader := opts.Header() - - // Add CRC when client supports it, MD5 is not set, not Google and we don't add SHA256 to chunks. - addCrc := c.trailingHeaderSupport && md5Base64 == "" && !s3utils.IsGoogleEndpoint(*c.endpointURL) && (opts.DisableContentSha256 || c.secure) - - if addCrc { - // If user has added checksums, don't add them ourselves. - for k := range opts.UserMetadata { - if strings.HasPrefix(strings.ToLower(k), "x-amz-checksum-") { - addCrc = false - } - } - } - // Populate request metadata. - reqMetadata := requestMetadata{ - bucketName: bucketName, - objectName: objectName, - customHeader: customHeader, - contentBody: reader, - contentLength: size, - contentMD5Base64: md5Base64, - contentSHA256Hex: sha256Hex, - streamSha256: !opts.DisableContentSha256, - addCrc: addCrc, - } - if opts.Internal.SourceVersionID != "" { - if opts.Internal.SourceVersionID != nullVersionID { - if _, err := uuid.Parse(opts.Internal.SourceVersionID); err != nil { - return UploadInfo{}, errInvalidArgument(err.Error()) - } - } - urlValues := make(url.Values) - urlValues.Set("versionId", opts.Internal.SourceVersionID) - reqMetadata.queryValues = urlValues - } - - // Execute PUT an objectName. - resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata) - defer closeResponse(resp) - if err != nil { - return UploadInfo{}, err - } - if resp != nil { - if resp.StatusCode != http.StatusOK { - return UploadInfo{}, httpRespToErrorResponse(resp, bucketName, objectName) - } - } - - // extract lifecycle expiry date and rule ID - expTime, ruleID := amzExpirationToExpiryDateRuleID(resp.Header.Get(amzExpiration)) - h := resp.Header - return UploadInfo{ - Bucket: bucketName, - Key: objectName, - ETag: trimEtag(h.Get("ETag")), - VersionID: h.Get(amzVersionID), - Size: size, - Expiration: expTime, - ExpirationRuleID: ruleID, - - // Checksum values - ChecksumCRC32: h.Get("x-amz-checksum-crc32"), - ChecksumCRC32C: h.Get("x-amz-checksum-crc32c"), - ChecksumSHA1: h.Get("x-amz-checksum-sha1"), - ChecksumSHA256: h.Get("x-amz-checksum-sha256"), - }, nil -} -- cgit v1.2.3