From 2dc9cb6cb97b7a3339e5a6942b86e49ce71f8004 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Wed, 1 Apr 2026 23:23:37 +0000 Subject: [PATCH 1/4] Prevent partition compaction to compact any blocks marked for deletion Signed-off-by: Alex Le --- pkg/compactor/compactor.go | 9 +- pkg/compactor/compactor_paritioning_test.go | 3 +- pkg/compactor/compactor_test.go | 2 +- pkg/compactor/partition_compaction_planner.go | 24 +++ .../partition_compaction_planner_test.go | 202 ++++++++++++++++++ 5 files changed, 234 insertions(+), 6 deletions(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 85a847b0a96..ab6f44a7399 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -139,7 +139,7 @@ var ( return nil, nil, err } - plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ string, _ prometheus.Counter, _ prometheus.Counter, _ *compactorMetrics) compact.Planner { + plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ string, _ prometheus.Counter, _ prometheus.Counter, _ *compactorMetrics, _ *block.IgnoreDeletionMarkFilter) compact.Planner { return compact.NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter) } @@ -152,10 +152,10 @@ var ( return nil, nil, err } - plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics) compact.Planner { + plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter) compact.Planner { if cfg.CompactionStrategy == util.CompactionStrategyPartitioning { - return NewPartitionCompactionPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, userID, cfg.ShardingPlannerDelay, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, compactorMetrics) + return NewPartitionCompactionPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, userID, cfg.ShardingPlannerDelay, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, compactorMetrics, ignoreDeletionMarkFilter) } else { return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed) } @@ -226,6 +226,7 @@ type PlannerFactory func( blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics, + ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, ) compact.Planner type CompactionLifecycleCallbackFactory func( @@ -1091,7 +1092,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { ulogger, syncer, c.blocksGrouperFactory(currentCtx, c.compactorCfg, bucket, ulogger, c.BlocksMarkedForNoCompaction, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, syncerMetrics, c.compactorMetrics, c.ring, c.ringLifecycler, c.limits, userID, noCompactMarkerFilter, c.ingestionReplicationFactor), - c.blocksPlannerFactory(currentCtx, bucket, ulogger, c.compactorCfg, noCompactMarkerFilter, c.ringLifecycler, userID, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.compactorMetrics), + c.blocksPlannerFactory(currentCtx, bucket, ulogger, c.compactorCfg, noCompactMarkerFilter, c.ringLifecycler, userID, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.compactorMetrics, ignoreDeletionMarkFilter), c.blocksCompactor, c.blockDeletableCheckerFactory(currentCtx, bucket, ulogger), c.compactionLifecycleCallbackFactory(currentCtx, bucket, ulogger, c.compactorCfg.MetaSyncConcurrency, c.compactDirForUser(userID), userID, c.compactorMetrics), diff --git a/pkg/compactor/compactor_paritioning_test.go b/pkg/compactor/compactor_paritioning_test.go index 9f7dd417fe5..bc94290f777 100644 --- a/pkg/compactor/compactor_paritioning_test.go +++ b/pkg/compactor/compactor_paritioning_test.go @@ -1463,7 +1463,7 @@ func prepareForPartitioning(t *testing.T, compactorCfg Config, bucketClient objs blocksCompactorFactory := func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) { return tsdbCompactor, - func(ctx context.Context, bkt objstore.InstrumentedBucket, _ log.Logger, _ Config, noCompactMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ string, _ prometheus.Counter, _ prometheus.Counter, _ *compactorMetrics) compact.Planner { + func(ctx context.Context, bkt objstore.InstrumentedBucket, _ log.Logger, _ Config, noCompactMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ string, _ prometheus.Counter, _ prometheus.Counter, _ *compactorMetrics, _ *block.IgnoreDeletionMarkFilter) compact.Planner { tsdbPlanner.noCompactMarkFilters = append(tsdbPlanner.noCompactMarkFilters, noCompactMarkFilter) return tsdbPlanner }, @@ -1972,6 +1972,7 @@ func (s *raceConditionTestSetup) createPlanner() *PartitionCompactionPlanner { 10*time.Minute, time.Minute, metrics, + nil, ) } diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 57a4e064c02..dff52d9695c 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1607,7 +1607,7 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Instrument blocksCompactorFactory := func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) { return tsdbCompactor, - func(ctx context.Context, bkt objstore.InstrumentedBucket, _ log.Logger, _ Config, noCompactMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ string, _ prometheus.Counter, _ prometheus.Counter, _ *compactorMetrics) compact.Planner { + func(ctx context.Context, bkt objstore.InstrumentedBucket, _ log.Logger, _ Config, noCompactMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ string, _ prometheus.Counter, _ prometheus.Counter, _ *compactorMetrics, _ *block.IgnoreDeletionMarkFilter) compact.Planner { tsdbPlanner.noCompactMarkFilters = append(tsdbPlanner.noCompactMarkFilters, noCompactMarkFilter) return tsdbPlanner }, diff --git a/pkg/compactor/partition_compaction_planner.go b/pkg/compactor/partition_compaction_planner.go index 989315de468..16bf9de1bc4 100644 --- a/pkg/compactor/partition_compaction_planner.go +++ b/pkg/compactor/partition_compaction_planner.go @@ -10,6 +10,7 @@ import ( "github.com/oklog/ulid/v2" "github.com/pkg/errors" "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/cortexproject/cortex/pkg/storage/tsdb" @@ -32,6 +33,7 @@ type PartitionCompactionPlanner struct { partitionVisitMarkerTimeout time.Duration partitionVisitMarkerFileUpdateInterval time.Duration compactorMetrics *compactorMetrics + ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter } func NewPartitionCompactionPlanner( @@ -46,6 +48,7 @@ func NewPartitionCompactionPlanner( partitionVisitMarkerTimeout time.Duration, partitionVisitMarkerFileUpdateInterval time.Duration, compactorMetrics *compactorMetrics, + ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, ) *PartitionCompactionPlanner { return &PartitionCompactionPlanner{ ctx: ctx, @@ -59,6 +62,7 @@ func NewPartitionCompactionPlanner( partitionVisitMarkerTimeout: partitionVisitMarkerTimeout, partitionVisitMarkerFileUpdateInterval: partitionVisitMarkerFileUpdateInterval, compactorMetrics: compactorMetrics, + ignoreDeletionMarkFilter: ignoreDeletionMarkFilter, } } @@ -171,6 +175,26 @@ func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasB return nil, nil } + if p.ignoreDeletionMarkFilter != nil { + resultMetasMap := make(map[ulid.ULID]*metadata.Meta, len(resultMetas)) + for _, m := range resultMetas { + resultMetasMap[m.ULID] = m + } + err = p.ignoreDeletionMarkFilter.Filter(p.ctx, resultMetasMap, p.compactorMetrics.metaFetcherSynced, p.compactorMetrics.metaFetcherModified) + if err != nil { + return nil, err + } + var deletedBlocks []string + for deletedBlock, _ := range p.ignoreDeletionMarkFilter.DeletionMarkBlocks() { + deletedBlocks = append(deletedBlocks, deletedBlock.String()) + } + if len(deletedBlocks) > 0 { + visitMarkerManager.MarkWithStatus(p.ctx, Failed) + level.Warn(p.logger).Log("msg", "partitioned group contains deleted blocks", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "deleted_blocks", deletedBlocks) + return nil, fmt.Errorf("partitioned group contains %d deleted blocks", len(deletedBlocks)) + } + } + go visitMarkerManager.HeartBeat(p.ctx, errChan, p.partitionVisitMarkerFileUpdateInterval, false) return resultMetas, nil diff --git a/pkg/compactor/partition_compaction_planner_test.go b/pkg/compactor/partition_compaction_planner_test.go index a4659ff2a0b..4d0efc1d76f 100644 --- a/pkg/compactor/partition_compaction_planner_test.go +++ b/pkg/compactor/partition_compaction_planner_test.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" "fmt" + "io" + "path" "testing" "time" @@ -15,6 +17,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/cortexproject/cortex/pkg/storage/bucket" @@ -328,6 +331,7 @@ func TestPartitionCompactionPlanner_Plan(t *testing.T) { visitMarkerTimeout, time.Minute, metrics, + nil, ) actual, err := p.Plan(context.Background(), testData.blocks, nil, &cortextsdb.CortexMetaExtensions{ PartitionInfo: &cortextsdb.PartitionInfo{ @@ -352,3 +356,201 @@ func TestPartitionCompactionPlanner_Plan(t *testing.T) { }) } } + +func TestPartitionCompactionPlanner_PlanWithDeletionMarkFilter(t *testing.T) { + currentCompactor := "test-compactor" + block1ulid := ulid.MustNew(1, nil) + block2ulid := ulid.MustNew(2, nil) + block3ulid := ulid.MustNew(3, nil) + + partitionedGroupID := uint32(1) + partitionID := 0 + visitMarkerTimeout := 5 * time.Minute + + setupBucket := func(t *testing.T, deletionMarkBlockIDs []ulid.ULID) (*bucket.ClientMock, *partitionVisitMarker) { + bkt := &bucket.ClientMock{} + uploadedVisitMarker := &partitionVisitMarker{} + + expireTime := time.Now() + visitMarker := partitionVisitMarker{ + CompactorID: currentCompactor, + PartitionedGroupID: partitionedGroupID, + PartitionID: partitionID, + VisitTime: expireTime.Unix(), + Status: Pending, + Version: PartitionVisitMarkerVersion1, + } + visitMarkerFileContent, _ := json.Marshal(visitMarker) + visitMarkerFile := GetPartitionVisitMarkerFilePath(partitionedGroupID, partitionID) + + partitionedGroupInfo := PartitionedGroupInfo{ + PartitionedGroupID: partitionedGroupID, + PartitionCount: 1, + Partitions: []Partition{ + {PartitionID: partitionID, Blocks: []ulid.ULID{}}, + }, + RangeStart: 0, + RangeEnd: 2 * time.Hour.Milliseconds(), + CreationTime: time.Now().Unix(), + Version: PartitionedGroupInfoVersion1, + } + partitionedGroupContent, _ := json.Marshal(partitionedGroupInfo) + partitionedGroupFile := GetPartitionedGroupFile(partitionedGroupID) + + bkt.MockGet(visitMarkerFile, string(visitMarkerFileContent), nil) + bkt.MockGet(partitionedGroupFile, string(partitionedGroupContent), nil) + + // Capture uploaded visit marker content + bkt.On("Upload", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + reader := args.Get(2).(io.Reader) + data, err := io.ReadAll(reader) + if err == nil { + _ = json.Unmarshal(data, uploadedVisitMarker) + } + }).Return(nil) + + // Mock deletion marks for specified blocks + deletionMarkIDs := make(map[ulid.ULID]struct{}) + for _, id := range deletionMarkBlockIDs { + deletionMarkIDs[id] = struct{}{} + deletionMark := metadata.DeletionMark{ + ID: id, + Version: metadata.DeletionMarkVersion1, + DeletionTime: time.Now().Unix(), + } + content, _ := json.Marshal(deletionMark) + markPath := path.Join(id.String(), metadata.DeletionMarkFilename) + bkt.MockGet(markPath, string(content), nil) + } + + // For blocks without deletion marks, mock not-found + for _, id := range []ulid.ULID{block1ulid, block2ulid, block3ulid} { + if _, ok := deletionMarkIDs[id]; !ok { + markPath := path.Join(id.String(), metadata.DeletionMarkFilename) + bkt.MockGet(markPath, "", nil) + } + } + + return bkt, uploadedVisitMarker + } + + createPlanner := func(bkt *bucket.ClientMock, partitionedGroupInfo PartitionedGroupInfo) *PartitionCompactionPlanner { + registerer := prometheus.NewPedanticRegistry() + metrics := newCompactorMetrics(registerer) + logger := log.NewLogfmtLogger(&concurrency.SyncBuffer{}) + instrBkt := objstore.WithNoopInstr(bkt) + ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, instrBkt, 0, 1) + + return NewPartitionCompactionPlanner( + context.Background(), + instrBkt, + logger, + []int64{2 * time.Hour.Milliseconds()}, + func() map[ulid.ULID]*metadata.NoCompactMark { return nil }, + currentCompactor, + "test-user", + 10*time.Millisecond, + visitMarkerTimeout, + time.Minute, + metrics, + ignoreDeletionMarkFilter, + ) + } + + t.Run("should plan successfully when no blocks are marked for deletion", func(t *testing.T) { + bkt, uploadedVisitMarker := setupBucket(t, nil) + partitionedGroupContent := PartitionedGroupInfo{} + partitionedGroupFile := GetPartitionedGroupFile(partitionedGroupID) + raw, _ := bkt.Get(context.Background(), partitionedGroupFile) + buf := make([]byte, 4096) + n, _ := raw.Read(buf) + _ = json.Unmarshal(buf[:n], &partitionedGroupContent) + + p := createPlanner(bkt, partitionedGroupContent) + + blocks := []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{ULID: block1ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}}, + {BlockMeta: tsdb.BlockMeta{ULID: block2ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}}, + } + + actual, err := p.Plan(context.Background(), blocks, nil, &cortextsdb.CortexMetaExtensions{ + PartitionInfo: &cortextsdb.PartitionInfo{ + PartitionCount: 1, + PartitionID: partitionID, + PartitionedGroupID: partitionedGroupID, + PartitionedGroupCreationTime: partitionedGroupContent.CreationTime, + }, + }) + + require.NoError(t, err) + require.Len(t, actual, 2) + assert.Equal(t, block1ulid, actual[0].ULID) + assert.Equal(t, block2ulid, actual[1].ULID) + // Visit marker should not be marked as Failed + assert.NotEqual(t, Failed, uploadedVisitMarker.GetStatus()) + }) + + t.Run("should fail when blocks are marked for deletion", func(t *testing.T) { + bkt, uploadedVisitMarker := setupBucket(t, []ulid.ULID{block2ulid}) + partitionedGroupContent := PartitionedGroupInfo{} + partitionedGroupFile := GetPartitionedGroupFile(partitionedGroupID) + raw, _ := bkt.Get(context.Background(), partitionedGroupFile) + buf := make([]byte, 4096) + n, _ := raw.Read(buf) + _ = json.Unmarshal(buf[:n], &partitionedGroupContent) + + p := createPlanner(bkt, partitionedGroupContent) + + blocks := []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{ULID: block1ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}}, + {BlockMeta: tsdb.BlockMeta{ULID: block2ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}}, + {BlockMeta: tsdb.BlockMeta{ULID: block3ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}}, + } + + actual, err := p.Plan(context.Background(), blocks, nil, &cortextsdb.CortexMetaExtensions{ + PartitionInfo: &cortextsdb.PartitionInfo{ + PartitionCount: 1, + PartitionID: partitionID, + PartitionedGroupID: partitionedGroupID, + PartitionedGroupCreationTime: partitionedGroupContent.CreationTime, + }, + }) + + require.Error(t, err) + assert.Contains(t, err.Error(), "partitioned group contains 1 deleted blocks") + assert.Nil(t, actual) + assert.Equal(t, Failed, uploadedVisitMarker.GetStatus()) + }) + + t.Run("should fail when multiple blocks are marked for deletion", func(t *testing.T) { + bkt, uploadedVisitMarker := setupBucket(t, []ulid.ULID{block1ulid, block3ulid}) + partitionedGroupContent := PartitionedGroupInfo{} + partitionedGroupFile := GetPartitionedGroupFile(partitionedGroupID) + raw, _ := bkt.Get(context.Background(), partitionedGroupFile) + buf := make([]byte, 4096) + n, _ := raw.Read(buf) + _ = json.Unmarshal(buf[:n], &partitionedGroupContent) + + p := createPlanner(bkt, partitionedGroupContent) + + blocks := []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{ULID: block1ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}}, + {BlockMeta: tsdb.BlockMeta{ULID: block2ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}}, + {BlockMeta: tsdb.BlockMeta{ULID: block3ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}}, + } + + actual, err := p.Plan(context.Background(), blocks, nil, &cortextsdb.CortexMetaExtensions{ + PartitionInfo: &cortextsdb.PartitionInfo{ + PartitionCount: 1, + PartitionID: partitionID, + PartitionedGroupID: partitionedGroupID, + PartitionedGroupCreationTime: partitionedGroupContent.CreationTime, + }, + }) + + require.Error(t, err) + assert.Contains(t, err.Error(), "partitioned group contains 2 deleted blocks") + assert.Nil(t, actual) + assert.Equal(t, Failed, uploadedVisitMarker.GetStatus()) + }) +} From 0bd62fdee148cec59d5559d89164b025d1b76516 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Wed, 1 Apr 2026 23:36:04 +0000 Subject: [PATCH 2/4] update CHANGELOG Signed-off-by: Alex Le --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ca162be539e..135bbdf17ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357 * [ENHANCEMENT] Tenant Federation: Add a local cache to regex resolver. #7363 * [ENHANCEMENT] Query Scheduler: Add `cortex_query_scheduler_tracked_requests` metric to track the current number of requests held by the scheduler. #7355 +* [ENHANCEMENT] Compactor: Prevent partition compaction to compact any blocks marked for deletion. #7391 * [BUGFIX] Alertmanager: Fix disappearing user config and state when ring is temporarily unreachable. #7372 * [BUGFIX] Fix nil when ingester_query_max_attempts > 1. #7369 * [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370 From 9283dc2be9bf6f1d6abf93b7556f047de176c67a Mon Sep 17 00:00:00 2001 From: Alex Le Date: Fri, 3 Apr 2026 01:30:54 +0000 Subject: [PATCH 3/4] fix lint Signed-off-by: Alex Le --- pkg/compactor/partition_compaction_planner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/compactor/partition_compaction_planner.go b/pkg/compactor/partition_compaction_planner.go index 16bf9de1bc4..a952a838d65 100644 --- a/pkg/compactor/partition_compaction_planner.go +++ b/pkg/compactor/partition_compaction_planner.go @@ -185,7 +185,7 @@ func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasB return nil, err } var deletedBlocks []string - for deletedBlock, _ := range p.ignoreDeletionMarkFilter.DeletionMarkBlocks() { + for deletedBlock := range p.ignoreDeletionMarkFilter.DeletionMarkBlocks() { deletedBlocks = append(deletedBlocks, deletedBlock.String()) } if len(deletedBlocks) > 0 { From 78442213dfc51a5cb731c45c43eda37c93fce4ad Mon Sep 17 00:00:00 2001 From: Alex Le Date: Tue, 7 Apr 2026 16:17:49 -0700 Subject: [PATCH 4/4] address comments and fix test race condition Signed-off-by: Alex Le --- pkg/compactor/compactor.go | 6 ++-- pkg/compactor/partition_compaction_planner.go | 2 ++ .../partition_compaction_planner_test.go | 33 ++++++------------- 3 files changed, 16 insertions(+), 25 deletions(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index ab6f44a7399..dc008f9908b 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -1048,8 +1048,10 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg), } - // Add ignoreDeletionMarkFilter only when not using bucket index discovery. - if blockDiscoveryStrategy != cortex_tsdb.BucketIndexDiscovery { + // Add ignoreDeletionMarkFilter only when not using bucket index discovery or using default compaction strategy. + // CompactionStrategyDefault would mark parent blocks for deletion after compaction is finished. ShuffleShardingGrouper + // should ignore blocks marked for deletion during grouping stage directly. + if blockDiscoveryStrategy != cortex_tsdb.BucketIndexDiscovery || c.compactorCfg.CompactionStrategy == util.CompactionStrategyDefault { filterList = append(filterList, ignoreDeletionMarkFilter) } diff --git a/pkg/compactor/partition_compaction_planner.go b/pkg/compactor/partition_compaction_planner.go index a952a838d65..24732260a16 100644 --- a/pkg/compactor/partition_compaction_planner.go +++ b/pkg/compactor/partition_compaction_planner.go @@ -182,6 +182,8 @@ func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasB } err = p.ignoreDeletionMarkFilter.Filter(p.ctx, resultMetasMap, p.compactorMetrics.metaFetcherSynced, p.compactorMetrics.metaFetcherModified) if err != nil { + visitMarkerManager.MarkWithStatus(p.ctx, Failed) + level.Warn(p.logger).Log("msg", "unable to filter blocks by deletion marker", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "err", err) return nil, err } var deletedBlocks []string diff --git a/pkg/compactor/partition_compaction_planner_test.go b/pkg/compactor/partition_compaction_planner_test.go index 4d0efc1d76f..f25c648c4c7 100644 --- a/pkg/compactor/partition_compaction_planner_test.go +++ b/pkg/compactor/partition_compaction_planner_test.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "io" "path" "testing" "time" @@ -367,9 +366,8 @@ func TestPartitionCompactionPlanner_PlanWithDeletionMarkFilter(t *testing.T) { partitionID := 0 visitMarkerTimeout := 5 * time.Minute - setupBucket := func(t *testing.T, deletionMarkBlockIDs []ulid.ULID) (*bucket.ClientMock, *partitionVisitMarker) { + setupBucket := func(t *testing.T, deletionMarkBlockIDs []ulid.ULID) *bucket.ClientMock { bkt := &bucket.ClientMock{} - uploadedVisitMarker := &partitionVisitMarker{} expireTime := time.Now() visitMarker := partitionVisitMarker{ @@ -400,14 +398,7 @@ func TestPartitionCompactionPlanner_PlanWithDeletionMarkFilter(t *testing.T) { bkt.MockGet(visitMarkerFile, string(visitMarkerFileContent), nil) bkt.MockGet(partitionedGroupFile, string(partitionedGroupContent), nil) - // Capture uploaded visit marker content - bkt.On("Upload", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { - reader := args.Get(2).(io.Reader) - data, err := io.ReadAll(reader) - if err == nil { - _ = json.Unmarshal(data, uploadedVisitMarker) - } - }).Return(nil) + bkt.On("Upload", mock.Anything, mock.Anything, mock.Anything).Return(nil) // Mock deletion marks for specified blocks deletionMarkIDs := make(map[ulid.ULID]struct{}) @@ -431,10 +422,10 @@ func TestPartitionCompactionPlanner_PlanWithDeletionMarkFilter(t *testing.T) { } } - return bkt, uploadedVisitMarker + return bkt } - createPlanner := func(bkt *bucket.ClientMock, partitionedGroupInfo PartitionedGroupInfo) *PartitionCompactionPlanner { + createPlanner := func(bkt *bucket.ClientMock) *PartitionCompactionPlanner { registerer := prometheus.NewPedanticRegistry() metrics := newCompactorMetrics(registerer) logger := log.NewLogfmtLogger(&concurrency.SyncBuffer{}) @@ -458,7 +449,7 @@ func TestPartitionCompactionPlanner_PlanWithDeletionMarkFilter(t *testing.T) { } t.Run("should plan successfully when no blocks are marked for deletion", func(t *testing.T) { - bkt, uploadedVisitMarker := setupBucket(t, nil) + bkt := setupBucket(t, nil) partitionedGroupContent := PartitionedGroupInfo{} partitionedGroupFile := GetPartitionedGroupFile(partitionedGroupID) raw, _ := bkt.Get(context.Background(), partitionedGroupFile) @@ -466,7 +457,7 @@ func TestPartitionCompactionPlanner_PlanWithDeletionMarkFilter(t *testing.T) { n, _ := raw.Read(buf) _ = json.Unmarshal(buf[:n], &partitionedGroupContent) - p := createPlanner(bkt, partitionedGroupContent) + p := createPlanner(bkt) blocks := []*metadata.Meta{ {BlockMeta: tsdb.BlockMeta{ULID: block1ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}}, @@ -486,12 +477,10 @@ func TestPartitionCompactionPlanner_PlanWithDeletionMarkFilter(t *testing.T) { require.Len(t, actual, 2) assert.Equal(t, block1ulid, actual[0].ULID) assert.Equal(t, block2ulid, actual[1].ULID) - // Visit marker should not be marked as Failed - assert.NotEqual(t, Failed, uploadedVisitMarker.GetStatus()) }) t.Run("should fail when blocks are marked for deletion", func(t *testing.T) { - bkt, uploadedVisitMarker := setupBucket(t, []ulid.ULID{block2ulid}) + bkt := setupBucket(t, []ulid.ULID{block2ulid}) partitionedGroupContent := PartitionedGroupInfo{} partitionedGroupFile := GetPartitionedGroupFile(partitionedGroupID) raw, _ := bkt.Get(context.Background(), partitionedGroupFile) @@ -499,7 +488,7 @@ func TestPartitionCompactionPlanner_PlanWithDeletionMarkFilter(t *testing.T) { n, _ := raw.Read(buf) _ = json.Unmarshal(buf[:n], &partitionedGroupContent) - p := createPlanner(bkt, partitionedGroupContent) + p := createPlanner(bkt) blocks := []*metadata.Meta{ {BlockMeta: tsdb.BlockMeta{ULID: block1ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}}, @@ -519,11 +508,10 @@ func TestPartitionCompactionPlanner_PlanWithDeletionMarkFilter(t *testing.T) { require.Error(t, err) assert.Contains(t, err.Error(), "partitioned group contains 1 deleted blocks") assert.Nil(t, actual) - assert.Equal(t, Failed, uploadedVisitMarker.GetStatus()) }) t.Run("should fail when multiple blocks are marked for deletion", func(t *testing.T) { - bkt, uploadedVisitMarker := setupBucket(t, []ulid.ULID{block1ulid, block3ulid}) + bkt := setupBucket(t, []ulid.ULID{block1ulid, block3ulid}) partitionedGroupContent := PartitionedGroupInfo{} partitionedGroupFile := GetPartitionedGroupFile(partitionedGroupID) raw, _ := bkt.Get(context.Background(), partitionedGroupFile) @@ -531,7 +519,7 @@ func TestPartitionCompactionPlanner_PlanWithDeletionMarkFilter(t *testing.T) { n, _ := raw.Read(buf) _ = json.Unmarshal(buf[:n], &partitionedGroupContent) - p := createPlanner(bkt, partitionedGroupContent) + p := createPlanner(bkt) blocks := []*metadata.Meta{ {BlockMeta: tsdb.BlockMeta{ULID: block1ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}}, @@ -551,6 +539,5 @@ func TestPartitionCompactionPlanner_PlanWithDeletionMarkFilter(t *testing.T) { require.Error(t, err) assert.Contains(t, err.Error(), "partitioned group contains 2 deleted blocks") assert.Nil(t, actual) - assert.Equal(t, Failed, uploadedVisitMarker.GetStatus()) }) }