Code Diff
diff --git a/pkg/setting/setting.go b/pkg/setting/setting.go
index 4d86232ed7907..113e360e459bf 100644
--- a/pkg/setting/setting.go
+++ b/pkg/setting/setting.go
@@ -713,6 +713,15 @@ type Cfg struct {
VectorPromotionThreshold int // row count per tenant to trigger promotion
VectorPromoterInterval time.Duration // promoter tick interval; 0 disables
+ // VectorSearch per-tenant query-embedding cache (DB-backed, FIFO).
+ VectorQueryCacheEnabled bool
+ VectorQueryCacheMaxPerTenant int
+
+ // VectorSearch per-tenant rate limit (DB-backed, sliding window).
+ VectorRateLimitEnabled bool
+ VectorRateLimitPerTenant int
+ VectorRateLimitWindow time.Duration
+
// Embedding provider used by the VectorSearch RPC. "" = disabled.
EmbeddingProvider string // "vertex" | "bedrock" | ""
VertexProjectID string
diff --git a/pkg/setting/setting_unified_storage.go b/pkg/setting/setting_unified_storage.go
index 44aa54b7c52fc..7f131c30a43fc 100644
--- a/pkg/setting/setting_unified_storage.go
+++ b/pkg/setting/setting_unified_storage.go
@@ -284,6 +284,13 @@ func (cfg *Cfg) setUnifiedStorageConfig() {
cfg.VectorPromotionThreshold = vectorSection.Key("promotion_threshold").MustInt(10000)
cfg.VectorPromoterInterval = vectorSection.Key("promoter_interval").MustDuration(0) // zero means disabled
+ // Per-tenant query-embedding cache + rate limit.
+ cfg.VectorQueryCacheEnabled = vectorSection.Key("query_cache_enabled").MustBool(true)
+ cfg.VectorQueryCacheMaxPerTenant = vectorSection.Key("query_cache_max_per_tenant").MustInt(1000)
+ cfg.VectorRateLimitEnabled = vectorSection.Key("rate_limit_enabled").MustBool(true)
+ cfg.VectorRateLimitPerTenant = vectorSection.Key("rate_limit_per_tenant").MustInt(60)
+ cfg.VectorRateLimitWindow = vectorSection.Key("rate_limit_window").MustDuration(time.Minute)
+
// Embedding provider for the VectorSearch RPC. Empty = disabled (RPC
// returns Unimplemented). When set, the matching provider's connection
// fields must also be configured.
diff --git a/pkg/storage/unified/resource/search.go b/pkg/storage/unified/resource/search.go
index 948f7e0cf6a3d..60433bbc24e04 100644
--- a/pkg/storage/unified/resource/search.go
+++ b/pkg/storage/unified/resource/search.go
@@ -3,6 +3,8 @@ package resource
import (
"cmp"
"context"
+ "crypto/sha256"
+ "encoding/hex"
"fmt"
"hash/fnv"
"math/rand"
@@ -170,6 +172,12 @@ type searchServer struct {
initWorkers int
initMinSize int
+ queryCache vector.QueryEmbeddingCache
+ queryCacheMaxPerTenant int
+ rateLimiter vector.RateLimiter
+ rateLimitPerTenant int
+ rateLimitWindow time.Duration
+
ownsIndexFn func(key NamespacedResource) (bool, error)
buildIndex singleflight.Group
@@ -270,6 +278,12 @@ func newSearchServer(opts SearchOptions, storage StorageBackend, vectorBackend v
selectableFields: opts.SelectableFieldsForKinds,
injectFailuresPercent: opts.InjectFailuresPercent,
indexModificationCacheTTL: opts.IndexModificationCacheTTL,
+
+ queryCache: opts.QueryCache,
+ queryCacheMaxPerTenant: opts.QueryCacheMaxPerTenant,
+ rateLimiter: opts.RateLimiter,
+ rateLimitPerTenant: opts.RateLimitPerTenant,
+ rateLimitWindow: opts.RateLimitWindow,
}
s.rebuildQueue = debouncer.NewQueue(combineRebuildRequests)
@@ -602,26 +616,18 @@ func (s *searchServer) VectorSearch(ctx context.Context, req *resourcepb.VectorS
attribute.Int("limit", limit),
)
- // Embed the query as a retrieval *query* (different task hint than the
- // retrieval *document* hint used at index time — providers tune
- // projections per side of the retrieval pair).
- out, err := s.embedder.EmbedText(ctx, embedder.EmbedTextInput{
- Texts: []string{req.Query},
- Normalize: s.embedder.ShouldNormalize(),
- Task: embedder.TaskRetrievalQuery,
- })
- if err != nil {
- s.log.Error("vector search: embed query", "err", err)
- return nil, status.Error(codes.Internal, "embed query")
+ if err := s.checkVectorSearchRateLimit(ctx, req.Key.Namespace); err != nil {
+ return nil, err
}
- if len(out.Embeddings) != 1 || len(out.Embeddings[0].Dense) == 0 {
- s.log.Error("vector search: embedder returned no vectors")
- return nil, status.Error(codes.Internal, "embed query: empty result")
+
+ dense, err := s.embedVectorSearchQuery(ctx, req.Key.Namespace, req.Query)
+ if err != nil {
+ return nil, err
}
results, err := s.vectorBackend.Search(ctx,
req.Key.Namespace, s.embedder.Model, req.Key.Resource,
- out.Embeddings[0].Dense, limit, translateVectorSearchFilters(req.Filters)...)
+ dense, limit, translateVectorSearchFilters(req.Filters)...)
if err != nil {
s.log.Error("vector search: backend", "err", err)
return nil, status.Error(codes.Internal, "vector search backend")
@@ -685,6 +691,109 @@ func validateVectorSearchRequest(req *resourcepb.VectorSearchRequest) *resourcep
return nil
}
+// checkVectorSearchRateLimit returns a gRPC status error when the
+// request must be rejected (over quota or limiter unreachable), nil
+// when it should proceed.
+func (s *searchServer) checkVectorSearchRateLimit(ctx context.Context, namespace string) error {
+ if s.rateLimiter == nil {
+ return nil
+ }
+ allowed, count, err := s.rateLimiter.Allow(ctx, namespace, s.rateLimitWindow, s.rateLimitPerTenant)
+ if err != nil {
+ s.log.Error("vector search: rate-limit check failed, fail-closed", "err", err, "namespace", namespace)
+ if s.vectorMetrics != nil {
+ s.vectorMetrics.RateLimiterErrorsTotal.Inc()
+ }
+ return status.Error(codes.Unavailable, "rate limiter unavailable")
+ }
+ if !allowed {
+ if s.vectorMetrics != nil {
+ s.vectorMetrics.RateLimitedRequestsTotal.Inc()
+ }
+ return status.Errorf(codes.ResourceExhausted, "tenant rate limit exceeded: %d requests in window", count)
+ }
+ return nil
+}
+
+// embedVectorSearchQuery returns the query embedding, fetching it from
+// the cache on hit or calling the embedder on miss (and best-effort
+// writing back into the cache, enforcing the per-tenant cap).
+func (s *searchServer) embedVectorSearchQuery(ctx context.Context, namespace, query string) ([]float32, error) {
+ queryHash := sha256Hex(query)
+ if dense, ok := s.lookupCachedQueryEmbedding(ctx, namespace, queryHash); ok {
+ return dense, nil
+ }
+
+ // Embed the query as a retrieval *query* (different task hint than
+ // the retrieval *document* hint used at index time — providers tune
+ // projections per side of the retrieval pair).
+ out, err := s.embedder.EmbedText(ctx, embedder.EmbedTextInput{
+ Texts: []string{query},
+ Normalize: s.embedder.ShouldNormalize(),
+ Task: embedder.TaskRetrievalQuery,
+ })
+ if err != nil {
+ s.log.Error("vector search: embed query", "err", err)
+ return nil, status.Error(codes.Internal, "embed query")
+ }
+ if len(out.Embeddings) != 1 || len(out.Embeddings[0].Dense) == 0 {
+ s.log.Error("vector search: embedder returned no vectors")
+ return nil, status.Error(codes.Internal, "embed query: empty result")
+ }
+ dense := out.Embeddings[0].Dense
+ if s.vectorMetrics != nil {
+ s.vectorMetrics.QueryCacheMissesTotal.WithLabelValues(s.embedder.Model).Inc()
+ }
+ s.storeCachedQueryEmbedding(ctx, namespace, queryHash, dense)
+ return dense, nil
+}
+
+// lookupCachedQueryEmbedding returns (embedding, true) on hit; (nil,
+// false) on miss or any error (cache failures must not fail the search).
+func (s *searchServer) lookupCachedQueryEmbedding(ctx context.Context, namespace, queryHash string) ([]float32, bool) {
+ if s.queryCache == nil {
+ return nil, false
+ }
+ emb, hit, err := s.queryCache.Get(ctx, namespace, s.embedder.Model, queryHash)
+ if err != nil {
+ s.log.Warn("vector search: cache lookup failed, falling through", "err", err)
+ return nil, false
+ }
+ if !hit {
+ return nil, false
+ }
+ if s.vectorMetrics != nil {
+ s.vectorMetrics.QueryCacheHitsTotal.WithLabelValues(s.embedder.Model).Inc()
+ }
+ return emb, true
+}
+
+// vectorQueryCacheEvictTargetFraction is how full the cache is left
+// after an eviction round runs.
+const vectorQueryCacheEvictTargetFraction = 0.8
+
+// storeCachedQueryEmbedding writes the freshly-embedded vector into the
+// cache (best-effort). When the tenant is at or above the cap, eviction
+// trims down to vectorQueryCacheEvictTargetFraction of the cap in one
+// pass so we don't run a DELETE on every cache miss at steady state.
+func (s *searchServer) storeCachedQueryEmbedding(ctx context.Context, namespace, queryHash string, dense []float32) {
+ if s.queryCache == nil || s.queryCacheMaxPerTenant <= 0 {
+ return
+ }
+ if n, err := s.queryCache.Count(ctx, namespace); err == nil && int(n) >= s.queryCacheMaxPerTenant {
+ target := int(float64(s.queryCacheMaxPerTenant) * vectorQueryCacheEvictTargetFraction)
+ evictN := int(n) - target
+ if deleted, err := s.queryCache.EvictOldest(ctx, namespace, evictN); err != nil {
+ s.log.Warn("vector search: cache evict failed", "err", err)
+ } else if deleted > 0 && s.vectorMetrics != nil {
+ s.vectorMetrics.QueryCacheEvictionsTotal.Add(float64(deleted))
+ }
+ }
+ if err := s.queryCache.Put(ctx, namespace, s.embedder.Model, queryHash, dense); err != nil {
+ s.log.Warn("vector search: cache put failed", "err", err)
+ }
+}
+
// vectorAuthzKey de-dupes (UID, Folder) so sub-resources of the same
// parent (e.g. dashboard panels) share a single batch-check entry.
type vectorAuthzKey struct{ uid, folder string }
@@ -1028,6 +1137,8 @@ func (s *searchServer) init(ctx context.Context) error {
s.bgTaskWg.Add(1)
go s.runPeriodicScanForIndexesToRebuild(subctx)
+ s.startRateBucketSweeper(subctx)
+
end := time.Now().Unix()
s.log.Info("search index initialized", "duration_secs", end-start, "total_docs", s.search.TotalDocs())
return nil
@@ -1080,6 +1191,42 @@ func (s *searchServer) runPeriodicScanForIndexesToRebuild(ctx context.Context) {
}
}
+func sha256Hex(s string) string {
+ h := sha256.Sum256([]byte(s))
+ return hex.EncodeToString(h[:])
+}
+
+// Old buckets never affect Allow() — sweeping is purely housekeeping.
+func (s *searchServer) startRateBucketSweeper(ctx context.Context) {
+ if s.rateLimiter == nil || s.rateLimitWindow <= 0 {
+ return
+ }
+ s.bgTaskWg.Go(func() {
+ interval := s.rateLimitWindow / 2
+ if interval < time.Minute {
+ interval = time.Minute
+ }
+ t := time.NewTicker(interval)
+ defer t.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-t.C:
+ cutoff := time.Now().Add(-2 * s.rateLimitWindow)
+ deleted, err := s.rateLimiter.SweepOlderThan(ctx, cutoff)
+ if err != nil {
+ s.log.Warn("rate-bucket sweep failed", "err", err)
+ continue
+ }
+ if deleted > 0 {
+ s.log.Debug("rate-bucket sweep", "deleted", deleted, "cutoff", cutoff)
+ }
+ }
+ }
+ })
+}
+
// jitterForKey returns a deterministic jitter duration for the given key,
// bounded to [0, maxAge/2). This spreads index rebuilds across scan intervals
// to avoid thundering herd CPU spikes when many indexes become stale at once.
diff --git a/pkg/storage/unified/resource/search_vector_cache_test.go b/pkg/storage/unified/resource/search_vector_cache_test.go
new file mode 100644
index 0000000000000..abcf4ea7b8f9c
--- /dev/null
+++ b/pkg/storage/unified/resource/search_vector_cache_test.go
@@ -0,0 +1,233 @@
+package resource
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ "github.com/grafana/grafana/pkg/storage/unified/resourcepb"
+ "github.com/grafana/grafana/pkg/storage/unified/search/embed/embedder"
+ "github.com/grafana/grafana/pkg/storage/unified/search/vector"
+)
+
+type fakeQueryCache struct {
+ mu sync.Mutex
+ entries map[string][]float32
+ getCalls int
+ putCalls int
+ evicted int64
+ putErr error
+ getErr error
+}
+
+func newFakeQueryCache() *fakeQueryCache {
+ return &fakeQueryCache{entries: map[string][]float32{}}
+}
+
+func cacheKey(ns, model, hash string) string { return ns + "|" + model + "|" + hash }
+
+func (f *fakeQueryCache) Get(_ context.Context, ns, model, hash string) ([]float32, bool, error) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ f.getCalls++
+ if f.getErr != nil {
+ return nil, false, f.getErr
+ }
+ if v, ok := f.entries[cacheKey(ns, model, hash)]; ok {
+ return v, true, nil
+ }
+ return nil, false, nil
+}
+
+func (f *fakeQueryCache) Put(_ context.Context, ns, model, hash string, emb []float32) error {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ f.putCalls++
+ if f.putErr != nil {
+ return f.putErr
+ }
+ f.entries[cacheKey(ns, model, hash)] = emb
+ return nil
+}
+
+func (f *fakeQueryCache) Count(_ context.Context, ns string) (int64, error) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ var n int64
+ for k := range f.entries {
+ // Prefix match is safe because tests use unique namespaces.
+ if len(k) >= len(ns) && k[:len(ns)] == ns {
+ n++
+ }
+ }
+ return n, nil
+}
+
+func (f *fakeQueryCache) EvictOldest(_ context.Context, ns string, n int) (int64, error) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ deleted := int64(0)
+ for k := range f.entries {
+ if deleted >= int64(n) {
+ break
+ }
+ if len(k) >= len(ns) && k[:len(ns)] == ns {
+ delete(f.entries, k)
+ deleted++
+ }
+ }
+ f.evicted += deleted
+ return deleted, nil
+}
+
+type fakeRateLimiter struct {
+ mu sync.Mutex
+ calls int
+ count int64
+ allow bool
+ err error
+ sweeps int
+ swept int64
+ sweepEr error
+}
+
+func (f *fakeRateLimiter) Allow(_ context.Context, _ string, _ time.Duration, _ int) (bool, int64, error) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ f.calls++
+ if f.err != nil {
+ return false, 0, f.err
+ }
+ return f.allow, f.count, nil
+}
+
+func (f *fakeRateLimiter) SweepOlderThan(_ context.Context, _ time.Time) (int64, error) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ f.sweeps++
+ return f.swept, f.sweepEr
+}
+
+func newTestSearchServerWithCache(emb *embedder.Embedder, backend vector.VectorBackend, cache vector.QueryEmbeddingCache, rl vector.RateLimiter) *searchServer {
+ s := newTestSearchServer(emb, backend)
+ s.queryCache = cache
+ s.queryCacheMaxPerTenant = 1000
+ s.rateLimiter = rl
+ s.rateLimitPerTenant = 60
+ s.rateLimitWindow = time.Minute
+ return s
+}
+
+func TestVectorSearch_CacheMissThenHitSkipsEmbedder(t *testing.T) {
+ fake := &fakeTextEmbedder{dim: 4}
+ emb := newTestEmbedder(fake)
+ backend := &fakeVectorBackend{results: []vector.VectorSearchResult{{UID: "u", Title: "t"}}}
+ cache := newFakeQueryCache()
+ s := newTestSearchServerWithCache(emb, backend, cache, nil)
+
+ req := &resourcepb.VectorSearchRequest{Key: validKey(), Query: "same query", Limit: 5}
+
+ _, err := s.VectorSearch(authedCtx(), req)
+ require.NoError(t, err)
+ assert.Equal(t, 1, cache.getCalls)
+ assert.Equal(t, 1, cache.putCalls)
+ assert.Equal(t, "same query", fake.gotIn.Texts[0])
+
+ // Reset the captur
... [truncated]