From 15b03b8c0cb30f762f31f68e35024ade0f147efb Mon Sep 17 00:00:00 2001 From: Michael Taufen Date: Mon, 25 Jun 2018 16:39:05 -0700 Subject: [PATCH] port setNodeAddress to Setter abstraction, port test also put cloud_request_manager.go in its own package --- pkg/kubelet/BUILD | 6 +- pkg/kubelet/cloudresource/BUILD | 39 ++++ .../cloud_request_manager.go | 14 +- .../cloud_request_manager_test.go | 4 +- pkg/kubelet/kubelet.go | 5 +- pkg/kubelet/kubelet_node_status.go | 110 +--------- pkg/kubelet/kubelet_node_status_test.go | 154 -------------- pkg/kubelet/nodestatus/BUILD | 24 ++- pkg/kubelet/nodestatus/setters.go | 116 ++++++++++ pkg/kubelet/nodestatus/setters_test.go | 200 ++++++++++++++++++ 10 files changed, 403 insertions(+), 269 deletions(-) create mode 100644 pkg/kubelet/cloudresource/BUILD rename pkg/kubelet/{ => cloudresource}/cloud_request_manager.go (89%) rename pkg/kubelet/{ => cloudresource}/cloud_request_manager_test.go (96%) create mode 100644 pkg/kubelet/nodestatus/setters_test.go diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index f0fd5b421c8..f284d6918f2 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -10,7 +10,6 @@ go_library( name = "go_default_library", srcs = [ "active_deadline.go", - "cloud_request_manager.go", "doc.go", "kubelet.go", "kubelet_getters.go", @@ -48,6 +47,7 @@ go_library( "//pkg/kubelet/cadvisor:go_default_library", "//pkg/kubelet/certificate:go_default_library", "//pkg/kubelet/checkpointmanager:go_default_library", + "//pkg/kubelet/cloudresource:go_default_library", "//pkg/kubelet/cm:go_default_library", "//pkg/kubelet/config:go_default_library", "//pkg/kubelet/configmap:go_default_library", @@ -66,6 +66,7 @@ go_library( "//pkg/kubelet/metrics/collectors:go_default_library", "//pkg/kubelet/mountpod:go_default_library", "//pkg/kubelet/network/dns:go_default_library", + "//pkg/kubelet/nodestatus:go_default_library", "//pkg/kubelet/pleg:go_default_library", "//pkg/kubelet/pod:go_default_library", "//pkg/kubelet/preemption:go_default_library", @@ -119,7 +120,6 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library", @@ -147,7 +147,6 @@ go_test( name = "go_default_test", srcs = [ "active_deadline_test.go", - "cloud_request_manager_test.go", "kubelet_getters_test.go", "kubelet_network_test.go", "kubelet_node_status_test.go", @@ -255,6 +254,7 @@ filegroup( "//pkg/kubelet/checkpoint:all-srcs", "//pkg/kubelet/checkpointmanager:all-srcs", "//pkg/kubelet/client:all-srcs", + "//pkg/kubelet/cloudresource:all-srcs", "//pkg/kubelet/cm:all-srcs", "//pkg/kubelet/config:all-srcs", "//pkg/kubelet/configmap:all-srcs", diff --git a/pkg/kubelet/cloudresource/BUILD b/pkg/kubelet/cloudresource/BUILD new file mode 100644 index 00000000000..ec93978d78b --- /dev/null +++ b/pkg/kubelet/cloudresource/BUILD @@ -0,0 +1,39 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["cloud_request_manager.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/cloudresource", + visibility = ["//visibility:public"], + deps = [ + "//pkg/cloudprovider:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["cloud_request_manager_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/cloudprovider/providers/fake:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/kubelet/cloud_request_manager.go b/pkg/kubelet/cloudresource/cloud_request_manager.go similarity index 89% rename from pkg/kubelet/cloud_request_manager.go rename to pkg/kubelet/cloudresource/cloud_request_manager.go index 58752bf19d3..f7ba8a197c4 100644 --- a/pkg/kubelet/cloud_request_manager.go +++ b/pkg/kubelet/cloudresource/cloud_request_manager.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package kubelet +package cloudresource import ( "context" @@ -32,6 +32,14 @@ import ( var nodeAddressesRetryPeriod = 5 * time.Second +// SyncManager is an interface for making requests to a cloud provider +type SyncManager interface { + Run(stopCh <-chan struct{}) + NodeAddresses() ([]v1.NodeAddress, error) +} + +var _ SyncManager = &cloudResourceSyncManager{} + type cloudResourceSyncManager struct { // Cloud provider interface. cloud cloudprovider.Interface @@ -45,9 +53,9 @@ type cloudResourceSyncManager struct { nodeName types.NodeName } -// NewCloudResourceSyncManager creates a manager responsible for collecting resources +// NewSyncManager creates a manager responsible for collecting resources // from a cloud provider through requests that are sensitive to timeouts and hanging -func NewCloudResourceSyncManager(cloud cloudprovider.Interface, nodeName types.NodeName, syncPeriod time.Duration) *cloudResourceSyncManager { +func NewSyncManager(cloud cloudprovider.Interface, nodeName types.NodeName, syncPeriod time.Duration) SyncManager { return &cloudResourceSyncManager{ cloud: cloud, syncPeriod: syncPeriod, diff --git a/pkg/kubelet/cloud_request_manager_test.go b/pkg/kubelet/cloudresource/cloud_request_manager_test.go similarity index 96% rename from pkg/kubelet/cloud_request_manager_test.go rename to pkg/kubelet/cloudresource/cloud_request_manager_test.go index f29293c504f..a0777f5bf5c 100644 --- a/pkg/kubelet/cloud_request_manager_test.go +++ b/pkg/kubelet/cloudresource/cloud_request_manager_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package kubelet +package cloudresource import ( "fmt" @@ -64,7 +64,7 @@ func TestNodeAddressesRequest(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - manager := NewCloudResourceSyncManager(cloud, "defaultNode", syncPeriod) + manager := NewSyncManager(cloud, "defaultNode", syncPeriod).(*cloudResourceSyncManager) go manager.Run(stopCh) nodeAddresses, err := collectNodeAddresses(manager) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 36edfe6cb93..3c6232a3b5d 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -61,6 +61,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cadvisor" kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/cloudresource" "k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/configmap" @@ -521,7 +522,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, } if klet.cloud != nil { - klet.cloudResourceSyncManager = NewCloudResourceSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency) + klet.cloudResourceSyncManager = cloudresource.NewSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency) } var secretManager secret.Manager @@ -966,7 +967,7 @@ type Kubelet struct { // Cloud provider interface. cloud cloudprovider.Interface // Handles requests to cloud provider with timeout - cloudResourceSyncManager *cloudResourceSyncManager + cloudResourceSyncManager cloudresource.SyncManager // Indicates that the node initialization happens in an external cloud controller externalCloudProvider bool diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index ad080d8ba42..23400c25e1f 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -31,7 +31,6 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - utilnet "k8s.io/apimachinery/pkg/util/net" utilfeature "k8s.io/apiserver/pkg/util/feature" k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" @@ -40,6 +39,7 @@ import ( kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/events" + "k8s.io/kubernetes/pkg/kubelet/nodestatus" "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/scheduler/algorithm" nodeutil "k8s.io/kubernetes/pkg/util/node" @@ -443,107 +443,6 @@ func (kl *Kubelet) recordNodeStatusEvent(eventType, event string) { kl.recorder.Eventf(kl.nodeRef, eventType, event, "Node %s status is now: %s", kl.nodeName, event) } -// Set IP and hostname addresses for the node. -func (kl *Kubelet) setNodeAddress(node *v1.Node) error { - if kl.nodeIP != nil { - if err := kl.nodeIPValidator(kl.nodeIP); err != nil { - return fmt.Errorf("failed to validate nodeIP: %v", err) - } - glog.V(2).Infof("Using node IP: %q", kl.nodeIP.String()) - } - - if kl.externalCloudProvider { - if kl.nodeIP != nil { - if node.ObjectMeta.Annotations == nil { - node.ObjectMeta.Annotations = make(map[string]string) - } - node.ObjectMeta.Annotations[kubeletapis.AnnotationProvidedIPAddr] = kl.nodeIP.String() - } - // We rely on the external cloud provider to supply the addresses. - return nil - } - if kl.cloud != nil { - nodeAddresses, err := kl.cloudResourceSyncManager.NodeAddresses() - if err != nil { - return err - } - - if kl.nodeIP != nil { - enforcedNodeAddresses := []v1.NodeAddress{} - - var nodeIPType v1.NodeAddressType - for _, nodeAddress := range nodeAddresses { - if nodeAddress.Address == kl.nodeIP.String() { - enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: nodeAddress.Type, Address: nodeAddress.Address}) - nodeIPType = nodeAddress.Type - break - } - } - if len(enforcedNodeAddresses) > 0 { - for _, nodeAddress := range nodeAddresses { - if nodeAddress.Type != nodeIPType && nodeAddress.Type != v1.NodeHostName { - enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: nodeAddress.Type, Address: nodeAddress.Address}) - } - } - - enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: v1.NodeHostName, Address: kl.GetHostname()}) - node.Status.Addresses = enforcedNodeAddresses - return nil - } - return fmt.Errorf("failed to get node address from cloud provider that matches ip: %v", kl.nodeIP) - } - - // Only add a NodeHostName address if the cloudprovider did not specify any addresses. - // (we assume the cloudprovider is authoritative if it specifies any addresses) - if len(nodeAddresses) == 0 { - nodeAddresses = []v1.NodeAddress{{Type: v1.NodeHostName, Address: kl.GetHostname()}} - } - node.Status.Addresses = nodeAddresses - } else { - var ipAddr net.IP - var err error - - // 1) Use nodeIP if set - // 2) If the user has specified an IP to HostnameOverride, use it - // 3) Lookup the IP from node name by DNS and use the first valid IPv4 address. - // If the node does not have a valid IPv4 address, use the first valid IPv6 address. - // 4) Try to get the IP from the network interface used as default gateway - if kl.nodeIP != nil { - ipAddr = kl.nodeIP - } else if addr := net.ParseIP(kl.hostname); addr != nil { - ipAddr = addr - } else { - var addrs []net.IP - addrs, _ = net.LookupIP(node.Name) - for _, addr := range addrs { - if err = kl.nodeIPValidator(addr); err == nil { - if addr.To4() != nil { - ipAddr = addr - break - } - if addr.To16() != nil && ipAddr == nil { - ipAddr = addr - } - } - } - - if ipAddr == nil { - ipAddr, err = utilnet.ChooseHostInterface() - } - } - - if ipAddr == nil { - // We tried everything we could, but the IP address wasn't fetchable; error out - return fmt.Errorf("can't get ip address of node %s. error: %v", node.Name, err) - } - node.Status.Addresses = []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: ipAddr.String()}, - {Type: v1.NodeHostName, Address: kl.GetHostname()}, - } - } - return nil -} - func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) { // Note: avoid blindly overwriting the capacity in case opaque // resources are being advertised. @@ -1083,8 +982,13 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error { return nil } } + // if cloud is not nil, we expect the cloud resource sync manager to exist + var nodeAddressesFunc func() ([]v1.NodeAddress, error) + if kl.cloud != nil { + nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses + } return []func(*v1.Node) error{ - kl.setNodeAddress, + nodestatus.NodeAddress(kl.nodeIP, kl.nodeIPValidator, kl.hostname, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc), withoutError(kl.setNodeStatusInfo), withoutError(kl.setNodeOODCondition), withoutError(kl.setNodeMemoryPressureCondition), diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index 6187c99e827..0958c2d9e61 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -140,160 +140,6 @@ func (lcm *localCM) GetCapacity() v1.ResourceList { return lcm.capacity } -func TestNodeStatusWithCloudProviderNodeIP(t *testing.T) { - testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) - defer testKubelet.Cleanup() - kubelet := testKubelet.kubelet - kubelet.kubeClient = nil // ensure only the heartbeat client is used - kubelet.hostname = testKubeletHostname - - cases := []struct { - name string - nodeIP net.IP - nodeAddresses []v1.NodeAddress - expectedAddresses []v1.NodeAddress - shouldError bool - }{ - { - name: "A single InternalIP", - nodeIP: net.ParseIP("10.1.1.1"), - nodeAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - expectedAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - shouldError: false, - }, - { - name: "NodeIP is external", - nodeIP: net.ParseIP("55.55.55.55"), - nodeAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, - {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - expectedAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, - {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - shouldError: false, - }, - { - // Accommodating #45201 and #49202 - name: "InternalIP and ExternalIP are the same", - nodeIP: net.ParseIP("55.55.55.55"), - nodeAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - expectedAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - shouldError: false, - }, - { - name: "An Internal/ExternalIP, an Internal/ExternalDNS", - nodeIP: net.ParseIP("10.1.1.1"), - nodeAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, - {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeInternalDNS, Address: "ip-10-1-1-1.us-west-2.compute.internal"}, - {Type: v1.NodeExternalDNS, Address: "ec2-55-55-55-55.us-west-2.compute.amazonaws.com"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - expectedAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, - {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeInternalDNS, Address: "ip-10-1-1-1.us-west-2.compute.internal"}, - {Type: v1.NodeExternalDNS, Address: "ec2-55-55-55-55.us-west-2.compute.amazonaws.com"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - shouldError: false, - }, - { - name: "An Internal with multiple internal IPs", - nodeIP: net.ParseIP("10.1.1.1"), - nodeAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, - {Type: v1.NodeInternalIP, Address: "10.2.2.2"}, - {Type: v1.NodeInternalIP, Address: "10.3.3.3"}, - {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - expectedAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, - {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - shouldError: false, - }, - { - name: "An InternalIP that isn't valid: should error", - nodeIP: net.ParseIP("10.2.2.2"), - nodeAddresses: []v1.NodeAddress{ - {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, - {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, - {Type: v1.NodeHostName, Address: testKubeletHostname}, - }, - expectedAddresses: nil, - shouldError: true, - }, - } - for _, testCase := range cases { - // testCase setup - existingNode := v1.Node{ - ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Annotations: make(map[string]string)}, - Spec: v1.NodeSpec{}, - } - - kubelet.nodeIP = testCase.nodeIP - - fakeCloud := &fakecloud.FakeCloud{ - Addresses: testCase.nodeAddresses, - Err: nil, - } - kubelet.cloud = fakeCloud - kubelet.cloudResourceSyncManager = NewCloudResourceSyncManager(kubelet.cloud, kubelet.nodeName, kubelet.nodeStatusUpdateFrequency) - stopCh := make(chan struct{}) - go kubelet.cloudResourceSyncManager.Run(stopCh) - kubelet.nodeIPValidator = func(nodeIP net.IP) error { - return nil - } - - // execute method - err := kubelet.setNodeAddress(&existingNode) - close(stopCh) - if err != nil && !testCase.shouldError { - t.Errorf("Unexpected error for test %s: %q", testCase.name, err) - continue - } else if err != nil && testCase.shouldError { - // expected an error - continue - } - - // Sort both sets for consistent equality - sortNodeAddresses(testCase.expectedAddresses) - sortNodeAddresses(existingNode.Status.Addresses) - - assert.True( - t, - apiequality.Semantic.DeepEqual( - testCase.expectedAddresses, - existingNode.Status.Addresses, - ), - fmt.Sprintf("Test %s failed %%s", testCase.name), - diff.ObjectDiff(testCase.expectedAddresses, existingNode.Status.Addresses), - ) - } -} - // sortableNodeAddress is a type for sorting []v1.NodeAddress type sortableNodeAddress []v1.NodeAddress diff --git a/pkg/kubelet/nodestatus/BUILD b/pkg/kubelet/nodestatus/BUILD index fd239ccd3cf..b7ac0ca3407 100644 --- a/pkg/kubelet/nodestatus/BUILD +++ b/pkg/kubelet/nodestatus/BUILD @@ -1,11 +1,17 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = ["setters.go"], importpath = "k8s.io/kubernetes/pkg/kubelet/nodestatus", visibility = ["//visibility:public"], - deps = ["//vendor/k8s.io/api/core/v1:go_default_library"], + deps = [ + "//pkg/cloudprovider:go_default_library", + "//pkg/kubelet/apis:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + ], ) filegroup( @@ -21,3 +27,17 @@ filegroup( tags = ["automanaged"], visibility = ["//visibility:public"], ) + +go_test( + name = "go_default_test", + srcs = ["setters_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/cloudprovider/providers/fake:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", + ], +) diff --git a/pkg/kubelet/nodestatus/setters.go b/pkg/kubelet/nodestatus/setters.go index 432853a5059..3d84c5e18aa 100644 --- a/pkg/kubelet/nodestatus/setters.go +++ b/pkg/kubelet/nodestatus/setters.go @@ -17,9 +17,125 @@ limitations under the License. package nodestatus import ( + "fmt" + "net" + "k8s.io/api/core/v1" + utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/kubernetes/pkg/cloudprovider" + kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" + + "github.com/golang/glog" ) // Setter modifies the node in-place, and returns an error if the modification failed. // Setters may partially mutate the node before returning an error. type Setter func(node *v1.Node) error + +// NodeAddress returns a Setter that updates address-related information on the node. +func NodeAddress(nodeIP net.IP, // typically Kubelet.nodeIP + validateNodeIPFunc func(net.IP) error, // typically Kubelet.nodeIPValidator + hostname string, // typically Kubelet.hostname + externalCloudProvider bool, // typically Kubelet.externalCloudProvider + cloud cloudprovider.Interface, // typically Kubelet.cloud + nodeAddressesFunc func() ([]v1.NodeAddress, error), // typically Kubelet.cloudResourceSyncManager.NodeAddresses +) Setter { + return func(node *v1.Node) error { + if nodeIP != nil { + if err := validateNodeIPFunc(nodeIP); err != nil { + return fmt.Errorf("failed to validate nodeIP: %v", err) + } + glog.V(2).Infof("Using node IP: %q", nodeIP.String()) + } + + if externalCloudProvider { + if nodeIP != nil { + if node.ObjectMeta.Annotations == nil { + node.ObjectMeta.Annotations = make(map[string]string) + } + node.ObjectMeta.Annotations[kubeletapis.AnnotationProvidedIPAddr] = nodeIP.String() + } + // We rely on the external cloud provider to supply the addresses. + return nil + } + if cloud != nil { + nodeAddresses, err := nodeAddressesFunc() + if err != nil { + return err + } + if nodeIP != nil { + enforcedNodeAddresses := []v1.NodeAddress{} + + var nodeIPType v1.NodeAddressType + for _, nodeAddress := range nodeAddresses { + if nodeAddress.Address == nodeIP.String() { + enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: nodeAddress.Type, Address: nodeAddress.Address}) + nodeIPType = nodeAddress.Type + break + } + } + if len(enforcedNodeAddresses) > 0 { + for _, nodeAddress := range nodeAddresses { + if nodeAddress.Type != nodeIPType && nodeAddress.Type != v1.NodeHostName { + enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: nodeAddress.Type, Address: nodeAddress.Address}) + } + } + + enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: v1.NodeHostName, Address: hostname}) + node.Status.Addresses = enforcedNodeAddresses + return nil + } + return fmt.Errorf("failed to get node address from cloud provider that matches ip: %v", nodeIP) + } + + // Only add a NodeHostName address if the cloudprovider did not specify any addresses. + // (we assume the cloudprovider is authoritative if it specifies any addresses) + if len(nodeAddresses) == 0 { + nodeAddresses = []v1.NodeAddress{{Type: v1.NodeHostName, Address: hostname}} + } + node.Status.Addresses = nodeAddresses + } else { + var ipAddr net.IP + var err error + + // 1) Use nodeIP if set + // 2) If the user has specified an IP to HostnameOverride, use it + // 3) Lookup the IP from node name by DNS and use the first valid IPv4 address. + // If the node does not have a valid IPv4 address, use the first valid IPv6 address. + // 4) Try to get the IP from the network interface used as default gateway + if nodeIP != nil { + ipAddr = nodeIP + } else if addr := net.ParseIP(hostname); addr != nil { + ipAddr = addr + } else { + var addrs []net.IP + addrs, _ = net.LookupIP(node.Name) + for _, addr := range addrs { + if err = validateNodeIPFunc(addr); err == nil { + if addr.To4() != nil { + ipAddr = addr + break + } + if addr.To16() != nil && ipAddr == nil { + ipAddr = addr + } + } + } + + if ipAddr == nil { + ipAddr, err = utilnet.ChooseHostInterface() + } + } + + if ipAddr == nil { + // We tried everything we could, but the IP address wasn't fetchable; error out + return fmt.Errorf("can't get ip address of node %s. error: %v", node.Name, err) + } + node.Status.Addresses = []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: ipAddr.String()}, + {Type: v1.NodeHostName, Address: hostname}, + } + } + return nil + } +} diff --git a/pkg/kubelet/nodestatus/setters_test.go b/pkg/kubelet/nodestatus/setters_test.go new file mode 100644 index 00000000000..f3f86ea63c8 --- /dev/null +++ b/pkg/kubelet/nodestatus/setters_test.go @@ -0,0 +1,200 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodestatus + +import ( + "net" + "sort" + "testing" + + "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/diff" + fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" + + "github.com/stretchr/testify/assert" +) + +const ( + testKubeletHostname = "127.0.0.1" +) + +// TODO(mtaufen): below is ported from the old kubelet_node_status_test.go code, potentially add more test coverage for NodeAddress setter in future +func TestNodeAddress(t *testing.T) { + cases := []struct { + name string + nodeIP net.IP + nodeAddresses []v1.NodeAddress + expectedAddresses []v1.NodeAddress + shouldError bool + }{ + { + name: "A single InternalIP", + nodeIP: net.ParseIP("10.1.1.1"), + nodeAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + expectedAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + shouldError: false, + }, + { + name: "NodeIP is external", + nodeIP: net.ParseIP("55.55.55.55"), + nodeAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, + {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + expectedAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, + {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + shouldError: false, + }, + { + // Accommodating #45201 and #49202 + name: "InternalIP and ExternalIP are the same", + nodeIP: net.ParseIP("55.55.55.55"), + nodeAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + expectedAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + shouldError: false, + }, + { + name: "An Internal/ExternalIP, an Internal/ExternalDNS", + nodeIP: net.ParseIP("10.1.1.1"), + nodeAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, + {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeInternalDNS, Address: "ip-10-1-1-1.us-west-2.compute.internal"}, + {Type: v1.NodeExternalDNS, Address: "ec2-55-55-55-55.us-west-2.compute.amazonaws.com"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + expectedAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, + {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeInternalDNS, Address: "ip-10-1-1-1.us-west-2.compute.internal"}, + {Type: v1.NodeExternalDNS, Address: "ec2-55-55-55-55.us-west-2.compute.amazonaws.com"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + shouldError: false, + }, + { + name: "An Internal with multiple internal IPs", + nodeIP: net.ParseIP("10.1.1.1"), + nodeAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, + {Type: v1.NodeInternalIP, Address: "10.2.2.2"}, + {Type: v1.NodeInternalIP, Address: "10.3.3.3"}, + {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + expectedAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, + {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + shouldError: false, + }, + { + name: "An InternalIP that isn't valid: should error", + nodeIP: net.ParseIP("10.2.2.2"), + nodeAddresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "10.1.1.1"}, + {Type: v1.NodeExternalIP, Address: "55.55.55.55"}, + {Type: v1.NodeHostName, Address: testKubeletHostname}, + }, + expectedAddresses: nil, + shouldError: true, + }, + } + for _, testCase := range cases { + t.Run(testCase.name, func(t *testing.T) { + // testCase setup + existingNode := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Annotations: make(map[string]string)}, + Spec: v1.NodeSpec{}, + } + + nodeIP := testCase.nodeIP + nodeIPValidator := func(nodeIP net.IP) error { + return nil + } + hostname := testKubeletHostname + externalCloudProvider := false + cloud := &fakecloud.FakeCloud{ + Addresses: testCase.nodeAddresses, + Err: nil, + } + nodeAddressesFunc := func() ([]v1.NodeAddress, error) { + return testCase.nodeAddresses, nil + } + + // construct setter + setter := NodeAddress(nodeIP, + nodeIPValidator, + hostname, + externalCloudProvider, + cloud, + nodeAddressesFunc) + + // call setter on existing node + err := setter(existingNode) + if err != nil && !testCase.shouldError { + t.Fatalf("unexpected error: %v", err) + } else if err != nil && testCase.shouldError { + // expected an error, and got one, so just return early here + return + } + + // Sort both sets for consistent equality + sortNodeAddresses(testCase.expectedAddresses) + sortNodeAddresses(existingNode.Status.Addresses) + + assert.True(t, apiequality.Semantic.DeepEqual(testCase.expectedAddresses, existingNode.Status.Addresses), + "Diff: %s", diff.ObjectDiff(testCase.expectedAddresses, existingNode.Status.Addresses)) + }) + } +} + +// Test Helpers: + +// sortableNodeAddress is a type for sorting []v1.NodeAddress +type sortableNodeAddress []v1.NodeAddress + +func (s sortableNodeAddress) Len() int { return len(s) } +func (s sortableNodeAddress) Less(i, j int) bool { + return (string(s[i].Type) + s[i].Address) < (string(s[j].Type) + s[j].Address) +} +func (s sortableNodeAddress) Swap(i, j int) { s[j], s[i] = s[i], s[j] } + +func sortNodeAddresses(addrs sortableNodeAddress) { + sort.Sort(addrs) +}