Skip to content

Commit

Permalink
renaming of service
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Sep 25, 2024
1 parent 8ae4571 commit 8dd5e80
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
6 changes: 3 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ type Distributor struct {
distributorsLifecycler *ring.BasicLifecycler
distributorsRing *ring.Ring
healthyInstancesCount *atomic.Uint32
costAttributionSvr *util.CostAttributionCleanupService
costAttributionSvc *util.CostAttributionCleanupService
// For handling HA replicas.
HATracker *haTracker

Expand Down Expand Up @@ -343,7 +343,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
healthyInstancesCount: atomic.NewUint32(0),
limits: limits,
HATracker: haTracker,
costAttributionSvr: costAttributionClenaupService,
costAttributionSvc: costAttributionClenaupService,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
maxCostAttributionPerUser: maxCostAttributionPerUser,

Expand Down Expand Up @@ -1661,7 +1661,7 @@ func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID st
receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms)
receivedExemplars += len(ts.TimeSeries.Exemplars)
if costAttributionLabel != "" {
attribution := d.costAttributionSvr.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), mtime.Now())
attribution := d.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), mtime.Now())
costAttribution[attribution]++
}
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/ingester/activeseries/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type ActiveSeries struct {
lastMatchersUpdate time.Time

CostAttributionLabel string
costAttributionSvr *util.CostAttributionCleanupService
costAttributionSvc *util.CostAttributionCleanupService

// The duration after which series become inactive.
// Also used to determine if enough time has passed since configuration reload for valid results.
Expand All @@ -67,7 +67,7 @@ type seriesStripe struct {
// Updated in purge and when old timestamp is used when updating series (in this case, oldestEntryTs is updated
// without holding the lock -- hence the atomic).
oldestEntryTs atomic.Int64
costAttributionSvr *util.CostAttributionCleanupService
costAttributionSvc *util.CostAttributionCleanupService
mu sync.RWMutex
refs map[storage.SeriesRef]seriesEntry
active uint32 // Number of active entries in this stripe. Only decreased during purge or clear.
Expand Down Expand Up @@ -98,17 +98,17 @@ func NewActiveSeries(
timeout time.Duration,
userID string,
costAttributionLabel string,
costAttributionSvr *util.CostAttributionCleanupService,
costAttributionSvc *util.CostAttributionCleanupService,
) *ActiveSeries {
c := &ActiveSeries{
matchers: asm, timeout: timeout, userID: userID,
CostAttributionLabel: costAttributionLabel,
costAttributionSvr: costAttributionSvr,
costAttributionSvc: costAttributionSvc,
}

// Stripes are pre-allocated so that we only read on them and no lock is required.
for i := 0; i < numStripes; i++ {
c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionSvr)
c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionSvc)
}

return c
Expand All @@ -125,7 +125,7 @@ func (c *ActiveSeries) ReloadMatchers(asm *Matchers, now time.Time) {
defer c.matchersMutex.Unlock()

for i := 0; i < numStripes; i++ {
c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.CostAttributionLabel, c.costAttributionSvr)
c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.CostAttributionLabel, c.costAttributionSvc)
}
c.matchers = asm
c.lastMatchersUpdate = now
Expand Down Expand Up @@ -429,7 +429,7 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef
// here if we have a cost attribution label, we can split the serie count based on the value of the label
// we also set the reference to the value of the label in the entry, so when remove, we can decrease the counter accordingly
if s.costAttributionLabel != "" {
attributionValue := s.costAttributionSvr.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Now())
attributionValue := s.costAttributionSvc.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Now())
s.costAttributionValues[attributionValue]++
e.attributionValue = attributionValue
}
Expand Down Expand Up @@ -462,7 +462,7 @@ func (s *seriesStripe) reinitialize(
deleted *deletedSeries,
userID string,
costAttributionLabel string,
costAttributionSvr *util.CostAttributionCleanupService,
costAttributionSvc *util.CostAttributionCleanupService,
) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -479,7 +479,7 @@ func (s *seriesStripe) reinitialize(
s.activeMatchingNativeHistograms = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistograms)
s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistogramBuckets)
s.costAttributionLabel = costAttributionLabel
s.costAttributionSvr = costAttributionSvr
s.costAttributionSvc = costAttributionSvc
}

func (s *seriesStripe) purge(keepUntil time.Time) {
Expand Down

0 comments on commit 8dd5e80

Please sign in to comment.