Skip to content

Commit

Permalink
seperate registry for cost attribution metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Oct 2, 2024
1 parent 788216a commit f1b4e74
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 61 deletions.
10 changes: 10 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -18161,6 +18161,16 @@
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "field",
"name": "custom_registry_path",
"required": false,
"desc": "",
"fieldValue": null,
"fieldDefaultValue": "",
"fieldType": "string",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "timeseries_unmarshal_caching_optimization_enabled",
Expand Down
1 change: 0 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,6 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Name: "cortex_distributor_received_samples_total",
Help: "The total number of received samples, excluding rejected and deduped samples.",
}, []string{"user", "attrib"}),

receivedExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_distributor_received_exemplars_total",
Help: "The total number of received exemplars, excluding rejected and deduped exemplars.",
Expand Down
3 changes: 2 additions & 1 deletion pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ type Config struct {
ContinuousTest continuoustest.Config `yaml:"-"`
OverridesExporter exporter.Config `yaml:"overrides_exporter"`

Common CommonConfig `yaml:"common"`
Common CommonConfig `yaml:"common"`
CustomRegistryPath string `yaml:"custom_registry_path" category:"advanced"`

TimeseriesUnmarshalCachingOptimizationEnabled bool `yaml:"timeseries_unmarshal_caching_optimization_enabled" category:"experimental"`
}
Expand Down
19 changes: 11 additions & 8 deletions pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
"github.com/grafana/mimir/pkg/util/validation/exporter"
"github.com/grafana/mimir/pkg/util/version"
"github.com/grafana/mimir/pkg/vault"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

// The various modules that make up Mimir.
Expand Down Expand Up @@ -473,10 +474,6 @@ func (t *Mimir) initDistributorService() (serv services.Service, err error) {
t.ActiveGroupsCleanup.Register(t.Distributor)
}

if t.CostAttributionCleanup != nil {
t.CostAttributionCleanup.Register(t.Distributor)
}

return t.Distributor, nil
}

Expand Down Expand Up @@ -651,7 +648,16 @@ func (t *Mimir) initActiveGroupsCleanupService() (services.Service, error) {
}

func (t *Mimir) initCostAttributionService() (services.Service, error) {
t.CostAttributionCleanup = costattribution.NewCostAttributionCleanupService(3*time.Minute, t.Cfg.CostAttributionEvictionInterval, util_log.Logger, t.Overrides)
if t.Cfg.CustomRegistryPath != "" {
customRegistry := prometheus.NewRegistry()
// Register the custom registry with the provided URL.
// This allows users to expose custom metrics on a separate endpoint.
// This is useful when users want to expose metrics that are not part of the default Mimir metrics.
http.Handle(t.Cfg.CustomRegistryPath, promhttp.HandlerFor(customRegistry, promhttp.HandlerOpts{Registry: customRegistry}))
t.CostAttributionCleanup = costattribution.NewCostAttributionCleanupService(3*time.Minute, t.Cfg.CostAttributionEvictionInterval, util_log.Logger, t.Overrides, customRegistry)
return t.CostAttributionCleanup, nil
}
t.CostAttributionCleanup = costattribution.NewCostAttributionCleanupService(3*time.Minute, t.Cfg.CostAttributionEvictionInterval, util_log.Logger, t.Overrides, t.Registerer)
return t.CostAttributionCleanup, nil
}

Expand All @@ -675,9 +681,6 @@ func (t *Mimir) initIngesterService() (serv services.Service, err error) {
t.ActiveGroupsCleanup.Register(t.Ingester)
}

if t.CostAttributionCleanup != nil {
t.CostAttributionCleanup.Register(t.Ingester)
}
return t.Ingester, nil
}

Expand Down
181 changes: 130 additions & 51 deletions pkg/util/costattribution/cost_attribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,57 +12,96 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/grafana/mimir/pkg/util/validation"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"
)

type Tracker struct {
trackedLabel string
activeSeriesPerUserAttribution *prometheus.GaugeVec
receivedSamplesAttribution *prometheus.CounterVec
discardedSampleAttribution *prometheus.CounterVec
attributionTimestamps map[string]*atomic.Int64
coolDownDeadline *atomic.Int64
}

func (m *Tracker) RemoveAttributionMetricsForUser(userID, attribution string) {
m.activeSeriesPerUserAttribution.DeleteLabelValues(userID, attribution)
m.receivedSamplesAttribution.DeleteLabelValues(userID, attribution)
m.discardedSampleAttribution.DeleteLabelValues(userID, attribution)
}

func NewCostAttributionTracker(reg prometheus.Registerer, trackedLabel string) *Tracker {
m := &Tracker{
trackedLabel: trackedLabel,
attributionTimestamps: map[string]*atomic.Int64{},
coolDownDeadline: atomic.NewInt64(0),
discardedSampleAttribution: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_discarded_samples_attribution_total",
Help: "The total number of samples that were discarded per attribution.",
}, []string{"user", trackedLabel}),
receivedSamplesAttribution: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_received_samples_attribution_total",
Help: "The total number of samples that were received per attribution.",
}, []string{"user", trackedLabel}),
activeSeriesPerUserAttribution: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_ingester_active_series_attribution",
Help: "The total number of active series per user and attribution.",
}, []string{"user", trackedLabel}),
}
return m
}

type CostAttribution struct {
mu sync.RWMutex
timestampsPerUser map[string]map[string]*atomic.Int64 // map[user][group] -> timestamp
coolDownDeadline map[string]*atomic.Int64
limits *validation.Overrides
mu sync.RWMutex
trackers map[string]*Tracker
limits *validation.Overrides
reg prometheus.Registerer
}

func NewCostAttribution(limits *validation.Overrides) *CostAttribution {
func NewCostAttribution(limits *validation.Overrides, reg prometheus.Registerer) *CostAttribution {
return &CostAttribution{
timestampsPerUser: map[string]map[string]*atomic.Int64{},
coolDownDeadline: map[string]*atomic.Int64{},
limits: limits,
trackers: map[string]*Tracker{},
limits: limits,
reg: reg,
}
}

// UpdateAttributionTimestampForUser function is only guaranteed to update to the
// timestamp provided even if it is smaller than the existing value
func (ag *CostAttribution) UpdateAttributionTimestampForUser(userID, attribution string, now time.Time) {
ts := now.UnixNano()
ag.mu.RLock()
if groupTs := ag.timestampsPerUser[userID][attribution]; groupTs != nil {
ag.mu.RUnlock()
groupTs.Store(ts)
// If the limit is set to 0, we don't need to track the attribution
if ag.limits.MaxCostAttributionPerUser(userID) <= 0 {
return
}
ag.mu.RUnlock()

ts := now.UnixNano()
ag.mu.Lock()
defer ag.mu.Unlock()

if ag.timestampsPerUser[userID] == nil {
ag.timestampsPerUser[userID] = map[string]*atomic.Int64{attribution: atomic.NewInt64(ts)}
return
// create new tracker if not exists
if _, exists := ag.trackers[userID]; !exists {
ag.trackers[userID] = NewCostAttributionTracker(ag.reg, ag.limits.CostAttributionLabel(userID))
}

if groupTs := ag.timestampsPerUser[userID][attribution]; groupTs != nil {
ag.mu.Unlock()
ag.mu.RLock()
if groupTs := ag.trackers[userID].attributionTimestamps[attribution]; groupTs != nil {
groupTs.Store(ts)
return
}

ag.timestampsPerUser[userID][attribution] = atomic.NewInt64(ts)
ag.mu.RUnlock()
ag.mu.Lock()
defer ag.mu.Unlock()
ag.trackers[userID].attributionTimestamps[attribution] = atomic.NewInt64(ts)
}

func (ag *CostAttribution) purgeInactiveAttributionsForUser(userID string, deadline int64) []string {
ag.mu.RLock()
var inactiveAttributions []string
attributionTimestamps := ag.timestampsPerUser[userID]
if ag.trackers[userID] == nil || ag.trackers[userID].attributionTimestamps == nil {
return nil
}

attributionTimestamps := ag.trackers[userID].attributionTimestamps
for attr, ts := range attributionTimestamps {
if ts.Load() <= deadline {
inactiveAttributions = append(inactiveAttributions, attr)
Expand All @@ -80,9 +119,9 @@ func (ag *CostAttribution) purgeInactiveAttributionsForUser(userID string, deadl

for i := 0; i < len(inactiveAttributions); {
inactiveAttribution := inactiveAttributions[i]
groupTs := ag.timestampsPerUser[userID][inactiveAttribution]
groupTs := ag.trackers[userID].attributionTimestamps[inactiveAttribution]
if groupTs != nil && groupTs.Load() <= deadline {
delete(ag.timestampsPerUser[userID], inactiveAttribution)
delete(ag.trackers[userID].attributionTimestamps, inactiveAttribution)
i++
} else {
inactiveAttributions[i] = inactiveAttributions[len(inactiveAttributions)-1]
Expand All @@ -93,10 +132,10 @@ func (ag *CostAttribution) purgeInactiveAttributionsForUser(userID string, deadl
return inactiveAttributions
}

func (ca *CostAttribution) purgeInactiveAttributions(inactiveTimeout time.Duration, cleanupFuncs ...func(string, string)) {
func (ca *CostAttribution) purgeInactiveAttributions(inactiveTimeout time.Duration) {
ca.mu.RLock()
userIDs := make([]string, 0, len(ca.timestampsPerUser))
for userID := range ca.timestampsPerUser {
userIDs := make([]string, 0, len(ca.trackers))
for userID := range ca.trackers {
userIDs = append(userIDs, userID)
}
ca.mu.RUnlock()
Expand All @@ -105,32 +144,37 @@ func (ca *CostAttribution) purgeInactiveAttributions(inactiveTimeout time.Durati
for _, userID := range userIDs {
inactiveAttributions := ca.purgeInactiveAttributionsForUser(userID, currentTime.Add(-inactiveTimeout).UnixNano())
for _, attribution := range inactiveAttributions {
for _, cleanupFn := range cleanupFuncs {
cleanupFn(userID, attribution)
}
ca.trackers[userID].RemoveAttributionMetricsForUser(userID, attribution)
}
}
}

func (ca *CostAttribution) attributionLimitExceeded(userID, attribution string, now time.Time) bool {
func (ca *CostAttribution) attributionLimitExceeded(userID, attribution string) bool {
// if we are still at the cooldown period, we will consider the limit reached
ca.mu.RLock()
defer ca.mu.RUnlock()
// if the user is not exist, we don't need to check the limit
if ca.trackers[userID] == nil {
return false
}

if v, exists := ca.coolDownDeadline[userID]; exists && v.Load() > now.UnixNano() {
now := time.Now()
if v := ca.trackers[userID].coolDownDeadline; v != nil && v.Load() > now.UnixNano() {
return true
}

// if the user attribution is already exist and we are not in the cooldown period, we don't need to check the limit
_, exists := ca.timestampsPerUser[userID][attribution]
_, exists := ca.trackers[userID].attributionTimestamps[attribution]
if exists {
return false
}

// if the user has reached the limit, we will set the cooldown period which is 20 minutes
maxReached := len(ca.timestampsPerUser[userID]) >= ca.limits.MaxCostAttributionPerUser(userID)
maxReached := len(ca.trackers[userID].attributionTimestamps) >= ca.limits.MaxCostAttributionPerUser(userID)
if maxReached {
ca.coolDownDeadline[userID].Store(time.Now().Add(20 * time.Minute).UnixNano())
ca.mu.Lock()
ca.trackers[userID].coolDownDeadline.Store(now.Add(20 * time.Minute).UnixNano())
ca.mu.Unlock()
return true
}

Expand All @@ -141,7 +185,6 @@ type CostAttributionCleanupService struct {
services.Service
logger log.Logger
costAttribution *CostAttribution
cleanupFuncs []func(userID, attribution string)
inactiveTimeout time.Duration
invalidValue string
}
Expand All @@ -150,10 +193,9 @@ type CostAttributionMetricsCleaner interface {
RemoveAttributionMetricsForUser(userID, attribution string)
}

func NewCostAttributionCleanupService(cleanupInterval, inactiveTimeout time.Duration, logger log.Logger, limits *validation.Overrides, cleanupFns ...func(string, string)) *CostAttributionCleanupService {
func NewCostAttributionCleanupService(cleanupInterval, inactiveTimeout time.Duration, logger log.Logger, limits *validation.Overrides, reg prometheus.Registerer) *CostAttributionCleanupService {
s := &CostAttributionCleanupService{
costAttribution: NewCostAttribution(limits),
cleanupFuncs: cleanupFns,
costAttribution: NewCostAttribution(limits, reg),
inactiveTimeout: inactiveTimeout,
logger: logger,
invalidValue: "__unaccounted__",
Expand All @@ -163,12 +205,55 @@ func NewCostAttributionCleanupService(cleanupInterval, inactiveTimeout time.Dura
return s
}

// IncrementReceivedSamples increments the received samples counter for a given user and attribution
func (s *CostAttributionCleanupService) IncrementReceivedSamples(userID, attribution string, value float64) {
attribution = s.GetUserAttribution(userID, attribution)
s.costAttribution.mu.RLock()
defer s.costAttribution.mu.RUnlock()
if tracker, exists := s.costAttribution.trackers[userID]; exists {
tracker.receivedSamplesAttribution.WithLabelValues(userID, attribution).Add(value)
}
}

// IncrementDiscardedSamples increments the discarded samples counter for a given user and attribution
func (s *CostAttributionCleanupService) IncrementDiscardedSamples(userID, attribution string, value float64) {
attribution = s.GetUserAttribution(userID, attribution)
s.costAttribution.mu.RLock()
defer s.costAttribution.mu.RUnlock()
if tracker, exists := s.costAttribution.trackers[userID]; exists {
tracker.discardedSampleAttribution.WithLabelValues(userID, attribution).Add(value)
}
}

// SetActiveSeries sets the active series gauge for a given user and attribution
func (s *CostAttributionCleanupService) SetActiveSeries(userID, attribution string, value float64) {
attribution = s.GetUserAttribution(userID, attribution)
s.costAttribution.mu.RLock()
defer s.costAttribution.mu.RUnlock()
if tracker, exists := s.costAttribution.trackers[userID]; exists {
tracker.activeSeriesPerUserAttribution.WithLabelValues(userID, attribution).Set(value)
}
}

func (s *CostAttributionCleanupService) GetUserAttribution(userID, attribution string) string {
// not tracking cost attribution for this user, this shouldn't happen
if s.costAttribution.limits.MaxCostAttributionPerUser(userID) <= 0 {
return attribution
}
if s.costAttribution.attributionLimitExceeded(userID, attribution) {
return s.invalidValue
}
return attribution
}

func (s *CostAttributionCleanupService) UpdateAttributionTimestamp(user, attribution string, now time.Time) string {
// empty label is not normal, if user set attribution label, the metrics send has to include the label
if attribution == "" {
attribution = s.invalidValue
level.Error(s.logger).Log("msg", fmt.Sprintf("set attribution label to \"%s\" since missing cost attribution label in metrics", s.invalidValue))
} else if s.costAttribution.attributionLimitExceeded(user, attribution, now) {
level.Error(s.logger).Log("msg", "set attribution label to \"\" since missing cost attribution label in metrics")
return attribution
}

if s.costAttribution.attributionLimitExceeded(user, attribution) {
attribution = s.invalidValue
level.Error(s.logger).Log("msg", fmt.Sprintf("set attribution label to \"%s\" since user has reached the limit of cost attribution labels", s.invalidValue))
}
Expand All @@ -178,12 +263,6 @@ func (s *CostAttributionCleanupService) UpdateAttributionTimestamp(user, attribu
}

func (s *CostAttributionCleanupService) iteration(_ context.Context) error {
s.costAttribution.purgeInactiveAttributions(s.inactiveTimeout, s.cleanupFuncs...)
s.costAttribution.purgeInactiveAttributions(s.inactiveTimeout)
return nil
}

// Register registers the cleanup function from metricsCleaner to be called during each cleanup iteration.
// This function is NOT thread safe
func (s *CostAttributionCleanupService) Register(metricsCleaner CostAttributionMetricsCleaner) {
s.cleanupFuncs = append(s.cleanupFuncs, metricsCleaner.RemoveAttributionMetricsForUser)
}

0 comments on commit f1b4e74

Please sign in to comment.