aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/minio/minio-go/v7/api-get-object.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/minio/minio-go/v7/api-get-object.go')
-rw-r--r--vendor/github.com/minio/minio-go/v7/api-get-object.go683
1 files changed, 683 insertions, 0 deletions
diff --git a/vendor/github.com/minio/minio-go/v7/api-get-object.go b/vendor/github.com/minio/minio-go/v7/api-get-object.go
new file mode 100644
index 0000000..9e6b154
--- /dev/null
+++ b/vendor/github.com/minio/minio-go/v7/api-get-object.go
@@ -0,0 +1,683 @@
1/*
2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage
3 * Copyright 2015-2020 MinIO, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package minio
19
20import (
21 "context"
22 "errors"
23 "fmt"
24 "io"
25 "net/http"
26 "sync"
27
28 "github.com/minio/minio-go/v7/pkg/s3utils"
29)
30
31// GetObject wrapper function that accepts a request context
32func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, opts GetObjectOptions) (*Object, error) {
33 // Input validation.
34 if err := s3utils.CheckValidBucketName(bucketName); err != nil {
35 return nil, err
36 }
37 if err := s3utils.CheckValidObjectName(objectName); err != nil {
38 return nil, err
39 }
40
41 gctx, cancel := context.WithCancel(ctx)
42
43 // Detect if snowball is server location we are talking to.
44 var snowball bool
45 if location, ok := c.bucketLocCache.Get(bucketName); ok {
46 snowball = location == "snowball"
47 }
48
49 var (
50 err error
51 httpReader io.ReadCloser
52 objectInfo ObjectInfo
53 totalRead int
54 )
55
56 // Create request channel.
57 reqCh := make(chan getRequest)
58 // Create response channel.
59 resCh := make(chan getResponse)
60
61 // This routine feeds partial object data as and when the caller reads.
62 go func() {
63 defer close(resCh)
64 defer func() {
65 // Close the http response body before returning.
66 // This ends the connection with the server.
67 if httpReader != nil {
68 httpReader.Close()
69 }
70 }()
71 defer cancel()
72
73 // Used to verify if etag of object has changed since last read.
74 var etag string
75
76 for req := range reqCh {
77 // If this is the first request we may not need to do a getObject request yet.
78 if req.isFirstReq {
79 // First request is a Read/ReadAt.
80 if req.isReadOp {
81 // Differentiate between wanting the whole object and just a range.
82 if req.isReadAt {
83 // If this is a ReadAt request only get the specified range.
84 // Range is set with respect to the offset and length of the buffer requested.
85 // Do not set objectInfo from the first readAt request because it will not get
86 // the whole object.
87 opts.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1)
88 } else if req.Offset > 0 {
89 opts.SetRange(req.Offset, 0)
90 }
91 httpReader, objectInfo, _, err = c.getObject(gctx, bucketName, objectName, opts)
92 if err != nil {
93 resCh <- getResponse{Error: err}
94 return
95 }
96 etag = objectInfo.ETag
97 // Read at least firstReq.Buffer bytes, if not we have
98 // reached our EOF.
99 size, err := readFull(httpReader, req.Buffer)
100 totalRead += size
101 if size > 0 && err == io.ErrUnexpectedEOF {
102 if int64(size) < objectInfo.Size {
103 // In situations when returned size
104 // is less than the expected content
105 // length set by the server, make sure
106 // we return io.ErrUnexpectedEOF
107 err = io.ErrUnexpectedEOF
108 } else {
109 // If an EOF happens after reading some but not
110 // all the bytes ReadFull returns ErrUnexpectedEOF
111 err = io.EOF
112 }
113 } else if size == 0 && err == io.EOF && objectInfo.Size > 0 {
114 // Special cases when server writes more data
115 // than the content-length, net/http response
116 // body returns an error, instead of converting
117 // it to io.EOF - return unexpected EOF.
118 err = io.ErrUnexpectedEOF
119 }
120 // Send back the first response.
121 resCh <- getResponse{
122 objectInfo: objectInfo,
123 Size: size,
124 Error: err,
125 didRead: true,
126 }
127 } else {
128 // First request is a Stat or Seek call.
129 // Only need to run a StatObject until an actual Read or ReadAt request comes through.
130
131 // Remove range header if already set, for stat Operations to get original file size.
132 delete(opts.headers, "Range")
133 objectInfo, err = c.StatObject(gctx, bucketName, objectName, StatObjectOptions(opts))
134 if err != nil {
135 resCh <- getResponse{
136 Error: err,
137 }
138 // Exit the go-routine.
139 return
140 }
141 etag = objectInfo.ETag
142 // Send back the first response.
143 resCh <- getResponse{
144 objectInfo: objectInfo,
145 }
146 }
147 } else if req.settingObjectInfo { // Request is just to get objectInfo.
148 // Remove range header if already set, for stat Operations to get original file size.
149 delete(opts.headers, "Range")
150 // Check whether this is snowball
151 // if yes do not use If-Match feature
152 // it doesn't work.
153 if etag != "" && !snowball {
154 opts.SetMatchETag(etag)
155 }
156 objectInfo, err := c.StatObject(gctx, bucketName, objectName, StatObjectOptions(opts))
157 if err != nil {
158 resCh <- getResponse{
159 Error: err,
160 }
161 // Exit the goroutine.
162 return
163 }
164 // Send back the objectInfo.
165 resCh <- getResponse{
166 objectInfo: objectInfo,
167 }
168 } else {
169 // Offset changes fetch the new object at an Offset.
170 // Because the httpReader may not be set by the first
171 // request if it was a stat or seek it must be checked
172 // if the object has been read or not to only initialize
173 // new ones when they haven't been already.
174 // All readAt requests are new requests.
175 if req.DidOffsetChange || !req.beenRead {
176 // Check whether this is snowball
177 // if yes do not use If-Match feature
178 // it doesn't work.
179 if etag != "" && !snowball {
180 opts.SetMatchETag(etag)
181 }
182 if httpReader != nil {
183 // Close previously opened http reader.
184 httpReader.Close()
185 }
186 // If this request is a readAt only get the specified range.
187 if req.isReadAt {
188 // Range is set with respect to the offset and length of the buffer requested.
189 opts.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1)
190 } else if req.Offset > 0 { // Range is set with respect to the offset.
191 opts.SetRange(req.Offset, 0)
192 } else {
193 // Remove range header if already set
194 delete(opts.headers, "Range")
195 }
196 httpReader, objectInfo, _, err = c.getObject(gctx, bucketName, objectName, opts)
197 if err != nil {
198 resCh <- getResponse{
199 Error: err,
200 }
201 return
202 }
203 totalRead = 0
204 }
205
206 // Read at least req.Buffer bytes, if not we have
207 // reached our EOF.
208 size, err := readFull(httpReader, req.Buffer)
209 totalRead += size
210 if size > 0 && err == io.ErrUnexpectedEOF {
211 if int64(totalRead) < objectInfo.Size {
212 // In situations when returned size
213 // is less than the expected content
214 // length set by the server, make sure
215 // we return io.ErrUnexpectedEOF
216 err = io.ErrUnexpectedEOF
217 } else {
218 // If an EOF happens after reading some but not
219 // all the bytes ReadFull returns ErrUnexpectedEOF
220 err = io.EOF
221 }
222 } else if size == 0 && err == io.EOF && objectInfo.Size > 0 {
223 // Special cases when server writes more data
224 // than the content-length, net/http response
225 // body returns an error, instead of converting
226 // it to io.EOF - return unexpected EOF.
227 err = io.ErrUnexpectedEOF
228 }
229
230 // Reply back how much was read.
231 resCh <- getResponse{
232 Size: size,
233 Error: err,
234 didRead: true,
235 objectInfo: objectInfo,
236 }
237 }
238 }
239 }()
240
241 // Create a newObject through the information sent back by reqCh.
242 return newObject(gctx, cancel, reqCh, resCh), nil
243}
244
245// get request message container to communicate with internal
246// go-routine.
247type getRequest struct {
248 Buffer []byte
249 Offset int64 // readAt offset.
250 DidOffsetChange bool // Tracks the offset changes for Seek requests.
251 beenRead bool // Determines if this is the first time an object is being read.
252 isReadAt bool // Determines if this request is a request to a specific range
253 isReadOp bool // Determines if this request is a Read or Read/At request.
254 isFirstReq bool // Determines if this request is the first time an object is being accessed.
255 settingObjectInfo bool // Determines if this request is to set the objectInfo of an object.
256}
257
258// get response message container to reply back for the request.
259type getResponse struct {
260 Size int
261 Error error
262 didRead bool // Lets subsequent calls know whether or not httpReader has been initiated.
263 objectInfo ObjectInfo // Used for the first request.
264}
265
266// Object represents an open object. It implements
267// Reader, ReaderAt, Seeker, Closer for a HTTP stream.
268type Object struct {
269 // Mutex.
270 mutex *sync.Mutex
271
272 // User allocated and defined.
273 reqCh chan<- getRequest
274 resCh <-chan getResponse
275 ctx context.Context
276 cancel context.CancelFunc
277 currOffset int64
278 objectInfo ObjectInfo
279
280 // Ask lower level to initiate data fetching based on currOffset
281 seekData bool
282
283 // Keeps track of closed call.
284 isClosed bool
285
286 // Keeps track of if this is the first call.
287 isStarted bool
288
289 // Previous error saved for future calls.
290 prevErr error
291
292 // Keeps track of if this object has been read yet.
293 beenRead bool
294
295 // Keeps track of if objectInfo has been set yet.
296 objectInfoSet bool
297}
298
299// doGetRequest - sends and blocks on the firstReqCh and reqCh of an object.
300// Returns back the size of the buffer read, if anything was read, as well
301// as any error encountered. For all first requests sent on the object
302// it is also responsible for sending back the objectInfo.
303func (o *Object) doGetRequest(request getRequest) (getResponse, error) {
304 select {
305 case <-o.ctx.Done():
306 return getResponse{}, o.ctx.Err()
307 case o.reqCh <- request:
308 }
309
310 response := <-o.resCh
311
312 // Return any error to the top level.
313 if response.Error != nil {
314 return response, response.Error
315 }
316
317 // This was the first request.
318 if !o.isStarted {
319 // The object has been operated on.
320 o.isStarted = true
321 }
322 // Set the objectInfo if the request was not readAt
323 // and it hasn't been set before.
324 if !o.objectInfoSet && !request.isReadAt {
325 o.objectInfo = response.objectInfo
326 o.objectInfoSet = true
327 }
328 // Set beenRead only if it has not been set before.
329 if !o.beenRead {
330 o.beenRead = response.didRead
331 }
332 // Data are ready on the wire, no need to reinitiate connection in lower level
333 o.seekData = false
334
335 return response, nil
336}
337
338// setOffset - handles the setting of offsets for
339// Read/ReadAt/Seek requests.
340func (o *Object) setOffset(bytesRead int64) error {
341 // Update the currentOffset.
342 o.currOffset += bytesRead
343
344 if o.objectInfo.Size > -1 && o.currOffset >= o.objectInfo.Size {
345 return io.EOF
346 }
347 return nil
348}
349
350// Read reads up to len(b) bytes into b. It returns the number of
351// bytes read (0 <= n <= len(b)) and any error encountered. Returns
352// io.EOF upon end of file.
353func (o *Object) Read(b []byte) (n int, err error) {
354 if o == nil {
355 return 0, errInvalidArgument("Object is nil")
356 }
357
358 // Locking.
359 o.mutex.Lock()
360 defer o.mutex.Unlock()
361
362 // prevErr is previous error saved from previous operation.
363 if o.prevErr != nil || o.isClosed {
364 return 0, o.prevErr
365 }
366
367 // Create a new request.
368 readReq := getRequest{
369 isReadOp: true,
370 beenRead: o.beenRead,
371 Buffer: b,
372 }
373
374 // Alert that this is the first request.
375 if !o.isStarted {
376 readReq.isFirstReq = true
377 }
378
379 // Ask to establish a new data fetch routine based on seekData flag
380 readReq.DidOffsetChange = o.seekData
381 readReq.Offset = o.currOffset
382
383 // Send and receive from the first request.
384 response, err := o.doGetRequest(readReq)
385 if err != nil && err != io.EOF {
386 // Save the error for future calls.
387 o.prevErr = err
388 return response.Size, err
389 }
390
391 // Bytes read.
392 bytesRead := int64(response.Size)
393
394 // Set the new offset.
395 oerr := o.setOffset(bytesRead)
396 if oerr != nil {
397 // Save the error for future calls.
398 o.prevErr = oerr
399 return response.Size, oerr
400 }
401
402 // Return the response.
403 return response.Size, err
404}
405
406// Stat returns the ObjectInfo structure describing Object.
407func (o *Object) Stat() (ObjectInfo, error) {
408 if o == nil {
409 return ObjectInfo{}, errInvalidArgument("Object is nil")
410 }
411 // Locking.
412 o.mutex.Lock()
413 defer o.mutex.Unlock()
414
415 if o.prevErr != nil && o.prevErr != io.EOF || o.isClosed {
416 return ObjectInfo{}, o.prevErr
417 }
418
419 // This is the first request.
420 if !o.isStarted || !o.objectInfoSet {
421 // Send the request and get the response.
422 _, err := o.doGetRequest(getRequest{
423 isFirstReq: !o.isStarted,
424 settingObjectInfo: !o.objectInfoSet,
425 })
426 if err != nil {
427 o.prevErr = err
428 return ObjectInfo{}, err
429 }
430 }
431
432 return o.objectInfo, nil
433}
434
435// ReadAt reads len(b) bytes from the File starting at byte offset
436// off. It returns the number of bytes read and the error, if any.
437// ReadAt always returns a non-nil error when n < len(b). At end of
438// file, that error is io.EOF.
439func (o *Object) ReadAt(b []byte, offset int64) (n int, err error) {
440 if o == nil {
441 return 0, errInvalidArgument("Object is nil")
442 }
443
444 // Locking.
445 o.mutex.Lock()
446 defer o.mutex.Unlock()
447
448 // prevErr is error which was saved in previous operation.
449 if o.prevErr != nil && o.prevErr != io.EOF || o.isClosed {
450 return 0, o.prevErr
451 }
452
453 // Set the current offset to ReadAt offset, because the current offset will be shifted at the end of this method.
454 o.currOffset = offset
455
456 // Can only compare offsets to size when size has been set.
457 if o.objectInfoSet {
458 // If offset is negative than we return io.EOF.
459 // If offset is greater than or equal to object size we return io.EOF.
460 if (o.objectInfo.Size > -1 && offset >= o.objectInfo.Size) || offset < 0 {
461 return 0, io.EOF
462 }
463 }
464
465 // Create the new readAt request.
466 readAtReq := getRequest{
467 isReadOp: true,
468 isReadAt: true,
469 DidOffsetChange: true, // Offset always changes.
470 beenRead: o.beenRead, // Set if this is the first request to try and read.
471 Offset: offset, // Set the offset.
472 Buffer: b,
473 }
474
475 // Alert that this is the first request.
476 if !o.isStarted {
477 readAtReq.isFirstReq = true
478 }
479
480 // Send and receive from the first request.
481 response, err := o.doGetRequest(readAtReq)
482 if err != nil && err != io.EOF {
483 // Save the error.
484 o.prevErr = err
485 return response.Size, err
486 }
487 // Bytes read.
488 bytesRead := int64(response.Size)
489 // There is no valid objectInfo yet
490 // to compare against for EOF.
491 if !o.objectInfoSet {
492 // Update the currentOffset.
493 o.currOffset += bytesRead
494 } else {
495 // If this was not the first request update
496 // the offsets and compare against objectInfo
497 // for EOF.
498 oerr := o.setOffset(bytesRead)
499 if oerr != nil {
500 o.prevErr = oerr
501 return response.Size, oerr
502 }
503 }
504 return response.Size, err
505}
506
507// Seek sets the offset for the next Read or Write to offset,
508// interpreted according to whence: 0 means relative to the
509// origin of the file, 1 means relative to the current offset,
510// and 2 means relative to the end.
511// Seek returns the new offset and an error, if any.
512//
513// Seeking to a negative offset is an error. Seeking to any positive
514// offset is legal, subsequent io operations succeed until the
515// underlying object is not closed.
516func (o *Object) Seek(offset int64, whence int) (n int64, err error) {
517 if o == nil {
518 return 0, errInvalidArgument("Object is nil")
519 }
520
521 // Locking.
522 o.mutex.Lock()
523 defer o.mutex.Unlock()
524
525 // At EOF seeking is legal allow only io.EOF, for any other errors we return.
526 if o.prevErr != nil && o.prevErr != io.EOF {
527 return 0, o.prevErr
528 }
529
530 // Negative offset is valid for whence of '2'.
531 if offset < 0 && whence != 2 {
532 return 0, errInvalidArgument(fmt.Sprintf("Negative position not allowed for %d", whence))
533 }
534
535 // This is the first request. So before anything else
536 // get the ObjectInfo.
537 if !o.isStarted || !o.objectInfoSet {
538 // Create the new Seek request.
539 seekReq := getRequest{
540 isReadOp: false,
541 Offset: offset,
542 isFirstReq: true,
543 }
544 // Send and receive from the seek request.
545 _, err := o.doGetRequest(seekReq)
546 if err != nil {
547 // Save the error.
548 o.prevErr = err
549 return 0, err
550 }
551 }
552
553 newOffset := o.currOffset
554
555 // Switch through whence.
556 switch whence {
557 default:
558 return 0, errInvalidArgument(fmt.Sprintf("Invalid whence %d", whence))
559 case 0:
560 if o.objectInfo.Size > -1 && offset > o.objectInfo.Size {
561 return 0, io.EOF
562 }
563 newOffset = offset
564 case 1:
565 if o.objectInfo.Size > -1 && o.currOffset+offset > o.objectInfo.Size {
566 return 0, io.EOF
567 }
568 newOffset += offset
569 case 2:
570 // If we don't know the object size return an error for io.SeekEnd
571 if o.objectInfo.Size < 0 {
572 return 0, errInvalidArgument("Whence END is not supported when the object size is unknown")
573 }
574 // Seeking to positive offset is valid for whence '2', but
575 // since we are backing a Reader we have reached 'EOF' if
576 // offset is positive.
577 if offset > 0 {
578 return 0, io.EOF
579 }
580 // Seeking to negative position not allowed for whence.
581 if o.objectInfo.Size+offset < 0 {
582 return 0, errInvalidArgument(fmt.Sprintf("Seeking at negative offset not allowed for %d", whence))
583 }
584 newOffset = o.objectInfo.Size + offset
585 }
586 // Reset the saved error since we successfully seeked, let the Read
587 // and ReadAt decide.
588 if o.prevErr == io.EOF {
589 o.prevErr = nil
590 }
591
592 // Ask lower level to fetch again from source when necessary
593 o.seekData = (newOffset != o.currOffset) || o.seekData
594 o.currOffset = newOffset
595
596 // Return the effective offset.
597 return o.currOffset, nil
598}
599
600// Close - The behavior of Close after the first call returns error
601// for subsequent Close() calls.
602func (o *Object) Close() (err error) {
603 if o == nil {
604 return errInvalidArgument("Object is nil")
605 }
606
607 // Locking.
608 o.mutex.Lock()
609 defer o.mutex.Unlock()
610
611 // if already closed return an error.
612 if o.isClosed {
613 return o.prevErr
614 }
615
616 // Close successfully.
617 o.cancel()
618
619 // Close the request channel to indicate the internal go-routine to exit.
620 close(o.reqCh)
621
622 // Save for future operations.
623 errMsg := "Object is already closed. Bad file descriptor."
624 o.prevErr = errors.New(errMsg)
625 // Save here that we closed done channel successfully.
626 o.isClosed = true
627 return nil
628}
629
630// newObject instantiates a new *minio.Object*
631// ObjectInfo will be set by setObjectInfo
632func newObject(ctx context.Context, cancel context.CancelFunc, reqCh chan<- getRequest, resCh <-chan getResponse) *Object {
633 return &Object{
634 ctx: ctx,
635 cancel: cancel,
636 mutex: &sync.Mutex{},
637 reqCh: reqCh,
638 resCh: resCh,
639 }
640}
641
642// getObject - retrieve object from Object Storage.
643//
644// Additionally this function also takes range arguments to download the specified
645// range bytes of an object. Setting offset and length = 0 will download the full object.
646//
647// For more information about the HTTP Range header.
648// go to http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35.
649func (c *Client) getObject(ctx context.Context, bucketName, objectName string, opts GetObjectOptions) (io.ReadCloser, ObjectInfo, http.Header, error) {
650 // Validate input arguments.
651 if err := s3utils.CheckValidBucketName(bucketName); err != nil {
652 return nil, ObjectInfo{}, nil, err
653 }
654 if err := s3utils.CheckValidObjectName(objectName); err != nil {
655 return nil, ObjectInfo{}, nil, err
656 }
657
658 // Execute GET on objectName.
659 resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
660 bucketName: bucketName,
661 objectName: objectName,
662 queryValues: opts.toQueryValues(),
663 customHeader: opts.Header(),
664 contentSHA256Hex: emptySHA256Hex,
665 })
666 if err != nil {
667 return nil, ObjectInfo{}, nil, err
668 }
669 if resp != nil {
670 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
671 return nil, ObjectInfo{}, nil, httpRespToErrorResponse(resp, bucketName, objectName)
672 }
673 }
674
675 objectStat, err := ToObjectInfo(bucketName, objectName, resp.Header)
676 if err != nil {
677 closeResponse(resp)
678 return nil, ObjectInfo{}, nil, err
679 }
680
681 // do not close body here, caller will close
682 return resp.Body, objectStat, resp.Header, nil
683}