aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/minio/minio-go/v7/pkg/replication/replication.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/minio/minio-go/v7/pkg/replication/replication.go')
-rw-r--r--vendor/github.com/minio/minio-go/v7/pkg/replication/replication.go971
1 files changed, 971 insertions, 0 deletions
diff --git a/vendor/github.com/minio/minio-go/v7/pkg/replication/replication.go b/vendor/github.com/minio/minio-go/v7/pkg/replication/replication.go
new file mode 100644
index 0000000..0abbf6e
--- /dev/null
+++ b/vendor/github.com/minio/minio-go/v7/pkg/replication/replication.go
@@ -0,0 +1,971 @@
1/*
2 * MinIO Client (C) 2020 MinIO, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package replication
18
19import (
20 "bytes"
21 "encoding/xml"
22 "fmt"
23 "math"
24 "strconv"
25 "strings"
26 "time"
27 "unicode/utf8"
28
29 "github.com/rs/xid"
30)
31
32var errInvalidFilter = fmt.Errorf("invalid filter")
33
34// OptionType specifies operation to be performed on config
35type OptionType string
36
37const (
38 // AddOption specifies addition of rule to config
39 AddOption OptionType = "Add"
40 // SetOption specifies modification of existing rule to config
41 SetOption OptionType = "Set"
42
43 // RemoveOption specifies rule options are for removing a rule
44 RemoveOption OptionType = "Remove"
45 // ImportOption is for getting current config
46 ImportOption OptionType = "Import"
47)
48
49// Options represents options to set a replication configuration rule
50type Options struct {
51 Op OptionType
52 RoleArn string
53 ID string
54 Prefix string
55 RuleStatus string
56 Priority string
57 TagString string
58 StorageClass string
59 DestBucket string
60 IsTagSet bool
61 IsSCSet bool
62 ReplicateDeletes string // replicate versioned deletes
63 ReplicateDeleteMarkers string // replicate soft deletes
64 ReplicaSync string // replicate replica metadata modifications
65 ExistingObjectReplicate string
66}
67
68// Tags returns a slice of tags for a rule
69func (opts Options) Tags() ([]Tag, error) {
70 var tagList []Tag
71 tagTokens := strings.Split(opts.TagString, "&")
72 for _, tok := range tagTokens {
73 if tok == "" {
74 break
75 }
76 kv := strings.SplitN(tok, "=", 2)
77 if len(kv) != 2 {
78 return []Tag{}, fmt.Errorf("tags should be entered as comma separated k=v pairs")
79 }
80 tagList = append(tagList, Tag{
81 Key: kv[0],
82 Value: kv[1],
83 })
84 }
85 return tagList, nil
86}
87
88// Config - replication configuration specified in
89// https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-add-config.html
90type Config struct {
91 XMLName xml.Name `xml:"ReplicationConfiguration" json:"-"`
92 Rules []Rule `xml:"Rule" json:"Rules"`
93 Role string `xml:"Role" json:"Role"`
94}
95
96// Empty returns true if config is not set
97func (c *Config) Empty() bool {
98 return len(c.Rules) == 0
99}
100
101// AddRule adds a new rule to existing replication config. If a rule exists with the
102// same ID, then the rule is replaced.
103func (c *Config) AddRule(opts Options) error {
104 priority, err := strconv.Atoi(opts.Priority)
105 if err != nil {
106 return err
107 }
108 var compatSw bool // true if RoleArn is used with new mc client and older minio version prior to multisite
109 if opts.RoleArn != "" {
110 tokens := strings.Split(opts.RoleArn, ":")
111 if len(tokens) != 6 {
112 return fmt.Errorf("invalid format for replication Role Arn: %v", opts.RoleArn)
113 }
114 switch {
115 case strings.HasPrefix(opts.RoleArn, "arn:minio:replication") && len(c.Rules) == 0:
116 c.Role = opts.RoleArn
117 compatSw = true
118 case strings.HasPrefix(opts.RoleArn, "arn:aws:iam"):
119 c.Role = opts.RoleArn
120 default:
121 return fmt.Errorf("RoleArn invalid for AWS replication configuration: %v", opts.RoleArn)
122 }
123 }
124
125 var status Status
126 // toggle rule status for edit option
127 switch opts.RuleStatus {
128 case "enable":
129 status = Enabled
130 case "disable":
131 status = Disabled
132 default:
133 return fmt.Errorf("rule state should be either [enable|disable]")
134 }
135
136 tags, err := opts.Tags()
137 if err != nil {
138 return err
139 }
140 andVal := And{
141 Tags: tags,
142 }
143 filter := Filter{Prefix: opts.Prefix}
144 // only a single tag is set.
145 if opts.Prefix == "" && len(tags) == 1 {
146 filter.Tag = tags[0]
147 }
148 // both prefix and tag are present
149 if len(andVal.Tags) > 1 || opts.Prefix != "" {
150 filter.And = andVal
151 filter.And.Prefix = opts.Prefix
152 filter.Prefix = ""
153 filter.Tag = Tag{}
154 }
155 if opts.ID == "" {
156 opts.ID = xid.New().String()
157 }
158
159 destBucket := opts.DestBucket
160 // ref https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-arn-format.html
161 if btokens := strings.Split(destBucket, ":"); len(btokens) != 6 {
162 if len(btokens) == 1 && compatSw {
163 destBucket = fmt.Sprintf("arn:aws:s3:::%s", destBucket)
164 } else {
165 return fmt.Errorf("destination bucket needs to be in Arn format")
166 }
167 }
168 dmStatus := Disabled
169 if opts.ReplicateDeleteMarkers != "" {
170 switch opts.ReplicateDeleteMarkers {
171 case "enable":
172 dmStatus = Enabled
173 case "disable":
174 dmStatus = Disabled
175 default:
176 return fmt.Errorf("ReplicateDeleteMarkers should be either enable|disable")
177 }
178 }
179
180 vDeleteStatus := Disabled
181 if opts.ReplicateDeletes != "" {
182 switch opts.ReplicateDeletes {
183 case "enable":
184 vDeleteStatus = Enabled
185 case "disable":
186 vDeleteStatus = Disabled
187 default:
188 return fmt.Errorf("ReplicateDeletes should be either enable|disable")
189 }
190 }
191 var replicaSync Status
192 // replica sync is by default Enabled, unless specified.
193 switch opts.ReplicaSync {
194 case "enable", "":
195 replicaSync = Enabled
196 case "disable":
197 replicaSync = Disabled
198 default:
199 return fmt.Errorf("replica metadata sync should be either [enable|disable]")
200 }
201
202 var existingStatus Status
203 if opts.ExistingObjectReplicate != "" {
204 switch opts.ExistingObjectReplicate {
205 case "enable":
206 existingStatus = Enabled
207 case "disable", "":
208 existingStatus = Disabled
209 default:
210 return fmt.Errorf("existingObjectReplicate should be either enable|disable")
211 }
212 }
213 newRule := Rule{
214 ID: opts.ID,
215 Priority: priority,
216 Status: status,
217 Filter: filter,
218 Destination: Destination{
219 Bucket: destBucket,
220 StorageClass: opts.StorageClass,
221 },
222 DeleteMarkerReplication: DeleteMarkerReplication{Status: dmStatus},
223 DeleteReplication: DeleteReplication{Status: vDeleteStatus},
224 // MinIO enables replica metadata syncing by default in the case of bi-directional replication to allow
225 // automatic failover as the expectation in this case is that replica and source should be identical.
226 // However AWS leaves this configurable https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-for-metadata-changes.html
227 SourceSelectionCriteria: SourceSelectionCriteria{
228 ReplicaModifications: ReplicaModifications{
229 Status: replicaSync,
230 },
231 },
232 // By default disable existing object replication unless selected
233 ExistingObjectReplication: ExistingObjectReplication{
234 Status: existingStatus,
235 },
236 }
237
238 // validate rule after overlaying priority for pre-existing rule being disabled.
239 if err := newRule.Validate(); err != nil {
240 return err
241 }
242 // if replication config uses RoleArn, migrate this to the destination element as target ARN for remote bucket for MinIO configuration
243 if c.Role != "" && !strings.HasPrefix(c.Role, "arn:aws:iam") && !compatSw {
244 for i := range c.Rules {
245 c.Rules[i].Destination.Bucket = c.Role
246 }
247 c.Role = ""
248 }
249
250 for _, rule := range c.Rules {
251 if rule.Priority == newRule.Priority {
252 return fmt.Errorf("priority must be unique. Replication configuration already has a rule with this priority")
253 }
254 if rule.ID == newRule.ID {
255 return fmt.Errorf("a rule exists with this ID")
256 }
257 }
258
259 c.Rules = append(c.Rules, newRule)
260 return nil
261}
262
263// EditRule modifies an existing rule in replication config
264func (c *Config) EditRule(opts Options) error {
265 if opts.ID == "" {
266 return fmt.Errorf("rule ID missing")
267 }
268 // if replication config uses RoleArn, migrate this to the destination element as target ARN for remote bucket for non AWS.
269 if c.Role != "" && !strings.HasPrefix(c.Role, "arn:aws:iam") && len(c.Rules) > 1 {
270 for i := range c.Rules {
271 c.Rules[i].Destination.Bucket = c.Role
272 }
273 c.Role = ""
274 }
275
276 rIdx := -1
277 var newRule Rule
278 for i, rule := range c.Rules {
279 if rule.ID == opts.ID {
280 rIdx = i
281 newRule = rule
282 break
283 }
284 }
285 if rIdx < 0 {
286 return fmt.Errorf("rule with ID %s not found in replication configuration", opts.ID)
287 }
288 prefixChg := opts.Prefix != newRule.Prefix()
289 if opts.IsTagSet || prefixChg {
290 prefix := newRule.Prefix()
291 if prefix != opts.Prefix {
292 prefix = opts.Prefix
293 }
294 tags := []Tag{newRule.Filter.Tag}
295 if len(newRule.Filter.And.Tags) != 0 {
296 tags = newRule.Filter.And.Tags
297 }
298 var err error
299 if opts.IsTagSet {
300 tags, err = opts.Tags()
301 if err != nil {
302 return err
303 }
304 }
305 andVal := And{
306 Tags: tags,
307 }
308
309 filter := Filter{Prefix: prefix}
310 // only a single tag is set.
311 if prefix == "" && len(tags) == 1 {
312 filter.Tag = tags[0]
313 }
314 // both prefix and tag are present
315 if len(andVal.Tags) > 1 || prefix != "" {
316 filter.And = andVal
317 filter.And.Prefix = prefix
318 filter.Prefix = ""
319 filter.Tag = Tag{}
320 }
321 newRule.Filter = filter
322 }
323
324 // toggle rule status for edit option
325 if opts.RuleStatus != "" {
326 switch opts.RuleStatus {
327 case "enable":
328 newRule.Status = Enabled
329 case "disable":
330 newRule.Status = Disabled
331 default:
332 return fmt.Errorf("rule state should be either [enable|disable]")
333 }
334 }
335 // set DeleteMarkerReplication rule status for edit option
336 if opts.ReplicateDeleteMarkers != "" {
337 switch opts.ReplicateDeleteMarkers {
338 case "enable":
339 newRule.DeleteMarkerReplication.Status = Enabled
340 case "disable":
341 newRule.DeleteMarkerReplication.Status = Disabled
342 default:
343 return fmt.Errorf("ReplicateDeleteMarkers state should be either [enable|disable]")
344 }
345 }
346
347 // set DeleteReplication rule status for edit option. This is a MinIO specific
348 // option to replicate versioned deletes
349 if opts.ReplicateDeletes != "" {
350 switch opts.ReplicateDeletes {
351 case "enable":
352 newRule.DeleteReplication.Status = Enabled
353 case "disable":
354 newRule.DeleteReplication.Status = Disabled
355 default:
356 return fmt.Errorf("ReplicateDeletes state should be either [enable|disable]")
357 }
358 }
359
360 if opts.ReplicaSync != "" {
361 switch opts.ReplicaSync {
362 case "enable", "":
363 newRule.SourceSelectionCriteria.ReplicaModifications.Status = Enabled
364 case "disable":
365 newRule.SourceSelectionCriteria.ReplicaModifications.Status = Disabled
366 default:
367 return fmt.Errorf("replica metadata sync should be either [enable|disable]")
368 }
369 }
370
371 if opts.ExistingObjectReplicate != "" {
372 switch opts.ExistingObjectReplicate {
373 case "enable":
374 newRule.ExistingObjectReplication.Status = Enabled
375 case "disable":
376 newRule.ExistingObjectReplication.Status = Disabled
377 default:
378 return fmt.Errorf("existingObjectsReplication state should be either [enable|disable]")
379 }
380 }
381 if opts.IsSCSet {
382 newRule.Destination.StorageClass = opts.StorageClass
383 }
384 if opts.Priority != "" {
385 priority, err := strconv.Atoi(opts.Priority)
386 if err != nil {
387 return err
388 }
389 newRule.Priority = priority
390 }
391 if opts.DestBucket != "" {
392 destBucket := opts.DestBucket
393 // ref https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-arn-format.html
394 if btokens := strings.Split(opts.DestBucket, ":"); len(btokens) != 6 {
395 return fmt.Errorf("destination bucket needs to be in Arn format")
396 }
397 newRule.Destination.Bucket = destBucket
398 }
399 // validate rule
400 if err := newRule.Validate(); err != nil {
401 return err
402 }
403 // ensure priority and destination bucket restrictions are not violated
404 for idx, rule := range c.Rules {
405 if rule.Priority == newRule.Priority && rIdx != idx {
406 return fmt.Errorf("priority must be unique. Replication configuration already has a rule with this priority")
407 }
408 if rule.Destination.Bucket != newRule.Destination.Bucket && rule.ID == newRule.ID {
409 return fmt.Errorf("invalid destination bucket for this rule")
410 }
411 }
412
413 c.Rules[rIdx] = newRule
414 return nil
415}
416
417// RemoveRule removes a rule from replication config.
418func (c *Config) RemoveRule(opts Options) error {
419 var newRules []Rule
420 ruleFound := false
421 for _, rule := range c.Rules {
422 if rule.ID != opts.ID {
423 newRules = append(newRules, rule)
424 continue
425 }
426 ruleFound = true
427 }
428 if !ruleFound {
429 return fmt.Errorf("Rule with ID %s not found", opts.ID)
430 }
431 if len(newRules) == 0 {
432 return fmt.Errorf("replication configuration should have at least one rule")
433 }
434 c.Rules = newRules
435 return nil
436}
437
438// Rule - a rule for replication configuration.
439type Rule struct {
440 XMLName xml.Name `xml:"Rule" json:"-"`
441 ID string `xml:"ID,omitempty"`
442 Status Status `xml:"Status"`
443 Priority int `xml:"Priority"`
444 DeleteMarkerReplication DeleteMarkerReplication `xml:"DeleteMarkerReplication"`
445 DeleteReplication DeleteReplication `xml:"DeleteReplication"`
446 Destination Destination `xml:"Destination"`
447 Filter Filter `xml:"Filter" json:"Filter"`
448 SourceSelectionCriteria SourceSelectionCriteria `xml:"SourceSelectionCriteria" json:"SourceSelectionCriteria"`
449 ExistingObjectReplication ExistingObjectReplication `xml:"ExistingObjectReplication,omitempty" json:"ExistingObjectReplication,omitempty"`
450}
451
452// Validate validates the rule for correctness
453func (r Rule) Validate() error {
454 if err := r.validateID(); err != nil {
455 return err
456 }
457 if err := r.validateStatus(); err != nil {
458 return err
459 }
460 if err := r.validateFilter(); err != nil {
461 return err
462 }
463
464 if r.Priority < 0 && r.Status == Enabled {
465 return fmt.Errorf("priority must be set for the rule")
466 }
467
468 if err := r.validateStatus(); err != nil {
469 return err
470 }
471 return r.ExistingObjectReplication.Validate()
472}
473
474// validateID - checks if ID is valid or not.
475func (r Rule) validateID() error {
476 // cannot be longer than 255 characters
477 if len(r.ID) > 255 {
478 return fmt.Errorf("ID must be less than 255 characters")
479 }
480 return nil
481}
482
483// validateStatus - checks if status is valid or not.
484func (r Rule) validateStatus() error {
485 // Status can't be empty
486 if len(r.Status) == 0 {
487 return fmt.Errorf("status cannot be empty")
488 }
489
490 // Status must be one of Enabled or Disabled
491 if r.Status != Enabled && r.Status != Disabled {
492 return fmt.Errorf("status must be set to either Enabled or Disabled")
493 }
494 return nil
495}
496
497func (r Rule) validateFilter() error {
498 return r.Filter.Validate()
499}
500
501// Prefix - a rule can either have prefix under <filter></filter> or under
502// <filter><and></and></filter>. This method returns the prefix from the
503// location where it is available
504func (r Rule) Prefix() string {
505 if r.Filter.Prefix != "" {
506 return r.Filter.Prefix
507 }
508 return r.Filter.And.Prefix
509}
510
511// Tags - a rule can either have tag under <filter></filter> or under
512// <filter><and></and></filter>. This method returns all the tags from the
513// rule in the format tag1=value1&tag2=value2
514func (r Rule) Tags() string {
515 ts := []Tag{r.Filter.Tag}
516 if len(r.Filter.And.Tags) != 0 {
517 ts = r.Filter.And.Tags
518 }
519
520 var buf bytes.Buffer
521 for _, t := range ts {
522 if buf.Len() > 0 {
523 buf.WriteString("&")
524 }
525 buf.WriteString(t.String())
526 }
527 return buf.String()
528}
529
530// Filter - a filter for a replication configuration Rule.
531type Filter struct {
532 XMLName xml.Name `xml:"Filter" json:"-"`
533 Prefix string `json:"Prefix,omitempty"`
534 And And `xml:"And,omitempty" json:"And,omitempty"`
535 Tag Tag `xml:"Tag,omitempty" json:"Tag,omitempty"`
536}
537
538// Validate - validates the filter element
539func (f Filter) Validate() error {
540 // A Filter must have exactly one of Prefix, Tag, or And specified.
541 if !f.And.isEmpty() {
542 if f.Prefix != "" {
543 return errInvalidFilter
544 }
545 if !f.Tag.IsEmpty() {
546 return errInvalidFilter
547 }
548 }
549 if f.Prefix != "" {
550 if !f.Tag.IsEmpty() {
551 return errInvalidFilter
552 }
553 }
554 if !f.Tag.IsEmpty() {
555 if err := f.Tag.Validate(); err != nil {
556 return err
557 }
558 }
559 return nil
560}
561
562// Tag - a tag for a replication configuration Rule filter.
563type Tag struct {
564 XMLName xml.Name `json:"-"`
565 Key string `xml:"Key,omitempty" json:"Key,omitempty"`
566 Value string `xml:"Value,omitempty" json:"Value,omitempty"`
567}
568
569func (tag Tag) String() string {
570 if tag.IsEmpty() {
571 return ""
572 }
573 return tag.Key + "=" + tag.Value
574}
575
576// IsEmpty returns whether this tag is empty or not.
577func (tag Tag) IsEmpty() bool {
578 return tag.Key == ""
579}
580
581// Validate checks this tag.
582func (tag Tag) Validate() error {
583 if len(tag.Key) == 0 || utf8.RuneCountInString(tag.Key) > 128 {
584 return fmt.Errorf("invalid Tag Key")
585 }
586
587 if utf8.RuneCountInString(tag.Value) > 256 {
588 return fmt.Errorf("invalid Tag Value")
589 }
590 return nil
591}
592
593// Destination - destination in ReplicationConfiguration.
594type Destination struct {
595 XMLName xml.Name `xml:"Destination" json:"-"`
596 Bucket string `xml:"Bucket" json:"Bucket"`
597 StorageClass string `xml:"StorageClass,omitempty" json:"StorageClass,omitempty"`
598}
599
600// And - a tag to combine a prefix and multiple tags for replication configuration rule.
601type And struct {
602 XMLName xml.Name `xml:"And,omitempty" json:"-"`
603 Prefix string `xml:"Prefix,omitempty" json:"Prefix,omitempty"`
604 Tags []Tag `xml:"Tag,omitempty" json:"Tag,omitempty"`
605}
606
607// isEmpty returns true if Tags field is null
608func (a And) isEmpty() bool {
609 return len(a.Tags) == 0 && a.Prefix == ""
610}
611
612// Status represents Enabled/Disabled status
613type Status string
614
615// Supported status types
616const (
617 Enabled Status = "Enabled"
618 Disabled Status = "Disabled"
619)
620
621// DeleteMarkerReplication - whether delete markers are replicated - https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-add-config.html
622type DeleteMarkerReplication struct {
623 Status Status `xml:"Status" json:"Status"` // should be set to "Disabled" by default
624}
625
626// IsEmpty returns true if DeleteMarkerReplication is not set
627func (d DeleteMarkerReplication) IsEmpty() bool {
628 return len(d.Status) == 0
629}
630
631// DeleteReplication - whether versioned deletes are replicated - this
632// is a MinIO specific extension
633type DeleteReplication struct {
634 Status Status `xml:"Status" json:"Status"` // should be set to "Disabled" by default
635}
636
637// IsEmpty returns true if DeleteReplication is not set
638func (d DeleteReplication) IsEmpty() bool {
639 return len(d.Status) == 0
640}
641
642// ReplicaModifications specifies if replica modification sync is enabled
643type ReplicaModifications struct {
644 Status Status `xml:"Status" json:"Status"` // should be set to "Enabled" by default
645}
646
647// SourceSelectionCriteria - specifies additional source selection criteria in ReplicationConfiguration.
648type SourceSelectionCriteria struct {
649 ReplicaModifications ReplicaModifications `xml:"ReplicaModifications" json:"ReplicaModifications"`
650}
651
652// IsValid - checks whether SourceSelectionCriteria is valid or not.
653func (s SourceSelectionCriteria) IsValid() bool {
654 return s.ReplicaModifications.Status == Enabled || s.ReplicaModifications.Status == Disabled
655}
656
657// Validate source selection criteria
658func (s SourceSelectionCriteria) Validate() error {
659 if (s == SourceSelectionCriteria{}) {
660 return nil
661 }
662 if !s.IsValid() {
663 return fmt.Errorf("invalid ReplicaModification status")
664 }
665 return nil
666}
667
668// ExistingObjectReplication - whether existing object replication is enabled
669type ExistingObjectReplication struct {
670 Status Status `xml:"Status"` // should be set to "Disabled" by default
671}
672
673// IsEmpty returns true if DeleteMarkerReplication is not set
674func (e ExistingObjectReplication) IsEmpty() bool {
675 return len(e.Status) == 0
676}
677
678// Validate validates whether the status is disabled.
679func (e ExistingObjectReplication) Validate() error {
680 if e.IsEmpty() {
681 return nil
682 }
683 if e.Status != Disabled && e.Status != Enabled {
684 return fmt.Errorf("invalid ExistingObjectReplication status")
685 }
686 return nil
687}
688
689// TargetMetrics represents inline replication metrics
690// such as pending, failed and completed bytes in total for a bucket remote target
691type TargetMetrics struct {
692 // Completed count
693 ReplicatedCount uint64 `json:"replicationCount,omitempty"`
694 // Completed size in bytes
695 ReplicatedSize uint64 `json:"completedReplicationSize,omitempty"`
696 // Bandwidth limit in bytes/sec for this target
697 BandWidthLimitInBytesPerSecond int64 `json:"limitInBits,omitempty"`
698 // Current bandwidth used in bytes/sec for this target
699 CurrentBandwidthInBytesPerSecond float64 `json:"currentBandwidth,omitempty"`
700 // errors seen in replication in last minute, hour and total
701 Failed TimedErrStats `json:"failed,omitempty"`
702 // Deprecated fields
703 // Pending size in bytes
704 PendingSize uint64 `json:"pendingReplicationSize,omitempty"`
705 // Total Replica size in bytes
706 ReplicaSize uint64 `json:"replicaSize,omitempty"`
707 // Failed size in bytes
708 FailedSize uint64 `json:"failedReplicationSize,omitempty"`
709 // Total number of pending operations including metadata updates
710 PendingCount uint64 `json:"pendingReplicationCount,omitempty"`
711 // Total number of failed operations including metadata updates
712 FailedCount uint64 `json:"failedReplicationCount,omitempty"`
713}
714
715// Metrics represents inline replication metrics for a bucket.
716type Metrics struct {
717 Stats map[string]TargetMetrics
718 // Completed size in bytes across targets
719 ReplicatedSize uint64 `json:"completedReplicationSize,omitempty"`
720 // Total Replica size in bytes across targets
721 ReplicaSize uint64 `json:"replicaSize,omitempty"`
722 // Total Replica counts
723 ReplicaCount int64 `json:"replicaCount,omitempty"`
724 // Total Replicated count
725 ReplicatedCount int64 `json:"replicationCount,omitempty"`
726 // errors seen in replication in last minute, hour and total
727 Errors TimedErrStats `json:"failed,omitempty"`
728 // Total number of entries that are queued for replication
729 QStats InQueueMetric `json:"queued"`
730 // Deprecated fields
731 // Total Pending size in bytes across targets
732 PendingSize uint64 `json:"pendingReplicationSize,omitempty"`
733 // Failed size in bytes across targets
734 FailedSize uint64 `json:"failedReplicationSize,omitempty"`
735 // Total number of pending operations including metadata updates across targets
736 PendingCount uint64 `json:"pendingReplicationCount,omitempty"`
737 // Total number of failed operations including metadata updates across targets
738 FailedCount uint64 `json:"failedReplicationCount,omitempty"`
739}
740
741// RStat - has count and bytes for replication metrics
742type RStat struct {
743 Count float64 `json:"count"`
744 Bytes int64 `json:"bytes"`
745}
746
747// Add two RStat
748func (r RStat) Add(r1 RStat) RStat {
749 return RStat{
750 Count: r.Count + r1.Count,
751 Bytes: r.Bytes + r1.Bytes,
752 }
753}
754
755// TimedErrStats holds error stats for a time period
756type TimedErrStats struct {
757 LastMinute RStat `json:"lastMinute"`
758 LastHour RStat `json:"lastHour"`
759 Totals RStat `json:"totals"`
760}
761
762// Add two TimedErrStats
763func (te TimedErrStats) Add(o TimedErrStats) TimedErrStats {
764 return TimedErrStats{
765 LastMinute: te.LastMinute.Add(o.LastMinute),
766 LastHour: te.LastHour.Add(o.LastHour),
767 Totals: te.Totals.Add(o.Totals),
768 }
769}
770
771// ResyncTargetsInfo provides replication target information to resync replicated data.
772type ResyncTargetsInfo struct {
773 Targets []ResyncTarget `json:"target,omitempty"`
774}
775
776// ResyncTarget provides the replica resources and resetID to initiate resync replication.
777type ResyncTarget struct {
778 Arn string `json:"arn"`
779 ResetID string `json:"resetid"`
780 StartTime time.Time `json:"startTime,omitempty"`
781 EndTime time.Time `json:"endTime,omitempty"`
782 // Status of resync operation
783 ResyncStatus string `json:"resyncStatus,omitempty"`
784 // Completed size in bytes
785 ReplicatedSize int64 `json:"completedReplicationSize,omitempty"`
786 // Failed size in bytes
787 FailedSize int64 `json:"failedReplicationSize,omitempty"`
788 // Total number of failed operations
789 FailedCount int64 `json:"failedReplicationCount,omitempty"`
790 // Total number of completed operations
791 ReplicatedCount int64 `json:"replicationCount,omitempty"`
792 // Last bucket/object replicated.
793 Bucket string `json:"bucket,omitempty"`
794 Object string `json:"object,omitempty"`
795}
796
797// XferStats holds transfer rate info for uploads/sec
798type XferStats struct {
799 AvgRate float64 `json:"avgRate"`
800 PeakRate float64 `json:"peakRate"`
801 CurrRate float64 `json:"currRate"`
802}
803
804// Merge two XferStats
805func (x *XferStats) Merge(x1 XferStats) {
806 x.AvgRate += x1.AvgRate
807 x.PeakRate += x1.PeakRate
808 x.CurrRate += x1.CurrRate
809}
810
811// QStat holds count and bytes for objects in replication queue
812type QStat struct {
813 Count float64 `json:"count"`
814 Bytes float64 `json:"bytes"`
815}
816
817// Add 2 QStat entries
818func (q *QStat) Add(q1 QStat) {
819 q.Count += q1.Count
820 q.Bytes += q1.Bytes
821}
822
823// InQueueMetric holds stats for objects in replication queue
824type InQueueMetric struct {
825 Curr QStat `json:"curr" msg:"cq"`
826 Avg QStat `json:"avg" msg:"aq"`
827 Max QStat `json:"peak" msg:"pq"`
828}
829
830// MetricName name of replication metric
831type MetricName string
832
833const (
834 // Large is a metric name for large objects >=128MiB
835 Large MetricName = "Large"
836 // Small is a metric name for objects <128MiB size
837 Small MetricName = "Small"
838 // Total is a metric name for total objects
839 Total MetricName = "Total"
840)
841
842// WorkerStat has stats on number of replication workers
843type WorkerStat struct {
844 Curr int32 `json:"curr"`
845 Avg float32 `json:"avg"`
846 Max int32 `json:"max"`
847}
848
849// ReplMRFStats holds stats of MRF backlog saved to disk in the last 5 minutes
850// and number of entries that failed replication after 3 retries
851type ReplMRFStats struct {
852 LastFailedCount uint64 `json:"failedCount_last5min"`
853 // Count of unreplicated entries that were dropped after MRF retry limit reached since cluster start.
854 TotalDroppedCount uint64 `json:"droppedCount_since_uptime"`
855 // Bytes of unreplicated entries that were dropped after MRF retry limit reached since cluster start.
856 TotalDroppedBytes uint64 `json:"droppedBytes_since_uptime"`
857}
858
859// ReplQNodeStats holds stats for a node in replication queue
860type ReplQNodeStats struct {
861 NodeName string `json:"nodeName"`
862 Uptime int64 `json:"uptime"`
863 Workers WorkerStat `json:"activeWorkers"`
864
865 XferStats map[MetricName]XferStats `json:"transferSummary"`
866 TgtXferStats map[string]map[MetricName]XferStats `json:"tgtTransferStats"`
867
868 QStats InQueueMetric `json:"queueStats"`
869 MRFStats ReplMRFStats `json:"mrfStats"`
870}
871
872// ReplQueueStats holds stats for replication queue across nodes
873type ReplQueueStats struct {
874 Nodes []ReplQNodeStats `json:"nodes"`
875}
876
877// Workers returns number of workers across all nodes
878func (q ReplQueueStats) Workers() (tot WorkerStat) {
879 for _, node := range q.Nodes {
880 tot.Avg += node.Workers.Avg
881 tot.Curr += node.Workers.Curr
882 if tot.Max < node.Workers.Max {
883 tot.Max = node.Workers.Max
884 }
885 }
886 if len(q.Nodes) > 0 {
887 tot.Avg /= float32(len(q.Nodes))
888 tot.Curr /= int32(len(q.Nodes))
889 }
890 return tot
891}
892
893// qStatSummary returns cluster level stats for objects in replication queue
894func (q ReplQueueStats) qStatSummary() InQueueMetric {
895 m := InQueueMetric{}
896 for _, v := range q.Nodes {
897 m.Avg.Add(v.QStats.Avg)
898 m.Curr.Add(v.QStats.Curr)
899 if m.Max.Count < v.QStats.Max.Count {
900 m.Max.Add(v.QStats.Max)
901 }
902 }
903 return m
904}
905
906// ReplQStats holds stats for objects in replication queue
907type ReplQStats struct {
908 Uptime int64 `json:"uptime"`
909 Workers WorkerStat `json:"workers"`
910
911 XferStats map[MetricName]XferStats `json:"xferStats"`
912 TgtXferStats map[string]map[MetricName]XferStats `json:"tgtXferStats"`
913
914 QStats InQueueMetric `json:"qStats"`
915 MRFStats ReplMRFStats `json:"mrfStats"`
916}
917
918// QStats returns cluster level stats for objects in replication queue
919func (q ReplQueueStats) QStats() (r ReplQStats) {
920 r.QStats = q.qStatSummary()
921 r.XferStats = make(map[MetricName]XferStats)
922 r.TgtXferStats = make(map[string]map[MetricName]XferStats)
923 r.Workers = q.Workers()
924
925 for _, node := range q.Nodes {
926 for arn := range node.TgtXferStats {
927 xmap, ok := node.TgtXferStats[arn]
928 if !ok {
929 xmap = make(map[MetricName]XferStats)
930 }
931 for m, v := range xmap {
932 st, ok := r.XferStats[m]
933 if !ok {
934 st = XferStats{}
935 }
936 st.AvgRate += v.AvgRate
937 st.CurrRate += v.CurrRate
938 st.PeakRate = math.Max(st.PeakRate, v.PeakRate)
939 if _, ok := r.TgtXferStats[arn]; !ok {
940 r.TgtXferStats[arn] = make(map[MetricName]XferStats)
941 }
942 r.TgtXferStats[arn][m] = st
943 }
944 }
945 for k, v := range node.XferStats {
946 st, ok := r.XferStats[k]
947 if !ok {
948 st = XferStats{}
949 }
950 st.AvgRate += v.AvgRate
951 st.CurrRate += v.CurrRate
952 st.PeakRate = math.Max(st.PeakRate, v.PeakRate)
953 r.XferStats[k] = st
954 }
955 r.MRFStats.LastFailedCount += node.MRFStats.LastFailedCount
956 r.MRFStats.TotalDroppedCount += node.MRFStats.TotalDroppedCount
957 r.MRFStats.TotalDroppedBytes += node.MRFStats.TotalDroppedBytes
958 r.Uptime += node.Uptime
959 }
960 if len(q.Nodes) > 0 {
961 r.Uptime /= int64(len(q.Nodes)) // average uptime
962 }
963 return
964}
965
966// MetricsV2 represents replication metrics for a bucket.
967type MetricsV2 struct {
968 Uptime int64 `json:"uptime"`
969 CurrentStats Metrics `json:"currStats"`
970 QueueStats ReplQueueStats `json:"queueStats"`
971}