aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/minio/minio-go/v7/api-bucket-notification.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/minio/minio-go/v7/api-bucket-notification.go')
-rw-r--r--vendor/github.com/minio/minio-go/v7/api-bucket-notification.go261
1 files changed, 261 insertions, 0 deletions
diff --git a/vendor/github.com/minio/minio-go/v7/api-bucket-notification.go b/vendor/github.com/minio/minio-go/v7/api-bucket-notification.go
new file mode 100644
index 0000000..8de5c01
--- /dev/null
+++ b/vendor/github.com/minio/minio-go/v7/api-bucket-notification.go
@@ -0,0 +1,261 @@
1/*
2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage
3 * Copyright 2017-2020 MinIO, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package minio
19
20import (
21 "bufio"
22 "bytes"
23 "context"
24 "encoding/xml"
25 "net/http"
26 "net/url"
27 "time"
28
29 jsoniter "github.com/json-iterator/go"
30 "github.com/minio/minio-go/v7/pkg/notification"
31 "github.com/minio/minio-go/v7/pkg/s3utils"
32)
33
34// SetBucketNotification saves a new bucket notification with a context to control cancellations and timeouts.
35func (c *Client) SetBucketNotification(ctx context.Context, bucketName string, config notification.Configuration) error {
36 // Input validation.
37 if err := s3utils.CheckValidBucketName(bucketName); err != nil {
38 return err
39 }
40
41 // Get resources properly escaped and lined up before
42 // using them in http request.
43 urlValues := make(url.Values)
44 urlValues.Set("notification", "")
45
46 notifBytes, err := xml.Marshal(&config)
47 if err != nil {
48 return err
49 }
50
51 notifBuffer := bytes.NewReader(notifBytes)
52 reqMetadata := requestMetadata{
53 bucketName: bucketName,
54 queryValues: urlValues,
55 contentBody: notifBuffer,
56 contentLength: int64(len(notifBytes)),
57 contentMD5Base64: sumMD5Base64(notifBytes),
58 contentSHA256Hex: sum256Hex(notifBytes),
59 }
60
61 // Execute PUT to upload a new bucket notification.
62 resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
63 defer closeResponse(resp)
64 if err != nil {
65 return err
66 }
67 if resp != nil {
68 if resp.StatusCode != http.StatusOK {
69 return httpRespToErrorResponse(resp, bucketName, "")
70 }
71 }
72 return nil
73}
74
75// RemoveAllBucketNotification - Remove bucket notification clears all previously specified config
76func (c *Client) RemoveAllBucketNotification(ctx context.Context, bucketName string) error {
77 return c.SetBucketNotification(ctx, bucketName, notification.Configuration{})
78}
79
80// GetBucketNotification returns current bucket notification configuration
81func (c *Client) GetBucketNotification(ctx context.Context, bucketName string) (bucketNotification notification.Configuration, err error) {
82 // Input validation.
83 if err := s3utils.CheckValidBucketName(bucketName); err != nil {
84 return notification.Configuration{}, err
85 }
86 return c.getBucketNotification(ctx, bucketName)
87}
88
89// Request server for notification rules.
90func (c *Client) getBucketNotification(ctx context.Context, bucketName string) (notification.Configuration, error) {
91 urlValues := make(url.Values)
92 urlValues.Set("notification", "")
93
94 // Execute GET on bucket to list objects.
95 resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
96 bucketName: bucketName,
97 queryValues: urlValues,
98 contentSHA256Hex: emptySHA256Hex,
99 })
100
101 defer closeResponse(resp)
102 if err != nil {
103 return notification.Configuration{}, err
104 }
105 return processBucketNotificationResponse(bucketName, resp)
106}
107
108// processes the GetNotification http response from the server.
109func processBucketNotificationResponse(bucketName string, resp *http.Response) (notification.Configuration, error) {
110 if resp.StatusCode != http.StatusOK {
111 errResponse := httpRespToErrorResponse(resp, bucketName, "")
112 return notification.Configuration{}, errResponse
113 }
114 var bucketNotification notification.Configuration
115 err := xmlDecoder(resp.Body, &bucketNotification)
116 if err != nil {
117 return notification.Configuration{}, err
118 }
119 return bucketNotification, nil
120}
121
122// ListenNotification listen for all events, this is a MinIO specific API
123func (c *Client) ListenNotification(ctx context.Context, prefix, suffix string, events []string) <-chan notification.Info {
124 return c.ListenBucketNotification(ctx, "", prefix, suffix, events)
125}
126
127// ListenBucketNotification listen for bucket events, this is a MinIO specific API
128func (c *Client) ListenBucketNotification(ctx context.Context, bucketName, prefix, suffix string, events []string) <-chan notification.Info {
129 notificationInfoCh := make(chan notification.Info, 1)
130 const notificationCapacity = 4 * 1024 * 1024
131 notificationEventBuffer := make([]byte, notificationCapacity)
132 // Only success, start a routine to start reading line by line.
133 go func(notificationInfoCh chan<- notification.Info) {
134 defer close(notificationInfoCh)
135
136 // Validate the bucket name.
137 if bucketName != "" {
138 if err := s3utils.CheckValidBucketName(bucketName); err != nil {
139 select {
140 case notificationInfoCh <- notification.Info{
141 Err: err,
142 }:
143 case <-ctx.Done():
144 }
145 return
146 }
147 }
148
149 // Check ARN partition to verify if listening bucket is supported
150 if s3utils.IsAmazonEndpoint(*c.endpointURL) || s3utils.IsGoogleEndpoint(*c.endpointURL) {
151 select {
152 case notificationInfoCh <- notification.Info{
153 Err: errAPINotSupported("Listening for bucket notification is specific only to `minio` server endpoints"),
154 }:
155 case <-ctx.Done():
156 }
157 return
158 }
159
160 // Continuously run and listen on bucket notification.
161 // Create a done channel to control 'ListObjects' go routine.
162 retryDoneCh := make(chan struct{}, 1)
163
164 // Indicate to our routine to exit cleanly upon return.
165 defer close(retryDoneCh)
166
167 // Prepare urlValues to pass into the request on every loop
168 urlValues := make(url.Values)
169 urlValues.Set("ping", "10")
170 urlValues.Set("prefix", prefix)
171 urlValues.Set("suffix", suffix)
172 urlValues["events"] = events
173
174 // Wait on the jitter retry loop.
175 for range c.newRetryTimerContinous(time.Second, time.Second*30, MaxJitter, retryDoneCh) {
176 // Execute GET on bucket to list objects.
177 resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
178 bucketName: bucketName,
179 queryValues: urlValues,
180 contentSHA256Hex: emptySHA256Hex,
181 })
182 if err != nil {
183 select {
184 case notificationInfoCh <- notification.Info{
185 Err: err,
186 }:
187 case <-ctx.Done():
188 }
189 return
190 }
191
192 // Validate http response, upon error return quickly.
193 if resp.StatusCode != http.StatusOK {
194 errResponse := httpRespToErrorResponse(resp, bucketName, "")
195 select {
196 case notificationInfoCh <- notification.Info{
197 Err: errResponse,
198 }:
199 case <-ctx.Done():
200 }
201 return
202 }
203
204 // Initialize a new bufio scanner, to read line by line.
205 bio := bufio.NewScanner(resp.Body)
206
207 // Use a higher buffer to support unexpected
208 // caching done by proxies
209 bio.Buffer(notificationEventBuffer, notificationCapacity)
210 json := jsoniter.ConfigCompatibleWithStandardLibrary
211
212 // Unmarshal each line, returns marshaled values.
213 for bio.Scan() {
214 var notificationInfo notification.Info
215 if err = json.Unmarshal(bio.Bytes(), &notificationInfo); err != nil {
216 // Unexpected error during json unmarshal, send
217 // the error to caller for actionable as needed.
218 select {
219 case notificationInfoCh <- notification.Info{
220 Err: err,
221 }:
222 case <-ctx.Done():
223 return
224 }
225 closeResponse(resp)
226 continue
227 }
228
229 // Empty events pinged from the server
230 if len(notificationInfo.Records) == 0 && notificationInfo.Err == nil {
231 continue
232 }
233
234 // Send notificationInfo
235 select {
236 case notificationInfoCh <- notificationInfo:
237 case <-ctx.Done():
238 closeResponse(resp)
239 return
240 }
241 }
242
243 if err = bio.Err(); err != nil {
244 select {
245 case notificationInfoCh <- notification.Info{
246 Err: err,
247 }:
248 case <-ctx.Done():
249 return
250 }
251 }
252
253 // Close current connection before looping further.
254 closeResponse(resp)
255
256 }
257 }(notificationInfoCh)
258
259 // Returns the notification info channel, for caller to start reading from.
260 return notificationInfoCh
261}