remove custom timeout in test that is never exercised

and misc cleanup
This commit is contained in:
Mike Danese 2018-12-04 13:02:24 -08:00
parent d2570d3971
commit 9ece24c33f
3 changed files with 25 additions and 41 deletions

View File

@ -21,6 +21,7 @@ go_test(
deps = [ deps = [
"//pkg/cloudprovider/providers/fake:go_default_library", "//pkg/cloudprovider/providers/fake:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
], ],
) )

View File

@ -63,27 +63,27 @@ func NewSyncManager(cloud cloudprovider.Interface, nodeName types.NodeName, sync
} }
} }
func (manager *cloudResourceSyncManager) getNodeAddressSafe() ([]v1.NodeAddress, error) { func (m *cloudResourceSyncManager) getNodeAddressSafe() ([]v1.NodeAddress, error) {
manager.nodeAddressesMux.Lock() m.nodeAddressesMux.Lock()
defer manager.nodeAddressesMux.Unlock() defer m.nodeAddressesMux.Unlock()
return manager.nodeAddresses, manager.nodeAddressesErr return m.nodeAddresses, m.nodeAddressesErr
} }
func (manager *cloudResourceSyncManager) setNodeAddressSafe(nodeAddresses []v1.NodeAddress, err error) { func (m *cloudResourceSyncManager) setNodeAddressSafe(nodeAddresses []v1.NodeAddress, err error) {
manager.nodeAddressesMux.Lock() m.nodeAddressesMux.Lock()
defer manager.nodeAddressesMux.Unlock() defer m.nodeAddressesMux.Unlock()
manager.nodeAddresses = nodeAddresses m.nodeAddresses = nodeAddresses
manager.nodeAddressesErr = err m.nodeAddressesErr = err
} }
// NodeAddresses does not wait for cloud provider to return a node addresses. // NodeAddresses does not wait for cloud provider to return a node addresses.
// It always returns node addresses or an error. // It always returns node addresses or an error.
func (manager *cloudResourceSyncManager) NodeAddresses() ([]v1.NodeAddress, error) { func (m *cloudResourceSyncManager) NodeAddresses() ([]v1.NodeAddress, error) {
// wait until there is something // wait until there is something
for { for {
nodeAddresses, err := manager.getNodeAddressSafe() nodeAddresses, err := m.getNodeAddressSafe()
if len(nodeAddresses) == 0 && err == nil { if len(nodeAddresses) == 0 && err == nil {
klog.V(5).Infof("Waiting for %v for cloud provider to provide node addresses", nodeAddressesRetryPeriod) klog.V(5).Infof("Waiting for %v for cloud provider to provide node addresses", nodeAddressesRetryPeriod)
time.Sleep(nodeAddressesRetryPeriod) time.Sleep(nodeAddressesRetryPeriod)
@ -93,12 +93,12 @@ func (manager *cloudResourceSyncManager) NodeAddresses() ([]v1.NodeAddress, erro
} }
} }
func (manager *cloudResourceSyncManager) collectNodeAddresses(ctx context.Context, nodeName types.NodeName) { func (m *cloudResourceSyncManager) collectNodeAddresses(ctx context.Context, nodeName types.NodeName) {
klog.V(5).Infof("Requesting node addresses from cloud provider for node %q", nodeName) klog.V(5).Infof("Requesting node addresses from cloud provider for node %q", nodeName)
instances, ok := manager.cloud.Instances() instances, ok := m.cloud.Instances()
if !ok { if !ok {
manager.setNodeAddressSafe(nil, fmt.Errorf("failed to get instances from cloud provider")) m.setNodeAddressSafe(nil, fmt.Errorf("failed to get instances from cloud provider"))
return return
} }
@ -109,16 +109,17 @@ func (manager *cloudResourceSyncManager) collectNodeAddresses(ctx context.Contex
nodeAddresses, err := instances.NodeAddresses(ctx, nodeName) nodeAddresses, err := instances.NodeAddresses(ctx, nodeName)
if err != nil { if err != nil {
manager.setNodeAddressSafe(nil, fmt.Errorf("failed to get node address from cloud provider: %v", err)) m.setNodeAddressSafe(nil, fmt.Errorf("failed to get node address from cloud provider: %v", err))
klog.V(2).Infof("Node addresses from cloud provider for node %q not collected", nodeName) klog.V(2).Infof("Node addresses from cloud provider for node %q not collected", nodeName)
} else { } else {
manager.setNodeAddressSafe(nodeAddresses, nil) m.setNodeAddressSafe(nodeAddresses, nil)
klog.V(5).Infof("Node addresses from cloud provider for node %q collected", nodeName) klog.V(5).Infof("Node addresses from cloud provider for node %q collected", nodeName)
} }
} }
func (manager *cloudResourceSyncManager) Run(stopCh <-chan struct{}) { // Run starts the cloud resource sync manager's sync loop.
func (m *cloudResourceSyncManager) Run(stopCh <-chan struct{}) {
wait.Until(func() { wait.Until(func() {
manager.collectNodeAddresses(context.TODO(), manager.nodeName) m.collectNodeAddresses(context.TODO(), m.nodeName)
}, manager.syncPeriod, stopCh) }, m.syncPeriod, stopCh)
} }

View File

@ -17,33 +17,15 @@ limitations under the License.
package cloudresource package cloudresource
import ( import (
"fmt"
"reflect" "reflect"
"testing" "testing"
"time" "time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/kubernetes/pkg/cloudprovider/providers/fake" "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
) )
func collectNodeAddresses(manager *cloudResourceSyncManager) ([]v1.NodeAddress, error) {
var nodeAddresses []v1.NodeAddress
var err error
collected := make(chan struct{}, 1)
go func() {
nodeAddresses, err = manager.NodeAddresses()
close(collected)
}()
select {
case <-collected:
return nodeAddresses, err
case <-time.Tick(2 * nodeAddressesRetryPeriod):
return nil, fmt.Errorf("Timeout after %v waiting for address to appear", 2*nodeAddressesRetryPeriod)
}
}
func createNodeInternalIPAddress(address string) []v1.NodeAddress { func createNodeInternalIPAddress(address string) []v1.NodeAddress {
return []v1.NodeAddress{ return []v1.NodeAddress{
{ {
@ -67,12 +49,12 @@ func TestNodeAddressesRequest(t *testing.T) {
manager := NewSyncManager(cloud, "defaultNode", syncPeriod).(*cloudResourceSyncManager) manager := NewSyncManager(cloud, "defaultNode", syncPeriod).(*cloudResourceSyncManager)
go manager.Run(stopCh) go manager.Run(stopCh)
nodeAddresses, err := collectNodeAddresses(manager) nodeAddresses, err := manager.NodeAddresses()
if err != nil { if err != nil {
t.Errorf("Unexpected err: %q\n", err) t.Errorf("Unexpected err: %q\n", err)
} }
if !reflect.DeepEqual(nodeAddresses, cloud.Addresses) { if !reflect.DeepEqual(nodeAddresses, cloud.Addresses) {
t.Errorf("Unexpected list of node addresses %#v, expected %#v: %v", nodeAddresses, cloud.Addresses, err) t.Errorf("Unexpected diff of node addresses: %v", diff.ObjectReflectDiff(nodeAddresses, cloud.Addresses))
} }
// Change the IP address // Change the IP address
@ -80,7 +62,7 @@ func TestNodeAddressesRequest(t *testing.T) {
// Wait until the IP address changes // Wait until the IP address changes
for i := 0; i < maxRetry; i++ { for i := 0; i < maxRetry; i++ {
nodeAddresses, err := collectNodeAddresses(manager) nodeAddresses, err := manager.NodeAddresses()
t.Logf("nodeAddresses: %#v, err: %v", nodeAddresses, err) t.Logf("nodeAddresses: %#v, err: %v", nodeAddresses, err)
if err != nil { if err != nil {
t.Errorf("Unexpected err: %q\n", err) t.Errorf("Unexpected err: %q\n", err)