Skip to content

Commit

Permalink
MQE: Add support for increase function (#9420)
Browse files Browse the repository at this point in the history
* MQE: Add support for increase function

* Update CHANGELOG

* Address review feedback
  • Loading branch information
jhesketh authored Sep 26, 2024
1 parent 8d3c2c6 commit be0fe27
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 167 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
* `cortex_alertmanager_silences`
* [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281 #9342 #9343 #9367 #9368 #9371 #9398 #9399 #9403 #9417 #9418 #9419
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281 #9342 #9343 #9367 #9368 #9371 #9398 #9399 #9403 #9417 #9418 #9419 #9420
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
217 changes: 113 additions & 104 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1434,13 +1434,15 @@ func TestAnnotations(t *testing.T) {
metric{series="incompatible-custom-buckets"} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[2 3] buckets:[1]}} {{schema:-53 sum:5 count:4 custom_values:[5 10] buckets:[1 2 1]}}
`

testCases := map[string]struct {
type testCase struct {
data string
expr string
expectedWarningAnnotations []string
expectedInfoAnnotations []string
skipComparisonWithPrometheusReason string
}{
}

testCases := map[string]testCase{
"sum() with float and native histogram at same step": {
data: mixedFloatHistogramData,
expr: "sum by (series) (metric)",
Expand Down Expand Up @@ -1477,68 +1479,6 @@ func TestAnnotations(t *testing.T) {
expr: `avg(metric{type="histogram"})`,
},

"rate() over metric without counter suffix containing only floats": {
data: mixedFloatHistogramData,
expr: `rate(metric{type="float"}[1m])`,
expectedInfoAnnotations: []string{`PromQL info: metric might not be a counter, name does not end in _total/_sum/_count/_bucket: "metric" (1:6)`},
},
"rate() over metric without counter suffix containing only native histograms": {
data: mixedFloatHistogramData,
expr: `rate(metric{type="histogram"}[1m])`,
},
"rate() over metric ending in _total": {
data: `some_metric_total 0+1x3`,
expr: `rate(some_metric_total[1m])`,
},
"rate() over metric ending in _sum": {
data: `some_metric_sum 0+1x3`,
expr: `rate(some_metric_sum[1m])`,
},
"rate() over metric ending in _count": {
data: `some_metric_count 0+1x3`,
expr: `rate(some_metric_count[1m])`,
},
"rate() over metric ending in _bucket": {
data: `some_metric_bucket 0+1x3`,
expr: `rate(some_metric_bucket[1m])`,
},
"rate() over multiple metric names": {
data: `
not_a_counter{env="prod", series="1"} 0+1x3
a_total{series="2"} 1+1x3
a_sum{series="3"} 2+1x3
a_count{series="4"} 3+1x3
a_bucket{series="5"} 4+1x3
not_a_counter{env="test", series="6"} 5+1x3
also_not_a_counter{env="test", series="7"} 6+1x3
`,
expr: `rate({__name__!=""}[1m])`,
expectedInfoAnnotations: []string{
`PromQL info: metric might not be a counter, name does not end in _total/_sum/_count/_bucket: "not_a_counter" (1:6)`,
`PromQL info: metric might not be a counter, name does not end in _total/_sum/_count/_bucket: "also_not_a_counter" (1:6)`,
},
},
"rate() over series with both floats and histograms": {
data: `some_metric_count 10 {{schema:0 sum:1 count:1 buckets:[1]}}`,
expr: `rate(some_metric_count[1m])`,
expectedWarningAnnotations: []string{`PromQL warning: encountered a mix of histograms and floats for metric name "some_metric_count" (1:6)`},
},
"rate() over series with first histogram that is not a counter": {
data: `some_metric {{schema:0 sum:1 count:1 buckets:[1] counter_reset_hint:gauge}} {{schema:0 sum:2 count:2 buckets:[2]}}`,
expr: `rate(some_metric[1m])`,
expectedWarningAnnotations: []string{`PromQL warning: this native histogram metric is not a counter: "some_metric" (1:6)`},
},
"rate() over series with last histogram that is not a counter": {
data: `some_metric {{schema:0 sum:1 count:1 buckets:[1]}} {{schema:0 sum:2 count:2 buckets:[2] counter_reset_hint:gauge}}`,
expr: `rate(some_metric[1m])`,
expectedWarningAnnotations: []string{`PromQL warning: this native histogram metric is not a counter: "some_metric" (1:6)`},
},
"rate() over series with a histogram that is not a counter that is neither the first or last in the range": {
data: `some_metric {{schema:0 sum:1 count:1 buckets:[1]}} {{schema:0 sum:2 count:2 buckets:[2] counter_reset_hint:gauge}} {{schema:0 sum:3 count:3 buckets:[3]}}`,
expr: `rate(some_metric[2m] @ 2m)`,
expectedWarningAnnotations: []string{`PromQL warning: this native histogram metric is not a counter: "some_metric" (1:6)`},
},

"sum() over native histograms with both exponential and custom buckets": {
data: nativeHistogramsWithCustomBucketsData,
expr: `sum(metric{series=~"exponential-buckets|custom-buckets-1"})`,
Expand All @@ -1554,41 +1494,6 @@ func TestAnnotations(t *testing.T) {
},
},

"rate() over native histograms with both exponential and custom buckets": {
data: nativeHistogramsWithCustomBucketsData,
expr: `rate(metric{series="mixed-exponential-custom-buckets"}[1m])`,
expectedWarningAnnotations: []string{
`PromQL warning: vector contains a mix of histograms with exponential and custom buckets schemas for metric name "metric" (1:6)`,
},
},
"rate() over native histograms with incompatible custom buckets": {
data: nativeHistogramsWithCustomBucketsData,
expr: `rate(metric{series="incompatible-custom-buckets"}[1m])`,
expectedWarningAnnotations: []string{
`PromQL warning: vector contains histograms with incompatible custom buckets for metric name "metric" (1:6)`,
},
},
"rate() over metric without counter suffix with single float or histogram in range": {
data: `
series 3 1 {{schema:3 sum:12 count:7 buckets:[2 2 3]}}
`,
expr: "rate(series[45s])",
expectedWarningAnnotations: []string{},
expectedInfoAnnotations: []string{},
// This can be removed once https://github.com/prometheus/prometheus/pull/14910 is vendored.
skipComparisonWithPrometheusReason: "Prometheus only considers the type of the last point in the vector selector rather than the output value",
},
"rate() over one point in range": {
data: `
series 1
`,
expr: "rate(series[1m])",
expectedWarningAnnotations: []string{},
expectedInfoAnnotations: []string{},
// This can be removed once https://github.com/prometheus/prometheus/pull/14910 is vendored.
skipComparisonWithPrometheusReason: "Prometheus only considers the type of the last point in the vector selector rather than the output value",
},

"sum_over_time() over series with both floats and histograms": {
data: `some_metric 10 {{schema:0 sum:1 count:1 buckets:[1]}}`,
expr: `sum_over_time(some_metric[1m])`,
Expand Down Expand Up @@ -1663,6 +1568,108 @@ func TestAnnotations(t *testing.T) {
},
}

// rate and increase use the same annotations
for _, function := range []string{"rate", "increase"} {
position := len(fmt.Sprintf("%s(", function)) + 1
testCases[fmt.Sprintf("%s() over metric without counter suffix containing only floats", function)] = testCase{
data: mixedFloatHistogramData,
expr: fmt.Sprintf(`%s(metric{type="float"}[1m])`, function),
expectedInfoAnnotations: []string{fmt.Sprintf(`PromQL info: metric might not be a counter, name does not end in _total/_sum/_count/_bucket: "metric" (1:%d)`, position)},
}

testCases[fmt.Sprintf("%s() over metric without counter suffix containing only native histograms", function)] = testCase{
data: mixedFloatHistogramData,
expr: fmt.Sprintf(`%s(metric{type="histogram"}[1m])`, function),
}
testCases[fmt.Sprintf("%s() over metric ending in _total", function)] = testCase{
data: `some_metric_total 0+1x3`,
expr: fmt.Sprintf(`%s(some_metric_total[1m])`, function),
}
testCases[fmt.Sprintf("%s() over metric ending in _sum", function)] = testCase{
data: `some_metric_sum 0+1x3`,
expr: fmt.Sprintf(`%s(some_metric_sum[1m])`, function),
}
testCases[fmt.Sprintf("%s() over metric ending in _count", function)] = testCase{
data: `some_metric_count 0+1x3`,
expr: fmt.Sprintf(`%s(some_metric_count[1m])`, function),
}
testCases[fmt.Sprintf("%s() over metric ending in _bucket", function)] = testCase{
data: `some_metric_bucket 0+1x3`,
expr: fmt.Sprintf(`%s(some_metric_bucket[1m])`, function),
}
testCases[fmt.Sprintf("%s() over multiple metric names", function)] = testCase{
data: `
not_a_counter{env="prod", series="1"} 0+1x3
a_total{series="2"} 1+1x3
a_sum{series="3"} 2+1x3
a_count{series="4"} 3+1x3
a_bucket{series="5"} 4+1x3
not_a_counter{env="test", series="6"} 5+1x3
also_not_a_counter{env="test", series="7"} 6+1x3
`,
expr: fmt.Sprintf(`%s({__name__!=""}[1m])`, function),
expectedInfoAnnotations: []string{
fmt.Sprintf(`PromQL info: metric might not be a counter, name does not end in _total/_sum/_count/_bucket: "not_a_counter" (1:%d)`, position),
fmt.Sprintf(`PromQL info: metric might not be a counter, name does not end in _total/_sum/_count/_bucket: "also_not_a_counter" (1:%d)`, position),
},
}
testCases[fmt.Sprintf("%s() over series with both floats and histograms", function)] = testCase{
data: `some_metric_count 10 {{schema:0 sum:1 count:1 buckets:[1]}}`,
expr: fmt.Sprintf(`%s(some_metric_count[1m])`, function),
expectedWarningAnnotations: []string{fmt.Sprintf(`PromQL warning: encountered a mix of histograms and floats for metric name "some_metric_count" (1:%d)`, position)},
}
testCases[fmt.Sprintf("%s() over series with first histogram that is not a counter", function)] = testCase{
data: `some_metric {{schema:0 sum:1 count:1 buckets:[1] counter_reset_hint:gauge}} {{schema:0 sum:2 count:2 buckets:[2]}}`,
expr: fmt.Sprintf(`%s(some_metric[1m])`, function),
expectedWarningAnnotations: []string{fmt.Sprintf(`PromQL warning: this native histogram metric is not a counter: "some_metric" (1:%d)`, position)},
}
testCases[fmt.Sprintf("%s() over series with last histogram that is not a counter", function)] = testCase{
data: `some_metric {{schema:0 sum:1 count:1 buckets:[1]}} {{schema:0 sum:2 count:2 buckets:[2] counter_reset_hint:gauge}}`,
expr: fmt.Sprintf(`%s(some_metric[1m])`, function),
expectedWarningAnnotations: []string{fmt.Sprintf(`PromQL warning: this native histogram metric is not a counter: "some_metric" (1:%d)`, position)},
}
testCases[fmt.Sprintf("%s() over series with a histogram that is not a counter that is neither the first or last in the range", function)] = testCase{
data: `some_metric {{schema:0 sum:1 count:1 buckets:[1]}} {{schema:0 sum:2 count:2 buckets:[2] counter_reset_hint:gauge}} {{schema:0 sum:3 count:3 buckets:[3]}}`,
expr: fmt.Sprintf(`%s(some_metric[2m] @ 2m)`, function),
expectedWarningAnnotations: []string{fmt.Sprintf(`PromQL warning: this native histogram metric is not a counter: "some_metric" (1:%d)`, position)},
}

testCases[fmt.Sprintf("%s() over native histograms with both exponential and custom buckets", function)] = testCase{
data: nativeHistogramsWithCustomBucketsData,
expr: fmt.Sprintf(`%s(metric{series="mixed-exponential-custom-buckets"}[1m])`, function),
expectedWarningAnnotations: []string{
fmt.Sprintf(`PromQL warning: vector contains a mix of histograms with exponential and custom buckets schemas for metric name "metric" (1:%d)`, position),
},
}
testCases[fmt.Sprintf("%s() over native histograms with incompatible custom buckets", function)] = testCase{
data: nativeHistogramsWithCustomBucketsData,
expr: fmt.Sprintf(`%s(metric{series="incompatible-custom-buckets"}[1m])`, function),
expectedWarningAnnotations: []string{
fmt.Sprintf(`PromQL warning: vector contains histograms with incompatible custom buckets for metric name "metric" (1:%d)`, position),
},
}
testCases[fmt.Sprintf("%s() over metric without counter suffix with single float or histogram in range", function)] = testCase{
data: `
series 3 1 {{schema:3 sum:12 count:7 buckets:[2 2 3]}}
`,
expr: fmt.Sprintf("%s(series[45s])", function),
expectedWarningAnnotations: []string{},
expectedInfoAnnotations: []string{},
// This can be removed once https://github.com/prometheus/prometheus/pull/14910 is vendored.
skipComparisonWithPrometheusReason: "Prometheus only considers the type of the last point in the vector selector rather than the output value",
}
testCases[fmt.Sprintf("%s() over one point in range", function)] = testCase{
data: `
series 1
`,
expr: fmt.Sprintf("%s(series[1m])", function),
expectedWarningAnnotations: []string{},
expectedInfoAnnotations: []string{},
// This can be removed once https://github.com/prometheus/prometheus/pull/14910 is vendored.
skipComparisonWithPrometheusReason: "Prometheus only considers the type of the last point in the vector selector rather than the output value",
}
}

opts := NewTestEngineOpts()
mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)
Expand Down Expand Up @@ -1867,7 +1874,7 @@ func TestCompareVariousMixedMetricsAggregations(t *testing.T) {
runMixedMetricsTests(t, expressions, pointsPerSeries, seriesData)
}

func TestCompareVariousMixedMetricsRate(t *testing.T) {
func TestCompareVariousMixedMetricsVectorSelectors(t *testing.T) {
labelsToUse, pointsPerSeries, seriesData := getMixedMetricsForTests()

// Test each label individually to catch edge cases in with single series
Expand All @@ -1881,10 +1888,12 @@ func TestCompareVariousMixedMetricsRate(t *testing.T) {

for _, labels := range labelCombinations {
labelRegex := strings.Join(labels, "|")
expressions = append(expressions, fmt.Sprintf(`rate(series{label=~"(%s)"}[45s])`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`rate(series{label=~"(%s)"}[1m])`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`sum(rate(series{label=~"(%s)"}[2m15s]))`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`sum(rate(series{label=~"(%s)"}[5m]))`, labelRegex))
for _, function := range []string{"rate", "increase"} {
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[45s])`, function, labelRegex))
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[1m])`, function, labelRegex))
expressions = append(expressions, fmt.Sprintf(`sum(%s(series{label=~"(%s)"}[2m15s]))`, function, labelRegex))
expressions = append(expressions, fmt.Sprintf(`sum(%s(series{label=~"(%s)"}[5m]))`, function, labelRegex))
}
}

runMixedMetricsTests(t, expressions, pointsPerSeries, seriesData)
Expand Down
1 change: 1 addition & 0 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe
"floor": InstantVectorTransformationFunctionOperatorFactory("floor", functions.Floor),
"histogram_count": InstantVectorTransformationFunctionOperatorFactory("histogram_count", functions.HistogramCount),
"histogram_sum": InstantVectorTransformationFunctionOperatorFactory("histogram_sum", functions.HistogramSum),
"increase": FunctionOverRangeVectorOperatorFactory("increase", functions.Increase),
"last_over_time": FunctionOverRangeVectorOperatorFactory("last_over_time", functions.LastOverTime),
"max_over_time": FunctionOverRangeVectorOperatorFactory("max_over_time", functions.MaxOverTime),
"min_over_time": FunctionOverRangeVectorOperatorFactory("min_over_time", functions.MinOverTime),
Expand Down
Loading

0 comments on commit be0fe27

Please sign in to comment.