Unverified Commit 50c26150 by Ben Ye Committed by GitHub

add tsdb cmds into promtool (#6088)

Signed-off-by: 's avataryeya24 <yb532204897@gmail.com>

update tsdb cli in makefile and promu
Signed-off-by: 's avataryeya24 <yb532204897@gmail.com>

remove building tsdb bin
Signed-off-by: 's avataryeya24 <yb532204897@gmail.com>

remove useless func
Signed-off-by: 's avataryeya24 <yb532204897@gmail.com>

refactor analyzeBlock
Signed-off-by: 's avataryeya24 <yb532204897@gmail.com>

Fix Makefile
Signed-off-by: 's avatarSimon Pasquier <spasquie@redhat.com>
parent 9801f52b
......@@ -6,11 +6,11 @@
/prometheus
/promtool
/tsdb/tsdb
benchmark.txt
/data
/cmd/prometheus/data
/cmd/prometheus/debug
/benchout
!/.travis.yml
!/.promu.yml
......
......@@ -10,8 +10,6 @@ build:
path: ./cmd/prometheus
- name: promtool
path: ./cmd/promtool
- name: tsdb
path: ./tsdb/cmd/tsdb
flags: -mod=vendor -a -tags netgo,builtinassets
ldflags: |
-X github.com/prometheus/common/version.Version={{.Version}}
......
......@@ -20,12 +20,10 @@ REACT_APP_OUTPUT_DIR = web/ui/static/react
REACT_APP_NODE_MODULES_PATH = $(REACT_APP_PATH)/node_modules
REACT_APP_NPM_LICENSES_TARBALL = "npm_licenses.tar.bz2"
TSDB_PROJECT_DIR = "./tsdb"
TSDB_CLI_DIR="$(TSDB_PROJECT_DIR)/cmd/tsdb"
TSDB_BIN = "$(TSDB_CLI_DIR)/tsdb"
PROMTOOL = ./promtool
TSDB_BENCHMARK_NUM_METRICS ?= 1000
TSDB_BENCHMARK_DATASET ?= "$(TSDB_PROJECT_DIR)/testdata/20kseries.json"
TSDB_BENCHMARK_OUTPUT_DIR ?= "$(TSDB_CLI_DIR)/benchout"
TSDB_BENCHMARK_DATASET ?= ./tsdb/testdata/20kseries.json
TSDB_BENCHMARK_OUTPUT_DIR ?= ./benchout
include Makefile.common
......@@ -80,16 +78,14 @@ docker: npm_licenses common-docker
.PHONY: build
build: assets common-build
.PHONY: build_tsdb
build_tsdb:
GO111MODULE=$(GO111MODULE) $(GO) build -o $(TSDB_BIN) $(TSDB_CLI_DIR)
.PHONY: bench_tsdb
bench_tsdb: build_tsdb
bench_tsdb: $(PROMU)
@echo ">> building promtool"
@GO111MODULE=$(GO111MODULE) $(PROMU) build --prefix $(PREFIX) promtool
@echo ">> running benchmark, writing result to $(TSDB_BENCHMARK_OUTPUT_DIR)"
@$(TSDB_BIN) bench write --metrics=$(TSDB_BENCHMARK_NUM_METRICS) --out=$(TSDB_BENCHMARK_OUTPUT_DIR) $(TSDB_BENCHMARK_DATASET)
@$(GO) tool pprof -svg $(TSDB_BIN) $(TSDB_BENCHMARK_OUTPUT_DIR)/cpu.prof > $(TSDB_BENCHMARK_OUTPUT_DIR)/cpuprof.svg
@$(GO) tool pprof --inuse_space -svg $(TSDB_BIN) $(TSDB_BENCHMARK_OUTPUT_DIR)/mem.prof > $(TSDB_BENCHMARK_OUTPUT_DIR)/memprof.inuse.svg
@$(GO) tool pprof --alloc_space -svg $(TSDB_BIN) $(TSDB_BENCHMARK_OUTPUT_DIR)/mem.prof > $(TSDB_BENCHMARK_OUTPUT_DIR)/memprof.alloc.svg
@$(GO) tool pprof -svg $(TSDB_BIN) $(TSDB_BENCHMARK_OUTPUT_DIR)/block.prof > $(TSDB_BENCHMARK_OUTPUT_DIR)/blockprof.svg
@$(GO) tool pprof -svg $(TSDB_BIN) $(TSDB_BENCHMARK_OUTPUT_DIR)/mutex.prof > $(TSDB_BENCHMARK_OUTPUT_DIR)/mutexprof.svg
@$(PROMTOOL) tsdb bench write --metrics=$(TSDB_BENCHMARK_NUM_METRICS) --out=$(TSDB_BENCHMARK_OUTPUT_DIR) $(TSDB_BENCHMARK_DATASET)
@$(GO) tool pprof -svg $(PROMTOOL) $(TSDB_BENCHMARK_OUTPUT_DIR)/cpu.prof > $(TSDB_BENCHMARK_OUTPUT_DIR)/cpuprof.svg
@$(GO) tool pprof --inuse_space -svg $(PROMTOOL) $(TSDB_BENCHMARK_OUTPUT_DIR)/mem.prof > $(TSDB_BENCHMARK_OUTPUT_DIR)/memprof.inuse.svg
@$(GO) tool pprof --alloc_space -svg $(PROMTOOL) $(TSDB_BENCHMARK_OUTPUT_DIR)/mem.prof > $(TSDB_BENCHMARK_OUTPUT_DIR)/memprof.alloc.svg
@$(GO) tool pprof -svg $(PROMTOOL) $(TSDB_BENCHMARK_OUTPUT_DIR)/block.prof > $(TSDB_BENCHMARK_OUTPUT_DIR)/blockprof.svg
@$(GO) tool pprof -svg $(PROMTOOL) $(TSDB_BENCHMARK_OUTPUT_DIR)/mutex.prof > $(TSDB_BENCHMARK_OUTPUT_DIR)/mutexprof.svg
......@@ -37,7 +37,7 @@ import (
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/common/version"
kingpin "gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/alecthomas/kingpin.v2"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/rulefmt"
......@@ -105,6 +105,29 @@ func main() {
"The unit test file.",
).Required().ExistingFiles()
defaultDBPath := "data/"
tsdbCmd := app.Command("tsdb", "Run tsdb commands.")
tsdbBenchCmd := tsdbCmd.Command("bench", "Run benchmarks.")
tsdbBenchWriteCmd := tsdbBenchCmd.Command("write", "Run a write performance benchmark.")
benchWriteOutPath := tsdbBenchWriteCmd.Flag("out", "Set the output path.").Default("benchout").String()
benchWriteNumMetrics := tsdbBenchWriteCmd.Flag("metrics", "Number of metrics to read.").Default("10000").Int()
benchSamplesFile := tsdbBenchWriteCmd.Arg("file", "Input file with samples data, default is ("+filepath.Join("..", "..", "tsdb", "testdata", "20kseries.json")+").").Default(filepath.Join("..", "..", "tsdb", "testdata", "20kseries.json")).String()
tsdbAnalyzeCmd := tsdbCmd.Command("analyze", "Analyze churn, label pair cardinality.")
analyzePath := tsdbAnalyzeCmd.Arg("db path", "Database path (default is "+defaultDBPath+").").Default(defaultDBPath).String()
analyzeBlockID := tsdbAnalyzeCmd.Arg("block id", "Block to analyze (default is the last block).").String()
analyzeLimit := tsdbAnalyzeCmd.Flag("limit", "How many items to show in each list.").Default("20").Int()
tsdbListCmd := tsdbCmd.Command("list", "List tsdb blocks.")
listHumanReadable := tsdbListCmd.Flag("human-readable", "Print human readable values.").Short('r').Bool()
listPath := tsdbListCmd.Arg("db path", "Database path (default is "+defaultDBPath+").").Default(defaultDBPath).String()
tsdbDumpCmd := tsdbCmd.Command("dump", "Dump samples from a TSDB.")
dumpPath := tsdbDumpCmd.Arg("db path", "Database path (default is "+defaultDBPath+").").Default(defaultDBPath).String()
dumpMinTime := tsdbDumpCmd.Flag("min-time", "Minimum timestamp to dump.").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
dumpMaxTime := tsdbDumpCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
parsedCmd := kingpin.MustParse(app.Parse(os.Args[1:]))
var p printer
......@@ -148,6 +171,18 @@ func main() {
case testRulesCmd.FullCommand():
os.Exit(RulesUnitTest(*testRulesFiles...))
case tsdbBenchWriteCmd.FullCommand():
os.Exit(checkErr(benchmarkWrite(*benchWriteOutPath, *benchSamplesFile, *benchWriteNumMetrics)))
case tsdbAnalyzeCmd.FullCommand():
os.Exit(checkErr(analyzeBlock(*analyzePath, *analyzeBlockID, *analyzeLimit)))
case tsdbListCmd.FullCommand():
os.Exit(checkErr(listBlocks(*listPath, *listHumanReadable)))
case tsdbDumpCmd.FullCommand():
os.Exit(checkErr(dumpSamples(*dumpPath, *dumpMinTime, *dumpMaxTime)))
}
}
......
......@@ -19,7 +19,6 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"os"
"path/filepath"
"runtime"
......@@ -38,109 +37,11 @@ import (
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"gopkg.in/alecthomas/kingpin.v2"
)
func main() {
if err := execute(); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
var merr tsdb_errors.MultiError
func execute() (err error) {
var (
defaultDBPath = filepath.Join("benchout", "storage")
cli = kingpin.New(filepath.Base(os.Args[0]), "CLI tool for tsdb")
benchCmd = cli.Command("bench", "run benchmarks")
benchWriteCmd = benchCmd.Command("write", "run a write performance benchmark")
benchWriteOutPath = benchWriteCmd.Flag("out", "set the output path").Default("benchout").String()
benchWriteNumMetrics = benchWriteCmd.Flag("metrics", "number of metrics to read").Default("10000").Int()
benchSamplesFile = benchWriteCmd.Arg("file", "input file with samples data, default is ("+filepath.Join("..", "..", "testdata", "20kseries.json")+")").Default(filepath.Join("..", "..", "testdata", "20kseries.json")).String()
listCmd = cli.Command("ls", "list db blocks")
listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool()
listPath = listCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.")
analyzePath = analyzeCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String()
analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int()
dumpCmd = cli.Command("dump", "dump samples from a TSDB")
dumpPath = dumpCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
dumpMinTime = dumpCmd.Flag("min-time", "minimum timestamp to dump").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
)
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
var merr tsdb_errors.MultiError
switch kingpin.MustParse(cli.Parse(os.Args[1:])) {
case benchWriteCmd.FullCommand():
wb := &writeBenchmark{
outPath: *benchWriteOutPath,
numMetrics: *benchWriteNumMetrics,
samplesFile: *benchSamplesFile,
logger: logger,
}
return wb.run()
case listCmd.FullCommand():
db, err := tsdb.OpenDBReadOnly(*listPath, nil)
if err != nil {
return err
}
defer func() {
merr.Add(err)
merr.Add(db.Close())
err = merr.Err()
}()
blocks, err := db.Blocks()
if err != nil {
return err
}
printBlocks(blocks, listCmdHumanReadable)
case analyzeCmd.FullCommand():
db, err := tsdb.OpenDBReadOnly(*analyzePath, nil)
if err != nil {
return err
}
defer func() {
merr.Add(err)
merr.Add(db.Close())
err = merr.Err()
}()
blocks, err := db.Blocks()
if err != nil {
return err
}
var block tsdb.BlockReader
if *analyzeBlockID != "" {
for _, b := range blocks {
if b.Meta().ULID.String() == *analyzeBlockID {
block = b
break
}
}
} else if len(blocks) > 0 {
block = blocks[len(blocks)-1]
}
if block == nil {
return fmt.Errorf("block not found")
}
return analyzeBlock(block, *analyzeLimit)
case dumpCmd.FullCommand():
db, err := tsdb.OpenDBReadOnly(*dumpPath, nil)
if err != nil {
return err
}
defer func() {
merr.Add(err)
merr.Add(db.Close())
err = merr.Err()
}()
return dumpSamples(db, *dumpMinTime, *dumpMaxTime)
}
return nil
}
const timeDelta = 30000
type writeBenchmark struct {
outPath string
......@@ -157,7 +58,13 @@ type writeBenchmark struct {
logger log.Logger
}
func (b *writeBenchmark) run() error {
func benchmarkWrite(outPath, samplesFile string, numMetrics int) error {
b := &writeBenchmark{
outPath: outPath,
samplesFile: samplesFile,
numMetrics: numMetrics,
logger: log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)),
}
if b.outPath == "" {
dir, err := ioutil.TempDir("", "tsdb_bench")
if err != nil {
......@@ -184,24 +91,24 @@ func (b *writeBenchmark) run() error {
if err != nil {
return err
}
st.DisableCompactions()
b.storage = st
var labels []labels.Labels
var lbs []labels.Labels
_, err = measureTime("readData", func() error {
if _, err = measureTime("readData", func() error {
f, err := os.Open(b.samplesFile)
if err != nil {
return err
}
defer f.Close()
labels, err = readPrometheusLabels(f, b.numMetrics)
lbs, err = readPrometheusLabels(f, b.numMetrics)
if err != nil {
return err
}
return nil
})
if err != nil {
}); err != nil {
return err
}
......@@ -211,7 +118,7 @@ func (b *writeBenchmark) run() error {
if err := b.startProfiling(); err != nil {
return err
}
total, err = b.ingestScrapes(labels, 3000)
total, err = b.ingestScrapes(lbs, 3000)
if err != nil {
return err
}
......@@ -224,23 +131,19 @@ func (b *writeBenchmark) run() error {
fmt.Println(" > total samples:", total)
fmt.Println(" > samples/sec:", float64(total)/dur.Seconds())
_, err = measureTime("stopStorage", func() error {
if _, err = measureTime("stopStorage", func() error {
if err := b.storage.Close(); err != nil {
return err
}
if err := b.stopProfiling(); err != nil {
return err
}
return nil
})
if err != nil {
return b.stopProfiling()
}); err != nil {
return err
}
return nil
}
const timeDelta = 30000
func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (uint64, error) {
var mu sync.Mutex
var total uint64
......@@ -399,10 +302,10 @@ func (b *writeBenchmark) stopProfiling() error {
func measureTime(stage string, f func() error) (time.Duration, error) {
fmt.Printf(">> start stage=%s\n", stage)
start := time.Now()
err := f()
if err != nil {
if err := f(); err != nil {
return 0, err
}
fmt.Printf(">> completed stage=%s duration=%s\n", stage, time.Since(start))
return time.Since(start), nil
}
......@@ -438,7 +341,25 @@ func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) {
return mets, nil
}
func printBlocks(blocks []tsdb.BlockReader, humanReadable *bool) {
func listBlocks(path string, humanReadable bool) error {
db, err := tsdb.OpenDBReadOnly(path, nil)
if err != nil {
return err
}
defer func() {
merr.Add(err)
merr.Add(db.Close())
err = merr.Err()
}()
blocks, err := db.Blocks()
if err != nil {
return err
}
printBlocks(blocks, humanReadable)
return nil
}
func printBlocks(blocks []tsdb.BlockReader, humanReadable bool) {
tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
defer tw.Flush()
......@@ -458,20 +379,56 @@ func printBlocks(blocks []tsdb.BlockReader, humanReadable *bool) {
}
}
func getFormatedTime(timestamp int64, humanReadable *bool) string {
if *humanReadable {
func getFormatedTime(timestamp int64, humanReadable bool) string {
if humanReadable {
return time.Unix(timestamp/1000, 0).UTC().String()
}
return strconv.FormatInt(timestamp, 10)
}
func analyzeBlock(b tsdb.BlockReader, limit int) error {
meta := b.Meta()
func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error) {
db, err := tsdb.OpenDBReadOnly(path, nil)
if err != nil {
return nil, nil, err
}
blocks, err := db.Blocks()
if err != nil {
return nil, nil, err
}
var block tsdb.BlockReader
if blockID != "" {
for _, b := range blocks {
if b.Meta().ULID.String() == blockID {
block = b
break
}
}
} else if len(blocks) > 0 {
block = blocks[len(blocks)-1]
}
if block == nil {
return nil, nil, fmt.Errorf("block %s not found", blockID)
}
return db, block, nil
}
func analyzeBlock(path, blockID string, limit int) error {
db, block, err := openBlock(path, blockID)
if err != nil {
return err
}
defer func() {
merr.Add(err)
merr.Add(db.Close())
err = merr.Err()
}()
meta := block.Meta()
fmt.Printf("Block ID: %s\n", meta.ULID)
// Presume 1ms resolution that Prometheus uses.
fmt.Printf("Duration: %s\n", (time.Duration(meta.MaxTime-meta.MinTime) * 1e6).String())
fmt.Printf("Series: %d\n", meta.Stats.NumSeries)
ir, err := b.Index()
ir, err := block.Index()
if err != nil {
return err
}
......@@ -605,27 +562,31 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error {
return nil
}
func dumpSamples(db *tsdb.DBReadOnly, mint, maxt int64) (err error) {
q, err := db.Querier(context.TODO(), mint, maxt)
func dumpSamples(path string, mint, maxt int64) (err error) {
db, err := tsdb.OpenDBReadOnly(path, nil)
if err != nil {
return err
}
defer func() {
var merr tsdb_errors.MultiError
merr.Add(err)
merr.Add(q.Close())
merr.Add(db.Close())
err = merr.Err()
}()
q, err := db.Querier(context.TODO(), mint, maxt)
if err != nil {
return err
}
defer q.Close()
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*"))
for ss.Next() {
series := ss.At()
labels := series.Labels()
lbs := series.Labels()
it := series.Iterator()
for it.Next() {
ts, val := it.At()
fmt.Printf("%s %g %d\n", labels, val, ts)
fmt.Printf("%s %g %d\n", lbs, val, ts)
}
if it.Err() != nil {
return ss.Err()
......@@ -645,3 +606,11 @@ func dumpSamples(db *tsdb.DBReadOnly, mint, maxt int64) (err error) {
}
return nil
}
func checkErr(err error) int {
if err != nil {
fmt.Fprintln(os.Stderr, err)
return 1
}
return 0
}
testdata*
tsdb
benchout
TODO:
- [ ] add tabular output
- [ ] break commands in separate files
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment