diff options
Diffstat (limited to 'vendor/github.com/minio/minio-go/v7/pkg/replication')
-rw-r--r-- | vendor/github.com/minio/minio-go/v7/pkg/replication/replication.go | 971 |
1 files changed, 971 insertions, 0 deletions
diff --git a/vendor/github.com/minio/minio-go/v7/pkg/replication/replication.go b/vendor/github.com/minio/minio-go/v7/pkg/replication/replication.go new file mode 100644 index 0000000..0abbf6e --- /dev/null +++ b/vendor/github.com/minio/minio-go/v7/pkg/replication/replication.go | |||
@@ -0,0 +1,971 @@ | |||
1 | /* | ||
2 | * MinIO Client (C) 2020 MinIO, Inc. | ||
3 | * | ||
4 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | * you may not use this file except in compliance with the License. | ||
6 | * You may obtain a copy of the License at | ||
7 | * | ||
8 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | * | ||
10 | * Unless required by applicable law or agreed to in writing, software | ||
11 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | * See the License for the specific language governing permissions and | ||
14 | * limitations under the License. | ||
15 | */ | ||
16 | |||
17 | package replication | ||
18 | |||
19 | import ( | ||
20 | "bytes" | ||
21 | "encoding/xml" | ||
22 | "fmt" | ||
23 | "math" | ||
24 | "strconv" | ||
25 | "strings" | ||
26 | "time" | ||
27 | "unicode/utf8" | ||
28 | |||
29 | "github.com/rs/xid" | ||
30 | ) | ||
31 | |||
32 | var errInvalidFilter = fmt.Errorf("invalid filter") | ||
33 | |||
34 | // OptionType specifies operation to be performed on config | ||
35 | type OptionType string | ||
36 | |||
37 | const ( | ||
38 | // AddOption specifies addition of rule to config | ||
39 | AddOption OptionType = "Add" | ||
40 | // SetOption specifies modification of existing rule to config | ||
41 | SetOption OptionType = "Set" | ||
42 | |||
43 | // RemoveOption specifies rule options are for removing a rule | ||
44 | RemoveOption OptionType = "Remove" | ||
45 | // ImportOption is for getting current config | ||
46 | ImportOption OptionType = "Import" | ||
47 | ) | ||
48 | |||
49 | // Options represents options to set a replication configuration rule | ||
50 | type Options struct { | ||
51 | Op OptionType | ||
52 | RoleArn string | ||
53 | ID string | ||
54 | Prefix string | ||
55 | RuleStatus string | ||
56 | Priority string | ||
57 | TagString string | ||
58 | StorageClass string | ||
59 | DestBucket string | ||
60 | IsTagSet bool | ||
61 | IsSCSet bool | ||
62 | ReplicateDeletes string // replicate versioned deletes | ||
63 | ReplicateDeleteMarkers string // replicate soft deletes | ||
64 | ReplicaSync string // replicate replica metadata modifications | ||
65 | ExistingObjectReplicate string | ||
66 | } | ||
67 | |||
68 | // Tags returns a slice of tags for a rule | ||
69 | func (opts Options) Tags() ([]Tag, error) { | ||
70 | var tagList []Tag | ||
71 | tagTokens := strings.Split(opts.TagString, "&") | ||
72 | for _, tok := range tagTokens { | ||
73 | if tok == "" { | ||
74 | break | ||
75 | } | ||
76 | kv := strings.SplitN(tok, "=", 2) | ||
77 | if len(kv) != 2 { | ||
78 | return []Tag{}, fmt.Errorf("tags should be entered as comma separated k=v pairs") | ||
79 | } | ||
80 | tagList = append(tagList, Tag{ | ||
81 | Key: kv[0], | ||
82 | Value: kv[1], | ||
83 | }) | ||
84 | } | ||
85 | return tagList, nil | ||
86 | } | ||
87 | |||
88 | // Config - replication configuration specified in | ||
89 | // https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-add-config.html | ||
90 | type Config struct { | ||
91 | XMLName xml.Name `xml:"ReplicationConfiguration" json:"-"` | ||
92 | Rules []Rule `xml:"Rule" json:"Rules"` | ||
93 | Role string `xml:"Role" json:"Role"` | ||
94 | } | ||
95 | |||
96 | // Empty returns true if config is not set | ||
97 | func (c *Config) Empty() bool { | ||
98 | return len(c.Rules) == 0 | ||
99 | } | ||
100 | |||
101 | // AddRule adds a new rule to existing replication config. If a rule exists with the | ||
102 | // same ID, then the rule is replaced. | ||
103 | func (c *Config) AddRule(opts Options) error { | ||
104 | priority, err := strconv.Atoi(opts.Priority) | ||
105 | if err != nil { | ||
106 | return err | ||
107 | } | ||
108 | var compatSw bool // true if RoleArn is used with new mc client and older minio version prior to multisite | ||
109 | if opts.RoleArn != "" { | ||
110 | tokens := strings.Split(opts.RoleArn, ":") | ||
111 | if len(tokens) != 6 { | ||
112 | return fmt.Errorf("invalid format for replication Role Arn: %v", opts.RoleArn) | ||
113 | } | ||
114 | switch { | ||
115 | case strings.HasPrefix(opts.RoleArn, "arn:minio:replication") && len(c.Rules) == 0: | ||
116 | c.Role = opts.RoleArn | ||
117 | compatSw = true | ||
118 | case strings.HasPrefix(opts.RoleArn, "arn:aws:iam"): | ||
119 | c.Role = opts.RoleArn | ||
120 | default: | ||
121 | return fmt.Errorf("RoleArn invalid for AWS replication configuration: %v", opts.RoleArn) | ||
122 | } | ||
123 | } | ||
124 | |||
125 | var status Status | ||
126 | // toggle rule status for edit option | ||
127 | switch opts.RuleStatus { | ||
128 | case "enable": | ||
129 | status = Enabled | ||
130 | case "disable": | ||
131 | status = Disabled | ||
132 | default: | ||
133 | return fmt.Errorf("rule state should be either [enable|disable]") | ||
134 | } | ||
135 | |||
136 | tags, err := opts.Tags() | ||
137 | if err != nil { | ||
138 | return err | ||
139 | } | ||
140 | andVal := And{ | ||
141 | Tags: tags, | ||
142 | } | ||
143 | filter := Filter{Prefix: opts.Prefix} | ||
144 | // only a single tag is set. | ||
145 | if opts.Prefix == "" && len(tags) == 1 { | ||
146 | filter.Tag = tags[0] | ||
147 | } | ||
148 | // both prefix and tag are present | ||
149 | if len(andVal.Tags) > 1 || opts.Prefix != "" { | ||
150 | filter.And = andVal | ||
151 | filter.And.Prefix = opts.Prefix | ||
152 | filter.Prefix = "" | ||
153 | filter.Tag = Tag{} | ||
154 | } | ||
155 | if opts.ID == "" { | ||
156 | opts.ID = xid.New().String() | ||
157 | } | ||
158 | |||
159 | destBucket := opts.DestBucket | ||
160 | // ref https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-arn-format.html | ||
161 | if btokens := strings.Split(destBucket, ":"); len(btokens) != 6 { | ||
162 | if len(btokens) == 1 && compatSw { | ||
163 | destBucket = fmt.Sprintf("arn:aws:s3:::%s", destBucket) | ||
164 | } else { | ||
165 | return fmt.Errorf("destination bucket needs to be in Arn format") | ||
166 | } | ||
167 | } | ||
168 | dmStatus := Disabled | ||
169 | if opts.ReplicateDeleteMarkers != "" { | ||
170 | switch opts.ReplicateDeleteMarkers { | ||
171 | case "enable": | ||
172 | dmStatus = Enabled | ||
173 | case "disable": | ||
174 | dmStatus = Disabled | ||
175 | default: | ||
176 | return fmt.Errorf("ReplicateDeleteMarkers should be either enable|disable") | ||
177 | } | ||
178 | } | ||
179 | |||
180 | vDeleteStatus := Disabled | ||
181 | if opts.ReplicateDeletes != "" { | ||
182 | switch opts.ReplicateDeletes { | ||
183 | case "enable": | ||
184 | vDeleteStatus = Enabled | ||
185 | case "disable": | ||
186 | vDeleteStatus = Disabled | ||
187 | default: | ||
188 | return fmt.Errorf("ReplicateDeletes should be either enable|disable") | ||
189 | } | ||
190 | } | ||
191 | var replicaSync Status | ||
192 | // replica sync is by default Enabled, unless specified. | ||
193 | switch opts.ReplicaSync { | ||
194 | case "enable", "": | ||
195 | replicaSync = Enabled | ||
196 | case "disable": | ||
197 | replicaSync = Disabled | ||
198 | default: | ||
199 | return fmt.Errorf("replica metadata sync should be either [enable|disable]") | ||
200 | } | ||
201 | |||
202 | var existingStatus Status | ||
203 | if opts.ExistingObjectReplicate != "" { | ||
204 | switch opts.ExistingObjectReplicate { | ||
205 | case "enable": | ||
206 | existingStatus = Enabled | ||
207 | case "disable", "": | ||
208 | existingStatus = Disabled | ||
209 | default: | ||
210 | return fmt.Errorf("existingObjectReplicate should be either enable|disable") | ||
211 | } | ||
212 | } | ||
213 | newRule := Rule{ | ||
214 | ID: opts.ID, | ||
215 | Priority: priority, | ||
216 | Status: status, | ||
217 | Filter: filter, | ||
218 | Destination: Destination{ | ||
219 | Bucket: destBucket, | ||
220 | StorageClass: opts.StorageClass, | ||
221 | }, | ||
222 | DeleteMarkerReplication: DeleteMarkerReplication{Status: dmStatus}, | ||
223 | DeleteReplication: DeleteReplication{Status: vDeleteStatus}, | ||
224 | // MinIO enables replica metadata syncing by default in the case of bi-directional replication to allow | ||
225 | // automatic failover as the expectation in this case is that replica and source should be identical. | ||
226 | // However AWS leaves this configurable https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-for-metadata-changes.html | ||
227 | SourceSelectionCriteria: SourceSelectionCriteria{ | ||
228 | ReplicaModifications: ReplicaModifications{ | ||
229 | Status: replicaSync, | ||
230 | }, | ||
231 | }, | ||
232 | // By default disable existing object replication unless selected | ||
233 | ExistingObjectReplication: ExistingObjectReplication{ | ||
234 | Status: existingStatus, | ||
235 | }, | ||
236 | } | ||
237 | |||
238 | // validate rule after overlaying priority for pre-existing rule being disabled. | ||
239 | if err := newRule.Validate(); err != nil { | ||
240 | return err | ||
241 | } | ||
242 | // if replication config uses RoleArn, migrate this to the destination element as target ARN for remote bucket for MinIO configuration | ||
243 | if c.Role != "" && !strings.HasPrefix(c.Role, "arn:aws:iam") && !compatSw { | ||
244 | for i := range c.Rules { | ||
245 | c.Rules[i].Destination.Bucket = c.Role | ||
246 | } | ||
247 | c.Role = "" | ||
248 | } | ||
249 | |||
250 | for _, rule := range c.Rules { | ||
251 | if rule.Priority == newRule.Priority { | ||
252 | return fmt.Errorf("priority must be unique. Replication configuration already has a rule with this priority") | ||
253 | } | ||
254 | if rule.ID == newRule.ID { | ||
255 | return fmt.Errorf("a rule exists with this ID") | ||
256 | } | ||
257 | } | ||
258 | |||
259 | c.Rules = append(c.Rules, newRule) | ||
260 | return nil | ||
261 | } | ||
262 | |||
263 | // EditRule modifies an existing rule in replication config | ||
264 | func (c *Config) EditRule(opts Options) error { | ||
265 | if opts.ID == "" { | ||
266 | return fmt.Errorf("rule ID missing") | ||
267 | } | ||
268 | // if replication config uses RoleArn, migrate this to the destination element as target ARN for remote bucket for non AWS. | ||
269 | if c.Role != "" && !strings.HasPrefix(c.Role, "arn:aws:iam") && len(c.Rules) > 1 { | ||
270 | for i := range c.Rules { | ||
271 | c.Rules[i].Destination.Bucket = c.Role | ||
272 | } | ||
273 | c.Role = "" | ||
274 | } | ||
275 | |||
276 | rIdx := -1 | ||
277 | var newRule Rule | ||
278 | for i, rule := range c.Rules { | ||
279 | if rule.ID == opts.ID { | ||
280 | rIdx = i | ||
281 | newRule = rule | ||
282 | break | ||
283 | } | ||
284 | } | ||
285 | if rIdx < 0 { | ||
286 | return fmt.Errorf("rule with ID %s not found in replication configuration", opts.ID) | ||
287 | } | ||
288 | prefixChg := opts.Prefix != newRule.Prefix() | ||
289 | if opts.IsTagSet || prefixChg { | ||
290 | prefix := newRule.Prefix() | ||
291 | if prefix != opts.Prefix { | ||
292 | prefix = opts.Prefix | ||
293 | } | ||
294 | tags := []Tag{newRule.Filter.Tag} | ||
295 | if len(newRule.Filter.And.Tags) != 0 { | ||
296 | tags = newRule.Filter.And.Tags | ||
297 | } | ||
298 | var err error | ||
299 | if opts.IsTagSet { | ||
300 | tags, err = opts.Tags() | ||
301 | if err != nil { | ||
302 | return err | ||
303 | } | ||
304 | } | ||
305 | andVal := And{ | ||
306 | Tags: tags, | ||
307 | } | ||
308 | |||
309 | filter := Filter{Prefix: prefix} | ||
310 | // only a single tag is set. | ||
311 | if prefix == "" && len(tags) == 1 { | ||
312 | filter.Tag = tags[0] | ||
313 | } | ||
314 | // both prefix and tag are present | ||
315 | if len(andVal.Tags) > 1 || prefix != "" { | ||
316 | filter.And = andVal | ||
317 | filter.And.Prefix = prefix | ||
318 | filter.Prefix = "" | ||
319 | filter.Tag = Tag{} | ||
320 | } | ||
321 | newRule.Filter = filter | ||
322 | } | ||
323 | |||
324 | // toggle rule status for edit option | ||
325 | if opts.RuleStatus != "" { | ||
326 | switch opts.RuleStatus { | ||
327 | case "enable": | ||
328 | newRule.Status = Enabled | ||
329 | case "disable": | ||
330 | newRule.Status = Disabled | ||
331 | default: | ||
332 | return fmt.Errorf("rule state should be either [enable|disable]") | ||
333 | } | ||
334 | } | ||
335 | // set DeleteMarkerReplication rule status for edit option | ||
336 | if opts.ReplicateDeleteMarkers != "" { | ||
337 | switch opts.ReplicateDeleteMarkers { | ||
338 | case "enable": | ||
339 | newRule.DeleteMarkerReplication.Status = Enabled | ||
340 | case "disable": | ||
341 | newRule.DeleteMarkerReplication.Status = Disabled | ||
342 | default: | ||
343 | return fmt.Errorf("ReplicateDeleteMarkers state should be either [enable|disable]") | ||
344 | } | ||
345 | } | ||
346 | |||
347 | // set DeleteReplication rule status for edit option. This is a MinIO specific | ||
348 | // option to replicate versioned deletes | ||
349 | if opts.ReplicateDeletes != "" { | ||
350 | switch opts.ReplicateDeletes { | ||
351 | case "enable": | ||
352 | newRule.DeleteReplication.Status = Enabled | ||
353 | case "disable": | ||
354 | newRule.DeleteReplication.Status = Disabled | ||
355 | default: | ||
356 | return fmt.Errorf("ReplicateDeletes state should be either [enable|disable]") | ||
357 | } | ||
358 | } | ||
359 | |||
360 | if opts.ReplicaSync != "" { | ||
361 | switch opts.ReplicaSync { | ||
362 | case "enable", "": | ||
363 | newRule.SourceSelectionCriteria.ReplicaModifications.Status = Enabled | ||
364 | case "disable": | ||
365 | newRule.SourceSelectionCriteria.ReplicaModifications.Status = Disabled | ||
366 | default: | ||
367 | return fmt.Errorf("replica metadata sync should be either [enable|disable]") | ||
368 | } | ||
369 | } | ||
370 | |||
371 | if opts.ExistingObjectReplicate != "" { | ||
372 | switch opts.ExistingObjectReplicate { | ||
373 | case "enable": | ||
374 | newRule.ExistingObjectReplication.Status = Enabled | ||
375 | case "disable": | ||
376 | newRule.ExistingObjectReplication.Status = Disabled | ||
377 | default: | ||
378 | return fmt.Errorf("existingObjectsReplication state should be either [enable|disable]") | ||
379 | } | ||
380 | } | ||
381 | if opts.IsSCSet { | ||
382 | newRule.Destination.StorageClass = opts.StorageClass | ||
383 | } | ||
384 | if opts.Priority != "" { | ||
385 | priority, err := strconv.Atoi(opts.Priority) | ||
386 | if err != nil { | ||
387 | return err | ||
388 | } | ||
389 | newRule.Priority = priority | ||
390 | } | ||
391 | if opts.DestBucket != "" { | ||
392 | destBucket := opts.DestBucket | ||
393 | // ref https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-arn-format.html | ||
394 | if btokens := strings.Split(opts.DestBucket, ":"); len(btokens) != 6 { | ||
395 | return fmt.Errorf("destination bucket needs to be in Arn format") | ||
396 | } | ||
397 | newRule.Destination.Bucket = destBucket | ||
398 | } | ||
399 | // validate rule | ||
400 | if err := newRule.Validate(); err != nil { | ||
401 | return err | ||
402 | } | ||
403 | // ensure priority and destination bucket restrictions are not violated | ||
404 | for idx, rule := range c.Rules { | ||
405 | if rule.Priority == newRule.Priority && rIdx != idx { | ||
406 | return fmt.Errorf("priority must be unique. Replication configuration already has a rule with this priority") | ||
407 | } | ||
408 | if rule.Destination.Bucket != newRule.Destination.Bucket && rule.ID == newRule.ID { | ||
409 | return fmt.Errorf("invalid destination bucket for this rule") | ||
410 | } | ||
411 | } | ||
412 | |||
413 | c.Rules[rIdx] = newRule | ||
414 | return nil | ||
415 | } | ||
416 | |||
417 | // RemoveRule removes a rule from replication config. | ||
418 | func (c *Config) RemoveRule(opts Options) error { | ||
419 | var newRules []Rule | ||
420 | ruleFound := false | ||
421 | for _, rule := range c.Rules { | ||
422 | if rule.ID != opts.ID { | ||
423 | newRules = append(newRules, rule) | ||
424 | continue | ||
425 | } | ||
426 | ruleFound = true | ||
427 | } | ||
428 | if !ruleFound { | ||
429 | return fmt.Errorf("Rule with ID %s not found", opts.ID) | ||
430 | } | ||
431 | if len(newRules) == 0 { | ||
432 | return fmt.Errorf("replication configuration should have at least one rule") | ||
433 | } | ||
434 | c.Rules = newRules | ||
435 | return nil | ||
436 | } | ||
437 | |||
438 | // Rule - a rule for replication configuration. | ||
439 | type Rule struct { | ||
440 | XMLName xml.Name `xml:"Rule" json:"-"` | ||
441 | ID string `xml:"ID,omitempty"` | ||
442 | Status Status `xml:"Status"` | ||
443 | Priority int `xml:"Priority"` | ||
444 | DeleteMarkerReplication DeleteMarkerReplication `xml:"DeleteMarkerReplication"` | ||
445 | DeleteReplication DeleteReplication `xml:"DeleteReplication"` | ||
446 | Destination Destination `xml:"Destination"` | ||
447 | Filter Filter `xml:"Filter" json:"Filter"` | ||
448 | SourceSelectionCriteria SourceSelectionCriteria `xml:"SourceSelectionCriteria" json:"SourceSelectionCriteria"` | ||
449 | ExistingObjectReplication ExistingObjectReplication `xml:"ExistingObjectReplication,omitempty" json:"ExistingObjectReplication,omitempty"` | ||
450 | } | ||
451 | |||
452 | // Validate validates the rule for correctness | ||
453 | func (r Rule) Validate() error { | ||
454 | if err := r.validateID(); err != nil { | ||
455 | return err | ||
456 | } | ||
457 | if err := r.validateStatus(); err != nil { | ||
458 | return err | ||
459 | } | ||
460 | if err := r.validateFilter(); err != nil { | ||
461 | return err | ||
462 | } | ||
463 | |||
464 | if r.Priority < 0 && r.Status == Enabled { | ||
465 | return fmt.Errorf("priority must be set for the rule") | ||
466 | } | ||
467 | |||
468 | if err := r.validateStatus(); err != nil { | ||
469 | return err | ||
470 | } | ||
471 | return r.ExistingObjectReplication.Validate() | ||
472 | } | ||
473 | |||
474 | // validateID - checks if ID is valid or not. | ||
475 | func (r Rule) validateID() error { | ||
476 | // cannot be longer than 255 characters | ||
477 | if len(r.ID) > 255 { | ||
478 | return fmt.Errorf("ID must be less than 255 characters") | ||
479 | } | ||
480 | return nil | ||
481 | } | ||
482 | |||
483 | // validateStatus - checks if status is valid or not. | ||
484 | func (r Rule) validateStatus() error { | ||
485 | // Status can't be empty | ||
486 | if len(r.Status) == 0 { | ||
487 | return fmt.Errorf("status cannot be empty") | ||
488 | } | ||
489 | |||
490 | // Status must be one of Enabled or Disabled | ||
491 | if r.Status != Enabled && r.Status != Disabled { | ||
492 | return fmt.Errorf("status must be set to either Enabled or Disabled") | ||
493 | } | ||
494 | return nil | ||
495 | } | ||
496 | |||
497 | func (r Rule) validateFilter() error { | ||
498 | return r.Filter.Validate() | ||
499 | } | ||
500 | |||
501 | // Prefix - a rule can either have prefix under <filter></filter> or under | ||
502 | // <filter><and></and></filter>. This method returns the prefix from the | ||
503 | // location where it is available | ||
504 | func (r Rule) Prefix() string { | ||
505 | if r.Filter.Prefix != "" { | ||
506 | return r.Filter.Prefix | ||
507 | } | ||
508 | return r.Filter.And.Prefix | ||
509 | } | ||
510 | |||
511 | // Tags - a rule can either have tag under <filter></filter> or under | ||
512 | // <filter><and></and></filter>. This method returns all the tags from the | ||
513 | // rule in the format tag1=value1&tag2=value2 | ||
514 | func (r Rule) Tags() string { | ||
515 | ts := []Tag{r.Filter.Tag} | ||
516 | if len(r.Filter.And.Tags) != 0 { | ||
517 | ts = r.Filter.And.Tags | ||
518 | } | ||
519 | |||
520 | var buf bytes.Buffer | ||
521 | for _, t := range ts { | ||
522 | if buf.Len() > 0 { | ||
523 | buf.WriteString("&") | ||
524 | } | ||
525 | buf.WriteString(t.String()) | ||
526 | } | ||
527 | return buf.String() | ||
528 | } | ||
529 | |||
530 | // Filter - a filter for a replication configuration Rule. | ||
531 | type Filter struct { | ||
532 | XMLName xml.Name `xml:"Filter" json:"-"` | ||
533 | Prefix string `json:"Prefix,omitempty"` | ||
534 | And And `xml:"And,omitempty" json:"And,omitempty"` | ||
535 | Tag Tag `xml:"Tag,omitempty" json:"Tag,omitempty"` | ||
536 | } | ||
537 | |||
538 | // Validate - validates the filter element | ||
539 | func (f Filter) Validate() error { | ||
540 | // A Filter must have exactly one of Prefix, Tag, or And specified. | ||
541 | if !f.And.isEmpty() { | ||
542 | if f.Prefix != "" { | ||
543 | return errInvalidFilter | ||
544 | } | ||
545 | if !f.Tag.IsEmpty() { | ||
546 | return errInvalidFilter | ||
547 | } | ||
548 | } | ||
549 | if f.Prefix != "" { | ||
550 | if !f.Tag.IsEmpty() { | ||
551 | return errInvalidFilter | ||
552 | } | ||
553 | } | ||
554 | if !f.Tag.IsEmpty() { | ||
555 | if err := f.Tag.Validate(); err != nil { | ||
556 | return err | ||
557 | } | ||
558 | } | ||
559 | return nil | ||
560 | } | ||
561 | |||
562 | // Tag - a tag for a replication configuration Rule filter. | ||
563 | type Tag struct { | ||
564 | XMLName xml.Name `json:"-"` | ||
565 | Key string `xml:"Key,omitempty" json:"Key,omitempty"` | ||
566 | Value string `xml:"Value,omitempty" json:"Value,omitempty"` | ||
567 | } | ||
568 | |||
569 | func (tag Tag) String() string { | ||
570 | if tag.IsEmpty() { | ||
571 | return "" | ||
572 | } | ||
573 | return tag.Key + "=" + tag.Value | ||
574 | } | ||
575 | |||
576 | // IsEmpty returns whether this tag is empty or not. | ||
577 | func (tag Tag) IsEmpty() bool { | ||
578 | return tag.Key == "" | ||
579 | } | ||
580 | |||
581 | // Validate checks this tag. | ||
582 | func (tag Tag) Validate() error { | ||
583 | if len(tag.Key) == 0 || utf8.RuneCountInString(tag.Key) > 128 { | ||
584 | return fmt.Errorf("invalid Tag Key") | ||
585 | } | ||
586 | |||
587 | if utf8.RuneCountInString(tag.Value) > 256 { | ||
588 | return fmt.Errorf("invalid Tag Value") | ||
589 | } | ||
590 | return nil | ||
591 | } | ||
592 | |||
593 | // Destination - destination in ReplicationConfiguration. | ||
594 | type Destination struct { | ||
595 | XMLName xml.Name `xml:"Destination" json:"-"` | ||
596 | Bucket string `xml:"Bucket" json:"Bucket"` | ||
597 | StorageClass string `xml:"StorageClass,omitempty" json:"StorageClass,omitempty"` | ||
598 | } | ||
599 | |||
600 | // And - a tag to combine a prefix and multiple tags for replication configuration rule. | ||
601 | type And struct { | ||
602 | XMLName xml.Name `xml:"And,omitempty" json:"-"` | ||
603 | Prefix string `xml:"Prefix,omitempty" json:"Prefix,omitempty"` | ||
604 | Tags []Tag `xml:"Tag,omitempty" json:"Tag,omitempty"` | ||
605 | } | ||
606 | |||
607 | // isEmpty returns true if Tags field is null | ||
608 | func (a And) isEmpty() bool { | ||
609 | return len(a.Tags) == 0 && a.Prefix == "" | ||
610 | } | ||
611 | |||
612 | // Status represents Enabled/Disabled status | ||
613 | type Status string | ||
614 | |||
615 | // Supported status types | ||
616 | const ( | ||
617 | Enabled Status = "Enabled" | ||
618 | Disabled Status = "Disabled" | ||
619 | ) | ||
620 | |||
621 | // DeleteMarkerReplication - whether delete markers are replicated - https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-add-config.html | ||
622 | type DeleteMarkerReplication struct { | ||
623 | Status Status `xml:"Status" json:"Status"` // should be set to "Disabled" by default | ||
624 | } | ||
625 | |||
626 | // IsEmpty returns true if DeleteMarkerReplication is not set | ||
627 | func (d DeleteMarkerReplication) IsEmpty() bool { | ||
628 | return len(d.Status) == 0 | ||
629 | } | ||
630 | |||
631 | // DeleteReplication - whether versioned deletes are replicated - this | ||
632 | // is a MinIO specific extension | ||
633 | type DeleteReplication struct { | ||
634 | Status Status `xml:"Status" json:"Status"` // should be set to "Disabled" by default | ||
635 | } | ||
636 | |||
637 | // IsEmpty returns true if DeleteReplication is not set | ||
638 | func (d DeleteReplication) IsEmpty() bool { | ||
639 | return len(d.Status) == 0 | ||
640 | } | ||
641 | |||
642 | // ReplicaModifications specifies if replica modification sync is enabled | ||
643 | type ReplicaModifications struct { | ||
644 | Status Status `xml:"Status" json:"Status"` // should be set to "Enabled" by default | ||
645 | } | ||
646 | |||
647 | // SourceSelectionCriteria - specifies additional source selection criteria in ReplicationConfiguration. | ||
648 | type SourceSelectionCriteria struct { | ||
649 | ReplicaModifications ReplicaModifications `xml:"ReplicaModifications" json:"ReplicaModifications"` | ||
650 | } | ||
651 | |||
652 | // IsValid - checks whether SourceSelectionCriteria is valid or not. | ||
653 | func (s SourceSelectionCriteria) IsValid() bool { | ||
654 | return s.ReplicaModifications.Status == Enabled || s.ReplicaModifications.Status == Disabled | ||
655 | } | ||
656 | |||
657 | // Validate source selection criteria | ||
658 | func (s SourceSelectionCriteria) Validate() error { | ||
659 | if (s == SourceSelectionCriteria{}) { | ||
660 | return nil | ||
661 | } | ||
662 | if !s.IsValid() { | ||
663 | return fmt.Errorf("invalid ReplicaModification status") | ||
664 | } | ||
665 | return nil | ||
666 | } | ||
667 | |||
668 | // ExistingObjectReplication - whether existing object replication is enabled | ||
669 | type ExistingObjectReplication struct { | ||
670 | Status Status `xml:"Status"` // should be set to "Disabled" by default | ||
671 | } | ||
672 | |||
673 | // IsEmpty returns true if DeleteMarkerReplication is not set | ||
674 | func (e ExistingObjectReplication) IsEmpty() bool { | ||
675 | return len(e.Status) == 0 | ||
676 | } | ||
677 | |||
678 | // Validate validates whether the status is disabled. | ||
679 | func (e ExistingObjectReplication) Validate() error { | ||
680 | if e.IsEmpty() { | ||
681 | return nil | ||
682 | } | ||
683 | if e.Status != Disabled && e.Status != Enabled { | ||
684 | return fmt.Errorf("invalid ExistingObjectReplication status") | ||
685 | } | ||
686 | return nil | ||
687 | } | ||
688 | |||
689 | // TargetMetrics represents inline replication metrics | ||
690 | // such as pending, failed and completed bytes in total for a bucket remote target | ||
691 | type TargetMetrics struct { | ||
692 | // Completed count | ||
693 | ReplicatedCount uint64 `json:"replicationCount,omitempty"` | ||
694 | // Completed size in bytes | ||
695 | ReplicatedSize uint64 `json:"completedReplicationSize,omitempty"` | ||
696 | // Bandwidth limit in bytes/sec for this target | ||
697 | BandWidthLimitInBytesPerSecond int64 `json:"limitInBits,omitempty"` | ||
698 | // Current bandwidth used in bytes/sec for this target | ||
699 | CurrentBandwidthInBytesPerSecond float64 `json:"currentBandwidth,omitempty"` | ||
700 | // errors seen in replication in last minute, hour and total | ||
701 | Failed TimedErrStats `json:"failed,omitempty"` | ||
702 | // Deprecated fields | ||
703 | // Pending size in bytes | ||
704 | PendingSize uint64 `json:"pendingReplicationSize,omitempty"` | ||
705 | // Total Replica size in bytes | ||
706 | ReplicaSize uint64 `json:"replicaSize,omitempty"` | ||
707 | // Failed size in bytes | ||
708 | FailedSize uint64 `json:"failedReplicationSize,omitempty"` | ||
709 | // Total number of pending operations including metadata updates | ||
710 | PendingCount uint64 `json:"pendingReplicationCount,omitempty"` | ||
711 | // Total number of failed operations including metadata updates | ||
712 | FailedCount uint64 `json:"failedReplicationCount,omitempty"` | ||
713 | } | ||
714 | |||
715 | // Metrics represents inline replication metrics for a bucket. | ||
716 | type Metrics struct { | ||
717 | Stats map[string]TargetMetrics | ||
718 | // Completed size in bytes across targets | ||
719 | ReplicatedSize uint64 `json:"completedReplicationSize,omitempty"` | ||
720 | // Total Replica size in bytes across targets | ||
721 | ReplicaSize uint64 `json:"replicaSize,omitempty"` | ||
722 | // Total Replica counts | ||
723 | ReplicaCount int64 `json:"replicaCount,omitempty"` | ||
724 | // Total Replicated count | ||
725 | ReplicatedCount int64 `json:"replicationCount,omitempty"` | ||
726 | // errors seen in replication in last minute, hour and total | ||
727 | Errors TimedErrStats `json:"failed,omitempty"` | ||
728 | // Total number of entries that are queued for replication | ||
729 | QStats InQueueMetric `json:"queued"` | ||
730 | // Deprecated fields | ||
731 | // Total Pending size in bytes across targets | ||
732 | PendingSize uint64 `json:"pendingReplicationSize,omitempty"` | ||
733 | // Failed size in bytes across targets | ||
734 | FailedSize uint64 `json:"failedReplicationSize,omitempty"` | ||
735 | // Total number of pending operations including metadata updates across targets | ||
736 | PendingCount uint64 `json:"pendingReplicationCount,omitempty"` | ||
737 | // Total number of failed operations including metadata updates across targets | ||
738 | FailedCount uint64 `json:"failedReplicationCount,omitempty"` | ||
739 | } | ||
740 | |||
741 | // RStat - has count and bytes for replication metrics | ||
742 | type RStat struct { | ||
743 | Count float64 `json:"count"` | ||
744 | Bytes int64 `json:"bytes"` | ||
745 | } | ||
746 | |||
747 | // Add two RStat | ||
748 | func (r RStat) Add(r1 RStat) RStat { | ||
749 | return RStat{ | ||
750 | Count: r.Count + r1.Count, | ||
751 | Bytes: r.Bytes + r1.Bytes, | ||
752 | } | ||
753 | } | ||
754 | |||
755 | // TimedErrStats holds error stats for a time period | ||
756 | type TimedErrStats struct { | ||
757 | LastMinute RStat `json:"lastMinute"` | ||
758 | LastHour RStat `json:"lastHour"` | ||
759 | Totals RStat `json:"totals"` | ||
760 | } | ||
761 | |||
762 | // Add two TimedErrStats | ||
763 | func (te TimedErrStats) Add(o TimedErrStats) TimedErrStats { | ||
764 | return TimedErrStats{ | ||
765 | LastMinute: te.LastMinute.Add(o.LastMinute), | ||
766 | LastHour: te.LastHour.Add(o.LastHour), | ||
767 | Totals: te.Totals.Add(o.Totals), | ||
768 | } | ||
769 | } | ||
770 | |||
771 | // ResyncTargetsInfo provides replication target information to resync replicated data. | ||
772 | type ResyncTargetsInfo struct { | ||
773 | Targets []ResyncTarget `json:"target,omitempty"` | ||
774 | } | ||
775 | |||
776 | // ResyncTarget provides the replica resources and resetID to initiate resync replication. | ||
777 | type ResyncTarget struct { | ||
778 | Arn string `json:"arn"` | ||
779 | ResetID string `json:"resetid"` | ||
780 | StartTime time.Time `json:"startTime,omitempty"` | ||
781 | EndTime time.Time `json:"endTime,omitempty"` | ||
782 | // Status of resync operation | ||
783 | ResyncStatus string `json:"resyncStatus,omitempty"` | ||
784 | // Completed size in bytes | ||
785 | ReplicatedSize int64 `json:"completedReplicationSize,omitempty"` | ||
786 | // Failed size in bytes | ||
787 | FailedSize int64 `json:"failedReplicationSize,omitempty"` | ||
788 | // Total number of failed operations | ||
789 | FailedCount int64 `json:"failedReplicationCount,omitempty"` | ||
790 | // Total number of completed operations | ||
791 | ReplicatedCount int64 `json:"replicationCount,omitempty"` | ||
792 | // Last bucket/object replicated. | ||
793 | Bucket string `json:"bucket,omitempty"` | ||
794 | Object string `json:"object,omitempty"` | ||
795 | } | ||
796 | |||
797 | // XferStats holds transfer rate info for uploads/sec | ||
798 | type XferStats struct { | ||
799 | AvgRate float64 `json:"avgRate"` | ||
800 | PeakRate float64 `json:"peakRate"` | ||
801 | CurrRate float64 `json:"currRate"` | ||
802 | } | ||
803 | |||
804 | // Merge two XferStats | ||
805 | func (x *XferStats) Merge(x1 XferStats) { | ||
806 | x.AvgRate += x1.AvgRate | ||
807 | x.PeakRate += x1.PeakRate | ||
808 | x.CurrRate += x1.CurrRate | ||
809 | } | ||
810 | |||
811 | // QStat holds count and bytes for objects in replication queue | ||
812 | type QStat struct { | ||
813 | Count float64 `json:"count"` | ||
814 | Bytes float64 `json:"bytes"` | ||
815 | } | ||
816 | |||
817 | // Add 2 QStat entries | ||
818 | func (q *QStat) Add(q1 QStat) { | ||
819 | q.Count += q1.Count | ||
820 | q.Bytes += q1.Bytes | ||
821 | } | ||
822 | |||
823 | // InQueueMetric holds stats for objects in replication queue | ||
824 | type InQueueMetric struct { | ||
825 | Curr QStat `json:"curr" msg:"cq"` | ||
826 | Avg QStat `json:"avg" msg:"aq"` | ||
827 | Max QStat `json:"peak" msg:"pq"` | ||
828 | } | ||
829 | |||
830 | // MetricName name of replication metric | ||
831 | type MetricName string | ||
832 | |||
833 | const ( | ||
834 | // Large is a metric name for large objects >=128MiB | ||
835 | Large MetricName = "Large" | ||
836 | // Small is a metric name for objects <128MiB size | ||
837 | Small MetricName = "Small" | ||
838 | // Total is a metric name for total objects | ||
839 | Total MetricName = "Total" | ||
840 | ) | ||
841 | |||
842 | // WorkerStat has stats on number of replication workers | ||
843 | type WorkerStat struct { | ||
844 | Curr int32 `json:"curr"` | ||
845 | Avg float32 `json:"avg"` | ||
846 | Max int32 `json:"max"` | ||
847 | } | ||
848 | |||
849 | // ReplMRFStats holds stats of MRF backlog saved to disk in the last 5 minutes | ||
850 | // and number of entries that failed replication after 3 retries | ||
851 | type ReplMRFStats struct { | ||
852 | LastFailedCount uint64 `json:"failedCount_last5min"` | ||
853 | // Count of unreplicated entries that were dropped after MRF retry limit reached since cluster start. | ||
854 | TotalDroppedCount uint64 `json:"droppedCount_since_uptime"` | ||
855 | // Bytes of unreplicated entries that were dropped after MRF retry limit reached since cluster start. | ||
856 | TotalDroppedBytes uint64 `json:"droppedBytes_since_uptime"` | ||
857 | } | ||
858 | |||
859 | // ReplQNodeStats holds stats for a node in replication queue | ||
860 | type ReplQNodeStats struct { | ||
861 | NodeName string `json:"nodeName"` | ||
862 | Uptime int64 `json:"uptime"` | ||
863 | Workers WorkerStat `json:"activeWorkers"` | ||
864 | |||
865 | XferStats map[MetricName]XferStats `json:"transferSummary"` | ||
866 | TgtXferStats map[string]map[MetricName]XferStats `json:"tgtTransferStats"` | ||
867 | |||
868 | QStats InQueueMetric `json:"queueStats"` | ||
869 | MRFStats ReplMRFStats `json:"mrfStats"` | ||
870 | } | ||
871 | |||
872 | // ReplQueueStats holds stats for replication queue across nodes | ||
873 | type ReplQueueStats struct { | ||
874 | Nodes []ReplQNodeStats `json:"nodes"` | ||
875 | } | ||
876 | |||
877 | // Workers returns number of workers across all nodes | ||
878 | func (q ReplQueueStats) Workers() (tot WorkerStat) { | ||
879 | for _, node := range q.Nodes { | ||
880 | tot.Avg += node.Workers.Avg | ||
881 | tot.Curr += node.Workers.Curr | ||
882 | if tot.Max < node.Workers.Max { | ||
883 | tot.Max = node.Workers.Max | ||
884 | } | ||
885 | } | ||
886 | if len(q.Nodes) > 0 { | ||
887 | tot.Avg /= float32(len(q.Nodes)) | ||
888 | tot.Curr /= int32(len(q.Nodes)) | ||
889 | } | ||
890 | return tot | ||
891 | } | ||
892 | |||
893 | // qStatSummary returns cluster level stats for objects in replication queue | ||
894 | func (q ReplQueueStats) qStatSummary() InQueueMetric { | ||
895 | m := InQueueMetric{} | ||
896 | for _, v := range q.Nodes { | ||
897 | m.Avg.Add(v.QStats.Avg) | ||
898 | m.Curr.Add(v.QStats.Curr) | ||
899 | if m.Max.Count < v.QStats.Max.Count { | ||
900 | m.Max.Add(v.QStats.Max) | ||
901 | } | ||
902 | } | ||
903 | return m | ||
904 | } | ||
905 | |||
906 | // ReplQStats holds stats for objects in replication queue | ||
907 | type ReplQStats struct { | ||
908 | Uptime int64 `json:"uptime"` | ||
909 | Workers WorkerStat `json:"workers"` | ||
910 | |||
911 | XferStats map[MetricName]XferStats `json:"xferStats"` | ||
912 | TgtXferStats map[string]map[MetricName]XferStats `json:"tgtXferStats"` | ||
913 | |||
914 | QStats InQueueMetric `json:"qStats"` | ||
915 | MRFStats ReplMRFStats `json:"mrfStats"` | ||
916 | } | ||
917 | |||
918 | // QStats returns cluster level stats for objects in replication queue | ||
919 | func (q ReplQueueStats) QStats() (r ReplQStats) { | ||
920 | r.QStats = q.qStatSummary() | ||
921 | r.XferStats = make(map[MetricName]XferStats) | ||
922 | r.TgtXferStats = make(map[string]map[MetricName]XferStats) | ||
923 | r.Workers = q.Workers() | ||
924 | |||
925 | for _, node := range q.Nodes { | ||
926 | for arn := range node.TgtXferStats { | ||
927 | xmap, ok := node.TgtXferStats[arn] | ||
928 | if !ok { | ||
929 | xmap = make(map[MetricName]XferStats) | ||
930 | } | ||
931 | for m, v := range xmap { | ||
932 | st, ok := r.XferStats[m] | ||
933 | if !ok { | ||
934 | st = XferStats{} | ||
935 | } | ||
936 | st.AvgRate += v.AvgRate | ||
937 | st.CurrRate += v.CurrRate | ||
938 | st.PeakRate = math.Max(st.PeakRate, v.PeakRate) | ||
939 | if _, ok := r.TgtXferStats[arn]; !ok { | ||
940 | r.TgtXferStats[arn] = make(map[MetricName]XferStats) | ||
941 | } | ||
942 | r.TgtXferStats[arn][m] = st | ||
943 | } | ||
944 | } | ||
945 | for k, v := range node.XferStats { | ||
946 | st, ok := r.XferStats[k] | ||
947 | if !ok { | ||
948 | st = XferStats{} | ||
949 | } | ||
950 | st.AvgRate += v.AvgRate | ||
951 | st.CurrRate += v.CurrRate | ||
952 | st.PeakRate = math.Max(st.PeakRate, v.PeakRate) | ||
953 | r.XferStats[k] = st | ||
954 | } | ||
955 | r.MRFStats.LastFailedCount += node.MRFStats.LastFailedCount | ||
956 | r.MRFStats.TotalDroppedCount += node.MRFStats.TotalDroppedCount | ||
957 | r.MRFStats.TotalDroppedBytes += node.MRFStats.TotalDroppedBytes | ||
958 | r.Uptime += node.Uptime | ||
959 | } | ||
960 | if len(q.Nodes) > 0 { | ||
961 | r.Uptime /= int64(len(q.Nodes)) // average uptime | ||
962 | } | ||
963 | return | ||
964 | } | ||
965 | |||
966 | // MetricsV2 represents replication metrics for a bucket. | ||
967 | type MetricsV2 struct { | ||
968 | Uptime int64 `json:"uptime"` | ||
969 | CurrentStats Metrics `json:"currStats"` | ||
970 | QueueStats ReplQueueStats `json:"queueStats"` | ||
971 | } | ||