Code Diff
diff --git a/pkg/storage/unified/search/bleve_snapshot_test.go b/pkg/storage/unified/search/bleve_snapshot_test.go
index d56588169f68..e7728b8fc0fb 100644
--- a/pkg/storage/unified/search/bleve_snapshot_test.go
+++ b/pkg/storage/unified/search/bleve_snapshot_test.go
@@ -72,6 +72,25 @@ func (f *fakeRemoteIndexStore) ListIndexes(context.Context, resource.NamespacedR
return out, nil
}
+func (f *fakeRemoteIndexStore) ListIndexKeys(context.Context, resource.NamespacedResource) ([]ulid.ULID, error) {
+ if f.listErr != nil {
+ return nil, f.listErr
+ }
+ keys := make([]ulid.ULID, 0, len(f.data))
+ for k := range f.data {
+ keys = append(keys, k)
+ }
+ return keys, nil
+}
+
+func (f *fakeRemoteIndexStore) GetIndexMeta(_ context.Context, _ resource.NamespacedResource, k ulid.ULID) (*IndexMeta, error) {
+ meta, ok := f.data[k]
+ if !ok {
+ return nil, ErrSnapshotNotFound
+ }
+ return meta, nil
+}
+
func (f *fakeRemoteIndexStore) DownloadIndex(_ context.Context, _ resource.NamespacedResource, k ulid.ULID, destDir string) (*IndexMeta, error) {
f.downloadCalls.Add(1)
if f.downloadErr != nil {
diff --git a/pkg/storage/unified/search/bleve_snapshot_upload_test.go b/pkg/storage/unified/search/bleve_snapshot_upload_test.go
index 742930ce4bc8..cb76908487e8 100644
--- a/pkg/storage/unified/search/bleve_snapshot_upload_test.go
+++ b/pkg/storage/unified/search/bleve_snapshot_upload_test.go
@@ -78,6 +78,14 @@ func (s *uploadTestStore) ListIndexes(context.Context, resource.NamespacedResour
panic("ListIndexes not implemented for uploadTestStore")
}
+func (s *uploadTestStore) ListIndexKeys(context.Context, resource.NamespacedResource) ([]ulid.ULID, error) {
+ panic("ListIndexKeys not implemented for uploadTestStore")
+}
+
+func (s *uploadTestStore) GetIndexMeta(context.Context, resource.NamespacedResource, ulid.ULID) (*IndexMeta, error) {
+ panic("GetIndexMeta not implemented for uploadTestStore")
+}
+
func (s *uploadTestStore) DeleteIndex(context.Context, resource.NamespacedResource, ulid.ULID) error {
panic("DeleteIndex not implemented for uploadTestStore")
}
diff --git a/pkg/storage/unified/search/remote_index_cleanup_test.go b/pkg/storage/unified/search/remote_index_cleanup_test.go
index 09b2c96d72ce..d7eb3fe60c3c 100644
--- a/pkg/storage/unified/search/remote_index_cleanup_test.go
+++ b/pkg/storage/unified/search/remote_index_cleanup_test.go
@@ -501,6 +501,12 @@ func (s *recordingStore) ListIndexes(ctx context.Context, r resource.NamespacedR
s.mu.Unlock()
return s.inner.ListIndexes(ctx, r)
}
+func (s *recordingStore) ListIndexKeys(ctx context.Context, r resource.NamespacedResource) ([]ulid.ULID, error) {
+ return s.inner.ListIndexKeys(ctx, r)
+}
+func (s *recordingStore) GetIndexMeta(ctx context.Context, r resource.NamespacedResource, k ulid.ULID) (*IndexMeta, error) {
+ return s.inner.GetIndexMeta(ctx, r, k)
+}
func (s *recordingStore) DeleteIndex(ctx context.Context, r resource.NamespacedResource, k ulid.ULID) error {
s.mu.Lock()
s.deleteIndex[r.Namespace]++
@@ -622,6 +628,12 @@ func (s *controllableLockStore) LockNamespaceForCleanup(_ context.Context, ns st
func (s *controllableLockStore) ListIndexes(ctx context.Context, r resource.NamespacedResource) (map[ulid.ULID]*IndexMeta, error) {
return s.inner.ListIndexes(ctx, r)
}
+func (s *controllableLockStore) ListIndexKeys(ctx context.Context, r resource.NamespacedResource) ([]ulid.ULID, error) {
+ return s.inner.ListIndexKeys(ctx, r)
+}
+func (s *controllableLockStore) GetIndexMeta(ctx context.Context, r resource.NamespacedResource, k ulid.ULID) (*IndexMeta, error) {
+ return s.inner.GetIndexMeta(ctx, r, k)
+}
func (s *controllableLockStore) DeleteIndex(ctx context.Context, r resource.NamespacedResource, k ulid.ULID) error {
return s.inner.DeleteIndex(ctx, r, k)
}
diff --git a/pkg/storage/unified/search/remote_index_store.go b/pkg/storage/unified/search/remote_index_store.go
index 4d3fa7a14790..d4e0ec4c3bb1 100644
--- a/pkg/storage/unified/search/remote_index_store.go
+++ b/pkg/storage/unified/search/remote_index_store.go
@@ -37,6 +37,17 @@ const (
// ErrNonRegularFile is returned when a non-regular file (symlink, pipe, socket, device) is found during index upload.
var ErrNonRegularFile = errors.New("non-regular file found in index directory")
+// ErrSnapshotNotFound is returned when the snapshot manifest for the given
+// index key does not exist (e.g. the snapshot was deleted, or the upload is
+// still in progress and the manifest hasn't been written yet).
+var ErrSnapshotNotFound = errors.New("snapshot not found")
+
+// ErrInvalidManifest is returned when the snapshot manifest exists but is
+// structurally invalid (oversized, unparseable, empty file list, or
+// non-canonical paths). Distinct from ErrSnapshotNotFound (manifest absent)
+// and from transient download errors.
+var ErrInvalidManifest = errors.New("invalid manifest")
+
// IndexMeta contains metadata about a remote index snapshot.
type IndexMeta struct {
// GrafanaBuildVersion is the version of Grafana that built this index.
@@ -95,6 +106,20 @@ type RemoteIndexStore interface {
// Note: indexes may be deleted between listing and subsequent operations.
ListIndexes(ctx context.Context, nsResource resource.NamespacedResource) (map[ulid.ULID]*IndexMeta, error)
+ // ListIndexKeys returns the ULID keys of all index snapshots under the
+ // given namespaced resource. The returned list may include incomplete
+ // uploads (snapshots whose manifest has not yet been written); callers
+ // that need to distinguish complete from incomplete snapshots should
+ // follow up with GetIndexMeta. Ordering is unspecified.
+ ListIndexKeys(ctx context.Context, nsResource resource.NamespacedResource) ([]ulid.ULID, error)
+
+ // GetIndexMeta returns the manifest for a single index snapshot. It
+ // returns ErrSnapshotNotFound if no manifest exists for the given key,
+ // or an error wrapping ErrInvalidManifest if the manifest exists but is
+ // structurally invalid (oversized, unparseable, empty file list, or
+ // non-canonical paths).
+ GetIndexMeta(ctx context.Context, nsResource resource.NamespacedResource, indexKey ulid.ULID) (*IndexMeta, error)
+
// DeleteIndex deletes all files for an index snapshot.
DeleteIndex(ctx context.Context, nsResource resource.NamespacedResource, indexKey ulid.ULID) error
@@ -326,20 +351,9 @@ func (s *BucketRemoteIndexStore) uploadFile(ctx context.Context, objectKey, loca
func (s *BucketRemoteIndexStore) DownloadIndex(ctx context.Context, nsResource resource.NamespacedResource, indexKey ulid.ULID, destDir string) (_ *IndexMeta, retErr error) {
pfx := indexPrefix(nsResource, indexKey.String())
- // Download and parse the snapshot manifest with a size limit to avoid OOM on malicious files.
- var metaBuf bytes.Buffer
- if err := s.bucket.Download(ctx, pfx+snapshotManifestFile, &resource.LimitedWriter{W: &metaBuf, N: maxSnapshotManifestSize}, nil); err != nil {
- return nil, fmt.Errorf("reading snapshot manifest: %w", err)
- }
- var meta IndexMeta
- if err := json.Unmarshal(metaBuf.Bytes(), &meta); err != nil {
- return nil, fmt.Errorf("parsing snapshot manifest: %w", err)
- }
- if len(meta.Files) == 0 {
- return nil, fmt.Errorf("snapshot manifest has empty file manifest for index %q", indexKey)
- }
- if err := validateManifestPaths(meta.Files); err != nil {
- return nil, fmt.Errorf("invalid manifest: %w", err)
+ meta, err := s.GetIndexMeta(ctx, nsResource, indexKey)
+ if err != nil {
+ return nil, err
}
// fail if destDir already exist
@@ -394,7 +408,7 @@ func (s *BucketRemoteIndexStore) DownloadIndex(ctx context.Context, nsResource r
}
tmpDir = "" // prevent deferred cleanup of the now-renamed directory
- return &meta, nil
+ return meta, nil
}
// validateManifestPaths rejects manifest entries that are not already in canonical form.
@@ -432,6 +446,60 @@ func (s *BucketRemoteIndexStore) downloadFile(ctx context.Context, objectKey, lo
return f.Close()
}
+// ListIndexKeys lists the ULID-keyed snapshot subdirectories under the
+// namespaced-resource prefix using a delimited list, without reading any
+// manifest bodies. Non-ULID subdirectories (e.g. the sibling `locks/` prefix)
+// are skipped silently.
+func (s *BucketRemoteIndexStore) ListIndexKeys(ctx context.Context, nsResource resource.NamespacedResource) ([]ulid.ULID, error) {
+ pfx := nsPrefix(nsResource)
+ iter := s.bucket.List(&blob.ListOptions{Prefix: pfx, Delimiter: "/"})
+ var keys []ulid.ULID
+ for {
+ obj, err := iter.Next(ctx)
+ if errors.Is(err, io.EOF) {
+ break
+ }
+ if err != nil {
+ return nil, fmt.Errorf("listing index keys: %w", err)
+ }
+ if !obj.IsDir {
+ continue
+ }
+ rel := strings.TrimSuffix(strings.TrimPrefix(obj.Key, pfx), "/")
+ key, err := ulid.Parse(rel)
+ if err != nil {
+ continue // skip non-ULID subdirs (e.g. /locks)
+ }
+ keys = append(keys, key)
+ }
+ return keys, nil
+}
+
+func (s *BucketRemoteIndexStore) GetIndexMeta(ctx context.Context, nsResource resource.NamespacedResource, indexKey ulid.ULID) (*IndexMeta, error) {
+ manifestKey := indexPrefix(nsResource, indexKey.String()) + snapshotManifestFile
+ var buf bytes.Buffer
+ if err := s.bucket.Download(ctx, manifestKey, &resource.LimitedWriter{W: &buf, N: maxSnapshotManifestSize}, nil); err != nil {
+ if gcerrors.Code(err) == gcerrors.NotFound {
+ return nil, ErrSnapshotNotFound
+ }
+ if errors.Is(err, resource.ErrWriteLimitExceeded) {
+ return nil, fmt.Errorf("%w: oversized snapshot manifest: %v", ErrInvalidManifest, err)
+ }
+ return nil, fmt.Errorf("reading snapshot manifest: %w", err)
+ }
+ var meta IndexMeta
+ if err := json.Unmarshal(buf.Bytes(), &meta); err != nil {
+ return nil, fmt.Errorf("%w: parsing snapshot manifest: %v", ErrInvalidManifest, err)
+ }
+ if len(meta.Files) == 0 {
+ return nil, fmt.Errorf("%w: empty file manifest for index %q", ErrInvalidManifest, indexKey)
+ }
+ if err := validateManifestPaths(meta.Files); err != nil {
+ return nil, fmt.Errorf("%w: %v", ErrInvalidManifest, err)
+ }
+ return &meta, nil
+}
+
func (s *BucketRemoteIndexStore) ListIndexes(ctx context.Context, nsResource resource.NamespacedResource) (map[ulid.ULID]*IndexMeta, error) {
nsPfx := nsPrefix(nsResource)
result := make(map[ulid.ULID]*IndexMeta)
@@ -464,22 +532,12 @@ func (s *BucketRemoteIndexStore) ListIndexes(ctx context.Context, nsResource res
continue
}
- // Fetch and parse the snapshot manifest with a size limit.
- var metaBuf bytes.Buffer
- if err := s.bucket.Download(ctx, obj.Key, &resource.LimitedWriter{W: &metaBuf, N: maxSnapshotManifestSize}, nil); err != nil {
- s.log.Error("failed to read snapshot manifest", "key", obj.Key, "err", err)
- continue
- }
- var meta IndexMeta
- if err := json.Unmarshal(metaBuf.Bytes(), &meta); err != nil {
- s.log.Error("failed to parse snapshot manifest", "key", obj.Key, "err", err)
- continue
- }
- if len(meta.Files) == 0 || validateManifestPaths(meta.Files) != nil {
- s.log.Warn("skipping index snapshot with invalid manifest", "key", obj.Key)
+ meta, err := s.GetIndexMeta(ctx, nsResource, indexKey)
+ if err != nil {
+ s.log.Warn("skipping index snapshot with invalid manifest", "key", obj.Key, "err", err)
continue
}
- result[indexKey] = &meta
+ result[indexKey] = meta
}
return result, nil
@@ -622,12 +680,22 @@ func (s *BucketRemoteIndexStore) CleanupIncompleteUploads(ctx context.Context, n
cleaned := 0
for keyStr, info := range prefixes {
if info.metaKey != "" {
- valid, err := s.isValidManifest(ctx, info.metaKey)
+ // keyStr was produced by ulid.Parse on the way in, so re-parsing
+ // here is infallible.
+ indexKey, err := ulid.Parse(keyStr)
if err != nil {
- s.log.Warn("skipping prefix due to manifest read error", "key", keyStr, "err", err)
continue
}
- if valid {
+ _, err = s.GetIndexMeta(ctx, nsResource, indexKey)
+ switch {
+ case err == nil:
+ continue // valid manifest, prefix is complete
+ case errors.Is(err, ErrInvalidManifest):
+ // fall through to delete
+ default:
+ // Transient error or ErrSnapshotNotFound (manifest deleted
+ // between list and read — race; defer to next pass).
+ s.log.Warn("skipping prefix due to manifest read error", "key", keyStr, "err", err)
continue
}
}
@@ -642,25 +710,3 @@ func (s *BucketRemoteIndexStore) CleanupIncompleteUploads(ctx context.Context, n
return cleaned, nil
}
-
-// isValidManifest downloads and parses a snapshot manifest object with a size limit.
-// Returns (true, nil) for a valid manifest, (false, nil) for a positively
-// invalid one (oversized, corrupt JSON, or empty Files), and (false, err) for
-// transient download errors.
-func (s *BucketRemoteIndexStore) isValidManifest(ctx context.Context, metaKey string) (bool, error) {
- var buf bytes.Buffer
- if err := s.bucket.Download(ctx, metaKey, &resource.LimitedWriter{W: &buf, N: maxSnapshotManifestSize}, nil); err != nil {
- if errors.Is(err, resource.ErrWriteLimitExceeded) {
- return false, nil // positively invalid: oversized
- }
- return false, err // transient download error, skip this prefix
- }
- var meta IndexMeta
- if err := json.Unmarshal(buf.Bytes(), &meta); err != nil {
- return false, nil // positively invalid
- }
- if len(meta.Files) == 0 || validateManifestPaths(meta.Files) != nil {
- return false, nil
- }
- return true, nil
-}
diff --git a/pkg/storage/unified/search/remote_index_store_test.go b/pkg/storage/unified/search/remote_index_store_test.go
index cc5ff93b3819..0dbfec1875c3 100644
--- a/pkg/storage/unified/search/remote_index_store_test.go
+++ b/pkg/storage/unified/search/remote_index_store_test.go
@@ -271,8 +271,7 @@ func TestRemoteIndexStore_DownloadRejectsCorruptMetaJSON(t *testing.T) {
t.Run("missing snapshot manifest", func(t *testing.T) {
_, err := store.DownloadIndex(ctx, ns, key, t.TempDir())
- require.Error(t, err)
- require.Contains(t, err.Error(), "reading snapshot manifest")
+ require.ErrorIs(t, err, ErrSnapshotNotFound)
})
t.Run("invalid JSON", func(t *testing.T) {