aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/minio/minio-go/v7/api-bucket-replication.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/minio/minio-go/v7/api-bucket-replication.go')
-rw-r--r--vendor/github.com/minio/minio-go/v7/api-bucket-replication.go355
1 files changed, 355 insertions, 0 deletions
diff --git a/vendor/github.com/minio/minio-go/v7/api-bucket-replication.go b/vendor/github.com/minio/minio-go/v7/api-bucket-replication.go
new file mode 100644
index 0000000..b12bb13
--- /dev/null
+++ b/vendor/github.com/minio/minio-go/v7/api-bucket-replication.go
@@ -0,0 +1,355 @@
1/*
2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage
3 * Copyright 2020 MinIO, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package minio
19
20import (
21 "bytes"
22 "context"
23 "encoding/json"
24 "encoding/xml"
25 "io"
26 "net/http"
27 "net/url"
28 "time"
29
30 "github.com/google/uuid"
31 "github.com/minio/minio-go/v7/pkg/replication"
32 "github.com/minio/minio-go/v7/pkg/s3utils"
33)
34
35// RemoveBucketReplication removes a replication config on an existing bucket.
36func (c *Client) RemoveBucketReplication(ctx context.Context, bucketName string) error {
37 return c.removeBucketReplication(ctx, bucketName)
38}
39
40// SetBucketReplication sets a replication config on an existing bucket.
41func (c *Client) SetBucketReplication(ctx context.Context, bucketName string, cfg replication.Config) error {
42 // Input validation.
43 if err := s3utils.CheckValidBucketName(bucketName); err != nil {
44 return err
45 }
46
47 // If replication is empty then delete it.
48 if cfg.Empty() {
49 return c.removeBucketReplication(ctx, bucketName)
50 }
51 // Save the updated replication.
52 return c.putBucketReplication(ctx, bucketName, cfg)
53}
54
55// Saves a new bucket replication.
56func (c *Client) putBucketReplication(ctx context.Context, bucketName string, cfg replication.Config) error {
57 // Get resources properly escaped and lined up before
58 // using them in http request.
59 urlValues := make(url.Values)
60 urlValues.Set("replication", "")
61 replication, err := xml.Marshal(cfg)
62 if err != nil {
63 return err
64 }
65
66 reqMetadata := requestMetadata{
67 bucketName: bucketName,
68 queryValues: urlValues,
69 contentBody: bytes.NewReader(replication),
70 contentLength: int64(len(replication)),
71 contentMD5Base64: sumMD5Base64(replication),
72 }
73
74 // Execute PUT to upload a new bucket replication config.
75 resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
76 defer closeResponse(resp)
77 if err != nil {
78 return err
79 }
80
81 if resp.StatusCode != http.StatusOK {
82 return httpRespToErrorResponse(resp, bucketName, "")
83 }
84
85 return nil
86}
87
88// Remove replication from a bucket.
89func (c *Client) removeBucketReplication(ctx context.Context, bucketName string) error {
90 // Get resources properly escaped and lined up before
91 // using them in http request.
92 urlValues := make(url.Values)
93 urlValues.Set("replication", "")
94
95 // Execute DELETE on objectName.
96 resp, err := c.executeMethod(ctx, http.MethodDelete, requestMetadata{
97 bucketName: bucketName,
98 queryValues: urlValues,
99 contentSHA256Hex: emptySHA256Hex,
100 })
101 defer closeResponse(resp)
102 if err != nil {
103 return err
104 }
105 if resp.StatusCode != http.StatusOK {
106 return httpRespToErrorResponse(resp, bucketName, "")
107 }
108 return nil
109}
110
111// GetBucketReplication fetches bucket replication configuration.If config is not
112// found, returns empty config with nil error.
113func (c *Client) GetBucketReplication(ctx context.Context, bucketName string) (cfg replication.Config, err error) {
114 // Input validation.
115 if err := s3utils.CheckValidBucketName(bucketName); err != nil {
116 return cfg, err
117 }
118 bucketReplicationCfg, err := c.getBucketReplication(ctx, bucketName)
119 if err != nil {
120 errResponse := ToErrorResponse(err)
121 if errResponse.Code == "ReplicationConfigurationNotFoundError" {
122 return cfg, nil
123 }
124 return cfg, err
125 }
126 return bucketReplicationCfg, nil
127}
128
129// Request server for current bucket replication config.
130func (c *Client) getBucketReplication(ctx context.Context, bucketName string) (cfg replication.Config, err error) {
131 // Get resources properly escaped and lined up before
132 // using them in http request.
133 urlValues := make(url.Values)
134 urlValues.Set("replication", "")
135
136 // Execute GET on bucket to get replication config.
137 resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
138 bucketName: bucketName,
139 queryValues: urlValues,
140 })
141
142 defer closeResponse(resp)
143 if err != nil {
144 return cfg, err
145 }
146
147 if resp.StatusCode != http.StatusOK {
148 return cfg, httpRespToErrorResponse(resp, bucketName, "")
149 }
150
151 if err = xmlDecoder(resp.Body, &cfg); err != nil {
152 return cfg, err
153 }
154
155 return cfg, nil
156}
157
158// GetBucketReplicationMetrics fetches bucket replication status metrics
159func (c *Client) GetBucketReplicationMetrics(ctx context.Context, bucketName string) (s replication.Metrics, err error) {
160 // Input validation.
161 if err := s3utils.CheckValidBucketName(bucketName); err != nil {
162 return s, err
163 }
164 // Get resources properly escaped and lined up before
165 // using them in http request.
166 urlValues := make(url.Values)
167 urlValues.Set("replication-metrics", "")
168
169 // Execute GET on bucket to get replication config.
170 resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
171 bucketName: bucketName,
172 queryValues: urlValues,
173 })
174
175 defer closeResponse(resp)
176 if err != nil {
177 return s, err
178 }
179
180 if resp.StatusCode != http.StatusOK {
181 return s, httpRespToErrorResponse(resp, bucketName, "")
182 }
183 respBytes, err := io.ReadAll(resp.Body)
184 if err != nil {
185 return s, err
186 }
187
188 if err := json.Unmarshal(respBytes, &s); err != nil {
189 return s, err
190 }
191 return s, nil
192}
193
194// mustGetUUID - get a random UUID.
195func mustGetUUID() string {
196 u, err := uuid.NewRandom()
197 if err != nil {
198 return ""
199 }
200 return u.String()
201}
202
203// ResetBucketReplication kicks off replication of previously replicated objects if ExistingObjectReplication
204// is enabled in the replication config
205func (c *Client) ResetBucketReplication(ctx context.Context, bucketName string, olderThan time.Duration) (rID string, err error) {
206 rID = mustGetUUID()
207 _, err = c.resetBucketReplicationOnTarget(ctx, bucketName, olderThan, "", rID)
208 if err != nil {
209 return rID, err
210 }
211 return rID, nil
212}
213
214// ResetBucketReplicationOnTarget kicks off replication of previously replicated objects if
215// ExistingObjectReplication is enabled in the replication config
216func (c *Client) ResetBucketReplicationOnTarget(ctx context.Context, bucketName string, olderThan time.Duration, tgtArn string) (replication.ResyncTargetsInfo, error) {
217 return c.resetBucketReplicationOnTarget(ctx, bucketName, olderThan, tgtArn, mustGetUUID())
218}
219
220// ResetBucketReplication kicks off replication of previously replicated objects if ExistingObjectReplication
221// is enabled in the replication config
222func (c *Client) resetBucketReplicationOnTarget(ctx context.Context, bucketName string, olderThan time.Duration, tgtArn, resetID string) (rinfo replication.ResyncTargetsInfo, err error) {
223 // Input validation.
224 if err = s3utils.CheckValidBucketName(bucketName); err != nil {
225 return
226 }
227 // Get resources properly escaped and lined up before
228 // using them in http request.
229 urlValues := make(url.Values)
230 urlValues.Set("replication-reset", "")
231 if olderThan > 0 {
232 urlValues.Set("older-than", olderThan.String())
233 }
234 if tgtArn != "" {
235 urlValues.Set("arn", tgtArn)
236 }
237 urlValues.Set("reset-id", resetID)
238 // Execute GET on bucket to get replication config.
239 resp, err := c.executeMethod(ctx, http.MethodPut, requestMetadata{
240 bucketName: bucketName,
241 queryValues: urlValues,
242 })
243
244 defer closeResponse(resp)
245 if err != nil {
246 return rinfo, err
247 }
248
249 if resp.StatusCode != http.StatusOK {
250 return rinfo, httpRespToErrorResponse(resp, bucketName, "")
251 }
252
253 if err = json.NewDecoder(resp.Body).Decode(&rinfo); err != nil {
254 return rinfo, err
255 }
256 return rinfo, nil
257}
258
259// GetBucketReplicationResyncStatus gets the status of replication resync
260func (c *Client) GetBucketReplicationResyncStatus(ctx context.Context, bucketName, arn string) (rinfo replication.ResyncTargetsInfo, err error) {
261 // Input validation.
262 if err := s3utils.CheckValidBucketName(bucketName); err != nil {
263 return rinfo, err
264 }
265 // Get resources properly escaped and lined up before
266 // using them in http request.
267 urlValues := make(url.Values)
268 urlValues.Set("replication-reset-status", "")
269 if arn != "" {
270 urlValues.Set("arn", arn)
271 }
272 // Execute GET on bucket to get replication config.
273 resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
274 bucketName: bucketName,
275 queryValues: urlValues,
276 })
277
278 defer closeResponse(resp)
279 if err != nil {
280 return rinfo, err
281 }
282
283 if resp.StatusCode != http.StatusOK {
284 return rinfo, httpRespToErrorResponse(resp, bucketName, "")
285 }
286
287 if err = json.NewDecoder(resp.Body).Decode(&rinfo); err != nil {
288 return rinfo, err
289 }
290 return rinfo, nil
291}
292
293// GetBucketReplicationMetricsV2 fetches bucket replication status metrics
294func (c *Client) GetBucketReplicationMetricsV2(ctx context.Context, bucketName string) (s replication.MetricsV2, err error) {
295 // Input validation.
296 if err := s3utils.CheckValidBucketName(bucketName); err != nil {
297 return s, err
298 }
299 // Get resources properly escaped and lined up before
300 // using them in http request.
301 urlValues := make(url.Values)
302 urlValues.Set("replication-metrics", "2")
303
304 // Execute GET on bucket to get replication metrics.
305 resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
306 bucketName: bucketName,
307 queryValues: urlValues,
308 })
309
310 defer closeResponse(resp)
311 if err != nil {
312 return s, err
313 }
314
315 if resp.StatusCode != http.StatusOK {
316 return s, httpRespToErrorResponse(resp, bucketName, "")
317 }
318 respBytes, err := io.ReadAll(resp.Body)
319 if err != nil {
320 return s, err
321 }
322
323 if err := json.Unmarshal(respBytes, &s); err != nil {
324 return s, err
325 }
326 return s, nil
327}
328
329// CheckBucketReplication validates if replication is set up properly for a bucket
330func (c *Client) CheckBucketReplication(ctx context.Context, bucketName string) (err error) {
331 // Input validation.
332 if err := s3utils.CheckValidBucketName(bucketName); err != nil {
333 return err
334 }
335 // Get resources properly escaped and lined up before
336 // using them in http request.
337 urlValues := make(url.Values)
338 urlValues.Set("replication-check", "")
339
340 // Execute GET on bucket to get replication config.
341 resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
342 bucketName: bucketName,
343 queryValues: urlValues,
344 })
345
346 defer closeResponse(resp)
347 if err != nil {
348 return err
349 }
350
351 if resp.StatusCode != http.StatusOK {
352 return httpRespToErrorResponse(resp, bucketName, "")
353 }
354 return nil
355}