Data race / race condition in in-flight pending allocation sharing (DRA)

MEDIUM
kubernetes/kubernetes
Commit: 61cf993c6b0d
Affected: 1.36.x prior to this fix (including v1.36.0-beta.0 pre-fix)
2026-04-04 17:54 UTC

Description

The commit addresses a data race in the Dynamic Resource Allocation (DRA) pending allocation sharing path by refactoring in-flight allocation tracking to be guarded by a mutex and by switching from a sync.Map-based approach to a map guarded by an RWMutex. The changes introduce a dedicated inFlightMutex, an inFlightAllocation struct (holding a claim and a sharers counter), and updated accessors (GetPendingAllocation, SignalClaimPendingAllocation, MaybeRemoveClaimPendingAllocation, etc.). This is intended to prevent concurrent readers/writers from corrupting the in-flight allocations map and to ensure consistent sharing/removal semantics across concurrent scheduling cycles. While not a classic external-facing vulnerability, race conditions in scheduler state can cause inconsistent state, misallocation, or timing-related vulnerabilities in a concurrent system. The change is a genuine security-quality fix aimed at eliminating data races in the allocation-tracking mechanism.

Proof of Concept

// Proof-of-concept: demonstrates a data race in pre-fix in-flight allocation sharing and how the fix prevents it. // The following code is a minimal repro to illustrate the race condition between concurrent writers/readers // of a shared in-flight allocations map. It is not a real Kubernetes cluster exploit, but a synthetic // demonstration of the race that the fix targets. package main import ( "fmt" "sync" "time" ) type AllocationResult struct { ID string Status string } // Pre-fix (unsafe): no synchronization around the map type ClaimTrackerNoLock struct { inFlightAllocations map[string]*AllocationResult } func (c *ClaimTrackerNoLock) SignalClaimPendingAllocation(claimUID string, allocatedClaim *AllocationResult) { // race-prone write c.inFlightAllocations[claimUID] = allocatedClaim } func (c *ClaimTrackerNoLock) GetPendingAllocation(claimUID string) *AllocationResult { // race-prone read return c.inFlightAllocations[claimUID] } // Post-fix (safe): use mutex to guard the map type ClaimTrackerLocked struct { inFlightAllocations map[string]*AllocationResult inFlightMutex sync.RWMutex } func (c *ClaimTrackerLocked) SignalClaimPendingAllocationSafe(claimUID string, allocatedClaim *AllocationResult) { c.inFlightMutex.Lock() defer c.inFlightMutex.Unlock() c.inFlightAllocations[claimUID] = allocatedClaim } func (c *ClaimTrackerLocked) GetPendingAllocationSafe(claimUID string) *AllocationResult { c.inFlightMutex.RLock() defer c.inFlightMutex.RUnlock() return c.inFlightAllocations[claimUID] } func main() { // Setup pre-fix tracker (unsafe) pre := &ClaimTrackerNoLock{inFlightAllocations: make(map[string]*AllocationResult)} // Seed and launch concurrent writer/reader to trigger race pre.SignalClaimPendingAllocation("CLAIM-1", &AllocationResult{ID: "CLAIM-1", Status: "pending"}) var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() for i := 0; i < 100000; i++ { pre.SignalClaimPendingAllocation("CLAIM-1", &AllocationResult{ID: "CLAIM-1", Status: "pending"}) time.Sleep(time.Millisecond * 0) } }() go func() { defer wg.Done() for i := 0; i < 100000; i++ { _ = pre.GetPendingAllocation("CLAIM-1") time.Sleep(time.Millisecond * 0) } }() wg.Wait() fmt.Println("Pre-fix (unsafe) race demonstration completed. A race detector in a real run would typically report a data race here.") // Setup post-fix tracker (safe) safe := &ClaimTrackerLocked{inFlightAllocations: make(map[string]*AllocationResult)} safe.SignalClaimPendingAllocationSafe("CLAIM-1", &AllocationResult{ID: "CLAIM-1", Status: "pending"}) var wg2 sync.WaitGroup wg2.Add(2) go func() { defer wg2.Done() for i := 0; i < 100000; i++ { safe.SignalClaimPendingAllocationSafe("CLAIM-1", &AllocationResult{ID: "CLAIM-1", Status: "pending"}) time.Sleep(time.Millisecond * 0) } }() go func() { defer wg2.Done() for i := 0; i < 100000; i++ { _ = safe.GetPendingAllocationSafe("CLAIM-1") time.Sleep(time.Millisecond * 0) } }() wg2.Wait() fmt.Println("Post-fix (mutex-protected) race demonstration completed. No data races should be reported when run with -race.") }

Commit Details

Author: Jon Huhn

Date: 2026-03-19 13:42 UTC

Message:

scheduler: fix race in DRA pending allocation sharing

Triage Assessment

Vulnerability Type: Race condition

Confidence: MEDIUM

Reasoning:

The commit addresses a race condition in the DRA pending allocation sharing mechanism by introducing mutexes and refactoring in-flight allocation tracking. While not a traditional security bug (e.g., XSS, SQLi), race conditions can lead to inconsistent state, bypasses, or timing-related vulnerabilities in concurrent systems. The changes reduce potential security risk from data races in allocation tracking.

Verification Assessment

Vulnerability Type: Data race / race condition in in-flight pending allocation sharing (DRA)

Confidence: MEDIUM

Affected Versions: 1.36.x prior to this fix (including v1.36.0-beta.0 pre-fix)

Code Diff

diff --git a/pkg/scheduler/framework/autoscaler_contract/lister_contract_test.go b/pkg/scheduler/framework/autoscaler_contract/lister_contract_test.go index ed53485c95e89..bcb522c9a2328 100644 --- a/pkg/scheduler/framework/autoscaler_contract/lister_contract_test.go +++ b/pkg/scheduler/framework/autoscaler_contract/lister_contract_test.go @@ -148,26 +148,18 @@ func (r *resourceClaimTrackerContract) GatherAllocatedState() (*schedulerapi.All return nil, nil } -func (r *resourceClaimTrackerContract) SignalClaimPendingAllocation(_ types.UID, _ *resourceapi.ResourceClaim) error { +func (r *resourceClaimTrackerContract) GetPendingAllocation(_ types.UID) *resourceapi.AllocationResult { return nil } -func (r *resourceClaimTrackerContract) GetPendingAllocation(_ types.UID) (*resourceapi.AllocationResult, bool) { - return nil, false +func (r *resourceClaimTrackerContract) SignalClaimPendingAllocation(_ types.UID, _ *resourceapi.ResourceClaim) error { + return nil } func (r *resourceClaimTrackerContract) MaybeRemoveClaimPendingAllocation(_ types.UID, _ bool) (deleted bool) { return false } -func (r *resourceClaimTrackerContract) AddSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error { - return nil -} - -func (r *resourceClaimTrackerContract) RemoveSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error { - return nil -} - func (r *resourceClaimTrackerContract) AssumeClaimAfterAPICall(_ *resourceapi.ResourceClaim) error { return nil } diff --git a/pkg/scheduler/framework/cycle_state.go b/pkg/scheduler/framework/cycle_state.go index 2676c263956eb..31b787253568f 100644 --- a/pkg/scheduler/framework/cycle_state.go +++ b/pkg/scheduler/framework/cycle_state.go @@ -41,11 +41,11 @@ type CycleState struct { // GetParallelPreBindPlugins returns plugins that can be run in parallel with other plugins // in the PreBind extension point. parallelPreBindPlugins sets.Set[string] - // isPodGroupSchedulingCycle indicates whether this cycle is a pod group scheduling cycle or not. - // If set to false, it means that the pod referencing this CycleState either passed the pod group cycle + // podGroupCycleState contains the CycleState for this pod's PodGroup. + // If set to nil, it means that the pod referencing this CycleState either passed the pod group cycle // or doesn't belong to any pod group. - // This field can only be set to true when GenericWorkload feature flag is enabled. - isPodGroupSchedulingCycle bool + // This field can only be non-nil when GenericWorkload feature flag is enabled. + podGroupCycleState fwk.PodGroupCycleState } // NewCycleState initializes a new CycleState and returns its pointer. @@ -102,11 +102,15 @@ func (c *CycleState) GetParallelPreBindPlugins() sets.Set[string] { } func (c *CycleState) IsPodGroupSchedulingCycle() bool { - return c.isPodGroupSchedulingCycle + return c.podGroupCycleState != nil } -func (c *CycleState) SetPodGroupSchedulingCycle(isPodGroupSchedulingCycle bool) { - c.isPodGroupSchedulingCycle = isPodGroupSchedulingCycle +func (c *CycleState) SetPodGroupSchedulingCycle(podGroupCycleState fwk.PodGroupCycleState) { + c.podGroupCycleState = podGroupCycleState +} + +func (c *CycleState) GetPodGroupSchedulingCycle() fwk.PodGroupCycleState { + return c.podGroupCycleState } func (c *CycleState) SetSkipAllPostFilterPlugins(flag bool) { @@ -135,7 +139,7 @@ func (c *CycleState) Clone() fwk.CycleState { copy.skipScorePlugins = c.skipScorePlugins copy.skipPreBindPlugins = c.skipPreBindPlugins copy.parallelPreBindPlugins = c.parallelPreBindPlugins - copy.isPodGroupSchedulingCycle = c.isPodGroupSchedulingCycle + copy.podGroupCycleState = c.podGroupCycleState copy.skipAllPostFilterPlugins = c.skipAllPostFilterPlugins return copy diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go b/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go index 4687e55c4c522..a1703319f6c0c 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "iter" "slices" "sync" @@ -63,7 +64,7 @@ func NewDRAManager(ctx context.Context, claimsCache *assumecache.AssumeCache, re manager := &DefaultDRAManager{ resourceClaimTracker: &claimTracker{ cache: claimsCache, - inFlightAllocations: &sync.Map{}, + inFlightAllocations: make(map[types.UID]inFlightAllocation), allocatedDevices: newAllocatedDevices(logger), logger: logger, }, @@ -74,9 +75,6 @@ func NewDRAManager(ctx context.Context, claimsCache *assumecache.AssumeCache, re if utilfeature.DefaultFeatureGate.Enabled(features.DRAExtendedResource) { manager.extendedResourceCache = extendedresourcecache.NewExtendedResourceCache(logger) } - if utilfeature.DefaultFeatureGate.Enabled(features.GenericWorkload) { - manager.resourceClaimTracker.inFlightAllocationSharers = &sync.Map{} - } pgLister := &podGroupLister{} if utilfeature.DefaultFeatureGate.Enabled(features.DRAWorkloadResourceClaims) { @@ -178,13 +176,16 @@ type claimTracker struct { // - would make integration with cluster autoscaler harder because it would need // to trigger informer callbacks. cache *assumecache.AssumeCache + // inFlightMutex syncs access to inFlightAllocations. + inFlightMutex sync.RWMutex // inFlightAllocations is a map from claim UUIDs to claim objects for those claims // for which allocation was triggered during a scheduling cycle and the // corresponding claim status update call in PreBind has not been done - // yet. If another pod needs the claim, the pod is treated as "not - // schedulable yet" unless the pod is a member of a PodGroup. For ungrouped - // pods, the cluster event for the claim status update will make it - // schedulable. + // yet. It also includes a reference count tracking how many actively + // scheduling Pods in a PodGroup are using that pending allocation. If + // another pod outside the PodGroup needs the claim, the pod is treated as + // "not schedulable yet". For those pods, the cluster event for the + // claim status update will make them schedulable. // // This mechanism avoids the following problem: // - Pod A triggers allocation for claim X. @@ -215,25 +216,45 @@ type claimTracker struct { // pods is expected to be rare compared to per-pod claim, so we end up // hitting the "multiple goroutines read, write, and overwrite entries // for disjoint sets of keys" case that sync.Map is optimized for. - inFlightAllocations *sync.Map - // inFlightAllocationSharers counts the actively scheduling pods - // sharing a given ResourceClaim. - inFlightAllocationSharers *sync.Map - allocatedDevices *allocatedDevices - logger klog.Logger + inFlightAllocations map[types.UID]inFlightAllocation + allocatedDevices *allocatedDevices + logger klog.Logger +} + +type inFlightAllocation struct { + claim *resourceapi.ResourceClaim + sharers int } -func (c *claimTracker) GetPendingAllocation(claimUID types.UID) (*resourceapi.AllocationResult, bool) { - var allocation *resourceapi.AllocationResult - claim, found := c.inFlightAllocations.Load(claimUID) - if found && claim != nil { - allocation = claim.(*resourceapi.ResourceClaim).Status.Allocation +func (c *claimTracker) GetPendingAllocation(claimUID types.UID) *resourceapi.AllocationResult { + c.inFlightMutex.RLock() + defer c.inFlightMutex.RUnlock() + + inFlight, found := c.inFlightAllocations[claimUID] + if !found || inFlight.claim == nil { + return nil } - return allocation, found + return inFlight.claim.Status.Allocation } func (c *claimTracker) SignalClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error { - c.inFlightAllocations.Store(claimUID, allocatedClaim) + c.inFlightMutex.Lock() + defer c.inFlightMutex.Unlock() + + inFlight, found := c.inFlightAllocations[claimUID] + if found { + inFlight.sharers++ + c.inFlightAllocations[claimUID] = inFlight + + claim := inFlight.claim + c.logger.V(5).Info("Added share for in-flight claim", "claim", klog.KObj(claim), "uid", claimUID, "version", claim.ResourceVersion, "sharers", inFlight.sharers) + return nil + } + + c.inFlightAllocations[claimUID] = inFlightAllocation{ + claim: allocatedClaim, + sharers: 1, + } // This is the same verbosity as the corresponding log in the assume cache. c.logger.V(5).Info("Added in-flight claim", "claim", klog.KObj(allocatedClaim), "uid", claimUID, "version", allocatedClaim.ResourceVersion) // There's no reason to return an error in this implementation, but the error is helpful for other implementations. @@ -242,67 +263,28 @@ func (c *claimTracker) SignalClaimPendingAllocation(claimUID types.UID, allocate return nil } -func (c *claimTracker) MaybeRemoveClaimPendingAllocation(claimUID types.UID, shareable bool) (deleted bool) { - if c.inFlightAllocationSharers != nil && shareable { - value, ok := c.inFlightAllocationSharers.Load(claimUID) - if ok && value.(int) > 0 { - if loggerV := c.logger.V(5); loggerV.Enabled() { - claim, found := c.inFlightAllocations.Load(claimUID) - if found { - claim := claim.(*resourceapi.ResourceClaim) - c.logger.V(5).Info("Claim is still shared by other pods, not removing in-flight claim", "claim", klog.KObj(claim), "uid", claimUID, "version", claim.ResourceVersion) - } - } - return false - } - } +func (c *claimTracker) MaybeRemoveClaimPendingAllocation(claimUID types.UID, forceRemove bool) (deleted bool) { + c.inFlightMutex.Lock() + defer c.inFlightMutex.Unlock() - claim, found := c.inFlightAllocations.LoadAndDelete(claimUID) + inFlight, found := c.inFlightAllocations[claimUID] // The assume cache doesn't log this, but maybe it should. - if found { - claim := claim.(*resourceapi.ResourceClaim) - c.logger.V(5).Info("Removed in-flight claim", "claim", klog.KObj(claim), "uid", claimUID, "version", claim.ResourceVersion) - } else { + if !found { c.logger.V(5).Info("Redundant remove of in-flight claim, not found", "uid", claimUID) + return false } - return found -} + claim := inFlight.claim -func (c *claimTracker) AddSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error { - newSharers := 1 - value, loaded := c.inFlightAllocationSharers.LoadOrStore(claimUID, newSharers) - if loaded { - oldSharers := value.(int) - newSharers = oldSharers + 1 - swapped := c.inFlightAllocationSharers.CompareAndSwap(claimUID, oldSharers, newSharers) - if !swapped { - // The value must have changed since we loaded - return fmt.Errorf("conflict adding in-flight allocation sharer for claim %s/%s, UID=%s", allocatedClaim.Namespace, allocatedClaim.Name, claimUID) - } + if forceRemove || inFlight.sharers == 1 { + delete(c.inFlightAllocations, claimUID) + c.logger.V(5).Info("Removed in-flight claim", "claim", klog.KObj(claim), "uid", claimUID, "version", claim.ResourceVersion) + return true } - c.logger.V(5).Info("Added share for in-flight claim", "claim", klog.KObj(allocatedClaim), "uid", claimUID, "version", allocatedClaim.ResourceVersion, "sharers", newSharers) - return nil -} -func (c *claimTracker) RemoveSharedClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error { - value, ok := c.inFlightAllocationSharers.Load(claimUID) - if !ok { - return nil - } - oldSharers := value.(int) - newSharers := oldSharers - 1 - var written bool - if newSharers == 0 { - written = c.inFlightAllocationSharers.CompareAndDelete(claimUID, oldSharers) - } else { - written = c.inFlightAllocationSharers.CompareAndSwap(claimUID, oldSharers, newSharers) - } - if !written { - // The value must have changed since we loaded - return fmt.Errorf("conflict removing in-flight allocation sharer for claim %s/%s, UID=%s", allocatedClaim.Namespace, allocatedClaim.Name, claimUID) - } - c.logger.V(5).Info("Removed share for in-flight claim", "claim", klog.KObj(allocatedClaim), "uid", claimUID, "version", allocatedClaim.ResourceVersion, "sharers", newSharers) - return nil + inFlight.sharers-- + c.inFlightAllocations[claimUID] = inFlight + c.logger.V(5).Info("Claim is still shared by other pods, not removing in-flight claim", "claim", klog.KObj(claim), "uid", claimUID, "version", claim.ResourceVersion, "sharers", inFlight.sharers) + return false } func (c *claimTracker) Get(namespace, claimName string) (*resourceapi.ResourceClaim, error) { @@ -360,14 +342,13 @@ func (c *claimTracker) ListAllAllocatedDevices() (a sets.Set[structured.DeviceID allocated, revision := c.allocatedDevices.Get() // Whatever is in flight also has to be checked. - c.inFlightAllocations.Range(func(key, value any) bool { - claim := value.(*resourceapi.ResourceClaim) + for _, inFlight := range c.allInFlightAllocationsRLocked() { + claim := inFlight.claim foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) { c.logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim)) allocated.Insert(deviceID) }, false, func(structured.SharedDeviceID) {}, func(structured.DeviceConsumedCapacity) {}) - return true - }) + } if revision == c.allocatedDevices.Revision() { // Our current result is valid, nothing changed in the meantime. @@ -433,8 +414,8 @@ func (c *claimTracker) GatherAllocatedState() (s *structured.AllocatedState, err } // Whatever is in flight also has to be checked. - c.inFlightAllocations.Range(func(key, value any) bool { - claim := value.(*resourceapi.ResourceClaim) + for _, inFlight := range c.allInFlightAllocationsRLocked() { + claim := inFlight.claim foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) { // dedicatedDeviceCallback c.logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim)) @@ -449,9 +430,7 @@ func (c *claimTracker) GatherAllocatedState() (s *structured.AllocatedState, err c.logger.V(6).Info("Device is in flight for allocation", "consumed capacity", capacity, "claim", klog.KObj(claim)) aggregatedCapacity.Insert(capacity) }) - return true - }) - + } if revision1 == c.allocatedDevices.Revision() { // Our current result is valid, nothing changed in the meantime. return &structured.AllocatedState{ @@ -464,6 +443,18 @@ func (c *claimTracker) GatherAllocatedState() (s *structured.AllocatedState, err return nil, errClaimTrackerConcurrentModification } +func (c *claimTracker) allInFlightAllocationsRLocked() iter.Seq2[types.UID, inFlightAllocation] { + return func(yield func(types.UID, inFlightAllocation) bool) { + c.inFlightMutex.RLock() + defer c.inFlightMutex.RUnlock() + for uid, inFlight := range c.inFlightAllocations { + if !yield(uid, inFlight) { + return + } + } + } +} + fun ... [truncated]
← Back to Alerts View on GitHub →