Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester: add tracing to shipper.Sync #9656

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
* `-memberlist.acquire-writer-timeout`
* [ENHANCEMENT] memberlist: Notifications can now be processed once per interval specified by `-memberlist.notify-interval` to reduce notify storm CPU activity in large clusters. #9594
* [ENHANCEMENT] Return server-side total bytes processed statistics as a header through query frontend. #9645
* [ENHANCEMENT] Ingester: Emit traces for block syncing, to join up block-upload traces. #9656
* [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508
* [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508
* [BUGFIX] Fix issue where `metric might not be a counter, name does not end in _total/_sum/_count/_bucket` annotation would be emitted even if `rate` or `increase` did not have enough samples to compute a result. #9508
Expand Down
23 changes: 13 additions & 10 deletions pkg/ingester/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
"github.com/grafana/mimir/pkg/util/spanlogger"
)

// shipperMetrics holds the shipper metrics. Mimir runs 1 shipper for each tenant but
Expand Down Expand Up @@ -103,13 +104,15 @@ func newShipper(
//
// It is not concurrency-safe, however it is compactor-safe (running concurrently with compactor is ok).
func (s *shipper) Sync(ctx context.Context) (shipped int, err error) {
log, ctx := spanlogger.NewWithLogger(ctx, s.logger, "Ingester.Shipper.Sync")
defer log.Finish()
shippedBlocks, err := readShippedBlocks(s.dir)
if err != nil {
// If we encounter any error, proceed with an new list of shipped blocks.
// The meta file will be overridden later. Note that the meta file is only
// used to avoid unnecessary bucket.Exists call, which are properly handled
// by the system if their occur anyway.
level.Warn(s.logger).Log("msg", "reading meta file failed, will override it", "err", err)
level.Warn(log).Log("msg", "reading meta file failed, will override it", "err", err)

// Reset the shipped blocks slice, so we can rebuild it only with blocks that still exist locally.
shippedBlocks = map[ulid.ULID]time.Time{}
Expand All @@ -132,7 +135,7 @@ func (s *shipper) Sync(ctx context.Context) (shipped int, err error) {

if m.Stats.NumSamples == 0 {
// Ignore empty blocks.
level.Debug(s.logger).Log("msg", "ignoring empty block", "block", m.ULID)
log.DebugLog("msg", "ignoring empty block", "block", m.ULID)
continue
}

Expand All @@ -155,24 +158,24 @@ func (s *shipper) Sync(ctx context.Context) (shipped int, err error) {
continue
}

level.Info(s.logger).Log("msg", "uploading new block to long-term storage", "block", m.ULID)
if err := s.upload(ctx, m); err != nil {
level.Info(log).Log("msg", "uploading new block to long-term storage", "block", m.ULID)
if err := s.upload(ctx, log, m); err != nil {
// No error returned, just log line. This is because we want other blocks to be shipped even
// though this one failed. It will be retried on second Sync iteration.
level.Error(s.logger).Log("msg", "uploading new block to long-term storage failed", "block", m.ULID, "err", err)
level.Error(log).Log("msg", "uploading new block to long-term storage failed", "block", m.ULID, "err", err)
uploadErrs++
continue
}
level.Info(s.logger).Log("msg", "finished uploading new block to long-term storage", "block", m.ULID)
level.Info(log).Log("msg", "finished uploading new block to long-term storage", "block", m.ULID)

meta.Shipped[m.ULID] = model.Now()
shipped++
s.metrics.uploads.Inc()
s.metrics.lastSuccessfulUploadTime.SetToCurrentTime()
}

if err := writeShipperMetaFile(s.logger, s.dir, meta); err != nil {
level.Warn(s.logger).Log("msg", "updating meta file failed", "err", err)
if err := writeShipperMetaFile(log, s.dir, meta); err != nil {
level.Warn(log).Log("msg", "updating meta file failed", "err", err)
}

if uploadErrs > 0 {
Expand All @@ -186,7 +189,7 @@ func (s *shipper) Sync(ctx context.Context) (shipped int, err error) {
// upload method uploads the block to blocks storage. Block is uploaded with updated meta.json file with extra details.
// This updated version of meta.json is however not persisted locally on the disk, to avoid race condition when TSDB
// library could actually unload the block if it found meta.json file missing.
func (s *shipper) upload(ctx context.Context, meta *block.Meta) error {
func (s *shipper) upload(ctx context.Context, logger log.Logger, meta *block.Meta) error {
blockDir := filepath.Join(s.dir, meta.ULID.String())

meta.Thanos.Source = s.source
Expand All @@ -198,7 +201,7 @@ func (s *shipper) upload(ctx context.Context, meta *block.Meta) error {
}

// Upload block with custom metadata.
return block.Upload(ctx, s.logger, s.bucket, blockDir, meta)
return block.Upload(ctx, logger, s.bucket, blockDir, meta)
}

// blockMetasFromOldest returns the block meta of each block found in dir
Expand Down