From 8db41da676ac8368ef7c2549d56239a5ff5eedde Mon Sep 17 00:00:00 2001 From: Rutger Broekhoff Date: Tue, 2 Jan 2024 18:56:31 +0100 Subject: Delete vendor directory --- .../minio-go/v7/pkg/replication/replication.go | 971 --------------------- 1 file changed, 971 deletions(-) delete mode 100644 vendor/github.com/minio/minio-go/v7/pkg/replication/replication.go (limited to 'vendor/github.com/minio/minio-go/v7/pkg/replication/replication.go') diff --git a/vendor/github.com/minio/minio-go/v7/pkg/replication/replication.go b/vendor/github.com/minio/minio-go/v7/pkg/replication/replication.go deleted file mode 100644 index 0abbf6e..0000000 --- a/vendor/github.com/minio/minio-go/v7/pkg/replication/replication.go +++ /dev/null @@ -1,971 +0,0 @@ -/* - * MinIO Client (C) 2020 MinIO, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package replication - -import ( - "bytes" - "encoding/xml" - "fmt" - "math" - "strconv" - "strings" - "time" - "unicode/utf8" - - "github.com/rs/xid" -) - -var errInvalidFilter = fmt.Errorf("invalid filter") - -// OptionType specifies operation to be performed on config -type OptionType string - -const ( - // AddOption specifies addition of rule to config - AddOption OptionType = "Add" - // SetOption specifies modification of existing rule to config - SetOption OptionType = "Set" - - // RemoveOption specifies rule options are for removing a rule - RemoveOption OptionType = "Remove" - // ImportOption is for getting current config - ImportOption OptionType = "Import" -) - -// Options represents options to set a replication configuration rule -type Options struct { - Op OptionType - RoleArn string - ID string - Prefix string - RuleStatus string - Priority string - TagString string - StorageClass string - DestBucket string - IsTagSet bool - IsSCSet bool - ReplicateDeletes string // replicate versioned deletes - ReplicateDeleteMarkers string // replicate soft deletes - ReplicaSync string // replicate replica metadata modifications - ExistingObjectReplicate string -} - -// Tags returns a slice of tags for a rule -func (opts Options) Tags() ([]Tag, error) { - var tagList []Tag - tagTokens := strings.Split(opts.TagString, "&") - for _, tok := range tagTokens { - if tok == "" { - break - } - kv := strings.SplitN(tok, "=", 2) - if len(kv) != 2 { - return []Tag{}, fmt.Errorf("tags should be entered as comma separated k=v pairs") - } - tagList = append(tagList, Tag{ - Key: kv[0], - Value: kv[1], - }) - } - return tagList, nil -} - -// Config - replication configuration specified in -// https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-add-config.html -type Config struct { - XMLName xml.Name `xml:"ReplicationConfiguration" json:"-"` - Rules []Rule `xml:"Rule" json:"Rules"` - Role string `xml:"Role" json:"Role"` -} - -// Empty returns true if config is not set -func (c *Config) Empty() bool { - return len(c.Rules) == 0 -} - -// AddRule adds a new rule to existing replication config. If a rule exists with the -// same ID, then the rule is replaced. -func (c *Config) AddRule(opts Options) error { - priority, err := strconv.Atoi(opts.Priority) - if err != nil { - return err - } - var compatSw bool // true if RoleArn is used with new mc client and older minio version prior to multisite - if opts.RoleArn != "" { - tokens := strings.Split(opts.RoleArn, ":") - if len(tokens) != 6 { - return fmt.Errorf("invalid format for replication Role Arn: %v", opts.RoleArn) - } - switch { - case strings.HasPrefix(opts.RoleArn, "arn:minio:replication") && len(c.Rules) == 0: - c.Role = opts.RoleArn - compatSw = true - case strings.HasPrefix(opts.RoleArn, "arn:aws:iam"): - c.Role = opts.RoleArn - default: - return fmt.Errorf("RoleArn invalid for AWS replication configuration: %v", opts.RoleArn) - } - } - - var status Status - // toggle rule status for edit option - switch opts.RuleStatus { - case "enable": - status = Enabled - case "disable": - status = Disabled - default: - return fmt.Errorf("rule state should be either [enable|disable]") - } - - tags, err := opts.Tags() - if err != nil { - return err - } - andVal := And{ - Tags: tags, - } - filter := Filter{Prefix: opts.Prefix} - // only a single tag is set. - if opts.Prefix == "" && len(tags) == 1 { - filter.Tag = tags[0] - } - // both prefix and tag are present - if len(andVal.Tags) > 1 || opts.Prefix != "" { - filter.And = andVal - filter.And.Prefix = opts.Prefix - filter.Prefix = "" - filter.Tag = Tag{} - } - if opts.ID == "" { - opts.ID = xid.New().String() - } - - destBucket := opts.DestBucket - // ref https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-arn-format.html - if btokens := strings.Split(destBucket, ":"); len(btokens) != 6 { - if len(btokens) == 1 && compatSw { - destBucket = fmt.Sprintf("arn:aws:s3:::%s", destBucket) - } else { - return fmt.Errorf("destination bucket needs to be in Arn format") - } - } - dmStatus := Disabled - if opts.ReplicateDeleteMarkers != "" { - switch opts.ReplicateDeleteMarkers { - case "enable": - dmStatus = Enabled - case "disable": - dmStatus = Disabled - default: - return fmt.Errorf("ReplicateDeleteMarkers should be either enable|disable") - } - } - - vDeleteStatus := Disabled - if opts.ReplicateDeletes != "" { - switch opts.ReplicateDeletes { - case "enable": - vDeleteStatus = Enabled - case "disable": - vDeleteStatus = Disabled - default: - return fmt.Errorf("ReplicateDeletes should be either enable|disable") - } - } - var replicaSync Status - // replica sync is by default Enabled, unless specified. - switch opts.ReplicaSync { - case "enable", "": - replicaSync = Enabled - case "disable": - replicaSync = Disabled - default: - return fmt.Errorf("replica metadata sync should be either [enable|disable]") - } - - var existingStatus Status - if opts.ExistingObjectReplicate != "" { - switch opts.ExistingObjectReplicate { - case "enable": - existingStatus = Enabled - case "disable", "": - existingStatus = Disabled - default: - return fmt.Errorf("existingObjectReplicate should be either enable|disable") - } - } - newRule := Rule{ - ID: opts.ID, - Priority: priority, - Status: status, - Filter: filter, - Destination: Destination{ - Bucket: destBucket, - StorageClass: opts.StorageClass, - }, - DeleteMarkerReplication: DeleteMarkerReplication{Status: dmStatus}, - DeleteReplication: DeleteReplication{Status: vDeleteStatus}, - // MinIO enables replica metadata syncing by default in the case of bi-directional replication to allow - // automatic failover as the expectation in this case is that replica and source should be identical. - // However AWS leaves this configurable https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-for-metadata-changes.html - SourceSelectionCriteria: SourceSelectionCriteria{ - ReplicaModifications: ReplicaModifications{ - Status: replicaSync, - }, - }, - // By default disable existing object replication unless selected - ExistingObjectReplication: ExistingObjectReplication{ - Status: existingStatus, - }, - } - - // validate rule after overlaying priority for pre-existing rule being disabled. - if err := newRule.Validate(); err != nil { - return err - } - // if replication config uses RoleArn, migrate this to the destination element as target ARN for remote bucket for MinIO configuration - if c.Role != "" && !strings.HasPrefix(c.Role, "arn:aws:iam") && !compatSw { - for i := range c.Rules { - c.Rules[i].Destination.Bucket = c.Role - } - c.Role = "" - } - - for _, rule := range c.Rules { - if rule.Priority == newRule.Priority { - return fmt.Errorf("priority must be unique. Replication configuration already has a rule with this priority") - } - if rule.ID == newRule.ID { - return fmt.Errorf("a rule exists with this ID") - } - } - - c.Rules = append(c.Rules, newRule) - return nil -} - -// EditRule modifies an existing rule in replication config -func (c *Config) EditRule(opts Options) error { - if opts.ID == "" { - return fmt.Errorf("rule ID missing") - } - // if replication config uses RoleArn, migrate this to the destination element as target ARN for remote bucket for non AWS. - if c.Role != "" && !strings.HasPrefix(c.Role, "arn:aws:iam") && len(c.Rules) > 1 { - for i := range c.Rules { - c.Rules[i].Destination.Bucket = c.Role - } - c.Role = "" - } - - rIdx := -1 - var newRule Rule - for i, rule := range c.Rules { - if rule.ID == opts.ID { - rIdx = i - newRule = rule - break - } - } - if rIdx < 0 { - return fmt.Errorf("rule with ID %s not found in replication configuration", opts.ID) - } - prefixChg := opts.Prefix != newRule.Prefix() - if opts.IsTagSet || prefixChg { - prefix := newRule.Prefix() - if prefix != opts.Prefix { - prefix = opts.Prefix - } - tags := []Tag{newRule.Filter.Tag} - if len(newRule.Filter.And.Tags) != 0 { - tags = newRule.Filter.And.Tags - } - var err error - if opts.IsTagSet { - tags, err = opts.Tags() - if err != nil { - return err - } - } - andVal := And{ - Tags: tags, - } - - filter := Filter{Prefix: prefix} - // only a single tag is set. - if prefix == "" && len(tags) == 1 { - filter.Tag = tags[0] - } - // both prefix and tag are present - if len(andVal.Tags) > 1 || prefix != "" { - filter.And = andVal - filter.And.Prefix = prefix - filter.Prefix = "" - filter.Tag = Tag{} - } - newRule.Filter = filter - } - - // toggle rule status for edit option - if opts.RuleStatus != "" { - switch opts.RuleStatus { - case "enable": - newRule.Status = Enabled - case "disable": - newRule.Status = Disabled - default: - return fmt.Errorf("rule state should be either [enable|disable]") - } - } - // set DeleteMarkerReplication rule status for edit option - if opts.ReplicateDeleteMarkers != "" { - switch opts.ReplicateDeleteMarkers { - case "enable": - newRule.DeleteMarkerReplication.Status = Enabled - case "disable": - newRule.DeleteMarkerReplication.Status = Disabled - default: - return fmt.Errorf("ReplicateDeleteMarkers state should be either [enable|disable]") - } - } - - // set DeleteReplication rule status for edit option. This is a MinIO specific - // option to replicate versioned deletes - if opts.ReplicateDeletes != "" { - switch opts.ReplicateDeletes { - case "enable": - newRule.DeleteReplication.Status = Enabled - case "disable": - newRule.DeleteReplication.Status = Disabled - default: - return fmt.Errorf("ReplicateDeletes state should be either [enable|disable]") - } - } - - if opts.ReplicaSync != "" { - switch opts.ReplicaSync { - case "enable", "": - newRule.SourceSelectionCriteria.ReplicaModifications.Status = Enabled - case "disable": - newRule.SourceSelectionCriteria.ReplicaModifications.Status = Disabled - default: - return fmt.Errorf("replica metadata sync should be either [enable|disable]") - } - } - - if opts.ExistingObjectReplicate != "" { - switch opts.ExistingObjectReplicate { - case "enable": - newRule.ExistingObjectReplication.Status = Enabled - case "disable": - newRule.ExistingObjectReplication.Status = Disabled - default: - return fmt.Errorf("existingObjectsReplication state should be either [enable|disable]") - } - } - if opts.IsSCSet { - newRule.Destination.StorageClass = opts.StorageClass - } - if opts.Priority != "" { - priority, err := strconv.Atoi(opts.Priority) - if err != nil { - return err - } - newRule.Priority = priority - } - if opts.DestBucket != "" { - destBucket := opts.DestBucket - // ref https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-arn-format.html - if btokens := strings.Split(opts.DestBucket, ":"); len(btokens) != 6 { - return fmt.Errorf("destination bucket needs to be in Arn format") - } - newRule.Destination.Bucket = destBucket - } - // validate rule - if err := newRule.Validate(); err != nil { - return err - } - // ensure priority and destination bucket restrictions are not violated - for idx, rule := range c.Rules { - if rule.Priority == newRule.Priority && rIdx != idx { - return fmt.Errorf("priority must be unique. Replication configuration already has a rule with this priority") - } - if rule.Destination.Bucket != newRule.Destination.Bucket && rule.ID == newRule.ID { - return fmt.Errorf("invalid destination bucket for this rule") - } - } - - c.Rules[rIdx] = newRule - return nil -} - -// RemoveRule removes a rule from replication config. -func (c *Config) RemoveRule(opts Options) error { - var newRules []Rule - ruleFound := false - for _, rule := range c.Rules { - if rule.ID != opts.ID { - newRules = append(newRules, rule) - continue - } - ruleFound = true - } - if !ruleFound { - return fmt.Errorf("Rule with ID %s not found", opts.ID) - } - if len(newRules) == 0 { - return fmt.Errorf("replication configuration should have at least one rule") - } - c.Rules = newRules - return nil -} - -// Rule - a rule for replication configuration. -type Rule struct { - XMLName xml.Name `xml:"Rule" json:"-"` - ID string `xml:"ID,omitempty"` - Status Status `xml:"Status"` - Priority int `xml:"Priority"` - DeleteMarkerReplication DeleteMarkerReplication `xml:"DeleteMarkerReplication"` - DeleteReplication DeleteReplication `xml:"DeleteReplication"` - Destination Destination `xml:"Destination"` - Filter Filter `xml:"Filter" json:"Filter"` - SourceSelectionCriteria SourceSelectionCriteria `xml:"SourceSelectionCriteria" json:"SourceSelectionCriteria"` - ExistingObjectReplication ExistingObjectReplication `xml:"ExistingObjectReplication,omitempty" json:"ExistingObjectReplication,omitempty"` -} - -// Validate validates the rule for correctness -func (r Rule) Validate() error { - if err := r.validateID(); err != nil { - return err - } - if err := r.validateStatus(); err != nil { - return err - } - if err := r.validateFilter(); err != nil { - return err - } - - if r.Priority < 0 && r.Status == Enabled { - return fmt.Errorf("priority must be set for the rule") - } - - if err := r.validateStatus(); err != nil { - return err - } - return r.ExistingObjectReplication.Validate() -} - -// validateID - checks if ID is valid or not. -func (r Rule) validateID() error { - // cannot be longer than 255 characters - if len(r.ID) > 255 { - return fmt.Errorf("ID must be less than 255 characters") - } - return nil -} - -// validateStatus - checks if status is valid or not. -func (r Rule) validateStatus() error { - // Status can't be empty - if len(r.Status) == 0 { - return fmt.Errorf("status cannot be empty") - } - - // Status must be one of Enabled or Disabled - if r.Status != Enabled && r.Status != Disabled { - return fmt.Errorf("status must be set to either Enabled or Disabled") - } - return nil -} - -func (r Rule) validateFilter() error { - return r.Filter.Validate() -} - -// Prefix - a rule can either have prefix under or under -// . This method returns the prefix from the -// location where it is available -func (r Rule) Prefix() string { - if r.Filter.Prefix != "" { - return r.Filter.Prefix - } - return r.Filter.And.Prefix -} - -// Tags - a rule can either have tag under or under -// . This method returns all the tags from the -// rule in the format tag1=value1&tag2=value2 -func (r Rule) Tags() string { - ts := []Tag{r.Filter.Tag} - if len(r.Filter.And.Tags) != 0 { - ts = r.Filter.And.Tags - } - - var buf bytes.Buffer - for _, t := range ts { - if buf.Len() > 0 { - buf.WriteString("&") - } - buf.WriteString(t.String()) - } - return buf.String() -} - -// Filter - a filter for a replication configuration Rule. -type Filter struct { - XMLName xml.Name `xml:"Filter" json:"-"` - Prefix string `json:"Prefix,omitempty"` - And And `xml:"And,omitempty" json:"And,omitempty"` - Tag Tag `xml:"Tag,omitempty" json:"Tag,omitempty"` -} - -// Validate - validates the filter element -func (f Filter) Validate() error { - // A Filter must have exactly one of Prefix, Tag, or And specified. - if !f.And.isEmpty() { - if f.Prefix != "" { - return errInvalidFilter - } - if !f.Tag.IsEmpty() { - return errInvalidFilter - } - } - if f.Prefix != "" { - if !f.Tag.IsEmpty() { - return errInvalidFilter - } - } - if !f.Tag.IsEmpty() { - if err := f.Tag.Validate(); err != nil { - return err - } - } - return nil -} - -// Tag - a tag for a replication configuration Rule filter. -type Tag struct { - XMLName xml.Name `json:"-"` - Key string `xml:"Key,omitempty" json:"Key,omitempty"` - Value string `xml:"Value,omitempty" json:"Value,omitempty"` -} - -func (tag Tag) String() string { - if tag.IsEmpty() { - return "" - } - return tag.Key + "=" + tag.Value -} - -// IsEmpty returns whether this tag is empty or not. -func (tag Tag) IsEmpty() bool { - return tag.Key == "" -} - -// Validate checks this tag. -func (tag Tag) Validate() error { - if len(tag.Key) == 0 || utf8.RuneCountInString(tag.Key) > 128 { - return fmt.Errorf("invalid Tag Key") - } - - if utf8.RuneCountInString(tag.Value) > 256 { - return fmt.Errorf("invalid Tag Value") - } - return nil -} - -// Destination - destination in ReplicationConfiguration. -type Destination struct { - XMLName xml.Name `xml:"Destination" json:"-"` - Bucket string `xml:"Bucket" json:"Bucket"` - StorageClass string `xml:"StorageClass,omitempty" json:"StorageClass,omitempty"` -} - -// And - a tag to combine a prefix and multiple tags for replication configuration rule. -type And struct { - XMLName xml.Name `xml:"And,omitempty" json:"-"` - Prefix string `xml:"Prefix,omitempty" json:"Prefix,omitempty"` - Tags []Tag `xml:"Tag,omitempty" json:"Tag,omitempty"` -} - -// isEmpty returns true if Tags field is null -func (a And) isEmpty() bool { - return len(a.Tags) == 0 && a.Prefix == "" -} - -// Status represents Enabled/Disabled status -type Status string - -// Supported status types -const ( - Enabled Status = "Enabled" - Disabled Status = "Disabled" -) - -// DeleteMarkerReplication - whether delete markers are replicated - https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-add-config.html -type DeleteMarkerReplication struct { - Status Status `xml:"Status" json:"Status"` // should be set to "Disabled" by default -} - -// IsEmpty returns true if DeleteMarkerReplication is not set -func (d DeleteMarkerReplication) IsEmpty() bool { - return len(d.Status) == 0 -} - -// DeleteReplication - whether versioned deletes are replicated - this -// is a MinIO specific extension -type DeleteReplication struct { - Status Status `xml:"Status" json:"Status"` // should be set to "Disabled" by default -} - -// IsEmpty returns true if DeleteReplication is not set -func (d DeleteReplication) IsEmpty() bool { - return len(d.Status) == 0 -} - -// ReplicaModifications specifies if replica modification sync is enabled -type ReplicaModifications struct { - Status Status `xml:"Status" json:"Status"` // should be set to "Enabled" by default -} - -// SourceSelectionCriteria - specifies additional source selection criteria in ReplicationConfiguration. -type SourceSelectionCriteria struct { - ReplicaModifications ReplicaModifications `xml:"ReplicaModifications" json:"ReplicaModifications"` -} - -// IsValid - checks whether SourceSelectionCriteria is valid or not. -func (s SourceSelectionCriteria) IsValid() bool { - return s.ReplicaModifications.Status == Enabled || s.ReplicaModifications.Status == Disabled -} - -// Validate source selection criteria -func (s SourceSelectionCriteria) Validate() error { - if (s == SourceSelectionCriteria{}) { - return nil - } - if !s.IsValid() { - return fmt.Errorf("invalid ReplicaModification status") - } - return nil -} - -// ExistingObjectReplication - whether existing object replication is enabled -type ExistingObjectReplication struct { - Status Status `xml:"Status"` // should be set to "Disabled" by default -} - -// IsEmpty returns true if DeleteMarkerReplication is not set -func (e ExistingObjectReplication) IsEmpty() bool { - return len(e.Status) == 0 -} - -// Validate validates whether the status is disabled. -func (e ExistingObjectReplication) Validate() error { - if e.IsEmpty() { - return nil - } - if e.Status != Disabled && e.Status != Enabled { - return fmt.Errorf("invalid ExistingObjectReplication status") - } - return nil -} - -// TargetMetrics represents inline replication metrics -// such as pending, failed and completed bytes in total for a bucket remote target -type TargetMetrics struct { - // Completed count - ReplicatedCount uint64 `json:"replicationCount,omitempty"` - // Completed size in bytes - ReplicatedSize uint64 `json:"completedReplicationSize,omitempty"` - // Bandwidth limit in bytes/sec for this target - BandWidthLimitInBytesPerSecond int64 `json:"limitInBits,omitempty"` - // Current bandwidth used in bytes/sec for this target - CurrentBandwidthInBytesPerSecond float64 `json:"currentBandwidth,omitempty"` - // errors seen in replication in last minute, hour and total - Failed TimedErrStats `json:"failed,omitempty"` - // Deprecated fields - // Pending size in bytes - PendingSize uint64 `json:"pendingReplicationSize,omitempty"` - // Total Replica size in bytes - ReplicaSize uint64 `json:"replicaSize,omitempty"` - // Failed size in bytes - FailedSize uint64 `json:"failedReplicationSize,omitempty"` - // Total number of pending operations including metadata updates - PendingCount uint64 `json:"pendingReplicationCount,omitempty"` - // Total number of failed operations including metadata updates - FailedCount uint64 `json:"failedReplicationCount,omitempty"` -} - -// Metrics represents inline replication metrics for a bucket. -type Metrics struct { - Stats map[string]TargetMetrics - // Completed size in bytes across targets - ReplicatedSize uint64 `json:"completedReplicationSize,omitempty"` - // Total Replica size in bytes across targets - ReplicaSize uint64 `json:"replicaSize,omitempty"` - // Total Replica counts - ReplicaCount int64 `json:"replicaCount,omitempty"` - // Total Replicated count - ReplicatedCount int64 `json:"replicationCount,omitempty"` - // errors seen in replication in last minute, hour and total - Errors TimedErrStats `json:"failed,omitempty"` - // Total number of entries that are queued for replication - QStats InQueueMetric `json:"queued"` - // Deprecated fields - // Total Pending size in bytes across targets - PendingSize uint64 `json:"pendingReplicationSize,omitempty"` - // Failed size in bytes across targets - FailedSize uint64 `json:"failedReplicationSize,omitempty"` - // Total number of pending operations including metadata updates across targets - PendingCount uint64 `json:"pendingReplicationCount,omitempty"` - // Total number of failed operations including metadata updates across targets - FailedCount uint64 `json:"failedReplicationCount,omitempty"` -} - -// RStat - has count and bytes for replication metrics -type RStat struct { - Count float64 `json:"count"` - Bytes int64 `json:"bytes"` -} - -// Add two RStat -func (r RStat) Add(r1 RStat) RStat { - return RStat{ - Count: r.Count + r1.Count, - Bytes: r.Bytes + r1.Bytes, - } -} - -// TimedErrStats holds error stats for a time period -type TimedErrStats struct { - LastMinute RStat `json:"lastMinute"` - LastHour RStat `json:"lastHour"` - Totals RStat `json:"totals"` -} - -// Add two TimedErrStats -func (te TimedErrStats) Add(o TimedErrStats) TimedErrStats { - return TimedErrStats{ - LastMinute: te.LastMinute.Add(o.LastMinute), - LastHour: te.LastHour.Add(o.LastHour), - Totals: te.Totals.Add(o.Totals), - } -} - -// ResyncTargetsInfo provides replication target information to resync replicated data. -type ResyncTargetsInfo struct { - Targets []ResyncTarget `json:"target,omitempty"` -} - -// ResyncTarget provides the replica resources and resetID to initiate resync replication. -type ResyncTarget struct { - Arn string `json:"arn"` - ResetID string `json:"resetid"` - StartTime time.Time `json:"startTime,omitempty"` - EndTime time.Time `json:"endTime,omitempty"` - // Status of resync operation - ResyncStatus string `json:"resyncStatus,omitempty"` - // Completed size in bytes - ReplicatedSize int64 `json:"completedReplicationSize,omitempty"` - // Failed size in bytes - FailedSize int64 `json:"failedReplicationSize,omitempty"` - // Total number of failed operations - FailedCount int64 `json:"failedReplicationCount,omitempty"` - // Total number of completed operations - ReplicatedCount int64 `json:"replicationCount,omitempty"` - // Last bucket/object replicated. - Bucket string `json:"bucket,omitempty"` - Object string `json:"object,omitempty"` -} - -// XferStats holds transfer rate info for uploads/sec -type XferStats struct { - AvgRate float64 `json:"avgRate"` - PeakRate float64 `json:"peakRate"` - CurrRate float64 `json:"currRate"` -} - -// Merge two XferStats -func (x *XferStats) Merge(x1 XferStats) { - x.AvgRate += x1.AvgRate - x.PeakRate += x1.PeakRate - x.CurrRate += x1.CurrRate -} - -// QStat holds count and bytes for objects in replication queue -type QStat struct { - Count float64 `json:"count"` - Bytes float64 `json:"bytes"` -} - -// Add 2 QStat entries -func (q *QStat) Add(q1 QStat) { - q.Count += q1.Count - q.Bytes += q1.Bytes -} - -// InQueueMetric holds stats for objects in replication queue -type InQueueMetric struct { - Curr QStat `json:"curr" msg:"cq"` - Avg QStat `json:"avg" msg:"aq"` - Max QStat `json:"peak" msg:"pq"` -} - -// MetricName name of replication metric -type MetricName string - -const ( - // Large is a metric name for large objects >=128MiB - Large MetricName = "Large" - // Small is a metric name for objects <128MiB size - Small MetricName = "Small" - // Total is a metric name for total objects - Total MetricName = "Total" -) - -// WorkerStat has stats on number of replication workers -type WorkerStat struct { - Curr int32 `json:"curr"` - Avg float32 `json:"avg"` - Max int32 `json:"max"` -} - -// ReplMRFStats holds stats of MRF backlog saved to disk in the last 5 minutes -// and number of entries that failed replication after 3 retries -type ReplMRFStats struct { - LastFailedCount uint64 `json:"failedCount_last5min"` - // Count of unreplicated entries that were dropped after MRF retry limit reached since cluster start. - TotalDroppedCount uint64 `json:"droppedCount_since_uptime"` - // Bytes of unreplicated entries that were dropped after MRF retry limit reached since cluster start. - TotalDroppedBytes uint64 `json:"droppedBytes_since_uptime"` -} - -// ReplQNodeStats holds stats for a node in replication queue -type ReplQNodeStats struct { - NodeName string `json:"nodeName"` - Uptime int64 `json:"uptime"` - Workers WorkerStat `json:"activeWorkers"` - - XferStats map[MetricName]XferStats `json:"transferSummary"` - TgtXferStats map[string]map[MetricName]XferStats `json:"tgtTransferStats"` - - QStats InQueueMetric `json:"queueStats"` - MRFStats ReplMRFStats `json:"mrfStats"` -} - -// ReplQueueStats holds stats for replication queue across nodes -type ReplQueueStats struct { - Nodes []ReplQNodeStats `json:"nodes"` -} - -// Workers returns number of workers across all nodes -func (q ReplQueueStats) Workers() (tot WorkerStat) { - for _, node := range q.Nodes { - tot.Avg += node.Workers.Avg - tot.Curr += node.Workers.Curr - if tot.Max < node.Workers.Max { - tot.Max = node.Workers.Max - } - } - if len(q.Nodes) > 0 { - tot.Avg /= float32(len(q.Nodes)) - tot.Curr /= int32(len(q.Nodes)) - } - return tot -} - -// qStatSummary returns cluster level stats for objects in replication queue -func (q ReplQueueStats) qStatSummary() InQueueMetric { - m := InQueueMetric{} - for _, v := range q.Nodes { - m.Avg.Add(v.QStats.Avg) - m.Curr.Add(v.QStats.Curr) - if m.Max.Count < v.QStats.Max.Count { - m.Max.Add(v.QStats.Max) - } - } - return m -} - -// ReplQStats holds stats for objects in replication queue -type ReplQStats struct { - Uptime int64 `json:"uptime"` - Workers WorkerStat `json:"workers"` - - XferStats map[MetricName]XferStats `json:"xferStats"` - TgtXferStats map[string]map[MetricName]XferStats `json:"tgtXferStats"` - - QStats InQueueMetric `json:"qStats"` - MRFStats ReplMRFStats `json:"mrfStats"` -} - -// QStats returns cluster level stats for objects in replication queue -func (q ReplQueueStats) QStats() (r ReplQStats) { - r.QStats = q.qStatSummary() - r.XferStats = make(map[MetricName]XferStats) - r.TgtXferStats = make(map[string]map[MetricName]XferStats) - r.Workers = q.Workers() - - for _, node := range q.Nodes { - for arn := range node.TgtXferStats { - xmap, ok := node.TgtXferStats[arn] - if !ok { - xmap = make(map[MetricName]XferStats) - } - for m, v := range xmap { - st, ok := r.XferStats[m] - if !ok { - st = XferStats{} - } - st.AvgRate += v.AvgRate - st.CurrRate += v.CurrRate - st.PeakRate = math.Max(st.PeakRate, v.PeakRate) - if _, ok := r.TgtXferStats[arn]; !ok { - r.TgtXferStats[arn] = make(map[MetricName]XferStats) - } - r.TgtXferStats[arn][m] = st - } - } - for k, v := range node.XferStats { - st, ok := r.XferStats[k] - if !ok { - st = XferStats{} - } - st.AvgRate += v.AvgRate - st.CurrRate += v.CurrRate - st.PeakRate = math.Max(st.PeakRate, v.PeakRate) - r.XferStats[k] = st - } - r.MRFStats.LastFailedCount += node.MRFStats.LastFailedCount - r.MRFStats.TotalDroppedCount += node.MRFStats.TotalDroppedCount - r.MRFStats.TotalDroppedBytes += node.MRFStats.TotalDroppedBytes - r.Uptime += node.Uptime - } - if len(q.Nodes) > 0 { - r.Uptime /= int64(len(q.Nodes)) // average uptime - } - return -} - -// MetricsV2 represents replication metrics for a bucket. -type MetricsV2 struct { - Uptime int64 `json:"uptime"` - CurrentStats Metrics `json:"currStats"` - QueueStats ReplQueueStats `json:"queueStats"` -} -- cgit v1.2.3