Data race / race condition in in-flight pending allocation sharing (DRA)
Description
The commit addresses a data race in the Dynamic Resource Allocation (DRA) pending allocation sharing path by refactoring in-flight allocation tracking to be guarded by a mutex and by switching from a sync.Map-based approach to a map guarded by an RWMutex. The changes introduce a dedicated inFlightMutex, an inFlightAllocation struct (holding a claim and a sharers counter), and updated accessors (GetPendingAllocation, SignalClaimPendingAllocation, MaybeRemoveClaimPendingAllocation, etc.). This is intended to prevent concurrent readers/writers from corrupting the in-flight allocations map and to ensure consistent sharing/removal semantics across concurrent scheduling cycles. While not a classic external-facing vulnerability, race conditions in scheduler state can cause inconsistent state, misallocation, or timing-related vulnerabilities in a concurrent system. The change is a genuine security-quality fix aimed at eliminating data races in the allocation-tracking mechanism.
Proof of Concept
// Proof-of-concept: demonstrates a data race in pre-fix in-flight allocation sharing and how the fix prevents it.
// The following code is a minimal repro to illustrate the race condition between concurrent writers/readers
// of a shared in-flight allocations map. It is not a real Kubernetes cluster exploit, but a synthetic
// demonstration of the race that the fix targets.
package main
import (
"fmt"
"sync"
"time"
)
type AllocationResult struct {
ID string
Status string
}
// Pre-fix (unsafe): no synchronization around the map
type ClaimTrackerNoLock struct {
inFlightAllocations map[string]*AllocationResult
}
func (c *ClaimTrackerNoLock) SignalClaimPendingAllocation(claimUID string, allocatedClaim *AllocationResult) {
// race-prone write
c.inFlightAllocations[claimUID] = allocatedClaim
}
func (c *ClaimTrackerNoLock) GetPendingAllocation(claimUID string) *AllocationResult {
// race-prone read
return c.inFlightAllocations[claimUID]
}
// Post-fix (safe): use mutex to guard the map
type ClaimTrackerLocked struct {
inFlightAllocations map[string]*AllocationResult
inFlightMutex sync.RWMutex
}
func (c *ClaimTrackerLocked) SignalClaimPendingAllocationSafe(claimUID string, allocatedClaim *AllocationResult) {
c.inFlightMutex.Lock()
defer c.inFlightMutex.Unlock()
c.inFlightAllocations[claimUID] = allocatedClaim
}
func (c *ClaimTrackerLocked) GetPendingAllocationSafe(claimUID string) *AllocationResult {
c.inFlightMutex.RLock()
defer c.inFlightMutex.RUnlock()
return c.inFlightAllocations[claimUID]
}
func main() {
// Setup pre-fix tracker (unsafe)
pre := &ClaimTrackerNoLock{inFlightAllocations: make(map[string]*AllocationResult)}
// Seed and launch concurrent writer/reader to trigger race
pre.SignalClaimPendingAllocation("CLAIM-1", &AllocationResult{ID: "CLAIM-1", Status: "pending"})
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < 100000; i++ {
pre.SignalClaimPendingAllocation("CLAIM-1", &AllocationResult{ID: "CLAIM-1", Status: "pending"})
time.Sleep(time.Millisecond * 0)
}
}()
go func() {
defer wg.Done()
for i := 0; i < 100000; i++ {
_ = pre.GetPendingAllocation("CLAIM-1")
time.Sleep(time.Millisecond * 0)
}
}()
wg.Wait()
fmt.Println("Pre-fix (unsafe) race demonstration completed. A race detector in a real run would typically report a data race here.")
// Setup post-fix tracker (safe)
safe := &ClaimTrackerLocked{inFlightAllocations: make(map[string]*AllocationResult)}
safe.SignalClaimPendingAllocationSafe("CLAIM-1", &AllocationResult{ID: "CLAIM-1", Status: "pending"})
var wg2 sync.WaitGroup
wg2.Add(2)
go func() {
defer wg2.Done()
for i := 0; i < 100000; i++ {
safe.SignalClaimPendingAllocationSafe("CLAIM-1", &AllocationResult{ID: "CLAIM-1", Status: "pending"})
time.Sleep(time.Millisecond * 0)
}
}()
go func() {
defer wg2.Done()
for i := 0; i < 100000; i++ {
_ = safe.GetPendingAllocationSafe("CLAIM-1")
time.Sleep(time.Millisecond * 0)
}
}()
wg2.Wait()
fmt.Println("Post-fix (mutex-protected) race demonstration completed. No data races should be reported when run with -race.")
}
Commit Details
Author: Jon Huhn
Date: 2026-03-19 13:42 UTC
Message:
scheduler: fix race in DRA pending allocation sharing
Triage Assessment
Vulnerability Type: Race condition
Confidence: MEDIUM
Reasoning:
The commit addresses a race condition in the DRA pending allocation sharing mechanism by introducing mutexes and refactoring in-flight allocation tracking. While not a traditional security bug (e.g., XSS, SQLi), race conditions can lead to inconsistent state, bypasses, or timing-related vulnerabilities in concurrent systems. The changes reduce potential security risk from data races in allocation tracking.
Verification Assessment
Vulnerability Type: Data race / race condition in in-flight pending allocation sharing (DRA)
Confidence: MEDIUM
Affected Versions: 1.36.x prior to this fix (including v1.36.0-beta.0 pre-fix)
Code Diff
diff --git a/pkg/scheduler/framework/autoscaler_contract/lister_contract_test.go b/pkg/scheduler/framework/autoscaler_contract/lister_contract_test.go
index ed53485c95e89..bcb522c9a2328 100644
--- a/pkg/scheduler/framework/autoscaler_contract/lister_contract_test.go
+++ b/pkg/scheduler/framework/autoscaler_contract/lister_contract_test.go
@@ -148,26 +148,18 @@ func (r *resourceClaimTrackerContract) GatherAllocatedState() (*schedulerapi.All
return nil, nil
}
-func (r *resourceClaimTrackerContract) SignalClaimPendingAllocation(_ types.UID, _ *resourceapi.ResourceClaim) error {
+func (r *resourceClaimTrackerContract) GetPendingAllocation(_ types.UID) *resourceapi.AllocationResult {
return nil
}
-func (r *resourceClaimTrackerContract) GetPendingAllocation(_ types.UID) (*resourceapi.AllocationResult, bool) {
- return nil, false
+func (r *resourceClaimTrackerContract) SignalClaimPendingAllocation(_ types.UID, _ *resourceapi.ResourceClaim) error {
+ return nil
}
func (r *resourceClaimTrackerContract) MaybeRemoveClaimPendingAllocation(_ types.UID, _ bool) (deleted bool) {
return false
}
-func (r *resourceClaimTrackerContract) AddSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error {
- return nil
-}
-
-func (r *resourceClaimTrackerContract) RemoveSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error {
- return nil
-}
-
func (r *resourceClaimTrackerContract) AssumeClaimAfterAPICall(_ *resourceapi.ResourceClaim) error {
return nil
}
diff --git a/pkg/scheduler/framework/cycle_state.go b/pkg/scheduler/framework/cycle_state.go
index 2676c263956eb..31b787253568f 100644
--- a/pkg/scheduler/framework/cycle_state.go
+++ b/pkg/scheduler/framework/cycle_state.go
@@ -41,11 +41,11 @@ type CycleState struct {
// GetParallelPreBindPlugins returns plugins that can be run in parallel with other plugins
// in the PreBind extension point.
parallelPreBindPlugins sets.Set[string]
- // isPodGroupSchedulingCycle indicates whether this cycle is a pod group scheduling cycle or not.
- // If set to false, it means that the pod referencing this CycleState either passed the pod group cycle
+ // podGroupCycleState contains the CycleState for this pod's PodGroup.
+ // If set to nil, it means that the pod referencing this CycleState either passed the pod group cycle
// or doesn't belong to any pod group.
- // This field can only be set to true when GenericWorkload feature flag is enabled.
- isPodGroupSchedulingCycle bool
+ // This field can only be non-nil when GenericWorkload feature flag is enabled.
+ podGroupCycleState fwk.PodGroupCycleState
}
// NewCycleState initializes a new CycleState and returns its pointer.
@@ -102,11 +102,15 @@ func (c *CycleState) GetParallelPreBindPlugins() sets.Set[string] {
}
func (c *CycleState) IsPodGroupSchedulingCycle() bool {
- return c.isPodGroupSchedulingCycle
+ return c.podGroupCycleState != nil
}
-func (c *CycleState) SetPodGroupSchedulingCycle(isPodGroupSchedulingCycle bool) {
- c.isPodGroupSchedulingCycle = isPodGroupSchedulingCycle
+func (c *CycleState) SetPodGroupSchedulingCycle(podGroupCycleState fwk.PodGroupCycleState) {
+ c.podGroupCycleState = podGroupCycleState
+}
+
+func (c *CycleState) GetPodGroupSchedulingCycle() fwk.PodGroupCycleState {
+ return c.podGroupCycleState
}
func (c *CycleState) SetSkipAllPostFilterPlugins(flag bool) {
@@ -135,7 +139,7 @@ func (c *CycleState) Clone() fwk.CycleState {
copy.skipScorePlugins = c.skipScorePlugins
copy.skipPreBindPlugins = c.skipPreBindPlugins
copy.parallelPreBindPlugins = c.parallelPreBindPlugins
- copy.isPodGroupSchedulingCycle = c.isPodGroupSchedulingCycle
+ copy.podGroupCycleState = c.podGroupCycleState
copy.skipAllPostFilterPlugins = c.skipAllPostFilterPlugins
return copy
diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go b/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go
index 4687e55c4c522..a1703319f6c0c 100644
--- a/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go
+++ b/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go
@@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
+ "iter"
"slices"
"sync"
@@ -63,7 +64,7 @@ func NewDRAManager(ctx context.Context, claimsCache *assumecache.AssumeCache, re
manager := &DefaultDRAManager{
resourceClaimTracker: &claimTracker{
cache: claimsCache,
- inFlightAllocations: &sync.Map{},
+ inFlightAllocations: make(map[types.UID]inFlightAllocation),
allocatedDevices: newAllocatedDevices(logger),
logger: logger,
},
@@ -74,9 +75,6 @@ func NewDRAManager(ctx context.Context, claimsCache *assumecache.AssumeCache, re
if utilfeature.DefaultFeatureGate.Enabled(features.DRAExtendedResource) {
manager.extendedResourceCache = extendedresourcecache.NewExtendedResourceCache(logger)
}
- if utilfeature.DefaultFeatureGate.Enabled(features.GenericWorkload) {
- manager.resourceClaimTracker.inFlightAllocationSharers = &sync.Map{}
- }
pgLister := &podGroupLister{}
if utilfeature.DefaultFeatureGate.Enabled(features.DRAWorkloadResourceClaims) {
@@ -178,13 +176,16 @@ type claimTracker struct {
// - would make integration with cluster autoscaler harder because it would need
// to trigger informer callbacks.
cache *assumecache.AssumeCache
+ // inFlightMutex syncs access to inFlightAllocations.
+ inFlightMutex sync.RWMutex
// inFlightAllocations is a map from claim UUIDs to claim objects for those claims
// for which allocation was triggered during a scheduling cycle and the
// corresponding claim status update call in PreBind has not been done
- // yet. If another pod needs the claim, the pod is treated as "not
- // schedulable yet" unless the pod is a member of a PodGroup. For ungrouped
- // pods, the cluster event for the claim status update will make it
- // schedulable.
+ // yet. It also includes a reference count tracking how many actively
+ // scheduling Pods in a PodGroup are using that pending allocation. If
+ // another pod outside the PodGroup needs the claim, the pod is treated as
+ // "not schedulable yet". For those pods, the cluster event for the
+ // claim status update will make them schedulable.
//
// This mechanism avoids the following problem:
// - Pod A triggers allocation for claim X.
@@ -215,25 +216,45 @@ type claimTracker struct {
// pods is expected to be rare compared to per-pod claim, so we end up
// hitting the "multiple goroutines read, write, and overwrite entries
// for disjoint sets of keys" case that sync.Map is optimized for.
- inFlightAllocations *sync.Map
- // inFlightAllocationSharers counts the actively scheduling pods
- // sharing a given ResourceClaim.
- inFlightAllocationSharers *sync.Map
- allocatedDevices *allocatedDevices
- logger klog.Logger
+ inFlightAllocations map[types.UID]inFlightAllocation
+ allocatedDevices *allocatedDevices
+ logger klog.Logger
+}
+
+type inFlightAllocation struct {
+ claim *resourceapi.ResourceClaim
+ sharers int
}
-func (c *claimTracker) GetPendingAllocation(claimUID types.UID) (*resourceapi.AllocationResult, bool) {
- var allocation *resourceapi.AllocationResult
- claim, found := c.inFlightAllocations.Load(claimUID)
- if found && claim != nil {
- allocation = claim.(*resourceapi.ResourceClaim).Status.Allocation
+func (c *claimTracker) GetPendingAllocation(claimUID types.UID) *resourceapi.AllocationResult {
+ c.inFlightMutex.RLock()
+ defer c.inFlightMutex.RUnlock()
+
+ inFlight, found := c.inFlightAllocations[claimUID]
+ if !found || inFlight.claim == nil {
+ return nil
}
- return allocation, found
+ return inFlight.claim.Status.Allocation
}
func (c *claimTracker) SignalClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error {
- c.inFlightAllocations.Store(claimUID, allocatedClaim)
+ c.inFlightMutex.Lock()
+ defer c.inFlightMutex.Unlock()
+
+ inFlight, found := c.inFlightAllocations[claimUID]
+ if found {
+ inFlight.sharers++
+ c.inFlightAllocations[claimUID] = inFlight
+
+ claim := inFlight.claim
+ c.logger.V(5).Info("Added share for in-flight claim", "claim", klog.KObj(claim), "uid", claimUID, "version", claim.ResourceVersion, "sharers", inFlight.sharers)
+ return nil
+ }
+
+ c.inFlightAllocations[claimUID] = inFlightAllocation{
+ claim: allocatedClaim,
+ sharers: 1,
+ }
// This is the same verbosity as the corresponding log in the assume cache.
c.logger.V(5).Info("Added in-flight claim", "claim", klog.KObj(allocatedClaim), "uid", claimUID, "version", allocatedClaim.ResourceVersion)
// There's no reason to return an error in this implementation, but the error is helpful for other implementations.
@@ -242,67 +263,28 @@ func (c *claimTracker) SignalClaimPendingAllocation(claimUID types.UID, allocate
return nil
}
-func (c *claimTracker) MaybeRemoveClaimPendingAllocation(claimUID types.UID, shareable bool) (deleted bool) {
- if c.inFlightAllocationSharers != nil && shareable {
- value, ok := c.inFlightAllocationSharers.Load(claimUID)
- if ok && value.(int) > 0 {
- if loggerV := c.logger.V(5); loggerV.Enabled() {
- claim, found := c.inFlightAllocations.Load(claimUID)
- if found {
- claim := claim.(*resourceapi.ResourceClaim)
- c.logger.V(5).Info("Claim is still shared by other pods, not removing in-flight claim", "claim", klog.KObj(claim), "uid", claimUID, "version", claim.ResourceVersion)
- }
- }
- return false
- }
- }
+func (c *claimTracker) MaybeRemoveClaimPendingAllocation(claimUID types.UID, forceRemove bool) (deleted bool) {
+ c.inFlightMutex.Lock()
+ defer c.inFlightMutex.Unlock()
- claim, found := c.inFlightAllocations.LoadAndDelete(claimUID)
+ inFlight, found := c.inFlightAllocations[claimUID]
// The assume cache doesn't log this, but maybe it should.
- if found {
- claim := claim.(*resourceapi.ResourceClaim)
- c.logger.V(5).Info("Removed in-flight claim", "claim", klog.KObj(claim), "uid", claimUID, "version", claim.ResourceVersion)
- } else {
+ if !found {
c.logger.V(5).Info("Redundant remove of in-flight claim, not found", "uid", claimUID)
+ return false
}
- return found
-}
+ claim := inFlight.claim
-func (c *claimTracker) AddSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error {
- newSharers := 1
- value, loaded := c.inFlightAllocationSharers.LoadOrStore(claimUID, newSharers)
- if loaded {
- oldSharers := value.(int)
- newSharers = oldSharers + 1
- swapped := c.inFlightAllocationSharers.CompareAndSwap(claimUID, oldSharers, newSharers)
- if !swapped {
- // The value must have changed since we loaded
- return fmt.Errorf("conflict adding in-flight allocation sharer for claim %s/%s, UID=%s", allocatedClaim.Namespace, allocatedClaim.Name, claimUID)
- }
+ if forceRemove || inFlight.sharers == 1 {
+ delete(c.inFlightAllocations, claimUID)
+ c.logger.V(5).Info("Removed in-flight claim", "claim", klog.KObj(claim), "uid", claimUID, "version", claim.ResourceVersion)
+ return true
}
- c.logger.V(5).Info("Added share for in-flight claim", "claim", klog.KObj(allocatedClaim), "uid", claimUID, "version", allocatedClaim.ResourceVersion, "sharers", newSharers)
- return nil
-}
-func (c *claimTracker) RemoveSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error {
- value, ok := c.inFlightAllocationSharers.Load(claimUID)
- if !ok {
- return nil
- }
- oldSharers := value.(int)
- newSharers := oldSharers - 1
- var written bool
- if newSharers == 0 {
- written = c.inFlightAllocationSharers.CompareAndDelete(claimUID, oldSharers)
- } else {
- written = c.inFlightAllocationSharers.CompareAndSwap(claimUID, oldSharers, newSharers)
- }
- if !written {
- // The value must have changed since we loaded
- return fmt.Errorf("conflict removing in-flight allocation sharer for claim %s/%s, UID=%s", allocatedClaim.Namespace, allocatedClaim.Name, claimUID)
- }
- c.logger.V(5).Info("Removed share for in-flight claim", "claim", klog.KObj(allocatedClaim), "uid", claimUID, "version", allocatedClaim.ResourceVersion, "sharers", newSharers)
- return nil
+ inFlight.sharers--
+ c.inFlightAllocations[claimUID] = inFlight
+ c.logger.V(5).Info("Claim is still shared by other pods, not removing in-flight claim", "claim", klog.KObj(claim), "uid", claimUID, "version", claim.ResourceVersion, "sharers", inFlight.sharers)
+ return false
}
func (c *claimTracker) Get(namespace, claimName string) (*resourceapi.ResourceClaim, error) {
@@ -360,14 +342,13 @@ func (c *claimTracker) ListAllAllocatedDevices() (a sets.Set[structured.DeviceID
allocated, revision := c.allocatedDevices.Get()
// Whatever is in flight also has to be checked.
- c.inFlightAllocations.Range(func(key, value any) bool {
- claim := value.(*resourceapi.ResourceClaim)
+ for _, inFlight := range c.allInFlightAllocationsRLocked() {
+ claim := inFlight.claim
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
c.logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim))
allocated.Insert(deviceID)
}, false, func(structured.SharedDeviceID) {}, func(structured.DeviceConsumedCapacity) {})
- return true
- })
+ }
if revision == c.allocatedDevices.Revision() {
// Our current result is valid, nothing changed in the meantime.
@@ -433,8 +414,8 @@ func (c *claimTracker) GatherAllocatedState() (s *structured.AllocatedState, err
}
// Whatever is in flight also has to be checked.
- c.inFlightAllocations.Range(func(key, value any) bool {
- claim := value.(*resourceapi.ResourceClaim)
+ for _, inFlight := range c.allInFlightAllocationsRLocked() {
+ claim := inFlight.claim
foreachAllocatedDevice(claim,
func(deviceID structured.DeviceID) { // dedicatedDeviceCallback
c.logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim))
@@ -449,9 +430,7 @@ func (c *claimTracker) GatherAllocatedState() (s *structured.AllocatedState, err
c.logger.V(6).Info("Device is in flight for allocation", "consumed capacity", capacity, "claim", klog.KObj(claim))
aggregatedCapacity.Insert(capacity)
})
- return true
- })
-
+ }
if revision1 == c.allocatedDevices.Revision() {
// Our current result is valid, nothing changed in the meantime.
return &structured.AllocatedState{
@@ -464,6 +443,18 @@ func (c *claimTracker) GatherAllocatedState() (s *structured.AllocatedState, err
return nil, errClaimTrackerConcurrentModification
}
+func (c *claimTracker) allInFlightAllocationsRLocked() iter.Seq2[types.UID, inFlightAllocation] {
+ return func(yield func(types.UID, inFlightAllocation) bool) {
+ c.inFlightMutex.RLock()
+ defer c.inFlightMutex.RUnlock()
+ for uid, inFlight := range c.inFlightAllocations {
+ if !yield(uid, inFlight) {
+ return
+ }
+ }
+ }
+}
+
fun
... [truncated]