Unverified Commit 8744510c by Bartlomiej Plotka Committed by GitHub

Merge pull request #6518 from prometheus/fix-2.15.1

Cut 2.15.1 + cherry-picked fix.
parents ec1868b0 e6e48e6d
## 2.15.1 / 2019-12-25
* [BUGFIX] Fixed race on concurrent queries against same data. #6512
## 2.15.0 / 2019-12-23
* [CHANGE] Discovery: Removed `prometheus_sd_kubernetes_cache_*` metrics. Additionally `prometheus_sd_kubernetes_workqueue_latency_seconds` and `prometheus_sd_kubernetes_workqueue_work_duration_seconds` metrics now show correct values in seconds. #6393
......
......@@ -456,16 +456,14 @@ func (b realByteSlice) Sub(start, end int) ByteSlice {
type Reader struct {
// The underlying bytes holding the encoded series data.
// Each slice holds the data for a different segment.
bs []ByteSlice
cs []io.Closer // Closers for resources behind the byte slices.
size int64 // The total size of bytes in the reader.
pool chunkenc.Pool
crc32 hash.Hash
buf [binary.MaxVarintLen32]byte
bs []ByteSlice
cs []io.Closer // Closers for resources behind the byte slices.
size int64 // The total size of bytes in the reader.
pool chunkenc.Pool
}
func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) {
cr := Reader{pool: pool, bs: bs, cs: cs, crc32: newCRC32()}
cr := Reader{pool: pool, bs: bs, cs: cs}
var totalSize int64
for i, b := range cr.bs {
......@@ -541,6 +539,7 @@ func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
// Get the lower 4 bytes.
// These contain the segment offset where the data for this chunk starts.
chkStart = int((ref << 32) >> 32)
chkCRC32 = newCRC32()
)
if sgmIndex >= len(s.bs) {
......@@ -569,12 +568,12 @@ func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
return nil, errors.Errorf("segment doesn't include enough bytes to read the chunk - required:%v, available:%v", chkEnd, sgmBytes.Len())
}
sum := sgmBytes.Range(chkEnd-crc32.Size, chkEnd)
s.crc32.Reset()
if _, err := s.crc32.Write(sgmBytes.Range(chkEncStart, chkDataEnd)); err != nil {
sum := sgmBytes.Range(chkDataEnd, chkEnd)
if _, err := chkCRC32.Write(sgmBytes.Range(chkEncStart, chkDataEnd)); err != nil {
return nil, err
}
if act := s.crc32.Sum(s.buf[:0]); !bytes.Equal(act, sum) {
if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) {
return nil, errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act)
}
......
......@@ -25,6 +25,7 @@ import (
"path/filepath"
"sort"
"strconv"
"sync"
"testing"
"time"
......@@ -2485,9 +2486,9 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
testutil.Equals(t, 1000.0, sum)
}
// TestChunkWriter ensures that chunk segment are cut at the set segment size and
// TestChunkWriter_ReadAfterWrite ensures that chunk segment are cut at the set segment size and
// that the resulted segments includes the expected chunks data.
func TestChunkWriter(t *testing.T) {
func TestChunkWriter_ReadAfterWrite(t *testing.T) {
chk1 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}})
chk2 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 2}})
chk3 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 3}})
......@@ -2611,22 +2612,19 @@ func TestChunkWriter(t *testing.T) {
for i, test := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test_chunk_witer")
tempDir, err := ioutil.TempDir("", "test_chunk_writer")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()
defer func() { testutil.Ok(t, os.RemoveAll(tempDir)) }()
chunkw, err := chunks.NewWriterWithSegSize(tmpdir, chunks.SegmentHeaderSize+int64(test.segmentSize))
chunkw, err := chunks.NewWriterWithSegSize(tempDir, chunks.SegmentHeaderSize+int64(test.segmentSize))
testutil.Ok(t, err)
for _, chks := range test.chks {
chunkw.WriteChunks(chks...)
testutil.Ok(t, chunkw.WriteChunks(chks...))
}
testutil.Ok(t, chunkw.Close())
files, err := ioutil.ReadDir(tmpdir)
files, err := ioutil.ReadDir(tempDir)
testutil.Ok(t, err)
testutil.Equals(t, test.expSegmentsCount, len(files), "expected segments count mismatch")
......@@ -2655,7 +2653,7 @@ func TestChunkWriter(t *testing.T) {
testutil.Equals(t, sizeExp, sizeAct)
// Check the content of the chunks.
r, err := chunks.NewDirReader(tmpdir, nil)
r, err := chunks.NewDirReader(tempDir, nil)
testutil.Ok(t, err)
for _, chks := range test.chks {
......@@ -2668,3 +2666,43 @@ func TestChunkWriter(t *testing.T) {
})
}
}
// TestChunkReader_ConcurrentReads checks that the chunk result can be read concurrently.
// Regression test for https://github.com/prometheus/prometheus/pull/6514.
func TestChunkReader_ConcurrentReads(t *testing.T) {
chks := []chunks.Meta{
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 2}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 3}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 4}}),
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 5}}),
}
tempDir, err := ioutil.TempDir("", "test_chunk_writer")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tempDir)) }()
chunkw, err := chunks.NewWriter(tempDir)
testutil.Ok(t, err)
testutil.Ok(t, chunkw.WriteChunks(chks...))
testutil.Ok(t, chunkw.Close())
r, err := chunks.NewDirReader(tempDir, nil)
testutil.Ok(t, err)
var wg sync.WaitGroup
for _, chk := range chks {
for i := 0; i < 100; i++ {
wg.Add(1)
go func(chunk chunks.Meta) {
defer wg.Done()
chkAct, err := r.Chunk(chunk.Ref)
testutil.Ok(t, err)
testutil.Equals(t, chunk.Chunk.Bytes(), chkAct.Bytes())
}(chk)
}
wg.Wait()
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment