Merge pull request #108724 from sanposhiho/cycle-state

use `sync.Map` in CycleState for better performance
This commit is contained in:
Kubernetes Prow Robot 2022-03-29 17:35:13 -07:00 committed by GitHub
commit 92c30bf6bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -41,18 +41,18 @@ type StateKey string
// StateData stored by one plugin can be read, altered, or deleted by another plugin. // StateData stored by one plugin can be read, altered, or deleted by another plugin.
// CycleState does not provide any data protection, as all plugins are assumed to be // CycleState does not provide any data protection, as all plugins are assumed to be
// trusted. // trusted.
// Note: CycleState uses a sync.Map to back the storage. It's aimed to optimize for the "write once and read many times" scenarios.
// It is the recommended pattern used in all in-tree plugins - plugin-specific state is written once in PreFilter/PreScore and afterwards read many times in Filter/Score.
type CycleState struct { type CycleState struct {
mx sync.RWMutex // storage is keyed with StateKey, and valued with StateData.
storage map[StateKey]StateData storage sync.Map
// if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle. // if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle.
recordPluginMetrics bool recordPluginMetrics bool
} }
// NewCycleState initializes a new CycleState and returns its pointer. // NewCycleState initializes a new CycleState and returns its pointer.
func NewCycleState() *CycleState { func NewCycleState() *CycleState {
return &CycleState{ return &CycleState{}
storage: make(map[StateKey]StateData),
}
} }
// ShouldRecordPluginMetrics returns whether PluginExecutionDuration metrics should be recorded. // ShouldRecordPluginMetrics returns whether PluginExecutionDuration metrics should be recorded.
@ -78,36 +78,32 @@ func (c *CycleState) Clone() *CycleState {
return nil return nil
} }
copy := NewCycleState() copy := NewCycleState()
for k, v := range c.storage { c.storage.Range(func(k, v interface{}) bool {
copy.Write(k, v.Clone()) copy.storage.Store(k, v.(StateData).Clone())
} return true
})
return copy return copy
} }
// Read retrieves data with the given "key" from CycleState. If the key is not // Read retrieves data with the given "key" from CycleState. If the key is not
// present an error is returned. // present an error is returned.
// This function is thread safe by acquiring an internal lock first. // This function is thread safe by using sync.Map.
func (c *CycleState) Read(key StateKey) (StateData, error) { func (c *CycleState) Read(key StateKey) (StateData, error) {
c.mx.RLock() if v, ok := c.storage.Load(key); ok {
defer c.mx.RUnlock() return v.(StateData), nil
if v, ok := c.storage[key]; ok {
return v, nil
} }
return nil, ErrNotFound return nil, ErrNotFound
} }
// Write stores the given "val" in CycleState with the given "key". // Write stores the given "val" in CycleState with the given "key".
// This function is thread safe by acquiring an internal lock first. // This function is thread safe by using sync.Map.
func (c *CycleState) Write(key StateKey, val StateData) { func (c *CycleState) Write(key StateKey, val StateData) {
c.mx.Lock() c.storage.Store(key, val)
c.storage[key] = val
c.mx.Unlock()
} }
// Delete deletes data with the given key from CycleState. // Delete deletes data with the given key from CycleState.
// This function is thread safe by acquiring an internal lock first. // This function is thread safe by using sync.Map.
func (c *CycleState) Delete(key StateKey) { func (c *CycleState) Delete(key StateKey) {
c.mx.Lock() c.storage.Delete(key)
delete(c.storage, key)
c.mx.Unlock()
} }