engine.go 62.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package promql

import (
17
	"bytes"
18
	"container/heap"
19
	"context"
20 21
	"fmt"
	"math"
Brian Brazil committed
22
	"regexp"
23 24
	"runtime"
	"sort"
Fabian Reinartz committed
25
	"strconv"
26
	"sync"
27 28
	"time"

29 30
	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/log/level"
31
	"github.com/opentracing/opentracing-go"
32
	"github.com/pkg/errors"
33
	"github.com/prometheus/client_golang/prometheus"
34
	"github.com/prometheus/common/model"
Vasily Sliouniaev committed
35
	"github.com/uber/jaeger-client-go"
36

37
	"github.com/prometheus/prometheus/pkg/labels"
Fabian Reinartz committed
38
	"github.com/prometheus/prometheus/pkg/timestamp"
39
	"github.com/prometheus/prometheus/pkg/value"
40
	"github.com/prometheus/prometheus/promql/parser"
41
	"github.com/prometheus/prometheus/storage"
Fabian Reinartz committed
42
	"github.com/prometheus/prometheus/util/stats"
43 44
)

45
const (
46 47 48 49 50
	namespace            = "prometheus"
	subsystem            = "engine"
	queryTag             = "query"
	env                  = "query execution"
	defaultLookbackDelta = 5 * time.Minute
51

52
	// The largest SampleValue that can be converted to an int64 without overflow.
53
	maxInt64 = 9223372036854774784
54
	// The smallest SampleValue that can be converted to an int64 without underflow.
55
	minInt64 = -9223372036854775808
56 57
)

58 59 60
type engineMetrics struct {
	currentQueries       prometheus.Gauge
	maxConcurrentQueries prometheus.Gauge
Julien Pivotto committed
61 62
	queryLogEnabled      prometheus.Gauge
	queryLogFailures     prometheus.Counter
63 64 65 66
	queryQueueTime       prometheus.Observer
	queryPrepareTime     prometheus.Observer
	queryInnerEval       prometheus.Observer
	queryResultSort      prometheus.Observer
67 68
}

69
// convertibleToInt64 returns true if v does not over-/underflow an int64.
70
func convertibleToInt64(v float64) bool {
71 72 73
	return v <= maxInt64 && v >= minInt64
}

74 75 76 77 78
type (
	// ErrQueryTimeout is returned if a query timed out during processing.
	ErrQueryTimeout string
	// ErrQueryCanceled is returned if a query was canceled during processing.
	ErrQueryCanceled string
Ben Kochie committed
79
	// ErrTooManySamples is returned if a query would load more than the maximum allowed samples into memory.
80
	ErrTooManySamples string
81 82
	// ErrStorage is returned if an error was encountered in the storage layer
	// during query handling.
83
	ErrStorage struct{ Err error }
84 85
)

86 87 88 89 90 91 92 93 94
func (e ErrQueryTimeout) Error() string {
	return fmt.Sprintf("query timed out in %s", string(e))
}
func (e ErrQueryCanceled) Error() string {
	return fmt.Sprintf("query was canceled in %s", string(e))
}
func (e ErrTooManySamples) Error() string {
	return fmt.Sprintf("query processing would load too many samples into memory in %s", string(e))
}
95
func (e ErrStorage) Error() string {
96
	return e.Err.Error()
97
}
98

Julien Pivotto committed
99 100 101 102 103 104 105
// QueryLogger is an interface that can be used to log all the queries logged
// by the engine.
type QueryLogger interface {
	Log(...interface{}) error
	Close() error
}

106 107 108
// A Query is derived from an a raw query string and can be run against an engine
// it is associated with.
type Query interface {
Brian Brazil committed
109
	// Exec processes the query. Can only be called once.
110
	Exec(ctx context.Context) *Result
Brian Brazil committed
111 112
	// Close recovers memory used by the query result.
	Close()
Tobias Guggenmos committed
113
	// Statement returns the parsed statement of the query.
114
	Statement() parser.Statement
115
	// Stats returns statistics about the lifetime of the query.
116
	Stats() *stats.QueryTimers
117 118 119 120 121 122
	// Cancel signals that a running query execution should be aborted.
	Cancel()
}

// query implements the Query interface.
type query struct {
123 124
	// Underlying data provider.
	queryable storage.Queryable
125 126
	// The original query string.
	q string
Tobias Guggenmos committed
127
	// Statement of the parsed query.
128
	stmt parser.Statement
129
	// Timer stats for the query execution.
130
	stats *stats.QueryTimers
Brian Brazil committed
131 132
	// Result matrix for reuse.
	matrix Matrix
133
	// Cancellation function for the query.
134 135 136 137 138 139
	cancel func()

	// The engine against which the query is executed.
	ng *Engine
}

140
type QueryOrigin struct{}
Julien Pivotto committed
141

Tobias Guggenmos committed
142
// Statement implements the Query interface.
143
func (q *query) Statement() parser.Statement {
144
	return q.stmt
145 146 147
}

// Stats implements the Query interface.
148
func (q *query) Stats() *stats.QueryTimers {
149 150 151 152 153 154 155 156 157 158
	return q.stats
}

// Cancel implements the Query interface.
func (q *query) Cancel() {
	if q.cancel != nil {
		q.cancel()
	}
}

Brian Brazil committed
159 160 161 162 163 164 165
// Close implements the Query interface.
func (q *query) Close() {
	for _, s := range q.matrix {
		putPointSlice(s.Points)
	}
}

166
// Exec implements the Query interface.
167
func (q *query) Exec(ctx context.Context) *Result {
168 169 170 171
	if span := opentracing.SpanFromContext(ctx); span != nil {
		span.SetTag(queryTag, q.stmt.String())
	}

172
	// Exec query.
173
	res, warnings, err := q.ng.exec(ctx, q)
174

Tobias Guggenmos committed
175
	return &Result{Err: err, Value: res, Warnings: warnings}
176 177 178 179
}

// contextDone returns an error if the context was canceled or timed out.
func contextDone(ctx context.Context, env string) error {
180 181
	if err := ctx.Err(); err != nil {
		return contextErr(err, env)
182
	}
183
	return nil
184 185
}

186 187 188 189 190 191 192 193 194 195 196
func contextErr(err error, env string) error {
	switch err {
	case context.Canceled:
		return ErrQueryCanceled(env)
	case context.DeadlineExceeded:
		return ErrQueryTimeout(env)
	default:
		return err
	}
}

197 198
// EngineOpts contains configuration options used when creating a new Engine.
type EngineOpts struct {
199 200 201 202 203
	Logger             log.Logger
	Reg                prometheus.Registerer
	MaxSamples         int
	Timeout            time.Duration
	ActiveQueryTracker *ActiveQueryTracker
204 205 206
	// LookbackDelta determines the time since the last sample after which a time
	// series is considered stale.
	LookbackDelta time.Duration
207 208 209 210

	// NoStepSubqueryIntervalFn is the default evaluation interval of
	// a subquery in milliseconds if no step in range vector was specified `[30m:<step>]`.
	NoStepSubqueryIntervalFn func(rangeMillis int64) int64
211 212
}

213
// Engine handles the lifetime of queries from beginning to end.
214
// It is connected to a querier.
215
type Engine struct {
216 217 218 219 220 221 222 223 224
	logger                   log.Logger
	metrics                  *engineMetrics
	timeout                  time.Duration
	maxSamplesPerQuery       int
	activeQueryTracker       *ActiveQueryTracker
	queryLogger              QueryLogger
	queryLoggerLock          sync.RWMutex
	lookbackDelta            time.Duration
	noStepSubqueryIntervalFn func(rangeMillis int64) int64
225 226
}

227
// NewEngine returns a new engine.
228 229 230
func NewEngine(opts EngineOpts) *Engine {
	if opts.Logger == nil {
		opts.Logger = log.NewNopLogger()
231
	}
232

233 234 235 236 237 238 239 240 241 242
	queryResultSummary := prometheus.NewSummaryVec(prometheus.SummaryOpts{
		Namespace:  namespace,
		Subsystem:  subsystem,
		Name:       "query_duration_seconds",
		Help:       "Query timings",
		Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
	},
		[]string{"slice"},
	)

243 244 245 246 247 248 249
	metrics := &engineMetrics{
		currentQueries: prometheus.NewGauge(prometheus.GaugeOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "queries",
			Help:      "The current number of queries being executed or waiting.",
		}),
Julien Pivotto committed
250 251 252 253 254 255 256 257 258 259 260 261
		queryLogEnabled: prometheus.NewGauge(prometheus.GaugeOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "query_log_enabled",
			Help:      "State of the query log.",
		}),
		queryLogFailures: prometheus.NewCounter(prometheus.CounterOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "query_log_failures_total",
			Help:      "The number of query log failures.",
		}),
262 263 264 265 266 267
		maxConcurrentQueries: prometheus.NewGauge(prometheus.GaugeOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "queries_concurrent_max",
			Help:      "The max number of concurrent queries.",
		}),
268 269 270 271
		queryQueueTime:   queryResultSummary.WithLabelValues("queue_time"),
		queryPrepareTime: queryResultSummary.WithLabelValues("prepare_time"),
		queryInnerEval:   queryResultSummary.WithLabelValues("inner_eval"),
		queryResultSort:  queryResultSummary.WithLabelValues("result_sort"),
272
	}
273 274 275 276 277 278

	if t := opts.ActiveQueryTracker; t != nil {
		metrics.maxConcurrentQueries.Set(float64(t.GetMaxConcurrent()))
	} else {
		metrics.maxConcurrentQueries.Set(-1)
	}
279

280 281 282
	if opts.LookbackDelta == 0 {
		opts.LookbackDelta = defaultLookbackDelta
		if l := opts.Logger; l != nil {
283
			level.Debug(l).Log("msg", "Lookback delta is zero, setting to default value", "value", defaultLookbackDelta)
284 285 286
		}
	}

287 288
	if opts.Reg != nil {
		opts.Reg.MustRegister(
289 290
			metrics.currentQueries,
			metrics.maxConcurrentQueries,
Julien Pivotto committed
291 292
			metrics.queryLogEnabled,
			metrics.queryLogFailures,
293
			queryResultSummary,
294 295
		)
	}
296

297
	return &Engine{
298 299 300 301 302 303 304
		timeout:                  opts.Timeout,
		logger:                   opts.Logger,
		metrics:                  metrics,
		maxSamplesPerQuery:       opts.MaxSamples,
		activeQueryTracker:       opts.ActiveQueryTracker,
		lookbackDelta:            opts.LookbackDelta,
		noStepSubqueryIntervalFn: opts.NoStepSubqueryIntervalFn,
305 306 307
	}
}

Julien Pivotto committed
308
// SetQueryLogger sets the query logger.
309
func (ng *Engine) SetQueryLogger(l QueryLogger) {
Julien Pivotto committed
310 311 312 313 314 315 316 317
	ng.queryLoggerLock.Lock()
	defer ng.queryLoggerLock.Unlock()

	if ng.queryLogger != nil {
		// An error closing the old file descriptor should
		// not make reload fail; only log a warning.
		err := ng.queryLogger.Close()
		if err != nil {
318
			level.Warn(ng.logger).Log("msg", "Error while closing the previous query log file", "err", err)
Julien Pivotto committed
319 320 321 322 323 324 325 326 327 328 329 330
		}
	}

	ng.queryLogger = l

	if l != nil {
		ng.metrics.queryLogEnabled.Set(1)
	} else {
		ng.metrics.queryLogEnabled.Set(0)
	}
}

331
// NewInstantQuery returns an evaluation query for the given expression at the given time.
332
func (ng *Engine) NewInstantQuery(q storage.Queryable, qs string, ts time.Time) (Query, error) {
Tobias Guggenmos committed
333
	expr, err := parser.ParseExpr(qs)
334 335 336
	if err != nil {
		return nil, err
	}
337
	qry := ng.newQuery(q, expr, ts, ts, 0)
338 339 340
	qry.q = qs

	return qry, nil
341 342 343 344
}

// NewRangeQuery returns an evaluation query for the given time range and with
// the resolution set by the interval.
345
func (ng *Engine) NewRangeQuery(q storage.Queryable, qs string, start, end time.Time, interval time.Duration) (Query, error) {
Tobias Guggenmos committed
346
	expr, err := parser.ParseExpr(qs)
347 348 349
	if err != nil {
		return nil, err
	}
350
	if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar {
Tobias Guggenmos committed
351
		return nil, errors.Errorf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type()))
352
	}
353
	qry := ng.newQuery(q, expr, start, end, interval)
354 355 356 357 358
	qry.q = qs

	return qry, nil
}

359 360
func (ng *Engine) newQuery(q storage.Queryable, expr parser.Expr, start, end time.Time, interval time.Duration) *query {
	es := &parser.EvalStmt{
Tobias Guggenmos committed
361 362 363 364
		Expr:     expr,
		Start:    start,
		End:      end,
		Interval: interval,
365
	}
366
	qry := &query{
367 368
		stmt:      es,
		ng:        ng,
369
		stats:     stats.NewQueryTimers(),
370
		queryable: q,
371
	}
372
	return qry
373 374
}

375
func (ng *Engine) newTestQuery(f func(context.Context) error) Query {
376 377
	qry := &query{
		q:     "test statement",
Tobias Guggenmos committed
378
		stmt:  parser.TestStmt(f),
379
		ng:    ng,
380
		stats: stats.NewQueryTimers(),
381 382 383 384 385 386
	}
	return qry
}

// exec executes the query.
//
Tobias Guggenmos committed
387
// At this point per query only one EvalStmt is evaluated. Alert and record
388
// statements are not handled by the Engine.
389
func (ng *Engine) exec(ctx context.Context, q *query) (v parser.Value, ws storage.Warnings, err error) {
390 391
	ng.metrics.currentQueries.Inc()
	defer ng.metrics.currentQueries.Dec()
392

393
	ctx, cancel := context.WithTimeout(ctx, ng.timeout)
394 395
	q.cancel = cancel

Julien Pivotto committed
396 397 398
	defer func() {
		ng.queryLoggerLock.RLock()
		if l := ng.queryLogger; l != nil {
399 400
			params := make(map[string]interface{}, 4)
			params["query"] = q.q
Tobias Guggenmos committed
401
			if eq, ok := q.Statement().(*parser.EvalStmt); ok {
402 403
				params["start"] = formatDate(eq.Start)
				params["end"] = formatDate(eq.End)
404
				// The step provided by the user is in seconds.
Julien Pivotto committed
405
				params["step"] = int64(eq.Interval / (time.Second / time.Nanosecond))
406 407
			}
			f := []interface{}{"params", params}
Julien Pivotto committed
408 409 410 411
			if err != nil {
				f = append(f, "error", err)
			}
			f = append(f, "stats", stats.NewQueryStats(q.Stats()))
Vasily Sliouniaev committed
412 413 414 415 416
			if span := opentracing.SpanFromContext(ctx); span != nil {
				if spanCtx, ok := span.Context().(jaeger.SpanContext); ok {
					f = append(f, "spanID", spanCtx.SpanID())
				}
			}
417
			if origin := ctx.Value(QueryOrigin{}); origin != nil {
418
				for k, v := range origin.(map[string]interface{}) {
Julien Pivotto committed
419 420 421 422 423 424 425 426 427 428 429
					f = append(f, k, v)
				}
			}
			if err := l.Log(f...); err != nil {
				ng.metrics.queryLogFailures.Inc()
				level.Error(ng.logger).Log("msg", "can't log query", "err", err)
			}
		}
		ng.queryLoggerLock.RUnlock()
	}()

430 431 432 433
	execSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.ExecTotalTime)
	defer execSpanTimer.Finish()

	queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime)
434 435 436 437 438 439 440 441 442
	// Log query in active log. The active log guarantees that we don't run over
	// MaxConcurrent queries.
	if ng.activeQueryTracker != nil {
		queryIndex, err := ng.activeQueryTracker.Insert(ctx, q.q)
		if err != nil {
			queueSpanTimer.Finish()
			return nil, nil, contextErr(err, "query queue")
		}
		defer ng.activeQueryTracker.Delete(queryIndex)
443
	}
444
	queueSpanTimer.Finish()
445

446 447 448
	// Cancel when execution is done or an error was raised.
	defer q.cancel()

449 450
	const env = "query execution"

451 452
	evalSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.EvalTotalTime)
	defer evalSpanTimer.Finish()
453

454 455
	// The base context might already be canceled on the first iteration (e.g. during shutdown).
	if err := contextDone(ctx, env); err != nil {
456
		return nil, nil, err
457
	}
458

Tobias Guggenmos committed
459
	switch s := q.Statement().(type) {
460
	case *parser.EvalStmt:
461
		return ng.execEvalStmt(ctx, q, s)
Tobias Guggenmos committed
462
	case parser.TestStmt:
463
		return nil, nil, s(ctx)
464
	}
465

Tobias Guggenmos committed
466
	panic(errors.Errorf("promql.Engine.exec: unhandled statement of type %T", q.Statement()))
467 468
}

469 470 471 472 473 474 475 476
func timeMilliseconds(t time.Time) int64 {
	return t.UnixNano() / int64(time.Millisecond/time.Nanosecond)
}

func durationMilliseconds(d time.Duration) int64 {
	return int64(d / (time.Millisecond / time.Nanosecond))
}

477
// execEvalStmt evaluates the expression of an evaluation statement for the given time range.
478
func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.EvalStmt) (parser.Value, storage.Warnings, error) {
479
	prepareSpanTimer, ctxPrepare := query.stats.GetSpanTimer(ctx, stats.QueryPreparationTime, ng.metrics.queryPrepareTime)
480 481 482 483 484
	mint := ng.findMinTime(s)
	querier, err := query.queryable.Querier(ctxPrepare, timestamp.FromTime(mint), timestamp.FromTime(s.End))
	if err != nil {
		prepareSpanTimer.Finish()
		return nil, nil, err
485
	}
486 487
	defer querier.Close()

488
	ng.populateSeries(querier, s)
489
	prepareSpanTimer.Finish()
490

491
	evalSpanTimer, ctxInnerEval := query.stats.GetSpanTimer(ctx, stats.InnerEvalTime, ng.metrics.queryInnerEval)
Brian Brazil committed
492
	// Instant evaluation. This is executed as a range evaluation with one step.
493
	if s.Start == s.End && s.Interval == 0 {
494
		start := timeMilliseconds(s.Start)
495
		evaluator := &evaluator{
496 497 498 499 500 501 502 503
			startTimestamp:           start,
			endTimestamp:             start,
			interval:                 1,
			ctx:                      ctxInnerEval,
			maxSamples:               ng.maxSamplesPerQuery,
			logger:                   ng.logger,
			lookbackDelta:            ng.lookbackDelta,
			noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn,
504
		}
505

506
		val, warnings, err := evaluator.Eval(s.Expr)
507
		if err != nil {
508
			return nil, warnings, err
509 510
		}

511
		evalSpanTimer.Finish()
512

513 514 515 516 517 518 519 520
		var mat Matrix

		switch result := val.(type) {
		case Matrix:
			mat = result
		case String:
			return result, warnings, nil
		default:
521
			panic(errors.Errorf("promql.Engine.exec: invalid expression type %q", val.Type()))
522
		}
523

Brian Brazil committed
524
		query.matrix = mat
Tobias Guggenmos committed
525
		switch s.Expr.Type() {
526
		case parser.ValueTypeVector:
Brian Brazil committed
527 528 529 530 531 532
			// Convert matrix with one value per series into vector.
			vector := make(Vector, len(mat))
			for i, s := range mat {
				// Point might have a different timestamp, force it to the evaluation
				// timestamp as that is when we ran the evaluation.
				vector[i] = Sample{Metric: s.Metric, Point: Point{V: s.Points[0].V, T: start}}
533
			}
534
			return vector, warnings, nil
535
		case parser.ValueTypeScalar:
536
			return Scalar{V: mat[0].Points[0].V, T: start}, warnings, nil
537
		case parser.ValueTypeMatrix:
538
			return mat, warnings, nil
539
		default:
Tobias Guggenmos committed
540
			panic(errors.Errorf("promql.Engine.exec: unexpected expression type %q", s.Expr.Type()))
541 542 543
		}
	}

Brian Brazil committed
544 545
	// Range evaluation.
	evaluator := &evaluator{
546 547 548 549 550 551 552 553
		startTimestamp:           timeMilliseconds(s.Start),
		endTimestamp:             timeMilliseconds(s.End),
		interval:                 durationMilliseconds(s.Interval),
		ctx:                      ctxInnerEval,
		maxSamples:               ng.maxSamplesPerQuery,
		logger:                   ng.logger,
		lookbackDelta:            ng.lookbackDelta,
		noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn,
Brian Brazil committed
554
	}
555
	val, warnings, err := evaluator.Eval(s.Expr)
Brian Brazil committed
556
	if err != nil {
557
		return nil, warnings, err
558
	}
559
	evalSpanTimer.Finish()
560

Brian Brazil committed
561 562
	mat, ok := val.(Matrix)
	if !ok {
563
		panic(errors.Errorf("promql.Engine.exec: invalid expression type %q", val.Type()))
564
	}
Brian Brazil committed
565
	query.matrix = mat
566 567

	if err := contextDone(ctx, "expression evaluation"); err != nil {
568
		return nil, warnings, err
569 570
	}

571
	// TODO(fabxc): where to ensure metric labels are a copy from the storage internals.
572
	sortSpanTimer, _ := query.stats.GetSpanTimer(ctx, stats.ResultSortTime, ng.metrics.queryResultSort)
573
	sort.Sort(mat)
574
	sortSpanTimer.Finish()
575

576
	return mat, warnings, nil
577 578
}

579 580 581 582 583 584
// subqueryOffsetRange returns the sum of offsets and ranges of all subqueries in the path.
func (ng *Engine) subqueryOffsetRange(path []parser.Node) (time.Duration, time.Duration) {
	var (
		subqOffset time.Duration
		subqRange  time.Duration
	)
Ganesh Vernekar committed
585 586
	for _, node := range path {
		switch n := node.(type) {
587
		case *parser.SubqueryExpr:
588 589
			subqOffset += n.Offset
			subqRange += n.Range
Ganesh Vernekar committed
590 591
		}
	}
592
	return subqOffset, subqRange
Ganesh Vernekar committed
593 594
}

595
func (ng *Engine) findMinTime(s *parser.EvalStmt) time.Time {
596
	var maxOffset time.Duration
Tobias Guggenmos committed
597
	parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error {
598
		subqOffset, subqRange := ng.subqueryOffsetRange(path)
599
		switch n := node.(type) {
600
		case *parser.VectorSelector:
601 602
			if maxOffset < ng.lookbackDelta+subqOffset+subqRange {
				maxOffset = ng.lookbackDelta + subqOffset + subqRange
Fabian Reinartz committed
603
			}
604 605
			if n.Offset+ng.lookbackDelta+subqOffset+subqRange > maxOffset {
				maxOffset = n.Offset + ng.lookbackDelta + subqOffset + subqRange
606
			}
Tobias Guggenmos committed
607
		case *parser.MatrixSelector:
608 609
			if maxOffset < n.Range+subqOffset+subqRange {
				maxOffset = n.Range + subqOffset + subqRange
Fabian Reinartz committed
610
			}
611
			if m := n.VectorSelector.(*parser.VectorSelector).Offset + n.Range + subqOffset + subqRange; m > maxOffset {
612
				maxOffset = m
613 614
			}
		}
615
		return nil
616
	})
617 618
	return s.Start.Add(-maxOffset)
}
619

620 621 622 623 624
func (ng *Engine) populateSeries(querier storage.Querier, s *parser.EvalStmt) {
	// Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range.
	// The evaluation of the VectorSelector inside then evaluates the given range and unsets
	// the variable.
	var evalRange time.Duration
625

Tobias Guggenmos committed
626
	parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error {
627
		switch n := node.(type) {
628
		case *parser.VectorSelector:
629 630 631
			hints := &storage.SelectHints{
				Start: timestamp.FromTime(s.Start),
				End:   timestamp.FromTime(s.End),
632
				Step:  durationMilliseconds(s.Interval),
633 634 635
			}

			// We need to make sure we select the timerange selected by the subquery.
636 637
			// The subqueryOffsetRange function gives the sum of range and the
			// sum of offset.
638
			// TODO(bwplotka): Add support for better hints when subquerying. See: https://github.com/prometheus/prometheus/issues/7630.
639
			subqOffset, subqRange := ng.subqueryOffsetRange(path)
640
			offsetMilliseconds := durationMilliseconds(subqOffset)
641 642
			hints.Start = hints.Start - offsetMilliseconds - durationMilliseconds(subqRange)
			hints.End = hints.End - offsetMilliseconds
643

644
			if evalRange == 0 {
645
				hints.Start = hints.Start - durationMilliseconds(ng.lookbackDelta)
646
			} else {
647
				hints.Range = durationMilliseconds(evalRange)
648 649
				// For all matrix queries we want to ensure that we have (end-start) + range selected
				// this way we have `range` data before the start time
650
				hints.Start = hints.Start - durationMilliseconds(evalRange)
651 652 653
				evalRange = 0
			}

654 655
			hints.Func = extractFuncFromPath(path)
			hints.By, hints.Grouping = extractGroupsFromPath(path)
656 657
			if n.Offset > 0 {
				offsetMilliseconds := durationMilliseconds(n.Offset)
658 659
				hints.Start = hints.Start - offsetMilliseconds
				hints.End = hints.End - offsetMilliseconds
660
			}
661

662
			n.UnexpandedSeriesSet = querier.Select(false, hints, n.LabelMatchers...)
Tobias Guggenmos committed
663
		case *parser.MatrixSelector:
664
			evalRange = n.Range
665
		}
666
		return nil
667
	})
668 669
}

670 671
// extractFuncFromPath walks up the path and searches for the first instance of
// a function or aggregation.
672
func extractFuncFromPath(p []parser.Node) string {
673 674 675 676
	if len(p) == 0 {
		return ""
	}
	switch n := p[len(p)-1].(type) {
Tobias Guggenmos committed
677
	case *parser.AggregateExpr:
678
		return n.Op.String()
Tobias Guggenmos committed
679
	case *parser.Call:
680
		return n.Func.Name
Tobias Guggenmos committed
681
	case *parser.BinaryExpr:
682 683 684 685 686 687 688
		// If we hit a binary expression we terminate since we only care about functions
		// or aggregations over a single metric.
		return ""
	}
	return extractFuncFromPath(p[:len(p)-1])
}

689
// extractGroupsFromPath parses vector outer function and extracts grouping information if by or without was used.
690
func extractGroupsFromPath(p []parser.Node) (bool, []string) {
691 692 693 694
	if len(p) == 0 {
		return false, nil
	}
	switch n := p[len(p)-1].(type) {
Tobias Guggenmos committed
695
	case *parser.AggregateExpr:
696 697 698 699 700
		return !n.Without, n.Grouping
	}
	return false, nil
}

701
func checkAndExpandSeriesSet(ctx context.Context, expr parser.Expr) (storage.Warnings, error) {
702
	switch e := expr.(type) {
Tobias Guggenmos committed
703
	case *parser.MatrixSelector:
704
		return checkAndExpandSeriesSet(ctx, e.VectorSelector)
705
	case *parser.VectorSelector:
706 707
		if e.Series != nil {
			return nil, nil
708
		}
709 710 711
		series, ws, err := expandSeriesSet(ctx, e.UnexpandedSeriesSet)
		e.Series = series
		return ws, err
712
	}
713
	return nil, nil
714 715
}

716
func expandSeriesSet(ctx context.Context, it storage.SeriesSet) (res []storage.Series, ws storage.Warnings, err error) {
717
	for it.Next() {
718 719
		select {
		case <-ctx.Done():
720
			return nil, nil, ctx.Err()
721 722
		default:
		}
723
		res = append(res, it.At())
724
	}
725 726 727 728 729 730
	return res, it.Warnings(), it.Err()
}

type errWithWarnings struct {
	err      error
	warnings storage.Warnings
731 732
}

733 734
func (e errWithWarnings) Error() string { return e.err.Error() }

Brian Brazil committed
735 736 737
// An evaluator evaluates given expressions over given fixed timestamps. It
// is attached to an engine through which it connects to a querier and reports
// errors. On timeout or cancellation of its context it terminates.
738 739 740
type evaluator struct {
	ctx context.Context

Brian Brazil committed
741
	startTimestamp int64 // Start time in milliseconds.
742 743
	endTimestamp   int64 // End time in milliseconds.
	interval       int64 // Interval in milliseconds.
744

745 746 747 748 749
	maxSamples               int
	currentSamples           int
	logger                   log.Logger
	lookbackDelta            time.Duration
	noStepSubqueryIntervalFn func(rangeMillis int64) int64
750 751
}

Brian Brazil committed
752
// errorf causes a panic with the input formatted into an error.
753
func (ev *evaluator) errorf(format string, args ...interface{}) {
754
	ev.error(errors.Errorf(format, args...))
755 756
}

Brian Brazil committed
757
// error causes a panic with the given error.
758 759 760 761 762
func (ev *evaluator) error(err error) {
	panic(err)
}

// recover is the handler that turns panics into returns from the top level of evaluation.
763
func (ev *evaluator) recover(ws *storage.Warnings, errp *error) {
764
	e := recover()
765 766 767
	if e == nil {
		return
	}
768 769 770

	switch err := e.(type) {
	case runtime.Error:
771 772 773 774 775
		// Print the stack trace but do not inhibit the running application.
		buf := make([]byte, 64<<10)
		buf = buf[:runtime.Stack(buf, false)]

		level.Error(ev.logger).Log("msg", "runtime panic in parser", "err", e, "stacktrace", string(buf))
776
		*errp = errors.Wrap(err, "unexpected error")
777 778 779 780
	case errWithWarnings:
		*errp = err.err
		*ws = append(*ws, err.warnings...)
	default:
781
		*errp = e.(error)
782 783 784
	}
}

785 786 787 788 789
func (ev *evaluator) Eval(expr parser.Expr) (v parser.Value, ws storage.Warnings, err error) {
	defer ev.recover(&ws, &err)

	v, ws = ev.eval(expr)
	return v, ws, nil
790 791
}

792
// EvalNodeHelper stores extra information and caches for evaluating a single node across steps.
Brian Brazil committed
793 794
type EvalNodeHelper struct {
	// Evaluation timestamp.
795
	Ts int64
Brian Brazil committed
796
	// Vector that can be used for output.
797
	Out Vector
Brian Brazil committed
798 799

	// Caches.
800 801
	// DropMetricName and label_*.
	Dmn map[uint64]labels.Labels
Brian Brazil committed
802
	// signatureFunc.
803
	sigf map[string]string
Brian Brazil committed
804
	// funcHistogramQuantile.
805
	signatureToMetricWithBuckets map[string]*metricWithBuckets
Brian Brazil committed
806 807 808
	// label_replace.
	regex *regexp.Regexp

809 810 811 812
	lb           *labels.Builder
	lblBuf       []byte
	lblResultBuf []byte

Brian Brazil committed
813
	// For binary vector matching.
814 815 816
	rightSigs    map[string]Sample
	matchedSigs  map[string]map[uint64]struct{}
	resultMetric map[string]labels.Labels
817 818
}

819 820 821 822
// DropMetricName is a cached version of DropMetricName.
func (enh *EvalNodeHelper) DropMetricName(l labels.Labels) labels.Labels {
	if enh.Dmn == nil {
		enh.Dmn = make(map[uint64]labels.Labels, len(enh.Out))
823
	}
Brian Brazil committed
824
	h := l.Hash()
825
	ret, ok := enh.Dmn[h]
Brian Brazil committed
826 827
	if ok {
		return ret
828
	}
Brian Brazil committed
829
	ret = dropMetricName(l)
830
	enh.Dmn[h] = ret
Brian Brazil committed
831
	return ret
832 833
}

834
func (enh *EvalNodeHelper) signatureFunc(on bool, names ...string) func(labels.Labels) string {
Brian Brazil committed
835
	if enh.sigf == nil {
836
		enh.sigf = make(map[string]string, len(enh.Out))
Brian Brazil committed
837
	}
838 839 840 841
	f := signatureFunc(on, enh.lblBuf, names...)
	return func(l labels.Labels) string {
		enh.lblBuf = l.Bytes(enh.lblBuf)
		ret, ok := enh.sigf[string(enh.lblBuf)]
Brian Brazil committed
842 843 844 845
		if ok {
			return ret
		}
		ret = f(l)
846
		enh.sigf[string(enh.lblBuf)] = ret
Brian Brazil committed
847
		return ret
Julius Volz committed
848 849 850
	}
}

Brian Brazil committed
851 852
// rangeEval evaluates the given expressions, and then for each step calls
// the given function with the values computed for each expression at that
853
// step.  The return value is the combination into time series of all the
Brian Brazil committed
854
// function call results.
855
func (ev *evaluator) rangeEval(f func([]parser.Value, *EvalNodeHelper) (Vector, storage.Warnings), exprs ...parser.Expr) (Matrix, storage.Warnings) {
Brian Brazil committed
856 857 858
	numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
	matrixes := make([]Matrix, len(exprs))
	origMatrixes := make([]Matrix, len(exprs))
859 860
	originalNumSamples := ev.currentSamples

861
	var warnings storage.Warnings
Brian Brazil committed
862 863
	for i, e := range exprs {
		// Functions will take string arguments from the expressions, not the values.
864
		if e != nil && e.Type() != parser.ValueTypeString {
865
			// ev.currentSamples will be updated to the correct value within the ev.eval call.
866 867 868
			val, ws := ev.eval(e)
			warnings = append(warnings, ws...)
			matrixes[i] = val.(Matrix)
Brian Brazil committed
869 870 871 872 873 874

			// Keep a copy of the original point slices so that they
			// can be returned to the pool.
			origMatrixes[i] = make(Matrix, len(matrixes[i]))
			copy(origMatrixes[i], matrixes[i])
		}
875 876
	}

877 878
	vectors := make([]Vector, len(exprs))    // Input vectors for the function.
	args := make([]parser.Value, len(exprs)) // Argument to function.
Brian Brazil committed
879 880 881 882 883 884 885 886 887
	// Create an output vector that is as big as the input matrix with
	// the most time series.
	biggestLen := 1
	for i := range exprs {
		vectors[i] = make(Vector, 0, len(matrixes[i]))
		if len(matrixes[i]) > biggestLen {
			biggestLen = len(matrixes[i])
		}
	}
888
	enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen)}
Brian Brazil committed
889
	seriess := make(map[uint64]Series, biggestLen) // Output series by series hash.
890
	tempNumSamples := ev.currentSamples
Brian Brazil committed
891
	for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
892 893 894
		if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
			ev.error(err)
		}
895 896
		// Reset number of samples in memory after each timestamp.
		ev.currentSamples = tempNumSamples
Brian Brazil committed
897 898 899 900 901 902
		// Gather input vectors for this timestamp.
		for i := range exprs {
			vectors[i] = vectors[i][:0]
			for si, series := range matrixes[i] {
				for _, point := range series.Points {
					if point.T == ts {
903 904 905 906 907 908 909 910 911
						if ev.currentSamples < ev.maxSamples {
							vectors[i] = append(vectors[i], Sample{Metric: series.Metric, Point: point})
							// Move input vectors forward so we don't have to re-scan the same
							// past points at the next step.
							matrixes[i][si].Points = series.Points[1:]
							ev.currentSamples++
						} else {
							ev.error(ErrTooManySamples(env))
						}
Brian Brazil committed
912 913 914 915 916 917 918
					}
					break
				}
			}
			args[i] = vectors[i]
		}
		// Make the function call.
919
		enh.Ts = ts
920
		result, ws := f(args, enh)
921 922 923
		if result.ContainsSameLabelset() {
			ev.errorf("vector cannot contain metrics with the same labelset")
		}
924
		enh.Out = result[:0] // Reuse result vector.
925
		warnings = append(warnings, ws...)
926 927 928 929 930 931 932 933 934 935

		ev.currentSamples += len(result)
		// When we reset currentSamples to tempNumSamples during the next iteration of the loop it also
		// needs to include the samples from the result here, as they're still in memory.
		tempNumSamples += len(result)

		if ev.currentSamples > ev.maxSamples {
			ev.error(ErrTooManySamples(env))
		}

Brian Brazil committed
936 937 938 939 940 941 942
		// If this could be an instant query, shortcut so as not to change sort order.
		if ev.endTimestamp == ev.startTimestamp {
			mat := make(Matrix, len(result))
			for i, s := range result {
				s.Point.T = ts
				mat[i] = Series{Metric: s.Metric, Points: []Point{s.Point}}
			}
943
			ev.currentSamples = originalNumSamples + mat.TotalSamples()
944
			return mat, warnings
Brian Brazil committed
945
		}
946

Brian Brazil committed
947 948 949 950 951 952 953 954 955 956 957 958 959
		// Add samples in output vector to output series.
		for _, sample := range result {
			h := sample.Metric.Hash()
			ss, ok := seriess[h]
			if !ok {
				ss = Series{
					Metric: sample.Metric,
					Points: getPointSlice(numSteps),
				}
			}
			sample.Point.T = ts
			ss.Points = append(ss.Points, sample.Point)
			seriess[h] = ss
960

Brian Brazil committed
961 962
		}
	}
963

Brian Brazil committed
964 965 966 967 968 969
	// Reuse the original point slices.
	for _, m := range origMatrixes {
		for _, s := range m {
			putPointSlice(s.Points)
		}
	}
970
	// Assemble the output matrix. By the time we get here we know we don't have too many samples.
Brian Brazil committed
971 972 973 974
	mat := make(Matrix, 0, len(seriess))
	for _, ss := range seriess {
		mat = append(mat, ss)
	}
975
	ev.currentSamples = originalNumSamples + mat.TotalSamples()
976
	return mat, warnings
977 978
}

Tobias Guggenmos committed
979 980
// evalSubquery evaluates given SubqueryExpr and returns an equivalent
// evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set.
981 982 983
func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSelector, storage.Warnings) {
	val, ws := ev.eval(subq)
	mat := val.(Matrix)
984
	vs := &parser.VectorSelector{
985
		Offset: subq.Offset,
986
		Series: make([]storage.Series, 0, len(mat)),
987
	}
Tobias Guggenmos committed
988
	ms := &parser.MatrixSelector{
989 990
		Range:          subq.Range,
		VectorSelector: vs,
Ganesh Vernekar committed
991
	}
992
	for _, s := range mat {
993
		vs.Series = append(vs.Series, NewStorageSeries(s))
Ganesh Vernekar committed
994
	}
995
	return ms, ws
Ganesh Vernekar committed
996 997
}

998
// eval evaluates the given expression as the given AST expression node requires.
999
func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
1000
	// This is the top-level evaluation method.
1001
	// Thus, we check for timeout/cancellation here.
1002 1003 1004
	if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
		ev.error(err)
	}
Brian Brazil committed
1005
	numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
1006 1007

	switch e := expr.(type) {
1008
	case *parser.AggregateExpr:
1009
		unwrapParenExpr(&e.Param)
1010
		if s, ok := e.Param.(*parser.StringLiteral); ok {
1011 1012
			return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
				return ev.aggregation(e.Op, e.Grouping, e.Without, s.Val, v[0].(Vector), enh), nil
1013
			}, e.Expr)
Brian Brazil committed
1014
		}
1015
		return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) {
Brian Brazil committed
1016 1017 1018 1019
			var param float64
			if e.Param != nil {
				param = v[0].(Vector)[0].V
			}
1020
			return ev.aggregation(e.Op, e.Grouping, e.Without, param, v[1].(Vector), enh), nil
1021
		}, e.Param, e.Expr)
1022