diff options
Diffstat (limited to 'vendor/github.com/minio/minio-go/v7/api-bucket-notification.go')
-rw-r--r-- | vendor/github.com/minio/minio-go/v7/api-bucket-notification.go | 261 |
1 files changed, 261 insertions, 0 deletions
diff --git a/vendor/github.com/minio/minio-go/v7/api-bucket-notification.go b/vendor/github.com/minio/minio-go/v7/api-bucket-notification.go new file mode 100644 index 0000000..8de5c01 --- /dev/null +++ b/vendor/github.com/minio/minio-go/v7/api-bucket-notification.go | |||
@@ -0,0 +1,261 @@ | |||
1 | /* | ||
2 | * MinIO Go Library for Amazon S3 Compatible Cloud Storage | ||
3 | * Copyright 2017-2020 MinIO, Inc. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | */ | ||
17 | |||
18 | package minio | ||
19 | |||
20 | import ( | ||
21 | "bufio" | ||
22 | "bytes" | ||
23 | "context" | ||
24 | "encoding/xml" | ||
25 | "net/http" | ||
26 | "net/url" | ||
27 | "time" | ||
28 | |||
29 | jsoniter "github.com/json-iterator/go" | ||
30 | "github.com/minio/minio-go/v7/pkg/notification" | ||
31 | "github.com/minio/minio-go/v7/pkg/s3utils" | ||
32 | ) | ||
33 | |||
34 | // SetBucketNotification saves a new bucket notification with a context to control cancellations and timeouts. | ||
35 | func (c *Client) SetBucketNotification(ctx context.Context, bucketName string, config notification.Configuration) error { | ||
36 | // Input validation. | ||
37 | if err := s3utils.CheckValidBucketName(bucketName); err != nil { | ||
38 | return err | ||
39 | } | ||
40 | |||
41 | // Get resources properly escaped and lined up before | ||
42 | // using them in http request. | ||
43 | urlValues := make(url.Values) | ||
44 | urlValues.Set("notification", "") | ||
45 | |||
46 | notifBytes, err := xml.Marshal(&config) | ||
47 | if err != nil { | ||
48 | return err | ||
49 | } | ||
50 | |||
51 | notifBuffer := bytes.NewReader(notifBytes) | ||
52 | reqMetadata := requestMetadata{ | ||
53 | bucketName: bucketName, | ||
54 | queryValues: urlValues, | ||
55 | contentBody: notifBuffer, | ||
56 | contentLength: int64(len(notifBytes)), | ||
57 | contentMD5Base64: sumMD5Base64(notifBytes), | ||
58 | contentSHA256Hex: sum256Hex(notifBytes), | ||
59 | } | ||
60 | |||
61 | // Execute PUT to upload a new bucket notification. | ||
62 | resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata) | ||
63 | defer closeResponse(resp) | ||
64 | if err != nil { | ||
65 | return err | ||
66 | } | ||
67 | if resp != nil { | ||
68 | if resp.StatusCode != http.StatusOK { | ||
69 | return httpRespToErrorResponse(resp, bucketName, "") | ||
70 | } | ||
71 | } | ||
72 | return nil | ||
73 | } | ||
74 | |||
75 | // RemoveAllBucketNotification - Remove bucket notification clears all previously specified config | ||
76 | func (c *Client) RemoveAllBucketNotification(ctx context.Context, bucketName string) error { | ||
77 | return c.SetBucketNotification(ctx, bucketName, notification.Configuration{}) | ||
78 | } | ||
79 | |||
80 | // GetBucketNotification returns current bucket notification configuration | ||
81 | func (c *Client) GetBucketNotification(ctx context.Context, bucketName string) (bucketNotification notification.Configuration, err error) { | ||
82 | // Input validation. | ||
83 | if err := s3utils.CheckValidBucketName(bucketName); err != nil { | ||
84 | return notification.Configuration{}, err | ||
85 | } | ||
86 | return c.getBucketNotification(ctx, bucketName) | ||
87 | } | ||
88 | |||
89 | // Request server for notification rules. | ||
90 | func (c *Client) getBucketNotification(ctx context.Context, bucketName string) (notification.Configuration, error) { | ||
91 | urlValues := make(url.Values) | ||
92 | urlValues.Set("notification", "") | ||
93 | |||
94 | // Execute GET on bucket to list objects. | ||
95 | resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{ | ||
96 | bucketName: bucketName, | ||
97 | queryValues: urlValues, | ||
98 | contentSHA256Hex: emptySHA256Hex, | ||
99 | }) | ||
100 | |||
101 | defer closeResponse(resp) | ||
102 | if err != nil { | ||
103 | return notification.Configuration{}, err | ||
104 | } | ||
105 | return processBucketNotificationResponse(bucketName, resp) | ||
106 | } | ||
107 | |||
108 | // processes the GetNotification http response from the server. | ||
109 | func processBucketNotificationResponse(bucketName string, resp *http.Response) (notification.Configuration, error) { | ||
110 | if resp.StatusCode != http.StatusOK { | ||
111 | errResponse := httpRespToErrorResponse(resp, bucketName, "") | ||
112 | return notification.Configuration{}, errResponse | ||
113 | } | ||
114 | var bucketNotification notification.Configuration | ||
115 | err := xmlDecoder(resp.Body, &bucketNotification) | ||
116 | if err != nil { | ||
117 | return notification.Configuration{}, err | ||
118 | } | ||
119 | return bucketNotification, nil | ||
120 | } | ||
121 | |||
122 | // ListenNotification listen for all events, this is a MinIO specific API | ||
123 | func (c *Client) ListenNotification(ctx context.Context, prefix, suffix string, events []string) <-chan notification.Info { | ||
124 | return c.ListenBucketNotification(ctx, "", prefix, suffix, events) | ||
125 | } | ||
126 | |||
127 | // ListenBucketNotification listen for bucket events, this is a MinIO specific API | ||
128 | func (c *Client) ListenBucketNotification(ctx context.Context, bucketName, prefix, suffix string, events []string) <-chan notification.Info { | ||
129 | notificationInfoCh := make(chan notification.Info, 1) | ||
130 | const notificationCapacity = 4 * 1024 * 1024 | ||
131 | notificationEventBuffer := make([]byte, notificationCapacity) | ||
132 | // Only success, start a routine to start reading line by line. | ||
133 | go func(notificationInfoCh chan<- notification.Info) { | ||
134 | defer close(notificationInfoCh) | ||
135 | |||
136 | // Validate the bucket name. | ||
137 | if bucketName != "" { | ||
138 | if err := s3utils.CheckValidBucketName(bucketName); err != nil { | ||
139 | select { | ||
140 | case notificationInfoCh <- notification.Info{ | ||
141 | Err: err, | ||
142 | }: | ||
143 | case <-ctx.Done(): | ||
144 | } | ||
145 | return | ||
146 | } | ||
147 | } | ||
148 | |||
149 | // Check ARN partition to verify if listening bucket is supported | ||
150 | if s3utils.IsAmazonEndpoint(*c.endpointURL) || s3utils.IsGoogleEndpoint(*c.endpointURL) { | ||
151 | select { | ||
152 | case notificationInfoCh <- notification.Info{ | ||
153 | Err: errAPINotSupported("Listening for bucket notification is specific only to `minio` server endpoints"), | ||
154 | }: | ||
155 | case <-ctx.Done(): | ||
156 | } | ||
157 | return | ||
158 | } | ||
159 | |||
160 | // Continuously run and listen on bucket notification. | ||
161 | // Create a done channel to control 'ListObjects' go routine. | ||
162 | retryDoneCh := make(chan struct{}, 1) | ||
163 | |||
164 | // Indicate to our routine to exit cleanly upon return. | ||
165 | defer close(retryDoneCh) | ||
166 | |||
167 | // Prepare urlValues to pass into the request on every loop | ||
168 | urlValues := make(url.Values) | ||
169 | urlValues.Set("ping", "10") | ||
170 | urlValues.Set("prefix", prefix) | ||
171 | urlValues.Set("suffix", suffix) | ||
172 | urlValues["events"] = events | ||
173 | |||
174 | // Wait on the jitter retry loop. | ||
175 | for range c.newRetryTimerContinous(time.Second, time.Second*30, MaxJitter, retryDoneCh) { | ||
176 | // Execute GET on bucket to list objects. | ||
177 | resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{ | ||
178 | bucketName: bucketName, | ||
179 | queryValues: urlValues, | ||
180 | contentSHA256Hex: emptySHA256Hex, | ||
181 | }) | ||
182 | if err != nil { | ||
183 | select { | ||
184 | case notificationInfoCh <- notification.Info{ | ||
185 | Err: err, | ||
186 | }: | ||
187 | case <-ctx.Done(): | ||
188 | } | ||
189 | return | ||
190 | } | ||
191 | |||
192 | // Validate http response, upon error return quickly. | ||
193 | if resp.StatusCode != http.StatusOK { | ||
194 | errResponse := httpRespToErrorResponse(resp, bucketName, "") | ||
195 | select { | ||
196 | case notificationInfoCh <- notification.Info{ | ||
197 | Err: errResponse, | ||
198 | }: | ||
199 | case <-ctx.Done(): | ||
200 | } | ||
201 | return | ||
202 | } | ||
203 | |||
204 | // Initialize a new bufio scanner, to read line by line. | ||
205 | bio := bufio.NewScanner(resp.Body) | ||
206 | |||
207 | // Use a higher buffer to support unexpected | ||
208 | // caching done by proxies | ||
209 | bio.Buffer(notificationEventBuffer, notificationCapacity) | ||
210 | json := jsoniter.ConfigCompatibleWithStandardLibrary | ||
211 | |||
212 | // Unmarshal each line, returns marshaled values. | ||
213 | for bio.Scan() { | ||
214 | var notificationInfo notification.Info | ||
215 | if err = json.Unmarshal(bio.Bytes(), ¬ificationInfo); err != nil { | ||
216 | // Unexpected error during json unmarshal, send | ||
217 | // the error to caller for actionable as needed. | ||
218 | select { | ||
219 | case notificationInfoCh <- notification.Info{ | ||
220 | Err: err, | ||
221 | }: | ||
222 | case <-ctx.Done(): | ||
223 | return | ||
224 | } | ||
225 | closeResponse(resp) | ||
226 | continue | ||
227 | } | ||
228 | |||
229 | // Empty events pinged from the server | ||
230 | if len(notificationInfo.Records) == 0 && notificationInfo.Err == nil { | ||
231 | continue | ||
232 | } | ||
233 | |||
234 | // Send notificationInfo | ||
235 | select { | ||
236 | case notificationInfoCh <- notificationInfo: | ||
237 | case <-ctx.Done(): | ||
238 | closeResponse(resp) | ||
239 | return | ||
240 | } | ||
241 | } | ||
242 | |||
243 | if err = bio.Err(); err != nil { | ||
244 | select { | ||
245 | case notificationInfoCh <- notification.Info{ | ||
246 | Err: err, | ||
247 | }: | ||
248 | case <-ctx.Done(): | ||
249 | return | ||
250 | } | ||
251 | } | ||
252 | |||
253 | // Close current connection before looping further. | ||
254 | closeResponse(resp) | ||
255 | |||
256 | } | ||
257 | }(notificationInfoCh) | ||
258 | |||
259 | // Returns the notification info channel, for caller to start reading from. | ||
260 | return notificationInfoCh | ||
261 | } | ||