Code Diff
diff --git a/pkg/storage/unified/search/bleve.go b/pkg/storage/unified/search/bleve.go
index 9afdc438b711..c2b4948d9be5 100644
--- a/pkg/storage/unified/search/bleve.go
+++ b/pkg/storage/unified/search/bleve.go
@@ -153,6 +153,9 @@ type bleveBackend struct {
// is empty. Guaranteed non-nil when opts.Snapshot.Store is set.
runningBuildVersion *semver.Version
+ // maxSupportedIndexFormat is the newest Bleve segment format this process can read.
+ maxSupportedIndexFormat string
+
bgTasksCancel func()
bgTasksWg sync.WaitGroup
@@ -202,11 +205,15 @@ func NewBleveBackend(opts BleveOptions, indexMetrics *resource.BleveIndexMetrics
if opts.Snapshot.Store != nil && runningBuildVersion == nil {
return nil, fmt.Errorf("bleve backend requires non-empty BuildVersion when snapshot store is configured")
}
+ maxSupportedFormat := maxSupportedIndexFormat()
l := opts.Logger
if l == nil {
l = log.New("bleve-backend")
}
+ if opts.Snapshot.Store != nil && maxSupportedFormat == "" {
+ l.Warn("could not detect bleve index format version; snapshot format compatibility gate disabled")
+ }
ownFn := opts.OwnsIndex
if ownFn == nil {
@@ -215,15 +222,16 @@ func NewBleveBackend(opts BleveOptions, indexMetrics *resource.BleveIndexMetrics
}
be := &bleveBackend{
- log: l,
- cache: map[resource.NamespacedResource]*bleveIndex{},
- opts: opts,
- ownsIndexFn: ownFn,
- indexMetrics: indexMetrics,
- selectableFields: opts.SelectableFieldsForKinds,
- runningBuildVersion: runningBuildVersion,
- lastUploadTime: map[resource.NamespacedResource]time.Time{},
- inFlightBuildDirs: map[string]int{},
+ log: l,
+ cache: map[resource.NamespacedResource]*bleveIndex{},
+ opts: opts,
+ ownsIndexFn: ownFn,
+ indexMetrics: indexMetrics,
+ selectableFields: opts.SelectableFieldsForKinds,
+ runningBuildVersion: runningBuildVersion,
+ maxSupportedIndexFormat: maxSupportedFormat,
+ lastUploadTime: map[resource.NamespacedResource]time.Time{},
+ inFlightBuildDirs: map[string]int{},
}
ctx, cancel := context.WithCancel(context.Background())
diff --git a/pkg/storage/unified/search/bleve_snapshot.go b/pkg/storage/unified/search/bleve_snapshot.go
index 81a30284ef30..02bedcc3593e 100644
--- a/pkg/storage/unified/search/bleve_snapshot.go
+++ b/pkg/storage/unified/search/bleve_snapshot.go
@@ -6,11 +6,15 @@ import (
"fmt"
"os"
"path/filepath"
+ "slices"
"sort"
+ "strconv"
+ "strings"
"time"
"github.com/Masterminds/semver"
"github.com/blevesearch/bleve/v2"
+ "github.com/blevesearch/bleve/v2/index/scorch"
"github.com/oklog/ulid/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
@@ -19,6 +23,46 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
+const zapSegmentType = "zap"
+
+func maxSupportedIndexFormat() string {
+ versions := scorch.SupportedSegmentTypeVersions(zapSegmentType)
+ if len(versions) == 0 {
+ return ""
+ }
+ return indexFormat(zapSegmentType, slices.Max(versions))
+}
+
+func indexFormat(formatType string, version uint32) string {
+ if formatType == "" || version == 0 {
+ return ""
+ }
+ return fmt.Sprintf("%s/%d", formatType, version)
+}
+
+func parseIndexFormat(format string) (string, uint32, bool) {
+ formatType, versionString, ok := strings.Cut(format, "/")
+ if !ok || formatType == "" {
+ return "", 0, false
+ }
+ version, err := strconv.ParseUint(versionString, 10, 32)
+ if err != nil || version == 0 {
+ return "", 0, false
+ }
+ return formatType, uint32(version), true
+}
+
+// isSnapshotIndexFormatUnknownOrSupported treats unknown formats as supported
+// for legacy snapshots uploaded before IndexFormat was added to the manifest.
+func isSnapshotIndexFormatUnknownOrSupported(snapshotFormat, maxSupportedFormat string) bool {
+ if snapshotFormat == "" || maxSupportedFormat == "" {
+ return true
+ }
+ snapshotType, snapshotVersion, ok := parseIndexFormat(snapshotFormat)
+ maxSupportedType, maxSupportedVersion, maxOK := parseIndexFormat(maxSupportedFormat)
+ return ok && maxOK && snapshotType == maxSupportedType && snapshotVersion <= maxSupportedVersion
+}
+
// Labels for the index_server_snapshot_downloads_total counter.
const (
// snapshotPolicyTiered selects via pickBestSnapshot (initial startup;
@@ -121,7 +165,7 @@ func (b *bleveBackend) tryDownloadFreshSameVersionSnapshot(
return b.downloadSelectedSnapshot(ctx, key, resourceDir, policy, spanName, logger,
func(ctx context.Context) (ulid.ULID, *IndexMeta, error) {
- k, m, err := findFreshSnapshotByBuildStart(ctx, b.opts.Snapshot.Store, key, notOlderThan, b.opts.BuildVersion)
+ k, m, err := findFreshSnapshotByBuildStart(ctx, b.opts.Snapshot.Store, key, notOlderThan, b.opts.BuildVersion, b.maxSupportedIndexFormat, logger)
if err != nil {
return ulid.ULID{}, nil, fmt.Errorf("probing for fresh snapshot: %w", err)
}
@@ -161,6 +205,7 @@ func (b *bleveBackend) downloadSelectedSnapshot(
attrs = append(attrs,
attribute.String("snapshot_key", snapKey.String()),
attribute.String("snapshot_version", meta.BuildVersion),
+ attribute.String("snapshot_index_format", meta.IndexFormat),
attribute.Int64("snapshot_rv", meta.LatestResourceVersion),
)
// Zero-value BuildTime means the snapshot was uploaded before that
@@ -197,6 +242,7 @@ func (b *bleveBackend) downloadSelectedSnapshot(
logFields := []any{
"snapshot_key", snapKey.String(),
"snapshot_version", meta.BuildVersion,
+ "snapshot_index_format", meta.IndexFormat,
"snapshot_rv", meta.LatestResourceVersion,
"snapshot_uploaded", meta.UploadTimestamp,
}
@@ -268,8 +314,9 @@ func (b *bleveBackend) downloadSelectedSnapshot(
return idx, name, rv, nil
}
-// pickBestSnapshot applies hard filters (upload time, unparseable version)
-// and the three-tier preference to pick the best snapshot, if any.
+// pickBestSnapshot applies hard filters (upload time, index format,
+// unparseable version) and the three-tier preference to pick the best
+// snapshot, if any.
//
// Tier 0 (ideal): MinBuildVersion <= v <= runningVersion
// Tier 1 (older, acceptable): v < MinBuildVersion
@@ -280,7 +327,7 @@ func (b *bleveBackend) pickBestSnapshot(all map[ulid.ULID]*IndexMeta, notOlderTh
minVersion := b.opts.Snapshot.MinBuildVersion
running := b.runningBuildVersion
- var droppedAge, droppedUnparseable int
+ var droppedAge, droppedUnparseable, droppedFormatUnsupported int
candidates := make([]snapshotCandidate, 0, len(all))
for k, m := range all {
// Hard filter: age.
@@ -288,6 +335,18 @@ func (b *bleveBackend) pickBestSnapshot(all map[ulid.ULID]*IndexMeta, notOlderTh
droppedAge++
continue
}
+ if !isSnapshotIndexFormatUnknownOrSupported(m.IndexFormat, b.maxSupportedIndexFormat) {
+ droppedFormatUnsupported++
+ logger.Debug("index snapshot candidate dropped: unsupported format",
+ "key", k.String(),
+ "snapshot_format", m.IndexFormat,
+ "max_supported_format", b.maxSupportedIndexFormat,
+ "version", m.BuildVersion,
+ "rv", m.LatestResourceVersion,
+ "uploaded", m.UploadTimestamp,
+ )
+ continue
+ }
// Hard filter: unparseable version (we can't tier it). Metadata validation
// lives here rather than in the store so we don't have to duplicate it
// across store implementations.
@@ -307,13 +366,15 @@ func (b *bleveBackend) pickBestSnapshot(all map[ulid.ULID]*IndexMeta, notOlderTh
"key", c.key.String(),
"tier", c.tier,
"version", c.version.String(),
+ "snapshot_format", c.meta.IndexFormat,
+ "max_supported_format", b.maxSupportedIndexFormat,
"rv", c.meta.LatestResourceVersion,
"uploaded", c.meta.UploadTimestamp,
)
}
if len(candidates) == 0 {
- logger.Debug("no index snapshot candidates", "total", len(all), "dropped_age", droppedAge, "dropped_unparseable", droppedUnparseable)
+ logger.Debug("no index snapshot candidates", "total", len(all), "dropped_age", droppedAge, "dropped_unparseable", droppedUnparseable, "dropped_format_unsupported", droppedFormatUnsupported, "max_supported_format", b.maxSupportedIndexFormat)
return snapshotCandidate{}, false
}
@@ -333,9 +394,12 @@ func (b *bleveBackend) pickBestSnapshot(all map[ulid.ULID]*IndexMeta, notOlderTh
logger.Debug("selected index snapshot",
"key", candidates[0].key.String(),
"tier", candidates[0].tier,
+ "snapshot_format", candidates[0].meta.IndexFormat,
+ "max_supported_format", b.maxSupportedIndexFormat,
"candidates", len(candidates),
"dropped_age", droppedAge,
"dropped_unparseable", droppedUnparseable,
+ "dropped_format_unsupported", droppedFormatUnsupported,
)
return candidates[0], true
}
@@ -403,9 +467,9 @@ func (b *bleveBackend) recordSnapshotDownloadOutcome(policy, status string) {
}
// findFreshSnapshotByUploadTime walks namespace snapshots newest-first and
-// returns the first one whose BuildVersion matches runningVersion and
-// whose ULID time is after notOlderThan. Returns a zero key and nil meta when
-// no such snapshot exists.
+// returns the first one whose BuildVersion matches runningVersion, whose
+// index format is not newer than this process can support, and whose ULID time
+// is after notOlderThan. Returns a zero key and nil meta when no such snapshot exists.
//
// Walking (rather than checking only the newest) is necessary in mixed-version
// clusters — either transiently during rolling upgrades, or as a deliberate
@@ -423,8 +487,10 @@ func findFreshSnapshotByUploadTime(
ns resource.NamespacedResource,
notOlderThan time.Time,
runningVersion string,
+ maxSupportedIndexFormat string,
+ logger log.Logger,
) (ulid.ULID, *IndexMeta, error) {
- return findFreshSnapshot(ctx, store, ns, notOlderThan, runningVersion, func(*IndexMeta) bool {
+ return findFreshSnapshot(ctx, store, ns, notOlderThan, runningVersion, maxSupportedIndexFormat, logger, func(*IndexMeta) bool {
return true
})
}
@@ -447,8 +513,10 @@ func findFreshSnapshotByBuildStart(
ns resource.NamespacedResource,
notOlderThan time.Time,
runningVersion string,
+ maxSupportedIndexFormat string,
+ logger log.Logger,
) (ulid.ULID, *IndexMeta, error) {
- return findFreshSnapshot(ctx, store, ns, notOlderThan, runningVersion, func(meta *IndexMeta) bool {
+ return findFreshSnapshot(ctx, store, ns, notOlderThan, runningVersion, maxSupportedIndexFormat, logger, func(meta *IndexMeta) bool {
return !meta.BuildTime.IsZero() && meta.BuildTime.After(notOlderThan)
})
}
@@ -459,6 +527,8 @@ func findFreshSnapshot(
ns resource.NamespacedResource,
notOlderThan time.Time,
runningVersion string,
+ maxSupportedIndexFormat string,
+ logger log.Logger,
isFresh func(*IndexMeta) bool,
) (ulid.ULID, *IndexMeta, error) {
keys, err := retryRemoteIndexStoreValue(ctx, snapshotStoreOpListIndexKeys, nil, func() ([]ulid.ULID, error) {
@@ -489,6 +559,16 @@ func findFreshSnapshot(
return ulid.ULID{}, nil, fmt.Errorf("reading manifest for %s: %w", k, err)
}
+ if !isSnapshotIndexFormatUnknownOrSupported(meta.IndexFormat, maxSupportedIndexFormat) {
+ logger.Debug("index snapshot candidate dropped: unsupported format",
+ "key", k.String(),
+ "snapshot_format", meta.IndexFormat,
+ "max_supported_format", maxSupportedIndexFormat,
+ "version", meta.BuildVersion,
+ )
+ continue
+ }
+
if meta.BuildVersion == runningVersion && isFresh(meta) {
return k, meta, nil
}
diff --git a/pkg/storage/unified/search/bleve_snapshot_test.go b/pkg/storage/unified/search/bleve_snapshot_test.go
index d91dcec7cea9..9a6cc5dfdbcb 100644
--- a/pkg/storage/unified/search/bleve_snapshot_test.go
+++ b/pkg/storage/unified/search/bleve_snapshot_test.go
@@ -73,6 +73,20 @@ func makeULID(t *testing.T, at time.Time) ulid.ULID {
return k
}
+func testIndexFormat(t *testing.T) string {
+ t.Helper()
+ format := maxSupportedIndexFormat()
+ require.NotEmpty(t, format)
+ return format
+}
+
+func testIndexFormatDelta(t *testing.T, delta int) string {
+ t.Helper()
+ formatType, version, ok := parseIndexFormat(testIndexFormat(t))
+ require.True(t, ok)
+ return indexFormat(formatType, uint32(int(version)+delta))
+}
+
func TestSnapshotTier(t *testing.T) {
running := semver.MustParse("11.5.0")
minV := semver.MustParse("11.4.0")
@@ -111,11 +125,14 @@ func TestPickBestSnapshot(t *testing.T) {
}
}
+ format := testIndexFormat(t)
+
newBackend := func(minVersion *semver.Version) *bleveBackend {
return &bleveBackend{
- log: log.New("bleve-snapshot-test"),
- opts: BleveOptions{Snapshot: SnapshotOptions{MinBuildVersion: minVersion}},
- runningBuildVersion: running,
+ log: log.New("bleve-snapshot-test"),
+ opts: BleveOptions{Snapshot: SnapshotOptions{MinBuildVersion: minVersion}},
+ runningBuildVersion: running,
+ maxSupportedIndexFormat: format,
}
}
cutoff := func(maxAge time.Duration) time.Time { return now.Add(-maxAge) }
@@ -186,6 +203,32 @@ func TestPickBestSnapshot(t *testing.T) {
assert.Equal(t, 2, c.tier)
})
+ t.Run("index format gate", func(t *testing.T) {
+ older := makeULID(t, now.Add(-30*time.Second))
+ same := makeULID(t, now.Add(-20*time.Second))
+ legacy := makeULID(t, now.Add(-10*time.Second))
+ tooNew := makeULID(t, now)
+
+ all := map[ulid.ULID]*IndexMeta{
+ older: snap("11.5.0", 100, time.Minute),
+ same: snap("11.5.0", 200, time.Minute),
+ legacy: snap("11.5.0", 300, time.Minute),
+ tooNew: snap("11.5.0", 400, time.Minute),
+ }
+ all[older].IndexFormat = testIndexFormatDelta(t, -1)
+ all[same].IndexFormat = format
+ all[tooNew].IndexFormat = testIndexFormatDelta(t, 1)
+
+ c, ok := newBackend(minV).pickBestSnapshot(all, cutoff(24*time.Hour), log.New("bleve-snapshot-test"))
+ require.True(t, ok)
+ assert.Equal(t, legacy, c.key, "empty legacy format remains compatible and normal tie-breaking still applies")
+
+ delete(all, legacy)
+ c, ok = newBackend(minV).pickBestSnapshot(all, cutoff(24*time.Hour), log.New("bleve-snapshot-test"))
+ require.True(t, ok)
+ assert.Equal(t, same, c.key, "same format should beat older format by RV")
+ })
+
t.Run("within tier: version desc, then RV desc, then upload desc", func(t *testing.T) {
a := makeULID(t, now.Add(-30*time.Second))
b := makeULID(t, now.Add(-20*time.Second))
@@ -964,7 +1007,7 @@ type probeCase struct {
wantErr string
}
-type probeFn func(ctx context.Context, s RemoteIndexStore, ns resource.NamespacedResource, notOlderThan time.Time, v string) (ulid.ULID, *IndexMeta, error)
+type probeFn func(ctx context.Context, s RemoteIndexStore, ns resource.NamespacedResource, notOlderThan time.Time, v string, f string, logger log.Logger) (ulid.ULID, *IndexMeta, error)
func runProbeCases(t *testing.T, probe probeFn, cases []probeCase) {
t.Helper()
@@ -973,7 +1016,8 @@
... [truncated]