target.go 11.8 KB
Newer Older
1
// Copyright 2013 The Prometheus Authors
2 3 4 5 6 7 8 9 10 11 12
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
13

14
package scrape
15 16

import (
17
	"fmt"
18
	"hash/fnv"
19
	"net"
20
	"net/url"
21
	"strings"
22
	"sync"
23
	"time"
24

25
	"github.com/pkg/errors"
26
	"github.com/prometheus/common/model"
27

28
	"github.com/prometheus/prometheus/config"
29
	"github.com/prometheus/prometheus/discovery/targetgroup"
30 31
	"github.com/prometheus/prometheus/pkg/labels"
	"github.com/prometheus/prometheus/pkg/relabel"
32
	"github.com/prometheus/prometheus/pkg/textparse"
33
	"github.com/prometheus/prometheus/pkg/value"
34
	"github.com/prometheus/prometheus/storage"
35 36
)

37
// TargetHealth describes the health state of a target.
38
type TargetHealth string
39

40
// The possible health states of a target based on the last performed scrape.
41
const (
42 43 44
	HealthUnknown TargetHealth = "unknown"
	HealthGood    TargetHealth = "up"
	HealthBad     TargetHealth = "down"
45 46
)

47 48
// Target refers to a singular HTTP or HTTPS endpoint.
type Target struct {
49
	// Labels before any processing.
50
	discoveredLabels labels.Labels
51
	// Any labels that are added to this target and its metrics.
52
	labels labels.Labels
53
	// Additional URL parameters that are part of the target URL.
54
	params url.Values
55

56 57 58 59 60
	mtx                sync.RWMutex
	lastError          error
	lastScrape         time.Time
	lastScrapeDuration time.Duration
	health             TargetHealth
61
	metadata           MetricMetadataStore
62 63
}

64
// NewTarget creates a reasonably configured target for querying.
65
func NewTarget(labels, discoveredLabels labels.Labels, params url.Values) *Target {
66
	return &Target{
67 68 69 70
		labels:           labels,
		discoveredLabels: discoveredLabels,
		params:           params,
		health:           HealthUnknown,
71
	}
72
}
73

74
func (t *Target) String() string {
75
	return t.URL().String()
76 77
}

78 79 80
type MetricMetadataStore interface {
	ListMetadata() []MetricMetadata
	GetMetadata(metric string) (MetricMetadata, bool)
81 82
	SizeMetadata() int
	LengthMetadata() int
83 84 85 86 87 88 89
}

// MetricMetadata is a piece of metadata for a metric.
type MetricMetadata struct {
	Metric string
	Type   textparse.MetricType
	Help   string
90
	Unit   string
91 92 93 94 95 96 97 98 99
}

func (t *Target) MetadataList() []MetricMetadata {
	t.mtx.RLock()
	defer t.mtx.RUnlock()

	if t.metadata == nil {
		return nil
	}
100
	return t.metadata.ListMetadata()
101 102
}

103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
func (t *Target) MetadataSize() int {
	t.mtx.RLock()
	defer t.mtx.RUnlock()

	if t.metadata == nil {
		return 0
	}

	return t.metadata.SizeMetadata()
}

func (t *Target) MetadataLength() int {
	t.mtx.RLock()
	defer t.mtx.RUnlock()

	if t.metadata == nil {
		return 0
	}

	return t.metadata.LengthMetadata()
}

125 126 127 128 129 130 131 132
// Metadata returns type and help metadata for the given metric.
func (t *Target) Metadata(metric string) (MetricMetadata, bool) {
	t.mtx.RLock()
	defer t.mtx.RUnlock()

	if t.metadata == nil {
		return MetricMetadata{}, false
	}
133
	return t.metadata.GetMetadata(metric)
134 135
}

136
func (t *Target) SetMetadataStore(s MetricMetadataStore) {
137 138 139 140 141
	t.mtx.Lock()
	defer t.mtx.Unlock()
	t.metadata = s
}

142 143 144
// hash returns an identifying hash for the target.
func (t *Target) hash() uint64 {
	h := fnv.New64a()
145
	//nolint: errcheck
146
	h.Write([]byte(fmt.Sprintf("%016d", t.labels.Hash())))
147
	//nolint: errcheck
148
	h.Write([]byte(t.URL().String()))
149

150
	return h.Sum64()
151 152 153
}

// offset returns the time until the next scrape cycle for the target.
154 155
// It includes the global server jitterSeed for scrapes from multiple Prometheus to try to be at different times.
func (t *Target) offset(interval time.Duration, jitterSeed uint64) time.Duration {
156 157
	now := time.Now().UnixNano()

158
	// Base is a pinned to absolute time, no matter how often offset is called.
159
	var (
160
		base   = int64(interval) - now%int64(interval)
161
		offset = (t.hash() ^ jitterSeed) % uint64(interval)
162 163 164 165 166 167 168 169 170
		next   = base + int64(offset)
	)

	if next > int64(interval) {
		next -= int64(interval)
	}
	return time.Duration(next)
}

171
// Labels returns a copy of the set of all public labels of the target.
172 173 174 175 176
func (t *Target) Labels() labels.Labels {
	lset := make(labels.Labels, 0, len(t.labels))
	for _, l := range t.labels {
		if !strings.HasPrefix(l.Name, model.ReservedLabelPrefix) {
			lset = append(lset, l)
177 178 179
		}
	}
	return lset
180 181
}

182
// DiscoveredLabels returns a copy of the target's labels before any processing.
183
func (t *Target) DiscoveredLabels() labels.Labels {
184 185
	t.mtx.Lock()
	defer t.mtx.Unlock()
186 187 188
	lset := make(labels.Labels, len(t.discoveredLabels))
	copy(lset, t.discoveredLabels)
	return lset
189 190
}

191 192
// SetDiscoveredLabels sets new DiscoveredLabels
func (t *Target) SetDiscoveredLabels(l labels.Labels) {
193 194
	t.mtx.Lock()
	defer t.mtx.Unlock()
195 196 197
	t.discoveredLabels = l
}

198 199 200 201
// URL returns a copy of the target's URL.
func (t *Target) URL() *url.URL {
	params := url.Values{}

202
	for k, v := range t.params {
203 204 205
		params[k] = make([]string, len(v))
		copy(params[k], v)
	}
206 207
	for _, l := range t.labels {
		if !strings.HasPrefix(l.Name, model.ParamLabelPrefix) {
208 209
			continue
		}
210
		ks := l.Name[len(model.ParamLabelPrefix):]
211 212

		if len(params[ks]) > 0 {
213
			params[ks][0] = l.Value
214
		} else {
215
			params[ks] = []string{l.Value}
216 217 218 219
		}
	}

	return &url.URL{
220 221 222
		Scheme:   t.labels.Get(model.SchemeLabel),
		Host:     t.labels.Get(model.AddressLabel),
		Path:     t.labels.Get(model.MetricsPathLabel),
223 224 225 226
		RawQuery: params.Encode(),
	}
}

227 228
// Report sets target data about the last scrape.
func (t *Target) Report(start time.Time, dur time.Duration, err error) {
229 230 231 232 233 234 235 236 237 238 239
	t.mtx.Lock()
	defer t.mtx.Unlock()

	if err == nil {
		t.health = HealthGood
	} else {
		t.health = HealthBad
	}

	t.lastError = err
	t.lastScrape = start
240
	t.lastScrapeDuration = dur
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
}

// LastError returns the error encountered during the last scrape.
func (t *Target) LastError() error {
	t.mtx.RLock()
	defer t.mtx.RUnlock()

	return t.lastError
}

// LastScrape returns the time of the last scrape.
func (t *Target) LastScrape() time.Time {
	t.mtx.RLock()
	defer t.mtx.RUnlock()

	return t.lastScrape
}

259 260 261 262 263 264 265 266
// LastScrapeDuration returns how long the last scrape of the target took.
func (t *Target) LastScrapeDuration() time.Duration {
	t.mtx.RLock()
	defer t.mtx.RUnlock()

	return t.lastScrapeDuration
}

267 268 269 270 271 272
// Health returns the last known health state of the target.
func (t *Target) Health() TargetHealth {
	t.mtx.RLock()
	defer t.mtx.RUnlock()

	return t.health
273 274
}

275 276 277 278 279 280 281
// Targets is a sortable list of targets.
type Targets []*Target

func (ts Targets) Len() int           { return len(ts) }
func (ts Targets) Less(i, j int) bool { return ts[i].URL().String() < ts[j].URL().String() }
func (ts Targets) Swap(i, j int)      { ts[i], ts[j] = ts[j], ts[i] }

282 283
var errSampleLimit = errors.New("sample limit exceeded")

284 285 286 287 288 289 290 291
// limitAppender limits the number of total appended samples in a batch.
type limitAppender struct {
	storage.Appender

	limit int
	i     int
}

292
func (app *limitAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
293 294 295
	if !value.IsStaleNaN(v) {
		app.i++
		if app.i > app.limit {
296
			return 0, errSampleLimit
297
		}
298 299 300
	}
	ref, err := app.Appender.Add(lset, t, v)
	if err != nil {
301
		return 0, err
302 303 304 305
	}
	return ref, nil
}

306
func (app *limitAppender) AddFast(ref uint64, t int64, v float64) error {
307 308 309 310 311
	if !value.IsStaleNaN(v) {
		app.i++
		if app.i > app.limit {
			return errSampleLimit
		}
312
	}
313
	err := app.Appender.AddFast(ref, t, v)
Krasi Georgiev committed
314
	return err
315 316
}

317 318 319 320 321 322
type timeLimitAppender struct {
	storage.Appender

	maxTime int64
}

323
func (app *timeLimitAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
324
	if t > app.maxTime {
325
		return 0, storage.ErrOutOfBounds
326 327 328 329
	}

	ref, err := app.Appender.Add(lset, t, v)
	if err != nil {
330
		return 0, err
331 332 333 334
	}
	return ref, nil
}

335
func (app *timeLimitAppender) AddFast(ref uint64, t int64, v float64) error {
336 337 338
	if t > app.maxTime {
		return storage.ErrOutOfBounds
	}
339
	err := app.Appender.AddFast(ref, t, v)
Krasi Georgiev committed
340
	return err
341 342
}

343 344
// populateLabels builds a label set from the given label set and scrape configuration.
// It returns a label set before relabeling was applied as the second return value.
345
// Returns the original discovered label set found before relabelling was applied if the target is dropped during relabeling.
346 347 348 349 350 351
func populateLabels(lset labels.Labels, cfg *config.ScrapeConfig) (res, orig labels.Labels, err error) {
	// Copy labels into the labelset for the target if they are not set already.
	scrapeLabels := []labels.Label{
		{Name: model.JobLabel, Value: cfg.JobName},
		{Name: model.MetricsPathLabel, Value: cfg.MetricsPath},
		{Name: model.SchemeLabel, Value: cfg.Scheme},
352
	}
353 354 355 356 357
	lb := labels.NewBuilder(lset)

	for _, l := range scrapeLabels {
		if lv := lset.Get(l.Name); lv == "" {
			lb.Set(l.Name, l.Value)
358 359 360 361 362
		}
	}
	// Encode scrape query parameters as labels.
	for k, v := range cfg.Params {
		if len(v) > 0 {
363
			lb.Set(model.ParamLabelPrefix+k, v[0])
364 365 366
		}
	}

367 368
	preRelabelLabels := lb.Labels()
	lset = relabel.Process(preRelabelLabels, cfg.RelabelConfigs...)
369 370 371

	// Check if the target was dropped.
	if lset == nil {
372
		return nil, preRelabelLabels, nil
373
	}
374
	if v := lset.Get(model.AddressLabel); v == "" {
375
		return nil, nil, errors.New("no address")
376
	}
377

378 379
	lb = labels.NewBuilder(lset)

380 381 382 383 384 385 386 387 388 389 390 391
	// addPort checks whether we should add a default port to the address.
	// If the address is not valid, we don't append a port either.
	addPort := func(s string) bool {
		// If we can split, a port exists and we don't have to add one.
		if _, _, err := net.SplitHostPort(s); err == nil {
			return false
		}
		// If adding a port makes it valid, the previous error
		// was not due to an invalid address and we can append a port.
		_, _, err := net.SplitHostPort(s + ":1234")
		return err == nil
	}
392
	addr := lset.Get(model.AddressLabel)
393
	// If it's an address with no trailing port, infer it based on the used scheme.
394
	if addPort(addr) {
395
		// Addresses reaching this point are already wrapped in [] if necessary.
396
		switch lset.Get(model.SchemeLabel) {
397 398 399 400 401
		case "http", "":
			addr = addr + ":80"
		case "https":
			addr = addr + ":443"
		default:
402
			return nil, nil, errors.Errorf("invalid scheme: %q", cfg.Scheme)
403
		}
404
		lb.Set(model.AddressLabel, addr)
405
	}
406 407

	if err := config.CheckTargetAddress(model.LabelValue(addr)); err != nil {
408 409 410 411 412
		return nil, nil, err
	}

	// Meta labels are deleted after relabelling. Other internal labels propagate to
	// the target which decides whether they will be part of their label set.
413 414 415
	for _, l := range lset {
		if strings.HasPrefix(l.Name, model.MetaLabelPrefix) {
			lb.Del(l.Name)
416 417 418 419
		}
	}

	// Default the instance label to the target address.
420 421
	if v := lset.Get(model.InstanceLabel); v == "" {
		lb.Set(model.InstanceLabel, addr)
422
	}
423 424 425 426 427

	res = lb.Labels()
	for _, l := range res {
		// Check label values are valid, drop the target if not.
		if !model.LabelValue(l.Value).IsValid() {
428
			return nil, nil, errors.Errorf("invalid label value for %q: %q", l.Name, l.Value)
429 430 431
		}
	}
	return res, preRelabelLabels, nil
432 433 434
}

// targetsFromGroup builds targets based on the given TargetGroup and config.
435
func targetsFromGroup(tg *targetgroup.Group, cfg *config.ScrapeConfig) ([]*Target, error) {
436 437
	targets := make([]*Target, 0, len(tg.Targets))

438 439 440 441 442 443
	for i, tlset := range tg.Targets {
		lbls := make([]labels.Label, 0, len(tlset)+len(tg.Labels))

		for ln, lv := range tlset {
			lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
		}
444
		for ln, lv := range tg.Labels {
445 446
			if _, ok := tlset[ln]; !ok {
				lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
447 448
			}
		}
449 450 451 452

		lset := labels.New(lbls...)

		lbls, origLabels, err := populateLabels(lset, cfg)
453
		if err != nil {
454
			return nil, errors.Wrapf(err, "instance %d in group %s", i, tg)
455
		}
456
		if lbls != nil || origLabels != nil {
457
			targets = append(targets, NewTarget(lbls, origLabels, cfg.Params))
458 459 460 461
		}
	}
	return targets, nil
}