Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Sep 27, 2024
1 parent 1107f71 commit 2dbf562
Show file tree
Hide file tree
Showing 14 changed files with 169 additions and 88 deletions.
24 changes: 12 additions & 12 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,6 @@
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_cost_attribution_per_user",
"required": false,
"desc": "Maximum number of cost attributions allowed per user.",
"fieldValue": null,
"fieldDefaultValue": 200,
"fieldFlag": "max-cost-attribution-per-user",
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "cost_attribution_eviction_interval",
Expand Down Expand Up @@ -4010,13 +3999,24 @@
"kind": "field",
"name": "cost_attribution_label",
"required": false,
"desc": "Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'cost' label with cost label's value. Currently applies to the following metrics: cortex_discarded_samples_total",
"desc": "Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'attrib' cost attribution's label's value. Applies to the following metrics: cortex_distributor_received_samples_total, cortex_ingester_active_series and cortex_discarded_samples_attribution_total.",
"fieldValue": null,
"fieldDefaultValue": "",
"fieldFlag": "validation.cost-attribution-label",
"fieldType": "string",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_cost_attribution_per_user",
"required": false,
"desc": "The maximum number of cost attribution labels per user, across the cluster. 0 to disable cost attribution.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "validation.max-cost-attribution-per-user",
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_fetched_chunks_per_query",
Expand Down
6 changes: 3 additions & 3 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1679,8 +1679,6 @@ Usage of ./cmd/mimir/mimir:
[experimental] Burst size, i.e., maximum number of messages that can be logged at once, temporarily exceeding the configured maximum logs per second. (default 1000)
-log.rate-limit-logs-per-second float
[experimental] Maximum number of messages per second to be logged. (default 10000)
-max-cost-attribution-per-user int
[experimental] Maximum number of cost attributions allowed per user. (default 200)
-max-separate-metrics-groups-per-user int
[experimental] Maximum number of groups allowed per user by which specified distributor and ingester metrics can be further separated. (default 1000)
-mem-ballast-size-bytes int
Expand Down Expand Up @@ -3064,11 +3062,13 @@ Usage of ./cmd/mimir/mimir:
-usage-stats.installation-mode string
Installation mode. Supported values: custom, helm, jsonnet. (default "custom")
-validation.cost-attribution-label string
[experimental] Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'cost' label with cost label's value. Currently applies to the following metrics: cortex_discarded_samples_total
[experimental] Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'attrib' cost attribution's label's value. Applies to the following metrics: cortex_distributor_received_samples_total, cortex_ingester_active_series and cortex_discarded_samples_attribution_total.
-validation.create-grace-period duration
Controls how far into the future incoming samples and exemplars are accepted compared to the wall clock. Any sample or exemplar will be rejected if its timestamp is greater than '(now + creation_grace_period)'. This configuration is enforced in the distributor and ingester. (default 10m)
-validation.enforce-metadata-metric-name
Enforce every metadata has a metric name. (default true)
-validation.max-cost-attribution-per-user int
[experimental] The maximum number of cost attribution labels per user, across the cluster. 0 to disable cost attribution.
-validation.max-label-names-per-series int
Maximum number of label names per series. (default 30)
-validation.max-length-label-name int
Expand Down
23 changes: 13 additions & 10 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type Distributor struct {
distributorsLifecycler *ring.BasicLifecycler
distributorsRing *ring.Ring
healthyInstancesCount *atomic.Uint32
costAttributionsvr *util.CostAttributionCleanupService
costAttributionSvc *util.CostAttributionCleanupService
// For handling HA replicas.
HATracker *haTracker

Expand Down Expand Up @@ -306,10 +306,7 @@ func (m *PushMetrics) deleteUserMetrics(user string) {
}

// New constructs a new Distributor
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides,
activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionClenaupService *util.CostAttributionCleanupService,
ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing,
canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) {
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionClenaupService *util.CostAttributionCleanupService, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) {
clientMetrics := ingester_client.NewMetrics(reg)
if cfg.IngesterClientFactory == nil {
cfg.IngesterClientFactory = ring_client.PoolInstFunc(func(inst ring.InstanceDesc) (ring_client.PoolClient, error) {
Expand Down Expand Up @@ -344,7 +341,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
healthyInstancesCount: atomic.NewUint32(0),
limits: limits,
HATracker: haTracker,
costAttributionsvr: costAttributionClenaupService,
costAttributionSvc: costAttributionClenaupService,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),

queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Expand Down Expand Up @@ -1442,7 +1439,9 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
return err
}

d.updateReceivedMetrics(req, userID, d.limits.CostAttributionLabel(userID))
now := mtime.Now()

d.updateReceivedMetrics(req, userID, d.limits.CostAttributionLabel(userID), now)

if len(req.Timeseries) == 0 && len(req.Metadata) == 0 {
return nil
Expand Down Expand Up @@ -1673,14 +1672,18 @@ func tokenForMetadata(userID string, metricName string) uint32 {
return mimirpb.ShardByMetricName(userID, metricName)
}

func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string, costAttributionLabel string) {
func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string, costAttributionLabel string, now time.Time) {
var receivedSamples, receivedExemplars, receivedMetadata int
costAttribution := make(map[string]int)
costAttributionSize := 0
if costAttributionLabel != "" {
costAttributionSize = d.limits.MaxCostAttributionPerUser(userID)
}
costAttribution := make(map[string]int, costAttributionSize)
for _, ts := range req.Timeseries {
receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms)
receivedExemplars += len(ts.TimeSeries.Exemplars)
if costAttributionLabel != "" {
attribution := d.costAttributionsvr.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), mtime.Now())
attribution := d.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), now, costAttributionSize)
costAttribution[attribution]++
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7956,7 +7956,7 @@ func TestCheckStartedMiddleware(t *testing.T) {
overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)

distributor, err := New(distributorConfig, clientConfig, overrides, nil, ingestersRing, nil, true, nil, log.NewNopLogger())
distributor, err := New(distributorConfig, clientConfig, overrides, nil, nil, ingestersRing, nil, true, nil, log.NewNopLogger())
require.NoError(t, err)

ctx := user.InjectOrgID(context.Background(), "user")
Expand Down
4 changes: 4 additions & 0 deletions pkg/ingester/activeseries/active_labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ func TestIsLabelValueActive(t *testing.T) {
labels.FromStrings("a", "5"),
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
<<<<<<< HEAD
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)

memPostings := index.NewMemPostings()
for i, l := range series {
Expand Down
20 changes: 20 additions & 0 deletions pkg/ingester/activeseries/active_native_histogram_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@ func TestNativeHistogramPostings_Expand(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
<<<<<<< HEAD

activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
>>>>>>> 3c422a8f57 (new service for tracking cost attribution)
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -67,11 +71,15 @@ func TestNativeHistogramPostings_ExpandWithBucketCount(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
<<<<<<< HEAD
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
>>>>>>> 3c422a8f57 (new service for tracking cost attribution)
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -115,11 +123,15 @@ func TestNativeHistogramPostings_SeekSkipsNonNative(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
<<<<<<< HEAD
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
>>>>>>> 3c422a8f57 (new service for tracking cost attribution)
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -159,11 +171,15 @@ func TestNativeHistogramPostings_Seek(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
<<<<<<< HEAD
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
>>>>>>> 3c422a8f57 (new service for tracking cost attribution)
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -200,11 +216,15 @@ func TestNativeHistogramPostings_SeekToEnd(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
<<<<<<< HEAD
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
>>>>>>> 3c422a8f57 (new service for tracking cost attribution)
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down
12 changes: 12 additions & 0 deletions pkg/ingester/activeseries/active_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ func TestPostings_Expand(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -58,7 +62,11 @@ func TestPostings_Seek(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -90,7 +98,11 @@ func TestPostings_SeekToEnd(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
<<<<<<< HEAD
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "")
=======
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
>>>>>>> 7e628c3508 (address comments)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down
Loading

0 comments on commit 2dbf562

Please sign in to comment.