mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-18 16:21:13 +00:00
remove artificial sleeps that made tests passes
This commit is contained in:
parent
9ece24c33f
commit
33fc5b354b
@ -30,8 +30,6 @@ import (
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
var nodeAddressesRetryPeriod = 5 * time.Second
|
||||
|
||||
// SyncManager is an interface for making requests to a cloud provider
|
||||
type SyncManager interface {
|
||||
Run(stopCh <-chan struct{})
|
||||
@ -46,50 +44,53 @@ type cloudResourceSyncManager struct {
|
||||
// Sync period
|
||||
syncPeriod time.Duration
|
||||
|
||||
nodeAddressesMux sync.Mutex
|
||||
nodeAddressesErr error
|
||||
nodeAddresses []v1.NodeAddress
|
||||
nodeAddressesMonitor *sync.Cond
|
||||
nodeAddressesErr error
|
||||
nodeAddresses []v1.NodeAddress
|
||||
|
||||
nodeName types.NodeName
|
||||
}
|
||||
|
||||
// NewSyncManager creates a manager responsible for collecting resources
|
||||
// from a cloud provider through requests that are sensitive to timeouts and hanging
|
||||
// NewSyncManager creates a manager responsible for collecting resources from a
|
||||
// cloud provider through requests that are sensitive to timeouts and hanging
|
||||
func NewSyncManager(cloud cloudprovider.Interface, nodeName types.NodeName, syncPeriod time.Duration) SyncManager {
|
||||
return &cloudResourceSyncManager{
|
||||
cloud: cloud,
|
||||
syncPeriod: syncPeriod,
|
||||
nodeName: nodeName,
|
||||
// nodeAddressesMonitor is a monitor that guards a result (nodeAddresses,
|
||||
// nodeAddressesErr) of the sync loop under the condition that a result has
|
||||
// been saved at least once. The semantics here are:
|
||||
//
|
||||
// * Readers of the result will wait on the monitor until the first result
|
||||
// has been saved.
|
||||
// * The sync loop (i.e. the only writer), will signal all waiters every
|
||||
// time it updates the result.
|
||||
nodeAddressesMonitor: sync.NewCond(&sync.Mutex{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *cloudResourceSyncManager) getNodeAddressSafe() ([]v1.NodeAddress, error) {
|
||||
m.nodeAddressesMux.Lock()
|
||||
defer m.nodeAddressesMux.Unlock()
|
||||
func (m *cloudResourceSyncManager) updateAddresses(addrs []v1.NodeAddress, err error) {
|
||||
m.nodeAddressesMonitor.L.Lock()
|
||||
defer m.nodeAddressesMonitor.L.Unlock()
|
||||
defer m.nodeAddressesMonitor.Broadcast()
|
||||
|
||||
return m.nodeAddresses, m.nodeAddressesErr
|
||||
}
|
||||
|
||||
func (m *cloudResourceSyncManager) setNodeAddressSafe(nodeAddresses []v1.NodeAddress, err error) {
|
||||
m.nodeAddressesMux.Lock()
|
||||
defer m.nodeAddressesMux.Unlock()
|
||||
|
||||
m.nodeAddresses = nodeAddresses
|
||||
m.nodeAddresses = addrs
|
||||
m.nodeAddressesErr = err
|
||||
}
|
||||
|
||||
// NodeAddresses does not wait for cloud provider to return a node addresses.
|
||||
// It always returns node addresses or an error.
|
||||
func (m *cloudResourceSyncManager) NodeAddresses() ([]v1.NodeAddress, error) {
|
||||
m.nodeAddressesMonitor.L.Lock()
|
||||
defer m.nodeAddressesMonitor.L.Unlock()
|
||||
// wait until there is something
|
||||
for {
|
||||
nodeAddresses, err := m.getNodeAddressSafe()
|
||||
if len(nodeAddresses) == 0 && err == nil {
|
||||
klog.V(5).Infof("Waiting for %v for cloud provider to provide node addresses", nodeAddressesRetryPeriod)
|
||||
time.Sleep(nodeAddressesRetryPeriod)
|
||||
continue
|
||||
if addrs, err := m.nodeAddresses, m.nodeAddressesErr; len(addrs) > 0 || err != nil {
|
||||
return addrs, err
|
||||
}
|
||||
return nodeAddresses, err
|
||||
klog.V(5).Infof("Waiting for cloud provider to provide node addresses")
|
||||
m.nodeAddressesMonitor.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
@ -98,7 +99,7 @@ func (m *cloudResourceSyncManager) collectNodeAddresses(ctx context.Context, nod
|
||||
|
||||
instances, ok := m.cloud.Instances()
|
||||
if !ok {
|
||||
m.setNodeAddressSafe(nil, fmt.Errorf("failed to get instances from cloud provider"))
|
||||
m.updateAddresses(nil, fmt.Errorf("failed to get instances from cloud provider"))
|
||||
return
|
||||
}
|
||||
|
||||
@ -109,10 +110,10 @@ func (m *cloudResourceSyncManager) collectNodeAddresses(ctx context.Context, nod
|
||||
|
||||
nodeAddresses, err := instances.NodeAddresses(ctx, nodeName)
|
||||
if err != nil {
|
||||
m.setNodeAddressSafe(nil, fmt.Errorf("failed to get node address from cloud provider: %v", err))
|
||||
m.updateAddresses(nil, fmt.Errorf("failed to get node address from cloud provider: %v", err))
|
||||
klog.V(2).Infof("Node addresses from cloud provider for node %q not collected", nodeName)
|
||||
} else {
|
||||
m.setNodeAddressSafe(nodeAddresses, nil)
|
||||
m.updateAddresses(nodeAddresses, nil)
|
||||
klog.V(5).Infof("Node addresses from cloud provider for node %q collected", nodeName)
|
||||
}
|
||||
}
|
||||
|
@ -35,13 +35,12 @@ func createNodeInternalIPAddress(address string) []v1.NodeAddress {
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeAddressesRequest(t *testing.T) {
|
||||
syncPeriod := 300 * time.Millisecond
|
||||
maxRetry := 5
|
||||
func TestNodeAddressesDelay(t *testing.T) {
|
||||
syncPeriod := 100 * time.Millisecond
|
||||
cloud := &fake.FakeCloud{
|
||||
Addresses: createNodeInternalIPAddress("10.0.1.12"),
|
||||
// Set the request delay so the manager timeouts and collects the node addresses later
|
||||
RequestDelay: 400 * time.Millisecond,
|
||||
RequestDelay: 200 * time.Millisecond,
|
||||
}
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
@ -61,6 +60,7 @@ func TestNodeAddressesRequest(t *testing.T) {
|
||||
cloud.SetNodeAddresses(createNodeInternalIPAddress("10.0.1.13"))
|
||||
|
||||
// Wait until the IP address changes
|
||||
maxRetry := 5
|
||||
for i := 0; i < maxRetry; i++ {
|
||||
nodeAddresses, err := manager.NodeAddresses()
|
||||
t.Logf("nodeAddresses: %#v, err: %v", nodeAddresses, err)
|
||||
@ -69,7 +69,7 @@ func TestNodeAddressesRequest(t *testing.T) {
|
||||
}
|
||||
// It is safe to read cloud.Addresses since no routine is changing the value at the same time
|
||||
if err == nil && nodeAddresses[0].Address != cloud.Addresses[0].Address {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
time.Sleep(syncPeriod)
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user