Code Diff
diff --git a/pkg/registry/apis/datasource/migrator/migrator.go b/pkg/registry/apis/datasource/migrator/migrator.go
index ef4b0cdcb7f3b..b28cdc33485d9 100644
--- a/pkg/registry/apis/datasource/migrator/migrator.go
+++ b/pkg/registry/apis/datasource/migrator/migrator.go
@@ -8,6 +8,7 @@ import (
"io"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime/schema"
common "github.com/grafana/grafana/pkg/apimachinery/apis/common/v0alpha1"
datasourceV0 "github.com/grafana/grafana/pkg/apis/datasource/v0alpha1"
@@ -15,12 +16,17 @@ import (
secret "github.com/grafana/grafana/pkg/registry/apis/secret/contracts"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/storage/unified/migrations"
+ "github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
// DataSourceMigrator handles migrating datasources from legacy SQL storage.
type DataSourceMigrator interface {
MigrateDataSources(ctx context.Context, orgId int64, opts migrations.MigrateOptions, stream resourcepb.BulkStore_BulkProcessClient) error
+ // PluginGroups resolves the distinct per-plugin GroupResources for the given
+ // namespace, including stale groups from unified storage, for bulk stream
+ // pre-authorization.
+ PluginGroups(ctx context.Context, namespace string, client resource.SearchClient) ([]schema.GroupResource, error)
}
type dataSourceMigrator struct {
@@ -186,6 +192,50 @@ func (m *dataSourceMigrator) MigrateDataSources(ctx context.Context, orgId int64
return nil
}
+func (m *dataSourceMigrator) PluginGroups(ctx context.Context, namespace string, client resource.SearchClient) ([]schema.GroupResource, error) {
+ dsList, err := m.getter(ctx, namespace)
+ if err != nil {
+ return nil, err
+ }
+ seen := make(map[string]bool, len(dsList))
+ legacy := make([]schema.GroupResource, 0, len(dsList))
+ for _, ds := range dsList {
+ group := ds.GroupVersionKind().Group
+ if group == "" || seen[group] {
+ continue
+ }
+ seen[group] = true
+ legacy = append(legacy, schema.GroupResource{Group: group, Resource: "datasources"})
+ }
+
+ existing, err := storageGroupsForDatasources(ctx, namespace, client)
+ if err != nil {
+ return nil, err
+ }
+ return migrations.MergeGroupResources(legacy, existing), nil
+}
+
+// storageGroupsForDatasources queries unified storage for distinct API groups
+// that currently hold datasource data in the given namespace. This ensures
+// stale groups (migrated previously but since deleted from legacy) are included
+// in the bulk collection so their data is cleaned up on re-migration.
+func storageGroupsForDatasources(ctx context.Context, namespace string, client resource.SearchClient) ([]schema.GroupResource, error) {
+ resp, err := client.GetStats(ctx, &resourcepb.ResourceStatsRequest{Namespace: namespace})
+ if err != nil {
+ return nil, fmt.Errorf("getting storage stats: %w", err)
+ }
+ if resp.Error != nil {
+ return nil, fmt.Errorf("getting storage stats: %s", resp.Error.Message)
+ }
+ var result []schema.GroupResource
+ for _, s := range resp.Stats {
+ if s.Resource == "datasources" {
+ result = append(result, schema.GroupResource{Group: s.Group, Resource: s.Resource})
+ }
+ }
+ return result, nil
+}
+
func (m *dataSourceMigrator) createSecrets(ctx context.Context, dsSecrets common.InlineSecureValues, objRef common.ObjectReference) (common.InlineSecureValues, error) {
if len(dsSecrets) == 0 {
return nil, nil
diff --git a/pkg/registry/apis/datasource/migrator/registrar.go b/pkg/registry/apis/datasource/migrator/registrar.go
index 4f4ae7afbaee8..1c123629c87a6 100644
--- a/pkg/registry/apis/datasource/migrator/registrar.go
+++ b/pkg/registry/apis/datasource/migrator/registrar.go
@@ -25,6 +25,7 @@ func DataSourceMigration(dsMigrator DataSourceMigrator) migrations.MigrationDefi
DataSourceCountValidation(),
},
// data_source table is still used by other code paths
- RenameTables: []string{},
+ RenameTables: []string{},
+ ResourceGroupsFunc: dsMigrator.PluginGroups,
}
}
diff --git a/pkg/storage/unified/migrations/migrator.go b/pkg/storage/unified/migrations/migrator.go
index f9262418dfc2c..3d6c9549cbc23 100644
--- a/pkg/storage/unified/migrations/migrator.go
+++ b/pkg/storage/unified/migrations/migrator.go
@@ -111,22 +111,24 @@ func (m *unifiedMigration) Migrate(ctx context.Context, opts MigrateOptions) (*r
origResources := opts.Resources
- // TODO... the migrator must be able to dynamically define the groups
- // The bulk processor will clean up any resources in these groups, and
- // initialize authorization scoped to this set of resources
- if len(opts.Resources) == 1 && opts.Resources[0].Group == "datasource.grafana.app" {
- // This should be loaded from the DB, or the plugin scanning
- plugins := []string{
- "alertmanager", "azuremonitor", "cloud-monitoring", "cloudwatch", "dashboard", "elasticsearch",
- "grafana-postgresql-datasource", "grafana-pyroscope-datasource", "grafana-testdata-datasource",
- "graphite", "influxdb", "jaeger", "loki", "mixed", "mssql", "mysql", "opentsdb", "parca", "prometheus",
- "tempo", "zipkin",
+ // If a definition provides a dynamic group resolver, call it to discover
+ // which groups actually exist in this namespace. The resolver receives the
+ // SearchClient so it can also query unified storage for stale groups and
+ // merge them in — keeping all resource-specific logic in the resolver.
+ //
+ // If the result is empty (namespace has no data at all), keep
+ // opts.Resources unchanged so the stream can still open and close cleanly.
+ for _, res := range origResources {
+ resolveFn := m.registry.GetResourceGroupsFunc(res)
+ if resolveFn == nil {
+ continue
+ }
+ resolved, err := resolveFn(ctx, opts.Namespace, m.client)
+ if err != nil {
+ return nil, fmt.Errorf("resolving resource groups for %s/%s: %w", res.Group, res.Resource, err)
}
- opts.Resources = make([]schema.GroupResource, 0, len(plugins))
- for _, p := range plugins {
- opts.Resources = append(opts.Resources, schema.GroupResource{
- Group: p + ".datasource.grafana.app", Resource: "datasources",
- })
+ if len(resolved) > 0 {
+ opts.Resources = resolved
}
}
@@ -157,6 +159,19 @@ func (m *unifiedMigration) Migrate(ctx context.Context, opts MigrateOptions) (*r
return stream.CloseAndRecv()
}
+// MergeGroupResources returns the union of a and b, deduplicated by Group.
+func MergeGroupResources(a, b []schema.GroupResource) []schema.GroupResource {
+ seen := make(map[string]bool, len(a)+len(b))
+ result := make([]schema.GroupResource, 0, len(a)+len(b))
+ for _, gr := range append(a, b...) {
+ if !seen[gr.Group] {
+ seen[gr.Group] = true
+ result = append(result, gr)
+ }
+ }
+ return result
+}
+
type RebuildIndexOptions struct {
UsingDistributor bool
NamespaceInfo authlib.NamespaceInfo
diff --git a/pkg/storage/unified/migrations/registry.go b/pkg/storage/unified/migrations/registry.go
index 1ff0d151a85db..0a9e0d1934585 100644
--- a/pkg/storage/unified/migrations/registry.go
+++ b/pkg/storage/unified/migrations/registry.go
@@ -7,6 +7,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/grafana/grafana/pkg/infra/log"
+ "github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/util/xorm"
)
@@ -43,6 +44,11 @@ type MigrationDefinition struct {
Validators []ValidatorFactory // Validator factories (validators created lazily)
RenameTables []string // Legacy tables to rename with _legacy suffix after successful migration
SkipWhenMissing bool // For fully migrated resources, the table may not exist at all
+ // ResourceGroupsFunc, when set, is called before opening the bulk stream to
+ // resolve the actual groups present in the namespace, replacing the static
+ // Resources list for stream pre-authorization. The SearchClient is provided
+ // so implementations can also account for stale groups in unified storage.
+ ResourceGroupsFunc func(ctx context.Context, namespace string, client resource.SearchClient) ([]schema.GroupResource, error)
}
// CreateValidators creates validators from the stored factory functions.
@@ -165,3 +171,17 @@ func (r *MigrationRegistry) HasResource(gr schema.GroupResource) bool {
}
return false
}
+
+// GetResourceGroupsFunc returns the ResourceGroupsFunc for the definition that
+// covers the given resource, or nil if none is registered or the definition has
+// no dynamic resolver.
+func (r *MigrationRegistry) GetResourceGroupsFunc(gr schema.GroupResource) func(ctx context.Context, namespace string, client resource.SearchClient) ([]schema.GroupResource, error) {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+ for _, def := range r.definitions {
+ if _, ok := def.Migrators[gr]; ok {
+ return def.ResourceGroupsFunc
+ }
+ }
+ return nil
+}