Skip to content

Commit

Permalink
Rebase on main and fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Sep 27, 2024
1 parent 2dbf562 commit d103ba7
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 146 deletions.
6 changes: 1 addition & 5 deletions pkg/ingester/activeseries/active_labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,7 @@ func TestIsLabelValueActive(t *testing.T) {
labels.FromStrings("a", "5"),
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
<<<<<<< HEAD
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

memPostings := index.NewMemPostings()
for i, l := range series {
Expand Down
51 changes: 5 additions & 46 deletions pkg/ingester/activeseries/active_native_histogram_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,7 @@ func TestNativeHistogramPostings_Expand(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
<<<<<<< HEAD

activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
>>>>>>> 3c422a8f57 (new service for tracking cost attribution)
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -71,15 +62,7 @@ func TestNativeHistogramPostings_ExpandWithBucketCount(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
<<<<<<< HEAD
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
>>>>>>> 3c422a8f57 (new service for tracking cost attribution)
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -123,15 +106,7 @@ func TestNativeHistogramPostings_SeekSkipsNonNative(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
<<<<<<< HEAD
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
>>>>>>> 3c422a8f57 (new service for tracking cost attribution)
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -171,15 +146,7 @@ func TestNativeHistogramPostings_Seek(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
<<<<<<< HEAD
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
>>>>>>> 3c422a8f57 (new service for tracking cost attribution)
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -216,15 +183,7 @@ func TestNativeHistogramPostings_SeekToEnd(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
<<<<<<< HEAD
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
>>>>>>> 3c422a8f57 (new service for tracking cost attribution)
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down
18 changes: 3 additions & 15 deletions pkg/ingester/activeseries/active_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@ func TestPostings_Expand(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -62,11 +58,7 @@ func TestPostings_Seek(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -98,11 +90,7 @@ func TestPostings_SeekToEnd(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down
44 changes: 23 additions & 21 deletions pkg/ingester/activeseries/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/util/zeropool"
"go.uber.org/atomic"

asmodel "github.com/grafana/mimir/pkg/ingester/activeseries/model"
)

const (
Expand Down Expand Up @@ -45,7 +47,7 @@ type ActiveSeries struct {

// matchersMutex protects matchers and lastMatchersUpdate.
matchersMutex sync.RWMutex
matchers *Matchers
matchers *asmodel.Matchers
lastMatchersUpdate time.Time

costAttributionLabel string
Expand All @@ -60,7 +62,7 @@ type ActiveSeries struct {

// seriesStripe holds a subset of the series timestamps for a single tenant.
type seriesStripe struct {
matchers *Matchers
matchers *asmodel.Matchers

deleted *deletedSeries

Expand All @@ -87,16 +89,16 @@ type seriesStripe struct {

// seriesEntry holds a timestamp for single series.
type seriesEntry struct {
nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe.
matches preAllocDynamicSlice // Index of the matcher matching
numNativeHistogramBuckets int // Number of buckets in native histogram series, -1 if not a native histogram.
nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe.
matches asmodel.PreAllocDynamicSlice // Index of the matcher matching
numNativeHistogramBuckets int // Number of buckets in native histogram series, -1 if not a native histogram.
// keep the value corresponding the label configured in serieStripe
deleted bool // This series was marked as deleted, so before purging we need to remove the refence to it from the deletedSeries.
attributionValue string
}

func NewActiveSeries(
asm *Matchers,
asm *asmodel.Matchers,
timeout time.Duration,
userID string,
costAttributionLabel string,
Expand Down Expand Up @@ -124,7 +126,7 @@ func (c *ActiveSeries) CurrentMatcherNames() []string {
return c.matchers.MatcherNames()
}

func (c *ActiveSeries) ReloadMatchers(asm *Matchers, now time.Time) {
func (c *ActiveSeries) ReloadMatchers(asm *asmodel.Matchers, now time.Time) {
c.matchersMutex.Lock()
defer c.matchersMutex.Unlock()

Expand All @@ -135,7 +137,7 @@ func (c *ActiveSeries) ReloadMatchers(asm *Matchers, now time.Time) {
c.lastMatchersUpdate = now
}

func (c *ActiveSeries) CurrentConfig() CustomTrackersConfig {
func (c *ActiveSeries) CurrentConfig() asmodel.CustomTrackersConfig {
c.matchersMutex.RLock()
defer c.matchersMutex.RUnlock()
return c.matchers.Config()
Expand Down Expand Up @@ -372,21 +374,21 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef
entry, ok := s.refs[ref]
if ok {
if entry.numNativeHistogramBuckets != numNativeHistogramBuckets {
matches := s.matchers.matches(series)
matchesLen := matches.len()
matches := s.matchers.Matches(series)
matchesLen := matches.Len()
if numNativeHistogramBuckets >= 0 && entry.numNativeHistogramBuckets >= 0 {
// change number of buckets but still a histogram
diff := numNativeHistogramBuckets - entry.numNativeHistogramBuckets
s.activeNativeHistogramBuckets = uint32(int(s.activeNativeHistogramBuckets) + diff)
for i := 0; i < matchesLen; i++ {
s.activeMatchingNativeHistogramBuckets[matches.get(i)] = uint32(int(s.activeMatchingNativeHistogramBuckets[matches.get(i)]) + diff)
s.activeMatchingNativeHistogramBuckets[matches.Get(i)] = uint32(int(s.activeMatchingNativeHistogramBuckets[matches.Get(i)]) + diff)
}
} else if numNativeHistogramBuckets >= 0 {
// change from float to histogram
s.activeNativeHistograms++
s.activeNativeHistogramBuckets += uint32(numNativeHistogramBuckets)
for i := 0; i < matchesLen; i++ {
match := matches.get(i)
match := matches.Get(i)
s.activeMatchingNativeHistograms[match]++
s.activeMatchingNativeHistogramBuckets[match] += uint32(numNativeHistogramBuckets)
}
Expand All @@ -395,7 +397,7 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef
s.activeNativeHistograms--
s.activeNativeHistogramBuckets -= uint32(entry.numNativeHistogramBuckets)
for i := 0; i < matchesLen; i++ {
match := matches.get(i)
match := matches.Get(i)
s.activeMatchingNativeHistograms[match]--
s.activeMatchingNativeHistogramBuckets[match] -= uint32(entry.numNativeHistogramBuckets)
}
Expand All @@ -406,8 +408,8 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef
return entry.nanos, false
}

matches := s.matchers.matches(series)
matchesLen := matches.len()
matches := s.matchers.Matches(series)
matchesLen := matches.Len()

s.active++

Expand All @@ -416,7 +418,7 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef
s.activeNativeHistogramBuckets += uint32(numNativeHistogramBuckets)
}
for i := 0; i < matchesLen; i++ {
match := matches.get(i)
match := matches.Get(i)
s.activeMatching[match]++
if numNativeHistogramBuckets >= 0 {
s.activeMatchingNativeHistograms[match]++
Expand Down Expand Up @@ -462,7 +464,7 @@ func (s *seriesStripe) clear() {

// Reinitialize assigns new matchers and corresponding size activeMatching slices.
func (s *seriesStripe) reinitialize(
asm *Matchers,
asm *asmodel.Matchers,
deleted *deletedSeries,
userID string,
costAttributionLabel string,
Expand Down Expand Up @@ -528,9 +530,9 @@ func (s *seriesStripe) purge(keepUntil time.Time) {
if entry.attributionValue != "" {
s.costAttributionValues[entry.attributionValue]++
}
ml := entry.matches.len()
ml := entry.matches.Len()
for i := 0; i < ml; i++ {
match := entry.matches.get(i)
match := entry.matches.Get(i)
s.activeMatching[match]++
if entry.numNativeHistogramBuckets >= 0 {
s.activeMatchingNativeHistograms[match]++
Expand Down Expand Up @@ -573,9 +575,9 @@ func (s *seriesStripe) remove(ref storage.SeriesRef) {
s.activeNativeHistograms--
s.activeNativeHistogramBuckets -= uint32(entry.numNativeHistogramBuckets)
}
ml := entry.matches.len()
ml := entry.matches.Len()
for i := 0; i < ml; i++ {
match := entry.matches.get(i)
match := entry.matches.Get(i)
s.activeMatching[match]--
if entry.numNativeHistogramBuckets >= 0 {
s.activeMatchingNativeHistograms[match]--
Expand Down
Loading

0 comments on commit d103ba7

Please sign in to comment.