diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f82a189c6..682366df62 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ * `cortex_alertmanager_alerts` * `cortex_alertmanager_silences` * [CHANGE] Distributor: Drop experimental `-distributor.direct-otlp-translation-enabled` flag, since direct OTLP translation is well tested at this point. #9647 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 #9651 * [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028 * [FEATURE] gRPC: Support S2 compression. #9322 * `-alertmanager.alertmanager-client.grpc-compression=s2` diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 913598e465..31994ffbf5 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -20,6 +20,7 @@ type InstantVectorFunctionOperatorFactory func( memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, + timeRange types.QueryTimeRange, ) (types.InstantVectorOperator, error) type ScalarFunctionOperatorFactory func( @@ -37,7 +38,7 @@ type ScalarFunctionOperatorFactory func( // - name: The name of the function // - f: The function implementation func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionOverInstantVector) InstantVectorFunctionOperatorFactory { - return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange) (types.InstantVectorOperator, error) { + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) { if len(args) != 1 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", name, len(args)) @@ -101,7 +102,7 @@ func FunctionOverRangeVectorOperatorFactory( name string, f functions.FunctionOverRangeVector, ) InstantVectorFunctionOperatorFactory { - return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange) (types.InstantVectorOperator, error) { + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) { if len(args) != 1 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", name, len(args)) @@ -123,7 +124,7 @@ func FunctionOverRangeVectorOperatorFactory( } } -func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange) (types.InstantVectorOperator, error) { +func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) { if len(args) != 1 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 1 argument for vector, got %v", len(args)) @@ -139,7 +140,7 @@ func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.Mem } func LabelReplaceFunctionOperatorFactory() InstantVectorFunctionOperatorFactory { - return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange) (types.InstantVectorOperator, error) { + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) { if len(args) != 5 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 5 argument for label_replace, got %v", len(args)) @@ -190,7 +191,7 @@ func LabelReplaceFunctionOperatorFactory() InstantVectorFunctionOperatorFactory } func ClampFunctionOperatorFactory() InstantVectorFunctionOperatorFactory { - return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange) (types.InstantVectorOperator, error) { + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) { if len(args) != 3 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 3 argument for clamp, got %v", len(args)) @@ -224,7 +225,7 @@ func ClampFunctionOperatorFactory() InstantVectorFunctionOperatorFactory { } func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) InstantVectorFunctionOperatorFactory { - return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange) (types.InstantVectorOperator, error) { + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) { if len(args) != 2 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 2 argument for %s, got %v", functionName, len(args)) @@ -251,6 +252,42 @@ func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) Instant } } +func RoundFunctionOperatorFactory() InstantVectorFunctionOperatorFactory { + return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) { + if len(args) != 1 && len(args) != 2 { + // Should be caught by the PromQL parser, but we check here for safety. + return nil, fmt.Errorf("expected exactly 1 or 2 argument for round, got %v", len(args)) + } + + inner, ok := args[0].(types.InstantVectorOperator) + if !ok { + // Should be caught by the PromQL parser, but we check here for safety. + return nil, fmt.Errorf("expected an instant vector for 1st argument for round, got %T", args[0]) + } + + var toNearest types.ScalarOperator + if len(args) == 2 { + toNearest, ok = args[1].(types.ScalarOperator) + if !ok { + // Should be caught by the PromQL parser, but we check here for safety. + return nil, fmt.Errorf("expected a scalar for 2nd argument for round, got %T", args[1]) + } + } else { + toNearest = operators.NewScalarConstant(float64(1), timeRange, memoryConsumptionTracker, expressionPosition) + } + + f := functions.FunctionOverInstantVector{ + SeriesDataFunc: functions.Round, + // TODO(jhesketh): With the currently vendored prometheus, round does not consistently drop the __name__ label + // (as verified by our tests). We match this for consistency, but will + // need to drop them once prometheus 3.0 is vendored in. + SeriesMetadataFunction: functions.SeriesMetadataFunctionDefinition{}, + } + + return operators.NewFunctionOverInstantVector(inner, []types.ScalarOperator{toNearest}, memoryConsumptionTracker, f, expressionPosition), nil + } +} + // These functions return an instant-vector. var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOperatorFactory{ // Please keep this list sorted alphabetically. @@ -265,8 +302,8 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "avg_over_time": FunctionOverRangeVectorOperatorFactory("avg_over_time", functions.AvgOverTime), "ceil": InstantVectorTransformationFunctionOperatorFactory("ceil", functions.Ceil), "clamp": ClampFunctionOperatorFactory(), - "clamp_min": ClampMinMaxFunctionOperatorFactory("clamp_min", true), "clamp_max": ClampMinMaxFunctionOperatorFactory("clamp_max", false), + "clamp_min": ClampMinMaxFunctionOperatorFactory("clamp_min", true), "cos": InstantVectorTransformationFunctionOperatorFactory("cos", functions.Cos), "cosh": InstantVectorTransformationFunctionOperatorFactory("cosh", functions.Cosh), "count_over_time": FunctionOverRangeVectorOperatorFactory("count_over_time", functions.CountOverTime), @@ -278,14 +315,15 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "increase": FunctionOverRangeVectorOperatorFactory("increase", functions.Increase), "label_replace": LabelReplaceFunctionOperatorFactory(), "last_over_time": FunctionOverRangeVectorOperatorFactory("last_over_time", functions.LastOverTime), - "max_over_time": FunctionOverRangeVectorOperatorFactory("max_over_time", functions.MaxOverTime), - "min_over_time": FunctionOverRangeVectorOperatorFactory("min_over_time", functions.MinOverTime), "ln": InstantVectorTransformationFunctionOperatorFactory("ln", functions.Ln), "log10": InstantVectorTransformationFunctionOperatorFactory("log10", functions.Log10), "log2": InstantVectorTransformationFunctionOperatorFactory("log2", functions.Log2), + "max_over_time": FunctionOverRangeVectorOperatorFactory("max_over_time", functions.MaxOverTime), + "min_over_time": FunctionOverRangeVectorOperatorFactory("min_over_time", functions.MinOverTime), "present_over_time": FunctionOverRangeVectorOperatorFactory("present_over_time", functions.PresentOverTime), "rad": InstantVectorTransformationFunctionOperatorFactory("rad", functions.Rad), "rate": FunctionOverRangeVectorOperatorFactory("rate", functions.Rate), + "round": RoundFunctionOperatorFactory(), "sgn": InstantVectorTransformationFunctionOperatorFactory("sgn", functions.Sgn), "sin": InstantVectorTransformationFunctionOperatorFactory("sin", functions.Sin), "sinh": InstantVectorTransformationFunctionOperatorFactory("sinh", functions.Sinh), diff --git a/pkg/streamingpromql/functions/math.go b/pkg/streamingpromql/functions/math.go index f6d31cb6ff..571ad76999 100644 --- a/pkg/streamingpromql/functions/math.go +++ b/pkg/streamingpromql/functions/math.go @@ -134,3 +134,32 @@ func ClampMinMaxFactory(isMin bool) InstantVectorSeriesFunction { return seriesData, nil } } + +// round returns a number rounded to toNearest. +// Ties are solved by rounding up. +var Round InstantVectorSeriesFunction = func(seriesData types.InstantVectorSeriesData, scalarArgsData []types.ScalarData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { + toNearest := scalarArgsData[0] + + // There will always be a scalar at every step of the query. + // However, there may not be a sample at a step. So we need to + // keep track of where we are up to step-wise with the scalars, + // incrementing through the scalars until their timestamp matches + // the samples. + argIdx := 0 + + for step, data := range seriesData.Floats { + for data.T > toNearest.Samples[argIdx].T { + argIdx++ + } + + // Invert as it seems to cause fewer floating point accuracy issues. + toNearestInverse := 1.0 / toNearest.Samples[argIdx].F + + // We reuse the existing FPoint slice in place + seriesData.Floats[step].F = math.Floor(data.F*toNearestInverse+0.5) / toNearestInverse + } + // Histograms are dropped from Round + types.HPointSlicePool.Put(seriesData.Histograms, memoryConsumptionTracker) + seriesData.Histograms = nil + return seriesData, nil +} diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index 8f9758df56..8418cfcd2e 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -302,7 +302,7 @@ func (q *Query) convertFunctionCallToInstantVectorOperator(e *parser.Call) (type args[i] = a } - return factory(args, q.memoryConsumptionTracker, q.annotations, e.PosRange) + return factory(args, q.memoryConsumptionTracker, q.annotations, e.PosRange, q.timeRange) } func (q *Query) convertToRangeVectorOperator(expr parser.Expr) (types.RangeVectorOperator, error) { diff --git a/pkg/streamingpromql/testdata/ours-only/native_histograms.test b/pkg/streamingpromql/testdata/ours-only/native_histograms.test index 4b243d036f..a9513c3b23 100644 --- a/pkg/streamingpromql/testdata/ours-only/native_histograms.test +++ b/pkg/streamingpromql/testdata/ours-only/native_histograms.test @@ -13,3 +13,9 @@ eval range from 0 to 5m step 1m clamp_min(mixed_metric, 2) eval range from 0 to 5m step 1m clamp_max(mixed_metric, 2) {} _ 1 2 2 + +# round ignores any histograms +# Prometheus currently returns 0 instead of no value as per the documentation +# https://github.com/prometheus/prometheus/pull/15176 +eval range from 0 to 5m step 1m round(mixed_metric) + mixed_metric _ 1 2 3 diff --git a/pkg/streamingpromql/testdata/ours/functions.test b/pkg/streamingpromql/testdata/ours/functions.test index 525812f43d..83cbebb23c 100644 --- a/pkg/streamingpromql/testdata/ours/functions.test +++ b/pkg/streamingpromql/testdata/ours/functions.test @@ -436,3 +436,21 @@ eval range from 0 to 54m step 6m clamp_min(series, scalar(mins)) eval range from 0 to 54m step 6m clamp_max(series, scalar(maxes)) {} 0 _ 0 0 5 _ 3 NaN 2 NaN + +clear + +load 6m + toNearest 1 _ 3 4 0.1 + series{a="b"} -5.5 2.75 _ _ 6.87 + +eval range from 0 to 24m step 6m round(series, scalar(toNearest)) + series{a="b"} -5 NaN _ _ 6.9 + +clear + +load 6m + toNearest 1 -0.1 0.5 10 0.1 0.25 100 0.0001 5 1000 NaN _ _ NaN _ 2 2 + series{a="b"} -5.5 2.75 0.25 15.5 9.999 3.14159 -0.999999 1000.01 0.49999999 -1e6 10 20 NaN NaN _ _ 10.5 + +eval range from 0 to 96m step 6m round(series, scalar(toNearest)) + series{a="b"} -5 2.7 0.5 20 10 3.25 0 1000.01 0 -1000000 NaN NaN NaN NaN _ _ 10 diff --git a/pkg/streamingpromql/testdata/upstream/aggregators.test b/pkg/streamingpromql/testdata/upstream/aggregators.test index e29151a04d..eb7991bcb2 100644 --- a/pkg/streamingpromql/testdata/upstream/aggregators.test +++ b/pkg/streamingpromql/testdata/upstream/aggregators.test @@ -132,70 +132,58 @@ eval instant at 50m ceil(0.004 * http_requests{group="production",job="api-serve {group="production", instance="0", job="api-server"} 1 {group="production", instance="1", job="api-server"} 1 -# Unsupported by streaming engine. -# eval instant at 50m round(0.004 * http_requests{group="production",job="api-server"}) -# {group="production", instance="0", job="api-server"} 0 -# {group="production", instance="1", job="api-server"} 1 +eval instant at 50m round(0.004 * http_requests{group="production",job="api-server"}) + {group="production", instance="0", job="api-server"} 0 + {group="production", instance="1", job="api-server"} 1 # Round should correctly handle negative numbers. -# Unsupported by streaming engine. -# eval instant at 50m round(-1 * (0.004 * http_requests{group="production",job="api-server"})) -# {group="production", instance="0", job="api-server"} 0 -# {group="production", instance="1", job="api-server"} -1 +eval instant at 50m round(-1 * (0.004 * http_requests{group="production",job="api-server"})) + {group="production", instance="0", job="api-server"} 0 + {group="production", instance="1", job="api-server"} -1 # Round should round half up. -# Unsupported by streaming engine. -# eval instant at 50m round(0.005 * http_requests{group="production",job="api-server"}) -# {group="production", instance="0", job="api-server"} 1 -# {group="production", instance="1", job="api-server"} 1 +eval instant at 50m round(0.005 * http_requests{group="production",job="api-server"}) + {group="production", instance="0", job="api-server"} 1 + {group="production", instance="1", job="api-server"} 1 -# Unsupported by streaming engine. -# eval instant at 50m round(-1 * (0.005 * http_requests{group="production",job="api-server"})) -# {group="production", instance="0", job="api-server"} 0 -# {group="production", instance="1", job="api-server"} -1 +eval instant at 50m round(-1 * (0.005 * http_requests{group="production",job="api-server"})) + {group="production", instance="0", job="api-server"} 0 + {group="production", instance="1", job="api-server"} -1 -# Unsupported by streaming engine. -# eval instant at 50m round(1 + 0.005 * http_requests{group="production",job="api-server"}) -# {group="production", instance="0", job="api-server"} 2 -# {group="production", instance="1", job="api-server"} 2 +eval instant at 50m round(1 + 0.005 * http_requests{group="production",job="api-server"}) + {group="production", instance="0", job="api-server"} 2 + {group="production", instance="1", job="api-server"} 2 -# Unsupported by streaming engine. -# eval instant at 50m round(-1 * (1 + 0.005 * http_requests{group="production",job="api-server"})) -# {group="production", instance="0", job="api-server"} -1 -# {group="production", instance="1", job="api-server"} -2 +eval instant at 50m round(-1 * (1 + 0.005 * http_requests{group="production",job="api-server"})) + {group="production", instance="0", job="api-server"} -1 + {group="production", instance="1", job="api-server"} -2 # Round should accept the number to round nearest to. -# Unsupported by streaming engine. -# eval instant at 50m round(0.0005 * http_requests{group="production",job="api-server"}, 0.1) -# {group="production", instance="0", job="api-server"} 0.1 -# {group="production", instance="1", job="api-server"} 0.1 +eval instant at 50m round(0.0005 * http_requests{group="production",job="api-server"}, 0.1) + {group="production", instance="0", job="api-server"} 0.1 + {group="production", instance="1", job="api-server"} 0.1 -# Unsupported by streaming engine. -# eval instant at 50m round(2.1 + 0.0005 * http_requests{group="production",job="api-server"}, 0.1) -# {group="production", instance="0", job="api-server"} 2.2 -# {group="production", instance="1", job="api-server"} 2.2 +eval instant at 50m round(2.1 + 0.0005 * http_requests{group="production",job="api-server"}, 0.1) + {group="production", instance="0", job="api-server"} 2.2 + {group="production", instance="1", job="api-server"} 2.2 -# Unsupported by streaming engine. -# eval instant at 50m round(5.2 + 0.0005 * http_requests{group="production",job="api-server"}, 0.1) -# {group="production", instance="0", job="api-server"} 5.3 -# {group="production", instance="1", job="api-server"} 5.3 +eval instant at 50m round(5.2 + 0.0005 * http_requests{group="production",job="api-server"}, 0.1) + {group="production", instance="0", job="api-server"} 5.3 + {group="production", instance="1", job="api-server"} 5.3 # Round should work correctly with negative numbers and multiple decimal places. -# Unsupported by streaming engine. -# eval instant at 50m round(-1 * (5.2 + 0.0005 * http_requests{group="production",job="api-server"}), 0.1) -# {group="production", instance="0", job="api-server"} -5.2 -# {group="production", instance="1", job="api-server"} -5.3 +eval instant at 50m round(-1 * (5.2 + 0.0005 * http_requests{group="production",job="api-server"}), 0.1) + {group="production", instance="0", job="api-server"} -5.2 + {group="production", instance="1", job="api-server"} -5.3 # Round should work correctly with big toNearests. -# Unsupported by streaming engine. -# eval instant at 50m round(0.025 * http_requests{group="production",job="api-server"}, 5) -# {group="production", instance="0", job="api-server"} 5 -# {group="production", instance="1", job="api-server"} 5 +eval instant at 50m round(0.025 * http_requests{group="production",job="api-server"}, 5) + {group="production", instance="0", job="api-server"} 5 + {group="production", instance="1", job="api-server"} 5 -# Unsupported by streaming engine. -# eval instant at 50m round(0.045 * http_requests{group="production",job="api-server"}, 5) -# {group="production", instance="0", job="api-server"} 5 -# {group="production", instance="1", job="api-server"} 10 +eval instant at 50m round(0.045 * http_requests{group="production",job="api-server"}, 5) + {group="production", instance="0", job="api-server"} 5 + {group="production", instance="1", job="api-server"} 10 # Standard deviation and variance. eval instant at 50m stddev(http_requests)