mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
port setNodeAddress to Setter abstraction, port test
also put cloud_request_manager.go in its own package
This commit is contained in:
parent
a3cbbbd931
commit
15b03b8c0c
@ -10,7 +10,6 @@ go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"active_deadline.go",
|
||||
"cloud_request_manager.go",
|
||||
"doc.go",
|
||||
"kubelet.go",
|
||||
"kubelet_getters.go",
|
||||
@ -48,6 +47,7 @@ go_library(
|
||||
"//pkg/kubelet/cadvisor:go_default_library",
|
||||
"//pkg/kubelet/certificate:go_default_library",
|
||||
"//pkg/kubelet/checkpointmanager:go_default_library",
|
||||
"//pkg/kubelet/cloudresource:go_default_library",
|
||||
"//pkg/kubelet/cm:go_default_library",
|
||||
"//pkg/kubelet/config:go_default_library",
|
||||
"//pkg/kubelet/configmap:go_default_library",
|
||||
@ -66,6 +66,7 @@ go_library(
|
||||
"//pkg/kubelet/metrics/collectors:go_default_library",
|
||||
"//pkg/kubelet/mountpod:go_default_library",
|
||||
"//pkg/kubelet/network/dns:go_default_library",
|
||||
"//pkg/kubelet/nodestatus:go_default_library",
|
||||
"//pkg/kubelet/pleg:go_default_library",
|
||||
"//pkg/kubelet/pod:go_default_library",
|
||||
"//pkg/kubelet/preemption:go_default_library",
|
||||
@ -119,7 +120,6 @@ go_library(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",
|
||||
@ -147,7 +147,6 @@ go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"active_deadline_test.go",
|
||||
"cloud_request_manager_test.go",
|
||||
"kubelet_getters_test.go",
|
||||
"kubelet_network_test.go",
|
||||
"kubelet_node_status_test.go",
|
||||
@ -255,6 +254,7 @@ filegroup(
|
||||
"//pkg/kubelet/checkpoint:all-srcs",
|
||||
"//pkg/kubelet/checkpointmanager:all-srcs",
|
||||
"//pkg/kubelet/client:all-srcs",
|
||||
"//pkg/kubelet/cloudresource:all-srcs",
|
||||
"//pkg/kubelet/cm:all-srcs",
|
||||
"//pkg/kubelet/config:all-srcs",
|
||||
"//pkg/kubelet/configmap:all-srcs",
|
||||
|
39
pkg/kubelet/cloudresource/BUILD
Normal file
39
pkg/kubelet/cloudresource/BUILD
Normal file
@ -0,0 +1,39 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["cloud_request_manager.go"],
|
||||
importpath = "k8s.io/kubernetes/pkg/kubelet/cloudresource",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//pkg/cloudprovider:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["cloud_request_manager_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/cloudprovider/providers/fake:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kubelet
|
||||
package cloudresource
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -32,6 +32,14 @@ import (
|
||||
|
||||
var nodeAddressesRetryPeriod = 5 * time.Second
|
||||
|
||||
// SyncManager is an interface for making requests to a cloud provider
|
||||
type SyncManager interface {
|
||||
Run(stopCh <-chan struct{})
|
||||
NodeAddresses() ([]v1.NodeAddress, error)
|
||||
}
|
||||
|
||||
var _ SyncManager = &cloudResourceSyncManager{}
|
||||
|
||||
type cloudResourceSyncManager struct {
|
||||
// Cloud provider interface.
|
||||
cloud cloudprovider.Interface
|
||||
@ -45,9 +53,9 @@ type cloudResourceSyncManager struct {
|
||||
nodeName types.NodeName
|
||||
}
|
||||
|
||||
// NewCloudResourceSyncManager creates a manager responsible for collecting resources
|
||||
// NewSyncManager creates a manager responsible for collecting resources
|
||||
// from a cloud provider through requests that are sensitive to timeouts and hanging
|
||||
func NewCloudResourceSyncManager(cloud cloudprovider.Interface, nodeName types.NodeName, syncPeriod time.Duration) *cloudResourceSyncManager {
|
||||
func NewSyncManager(cloud cloudprovider.Interface, nodeName types.NodeName, syncPeriod time.Duration) SyncManager {
|
||||
return &cloudResourceSyncManager{
|
||||
cloud: cloud,
|
||||
syncPeriod: syncPeriod,
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kubelet
|
||||
package cloudresource
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@ -64,7 +64,7 @@ func TestNodeAddressesRequest(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
manager := NewCloudResourceSyncManager(cloud, "defaultNode", syncPeriod)
|
||||
manager := NewSyncManager(cloud, "defaultNode", syncPeriod).(*cloudResourceSyncManager)
|
||||
go manager.Run(stopCh)
|
||||
|
||||
nodeAddresses, err := collectNodeAddresses(manager)
|
@ -61,6 +61,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
|
||||
kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
|
||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cloudresource"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/configmap"
|
||||
@ -521,7 +522,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
}
|
||||
|
||||
if klet.cloud != nil {
|
||||
klet.cloudResourceSyncManager = NewCloudResourceSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency)
|
||||
klet.cloudResourceSyncManager = cloudresource.NewSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency)
|
||||
}
|
||||
|
||||
var secretManager secret.Manager
|
||||
@ -966,7 +967,7 @@ type Kubelet struct {
|
||||
// Cloud provider interface.
|
||||
cloud cloudprovider.Interface
|
||||
// Handles requests to cloud provider with timeout
|
||||
cloudResourceSyncManager *cloudResourceSyncManager
|
||||
cloudResourceSyncManager cloudresource.SyncManager
|
||||
|
||||
// Indicates that the node initialization happens in an external cloud controller
|
||||
externalCloudProvider bool
|
||||
|
@ -31,7 +31,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
|
||||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||
@ -40,6 +39,7 @@ import (
|
||||
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
|
||||
"k8s.io/kubernetes/pkg/kubelet/events"
|
||||
"k8s.io/kubernetes/pkg/kubelet/nodestatus"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||
nodeutil "k8s.io/kubernetes/pkg/util/node"
|
||||
@ -443,107 +443,6 @@ func (kl *Kubelet) recordNodeStatusEvent(eventType, event string) {
|
||||
kl.recorder.Eventf(kl.nodeRef, eventType, event, "Node %s status is now: %s", kl.nodeName, event)
|
||||
}
|
||||
|
||||
// Set IP and hostname addresses for the node.
|
||||
func (kl *Kubelet) setNodeAddress(node *v1.Node) error {
|
||||
if kl.nodeIP != nil {
|
||||
if err := kl.nodeIPValidator(kl.nodeIP); err != nil {
|
||||
return fmt.Errorf("failed to validate nodeIP: %v", err)
|
||||
}
|
||||
glog.V(2).Infof("Using node IP: %q", kl.nodeIP.String())
|
||||
}
|
||||
|
||||
if kl.externalCloudProvider {
|
||||
if kl.nodeIP != nil {
|
||||
if node.ObjectMeta.Annotations == nil {
|
||||
node.ObjectMeta.Annotations = make(map[string]string)
|
||||
}
|
||||
node.ObjectMeta.Annotations[kubeletapis.AnnotationProvidedIPAddr] = kl.nodeIP.String()
|
||||
}
|
||||
// We rely on the external cloud provider to supply the addresses.
|
||||
return nil
|
||||
}
|
||||
if kl.cloud != nil {
|
||||
nodeAddresses, err := kl.cloudResourceSyncManager.NodeAddresses()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if kl.nodeIP != nil {
|
||||
enforcedNodeAddresses := []v1.NodeAddress{}
|
||||
|
||||
var nodeIPType v1.NodeAddressType
|
||||
for _, nodeAddress := range nodeAddresses {
|
||||
if nodeAddress.Address == kl.nodeIP.String() {
|
||||
enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: nodeAddress.Type, Address: nodeAddress.Address})
|
||||
nodeIPType = nodeAddress.Type
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(enforcedNodeAddresses) > 0 {
|
||||
for _, nodeAddress := range nodeAddresses {
|
||||
if nodeAddress.Type != nodeIPType && nodeAddress.Type != v1.NodeHostName {
|
||||
enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: nodeAddress.Type, Address: nodeAddress.Address})
|
||||
}
|
||||
}
|
||||
|
||||
enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: v1.NodeHostName, Address: kl.GetHostname()})
|
||||
node.Status.Addresses = enforcedNodeAddresses
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("failed to get node address from cloud provider that matches ip: %v", kl.nodeIP)
|
||||
}
|
||||
|
||||
// Only add a NodeHostName address if the cloudprovider did not specify any addresses.
|
||||
// (we assume the cloudprovider is authoritative if it specifies any addresses)
|
||||
if len(nodeAddresses) == 0 {
|
||||
nodeAddresses = []v1.NodeAddress{{Type: v1.NodeHostName, Address: kl.GetHostname()}}
|
||||
}
|
||||
node.Status.Addresses = nodeAddresses
|
||||
} else {
|
||||
var ipAddr net.IP
|
||||
var err error
|
||||
|
||||
// 1) Use nodeIP if set
|
||||
// 2) If the user has specified an IP to HostnameOverride, use it
|
||||
// 3) Lookup the IP from node name by DNS and use the first valid IPv4 address.
|
||||
// If the node does not have a valid IPv4 address, use the first valid IPv6 address.
|
||||
// 4) Try to get the IP from the network interface used as default gateway
|
||||
if kl.nodeIP != nil {
|
||||
ipAddr = kl.nodeIP
|
||||
} else if addr := net.ParseIP(kl.hostname); addr != nil {
|
||||
ipAddr = addr
|
||||
} else {
|
||||
var addrs []net.IP
|
||||
addrs, _ = net.LookupIP(node.Name)
|
||||
for _, addr := range addrs {
|
||||
if err = kl.nodeIPValidator(addr); err == nil {
|
||||
if addr.To4() != nil {
|
||||
ipAddr = addr
|
||||
break
|
||||
}
|
||||
if addr.To16() != nil && ipAddr == nil {
|
||||
ipAddr = addr
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if ipAddr == nil {
|
||||
ipAddr, err = utilnet.ChooseHostInterface()
|
||||
}
|
||||
}
|
||||
|
||||
if ipAddr == nil {
|
||||
// We tried everything we could, but the IP address wasn't fetchable; error out
|
||||
return fmt.Errorf("can't get ip address of node %s. error: %v", node.Name, err)
|
||||
}
|
||||
node.Status.Addresses = []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: ipAddr.String()},
|
||||
{Type: v1.NodeHostName, Address: kl.GetHostname()},
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
|
||||
// Note: avoid blindly overwriting the capacity in case opaque
|
||||
// resources are being advertised.
|
||||
@ -1083,8 +982,13 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
// if cloud is not nil, we expect the cloud resource sync manager to exist
|
||||
var nodeAddressesFunc func() ([]v1.NodeAddress, error)
|
||||
if kl.cloud != nil {
|
||||
nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses
|
||||
}
|
||||
return []func(*v1.Node) error{
|
||||
kl.setNodeAddress,
|
||||
nodestatus.NodeAddress(kl.nodeIP, kl.nodeIPValidator, kl.hostname, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc),
|
||||
withoutError(kl.setNodeStatusInfo),
|
||||
withoutError(kl.setNodeOODCondition),
|
||||
withoutError(kl.setNodeMemoryPressureCondition),
|
||||
|
@ -140,160 +140,6 @@ func (lcm *localCM) GetCapacity() v1.ResourceList {
|
||||
return lcm.capacity
|
||||
}
|
||||
|
||||
func TestNodeStatusWithCloudProviderNodeIP(t *testing.T) {
|
||||
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
|
||||
defer testKubelet.Cleanup()
|
||||
kubelet := testKubelet.kubelet
|
||||
kubelet.kubeClient = nil // ensure only the heartbeat client is used
|
||||
kubelet.hostname = testKubeletHostname
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
nodeIP net.IP
|
||||
nodeAddresses []v1.NodeAddress
|
||||
expectedAddresses []v1.NodeAddress
|
||||
shouldError bool
|
||||
}{
|
||||
{
|
||||
name: "A single InternalIP",
|
||||
nodeIP: net.ParseIP("10.1.1.1"),
|
||||
nodeAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
expectedAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
shouldError: false,
|
||||
},
|
||||
{
|
||||
name: "NodeIP is external",
|
||||
nodeIP: net.ParseIP("55.55.55.55"),
|
||||
nodeAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
|
||||
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
expectedAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
|
||||
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
shouldError: false,
|
||||
},
|
||||
{
|
||||
// Accommodating #45201 and #49202
|
||||
name: "InternalIP and ExternalIP are the same",
|
||||
nodeIP: net.ParseIP("55.55.55.55"),
|
||||
nodeAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
expectedAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
shouldError: false,
|
||||
},
|
||||
{
|
||||
name: "An Internal/ExternalIP, an Internal/ExternalDNS",
|
||||
nodeIP: net.ParseIP("10.1.1.1"),
|
||||
nodeAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
|
||||
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeInternalDNS, Address: "ip-10-1-1-1.us-west-2.compute.internal"},
|
||||
{Type: v1.NodeExternalDNS, Address: "ec2-55-55-55-55.us-west-2.compute.amazonaws.com"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
expectedAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
|
||||
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeInternalDNS, Address: "ip-10-1-1-1.us-west-2.compute.internal"},
|
||||
{Type: v1.NodeExternalDNS, Address: "ec2-55-55-55-55.us-west-2.compute.amazonaws.com"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
shouldError: false,
|
||||
},
|
||||
{
|
||||
name: "An Internal with multiple internal IPs",
|
||||
nodeIP: net.ParseIP("10.1.1.1"),
|
||||
nodeAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
|
||||
{Type: v1.NodeInternalIP, Address: "10.2.2.2"},
|
||||
{Type: v1.NodeInternalIP, Address: "10.3.3.3"},
|
||||
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
expectedAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
|
||||
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
shouldError: false,
|
||||
},
|
||||
{
|
||||
name: "An InternalIP that isn't valid: should error",
|
||||
nodeIP: net.ParseIP("10.2.2.2"),
|
||||
nodeAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
|
||||
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
expectedAddresses: nil,
|
||||
shouldError: true,
|
||||
},
|
||||
}
|
||||
for _, testCase := range cases {
|
||||
// testCase setup
|
||||
existingNode := v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Annotations: make(map[string]string)},
|
||||
Spec: v1.NodeSpec{},
|
||||
}
|
||||
|
||||
kubelet.nodeIP = testCase.nodeIP
|
||||
|
||||
fakeCloud := &fakecloud.FakeCloud{
|
||||
Addresses: testCase.nodeAddresses,
|
||||
Err: nil,
|
||||
}
|
||||
kubelet.cloud = fakeCloud
|
||||
kubelet.cloudResourceSyncManager = NewCloudResourceSyncManager(kubelet.cloud, kubelet.nodeName, kubelet.nodeStatusUpdateFrequency)
|
||||
stopCh := make(chan struct{})
|
||||
go kubelet.cloudResourceSyncManager.Run(stopCh)
|
||||
kubelet.nodeIPValidator = func(nodeIP net.IP) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// execute method
|
||||
err := kubelet.setNodeAddress(&existingNode)
|
||||
close(stopCh)
|
||||
if err != nil && !testCase.shouldError {
|
||||
t.Errorf("Unexpected error for test %s: %q", testCase.name, err)
|
||||
continue
|
||||
} else if err != nil && testCase.shouldError {
|
||||
// expected an error
|
||||
continue
|
||||
}
|
||||
|
||||
// Sort both sets for consistent equality
|
||||
sortNodeAddresses(testCase.expectedAddresses)
|
||||
sortNodeAddresses(existingNode.Status.Addresses)
|
||||
|
||||
assert.True(
|
||||
t,
|
||||
apiequality.Semantic.DeepEqual(
|
||||
testCase.expectedAddresses,
|
||||
existingNode.Status.Addresses,
|
||||
),
|
||||
fmt.Sprintf("Test %s failed %%s", testCase.name),
|
||||
diff.ObjectDiff(testCase.expectedAddresses, existingNode.Status.Addresses),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// sortableNodeAddress is a type for sorting []v1.NodeAddress
|
||||
type sortableNodeAddress []v1.NodeAddress
|
||||
|
||||
|
@ -1,11 +1,17 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["setters.go"],
|
||||
importpath = "k8s.io/kubernetes/pkg/kubelet/nodestatus",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = ["//vendor/k8s.io/api/core/v1:go_default_library"],
|
||||
deps = [
|
||||
"//pkg/cloudprovider:go_default_library",
|
||||
"//pkg/kubelet/apis:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
@ -21,3 +27,17 @@ filegroup(
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["setters_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/cloudprovider/providers/fake:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -17,9 +17,125 @@ limitations under the License.
|
||||
package nodestatus
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// Setter modifies the node in-place, and returns an error if the modification failed.
|
||||
// Setters may partially mutate the node before returning an error.
|
||||
type Setter func(node *v1.Node) error
|
||||
|
||||
// NodeAddress returns a Setter that updates address-related information on the node.
|
||||
func NodeAddress(nodeIP net.IP, // typically Kubelet.nodeIP
|
||||
validateNodeIPFunc func(net.IP) error, // typically Kubelet.nodeIPValidator
|
||||
hostname string, // typically Kubelet.hostname
|
||||
externalCloudProvider bool, // typically Kubelet.externalCloudProvider
|
||||
cloud cloudprovider.Interface, // typically Kubelet.cloud
|
||||
nodeAddressesFunc func() ([]v1.NodeAddress, error), // typically Kubelet.cloudResourceSyncManager.NodeAddresses
|
||||
) Setter {
|
||||
return func(node *v1.Node) error {
|
||||
if nodeIP != nil {
|
||||
if err := validateNodeIPFunc(nodeIP); err != nil {
|
||||
return fmt.Errorf("failed to validate nodeIP: %v", err)
|
||||
}
|
||||
glog.V(2).Infof("Using node IP: %q", nodeIP.String())
|
||||
}
|
||||
|
||||
if externalCloudProvider {
|
||||
if nodeIP != nil {
|
||||
if node.ObjectMeta.Annotations == nil {
|
||||
node.ObjectMeta.Annotations = make(map[string]string)
|
||||
}
|
||||
node.ObjectMeta.Annotations[kubeletapis.AnnotationProvidedIPAddr] = nodeIP.String()
|
||||
}
|
||||
// We rely on the external cloud provider to supply the addresses.
|
||||
return nil
|
||||
}
|
||||
if cloud != nil {
|
||||
nodeAddresses, err := nodeAddressesFunc()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if nodeIP != nil {
|
||||
enforcedNodeAddresses := []v1.NodeAddress{}
|
||||
|
||||
var nodeIPType v1.NodeAddressType
|
||||
for _, nodeAddress := range nodeAddresses {
|
||||
if nodeAddress.Address == nodeIP.String() {
|
||||
enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: nodeAddress.Type, Address: nodeAddress.Address})
|
||||
nodeIPType = nodeAddress.Type
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(enforcedNodeAddresses) > 0 {
|
||||
for _, nodeAddress := range nodeAddresses {
|
||||
if nodeAddress.Type != nodeIPType && nodeAddress.Type != v1.NodeHostName {
|
||||
enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: nodeAddress.Type, Address: nodeAddress.Address})
|
||||
}
|
||||
}
|
||||
|
||||
enforcedNodeAddresses = append(enforcedNodeAddresses, v1.NodeAddress{Type: v1.NodeHostName, Address: hostname})
|
||||
node.Status.Addresses = enforcedNodeAddresses
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("failed to get node address from cloud provider that matches ip: %v", nodeIP)
|
||||
}
|
||||
|
||||
// Only add a NodeHostName address if the cloudprovider did not specify any addresses.
|
||||
// (we assume the cloudprovider is authoritative if it specifies any addresses)
|
||||
if len(nodeAddresses) == 0 {
|
||||
nodeAddresses = []v1.NodeAddress{{Type: v1.NodeHostName, Address: hostname}}
|
||||
}
|
||||
node.Status.Addresses = nodeAddresses
|
||||
} else {
|
||||
var ipAddr net.IP
|
||||
var err error
|
||||
|
||||
// 1) Use nodeIP if set
|
||||
// 2) If the user has specified an IP to HostnameOverride, use it
|
||||
// 3) Lookup the IP from node name by DNS and use the first valid IPv4 address.
|
||||
// If the node does not have a valid IPv4 address, use the first valid IPv6 address.
|
||||
// 4) Try to get the IP from the network interface used as default gateway
|
||||
if nodeIP != nil {
|
||||
ipAddr = nodeIP
|
||||
} else if addr := net.ParseIP(hostname); addr != nil {
|
||||
ipAddr = addr
|
||||
} else {
|
||||
var addrs []net.IP
|
||||
addrs, _ = net.LookupIP(node.Name)
|
||||
for _, addr := range addrs {
|
||||
if err = validateNodeIPFunc(addr); err == nil {
|
||||
if addr.To4() != nil {
|
||||
ipAddr = addr
|
||||
break
|
||||
}
|
||||
if addr.To16() != nil && ipAddr == nil {
|
||||
ipAddr = addr
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if ipAddr == nil {
|
||||
ipAddr, err = utilnet.ChooseHostInterface()
|
||||
}
|
||||
}
|
||||
|
||||
if ipAddr == nil {
|
||||
// We tried everything we could, but the IP address wasn't fetchable; error out
|
||||
return fmt.Errorf("can't get ip address of node %s. error: %v", node.Name, err)
|
||||
}
|
||||
node.Status.Addresses = []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: ipAddr.String()},
|
||||
{Type: v1.NodeHostName, Address: hostname},
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
200
pkg/kubelet/nodestatus/setters_test.go
Normal file
200
pkg/kubelet/nodestatus/setters_test.go
Normal file
@ -0,0 +1,200 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package nodestatus
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
const (
|
||||
testKubeletHostname = "127.0.0.1"
|
||||
)
|
||||
|
||||
// TODO(mtaufen): below is ported from the old kubelet_node_status_test.go code, potentially add more test coverage for NodeAddress setter in future
|
||||
func TestNodeAddress(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
nodeIP net.IP
|
||||
nodeAddresses []v1.NodeAddress
|
||||
expectedAddresses []v1.NodeAddress
|
||||
shouldError bool
|
||||
}{
|
||||
{
|
||||
name: "A single InternalIP",
|
||||
nodeIP: net.ParseIP("10.1.1.1"),
|
||||
nodeAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
expectedAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
shouldError: false,
|
||||
},
|
||||
{
|
||||
name: "NodeIP is external",
|
||||
nodeIP: net.ParseIP("55.55.55.55"),
|
||||
nodeAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
|
||||
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
expectedAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
|
||||
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
shouldError: false,
|
||||
},
|
||||
{
|
||||
// Accommodating #45201 and #49202
|
||||
name: "InternalIP and ExternalIP are the same",
|
||||
nodeIP: net.ParseIP("55.55.55.55"),
|
||||
nodeAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
expectedAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
shouldError: false,
|
||||
},
|
||||
{
|
||||
name: "An Internal/ExternalIP, an Internal/ExternalDNS",
|
||||
nodeIP: net.ParseIP("10.1.1.1"),
|
||||
nodeAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
|
||||
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeInternalDNS, Address: "ip-10-1-1-1.us-west-2.compute.internal"},
|
||||
{Type: v1.NodeExternalDNS, Address: "ec2-55-55-55-55.us-west-2.compute.amazonaws.com"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
expectedAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
|
||||
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeInternalDNS, Address: "ip-10-1-1-1.us-west-2.compute.internal"},
|
||||
{Type: v1.NodeExternalDNS, Address: "ec2-55-55-55-55.us-west-2.compute.amazonaws.com"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
shouldError: false,
|
||||
},
|
||||
{
|
||||
name: "An Internal with multiple internal IPs",
|
||||
nodeIP: net.ParseIP("10.1.1.1"),
|
||||
nodeAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
|
||||
{Type: v1.NodeInternalIP, Address: "10.2.2.2"},
|
||||
{Type: v1.NodeInternalIP, Address: "10.3.3.3"},
|
||||
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
expectedAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
|
||||
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
shouldError: false,
|
||||
},
|
||||
{
|
||||
name: "An InternalIP that isn't valid: should error",
|
||||
nodeIP: net.ParseIP("10.2.2.2"),
|
||||
nodeAddresses: []v1.NodeAddress{
|
||||
{Type: v1.NodeInternalIP, Address: "10.1.1.1"},
|
||||
{Type: v1.NodeExternalIP, Address: "55.55.55.55"},
|
||||
{Type: v1.NodeHostName, Address: testKubeletHostname},
|
||||
},
|
||||
expectedAddresses: nil,
|
||||
shouldError: true,
|
||||
},
|
||||
}
|
||||
for _, testCase := range cases {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
// testCase setup
|
||||
existingNode := &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Annotations: make(map[string]string)},
|
||||
Spec: v1.NodeSpec{},
|
||||
}
|
||||
|
||||
nodeIP := testCase.nodeIP
|
||||
nodeIPValidator := func(nodeIP net.IP) error {
|
||||
return nil
|
||||
}
|
||||
hostname := testKubeletHostname
|
||||
externalCloudProvider := false
|
||||
cloud := &fakecloud.FakeCloud{
|
||||
Addresses: testCase.nodeAddresses,
|
||||
Err: nil,
|
||||
}
|
||||
nodeAddressesFunc := func() ([]v1.NodeAddress, error) {
|
||||
return testCase.nodeAddresses, nil
|
||||
}
|
||||
|
||||
// construct setter
|
||||
setter := NodeAddress(nodeIP,
|
||||
nodeIPValidator,
|
||||
hostname,
|
||||
externalCloudProvider,
|
||||
cloud,
|
||||
nodeAddressesFunc)
|
||||
|
||||
// call setter on existing node
|
||||
err := setter(existingNode)
|
||||
if err != nil && !testCase.shouldError {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
} else if err != nil && testCase.shouldError {
|
||||
// expected an error, and got one, so just return early here
|
||||
return
|
||||
}
|
||||
|
||||
// Sort both sets for consistent equality
|
||||
sortNodeAddresses(testCase.expectedAddresses)
|
||||
sortNodeAddresses(existingNode.Status.Addresses)
|
||||
|
||||
assert.True(t, apiequality.Semantic.DeepEqual(testCase.expectedAddresses, existingNode.Status.Addresses),
|
||||
"Diff: %s", diff.ObjectDiff(testCase.expectedAddresses, existingNode.Status.Addresses))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Test Helpers:
|
||||
|
||||
// sortableNodeAddress is a type for sorting []v1.NodeAddress
|
||||
type sortableNodeAddress []v1.NodeAddress
|
||||
|
||||
func (s sortableNodeAddress) Len() int { return len(s) }
|
||||
func (s sortableNodeAddress) Less(i, j int) bool {
|
||||
return (string(s[i].Type) + s[i].Address) < (string(s[j].Type) + s[j].Address)
|
||||
}
|
||||
func (s sortableNodeAddress) Swap(i, j int) { s[j], s[i] = s[i], s[j] }
|
||||
|
||||
func sortNodeAddresses(addrs sortableNodeAddress) {
|
||||
sort.Sort(addrs)
|
||||
}
|
Loading…
Reference in New Issue
Block a user