diff --git a/cmd/cloud-controller-manager/app/controllermanager.go b/cmd/cloud-controller-manager/app/controllermanager.go index 973f2f8a22b..5cc76b5b795 100644 --- a/cmd/cloud-controller-manager/app/controllermanager.go +++ b/cmd/cloud-controller-manager/app/controllermanager.go @@ -211,7 +211,8 @@ func StartControllers(s *options.CloudControllerManagerServer, kubeconfig *restc nodeController := nodecontroller.NewCloudNodeController( sharedInformers.Core().V1().Nodes(), client("cloud-node-controller"), cloud, - s.NodeMonitorPeriod.Duration) + s.NodeMonitorPeriod.Duration, + s.NodeStatusUpdateFrequency.Duration) nodeController.Run() time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) diff --git a/cmd/cloud-controller-manager/app/options/options.go b/cmd/cloud-controller-manager/app/options/options.go index 330601c4fb1..576c6922cd4 100644 --- a/cmd/cloud-controller-manager/app/options/options.go +++ b/cmd/cloud-controller-manager/app/options/options.go @@ -37,6 +37,9 @@ type CloudControllerManagerServer struct { Master string Kubeconfig string + + // NodeStatusUpdateFrequency is the freuency at which the controller updates nodes' status + NodeStatusUpdateFrequency metav1.Duration } // NewCloudControllerManagerServer creates a new ExternalCMServer with a default config. @@ -56,6 +59,7 @@ func NewCloudControllerManagerServer() *CloudControllerManagerServer { LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(), ControllerStartInterval: metav1.Duration{Duration: 0 * time.Second}, }, + NodeStatusUpdateFrequency: metav1.Duration{Duration: 5 * time.Minute}, } s.LeaderElection.LeaderElect = true return &s @@ -70,6 +74,7 @@ func (s *CloudControllerManagerServer) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.MinResyncPeriod.Duration, "min-resync-period", s.MinResyncPeriod.Duration, "The resync period in reflectors will be random between MinResyncPeriod and 2*MinResyncPeriod") fs.DurationVar(&s.NodeMonitorPeriod.Duration, "node-monitor-period", s.NodeMonitorPeriod.Duration, "The period for syncing NodeStatus in NodeController.") + fs.DurationVar(&s.NodeStatusUpdateFrequency.Duration, "node-status-update-frequency", s.NodeStatusUpdateFrequency.Duration, "Specifies how often the controller updates nodes' status.") fs.StringVar(&s.ServiceAccountKeyFile, "service-account-private-key-file", s.ServiceAccountKeyFile, "Filename containing a PEM-encoded private RSA or ECDSA key used to sign service account tokens.") fs.BoolVar(&s.UseServiceAccountCredentials, "use-service-account-credentials", s.UseServiceAccountCredentials, "If true, use individual service account credentials for each controller.") fs.DurationVar(&s.RouteReconciliationPeriod.Duration, "route-reconciliation-period", s.RouteReconciliationPeriod.Duration, "The period for reconciling routes created for Nodes by cloud provider.") diff --git a/cmd/kubelet/app/options/options.go b/cmd/kubelet/app/options/options.go index ab13c5ff811..69b57edbe48 100644 --- a/cmd/kubelet/app/options/options.go +++ b/cmd/kubelet/app/options/options.go @@ -77,6 +77,10 @@ type KubeletFlags struct { // DockershimRootDirectory is the path to the dockershim root directory. Defaults to // /var/lib/dockershim if unset. Exposed for integration testing (e.g. in OpenShift). DockershimRootDirectory string + + // This flag, if set, sets the unique id of the instance that an external provider (i.e. cloudprovider) + // can use to identify a specific node + ProviderID string } // KubeletServer encapsulates all of the parameters necessary for starting up @@ -136,6 +140,7 @@ func (f *KubeletFlags) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&f.NodeIP, "node-ip", f.NodeIP, "IP address of the node. If set, kubelet will use this IP address for the node") fs.StringVar(&f.DockershimRootDirectory, "experimental-dockershim-root-directory", f.DockershimRootDirectory, "Path to the dockershim root directory.") + fs.StringVar(&f.ProviderID, "provider-id", f.ProviderID, "Unique identifier for identifying the node in a machine database, i.e cloudprovider") fs.MarkHidden("experimental-dockershim-root-directory") } diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 30bf6b95067..d01f80bfa39 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -811,7 +811,7 @@ func RunKubelet(kubeFlags *options.KubeletFlags, kubeCfg *componentconfig.Kubele if kubeDeps.OSInterface == nil { kubeDeps.OSInterface = kubecontainer.RealOS{} } - k, err := builder(kubeCfg, kubeDeps, standaloneMode, kubeFlags.HostnameOverride, kubeFlags.NodeIP, kubeFlags.DockershimRootDirectory) + k, err := builder(kubeCfg, kubeDeps, standaloneMode, kubeFlags.HostnameOverride, kubeFlags.NodeIP, kubeFlags.DockershimRootDirectory, kubeFlags.ProviderID) if err != nil { return fmt.Errorf("failed to create kubelet: %v", err) } @@ -891,11 +891,11 @@ func startKubelet(k kubelet.KubeletBootstrap, podCfg *config.PodConfig, kubeCfg } } -func CreateAndInitKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, standaloneMode bool, hostnameOverride, nodeIP, dockershimRootDir string) (k kubelet.KubeletBootstrap, err error) { +func CreateAndInitKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, standaloneMode bool, hostnameOverride, nodeIP, dockershimRootDir, providerID string) (k kubelet.KubeletBootstrap, err error) { // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop // up into "per source" synchronizations - k, err = kubelet.NewMainKubelet(kubeCfg, kubeDeps, standaloneMode, hostnameOverride, nodeIP, dockershimRootDir) + k, err = kubelet.NewMainKubelet(kubeCfg, kubeDeps, standaloneMode, hostnameOverride, nodeIP, dockershimRootDir, providerID) if err != nil { return nil, err } diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 41197f106e8..80e9af6f47e 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -545,6 +545,7 @@ private-mountns prom-push-gateway protect-kernel-defaults proto-import +provider-id proxy-bindall proxy-client-cert-file proxy-client-key-file diff --git a/pkg/cloudprovider/providers/fake/fake.go b/pkg/cloudprovider/providers/fake/fake.go index ad0e620bba5..d7b59ac7ffa 100644 --- a/pkg/cloudprovider/providers/fake/fake.go +++ b/pkg/cloudprovider/providers/fake/fake.go @@ -28,7 +28,7 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" ) -const ProviderName = "fake" +const defaultProviderName = "fake" // FakeBalancer is a fake storage of balancer information type FakeBalancer struct { @@ -61,6 +61,8 @@ type FakeCloud struct { UpdateCalls []FakeUpdateBalancerCall RouteMap map[string]*FakeRoute Lock sync.Mutex + Provider string + addCallLock sync.Mutex cloudprovider.Zone } @@ -70,6 +72,8 @@ type FakeRoute struct { } func (f *FakeCloud) addCall(desc string) { + f.addCallLock.Lock() + defer f.addCallLock.Unlock() f.Calls = append(f.Calls, desc) } @@ -92,7 +96,10 @@ func (f *FakeCloud) Clusters() (cloudprovider.Clusters, bool) { // ProviderName returns the cloud provider ID. func (f *FakeCloud) ProviderName() string { - return ProviderName + if f.Provider == "" { + return defaultProviderName + } + return f.Provider } // ScrubDNS filters DNS settings for pods. diff --git a/pkg/controller/cloud/BUILD b/pkg/controller/cloud/BUILD index 2c8b4962bf2..b267fd43a2e 100644 --- a/pkg/controller/cloud/BUILD +++ b/pkg/controller/cloud/BUILD @@ -18,7 +18,9 @@ go_library( "//pkg/api/v1/node:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library", + "//pkg/client/retry:go_default_library", "//pkg/cloudprovider:go_default_library", + "//pkg/util/node:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", @@ -26,6 +28,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", ], ) @@ -46,6 +49,7 @@ go_test( "//pkg/controller/node/testutil:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", diff --git a/pkg/controller/cloud/nodecontroller.go b/pkg/controller/cloud/nodecontroller.go index 09ad6ccdf7d..8d21c09e50d 100644 --- a/pkg/controller/cloud/nodecontroller.go +++ b/pkg/controller/cloud/nodecontroller.go @@ -28,15 +28,24 @@ import ( "k8s.io/apimachinery/pkg/util/wait" v1core "k8s.io/client-go/kubernetes/typed/core/v1" clientv1 "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" - nodeutil "k8s.io/kubernetes/pkg/api/v1/node" + nodeutilv1 "k8s.io/kubernetes/pkg/api/v1/node" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1" + clientretry "k8s.io/kubernetes/pkg/client/retry" "k8s.io/kubernetes/pkg/cloudprovider" + nodeutil "k8s.io/kubernetes/pkg/util/node" ) +var UpdateNodeSpecBackoff = wait.Backoff{ + Steps: 20, + Duration: 50 * time.Millisecond, + Jitter: 1.0, +} + type CloudNodeController struct { nodeInformer coreinformers.NodeInformer kubeClient clientset.Interface @@ -48,6 +57,8 @@ type CloudNodeController struct { // check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod // set in controller-manager nodeMonitorPeriod time.Duration + + nodeStatusUpdateFrequency time.Duration } const ( @@ -63,96 +74,358 @@ func NewCloudNodeController( nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface, cloud cloudprovider.Interface, - nodeMonitorPeriod time.Duration) *CloudNodeController { + nodeMonitorPeriod time.Duration, + nodeStatusUpdateFrequency time.Duration) *CloudNodeController { eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "cloudcontrollermanager"}) eventBroadcaster.StartLogging(glog.Infof) if kubeClient != nil { glog.V(0).Infof("Sending events to api server.") - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")}) + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")}) } else { glog.V(0).Infof("No api server defined - no events will be sent to API server.") } cnc := &CloudNodeController{ - nodeInformer: nodeInformer, - kubeClient: kubeClient, - recorder: recorder, - cloud: cloud, - nodeMonitorPeriod: nodeMonitorPeriod, + nodeInformer: nodeInformer, + kubeClient: kubeClient, + recorder: recorder, + cloud: cloud, + nodeMonitorPeriod: nodeMonitorPeriod, + nodeStatusUpdateFrequency: nodeStatusUpdateFrequency, } + + nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: cnc.AddCloudNode, + }) + return cnc } // This controller deletes a node if kubelet is not reporting // and the node is gone from the cloud provider. func (cnc *CloudNodeController) Run() { - go func() { - defer utilruntime.HandleCrash() + defer utilruntime.HandleCrash() - go wait.Until(func() { - nodes, err := cnc.kubeClient.Core().Nodes().List(metav1.ListOptions{ResourceVersion: "0"}) - if err != nil { - glog.Errorf("Error monitoring node status: %v", err) - } + // The following loops run communicate with the APIServer with a worst case complexity + // of O(num_nodes) per cycle. These functions are justified here because these events fire + // very infrequently. DO NOT MODIFY this to perform frequent operations. - for i := range nodes.Items { - var currentReadyCondition *v1.NodeCondition - node := &nodes.Items[i] - // Try to get the current node status - // If node status is empty, then kubelet has not posted ready status yet. In this case, process next node - for rep := 0; rep < nodeStatusUpdateRetry; rep++ { - _, currentReadyCondition = nodeutil.GetNodeCondition(&node.Status, v1.NodeReady) - if currentReadyCondition != nil { - break - } - name := node.Name - node, err = cnc.kubeClient.Core().Nodes().Get(name, metav1.GetOptions{}) - if err != nil { - glog.Errorf("Failed while getting a Node to retry updating NodeStatus. Probably Node %s was deleted.", name) - break - } - time.Sleep(retrySleepTime) - } - if currentReadyCondition == nil { - glog.Errorf("Update status of Node %v from CloudNodeController exceeds retry count.", node.Name) - continue - } - // If the known node status says that Node is NotReady, then check if the node has been removed - // from the cloud provider. If node cannot be found in cloudprovider, then delete the node immediately - if currentReadyCondition != nil { - if currentReadyCondition.Status != v1.ConditionTrue { - instances, ok := cnc.cloud.Instances() - if !ok { - glog.Errorf("cloud provider does not support instances.") - continue - } - // Check with the cloud provider to see if the node still exists. If it - // doesn't, delete the node immediately. - if _, err := instances.ExternalID(types.NodeName(node.Name)); err != nil { - if err == cloudprovider.InstanceNotFound { - glog.V(2).Infof("Deleting node no longer present in cloud provider: %s", node.Name) - ref := &v1.ObjectReference{ - Kind: "Node", - Name: node.Name, - UID: types.UID(node.UID), - Namespace: "", - } - glog.V(2).Infof("Recording %s event message for node %s", "DeletingNode", node.Name) - cnc.recorder.Eventf(ref, v1.EventTypeNormal, fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name), "Node %s event: %s", node.Name, "DeletingNode") - go func(nodeName string) { - defer utilruntime.HandleCrash() - if err := cnc.kubeClient.Core().Nodes().Delete(node.Name, nil); err != nil { - glog.Errorf("unable to delete node %q: %v", node.Name, err) - } - }(node.Name) - } - glog.Errorf("Error getting node data from cloud: %v", err) - } - } - } - } - }, cnc.nodeMonitorPeriod, wait.NeverStop) - }() + // Start a loop to periodically update the node addresses obtained from the cloud + go wait.Until(cnc.UpdateNodeStatus, cnc.nodeStatusUpdateFrequency, wait.NeverStop) + + // Start a loop to periodically check if any nodes have been deleted from cloudprovider + go wait.Until(cnc.MonitorNode, cnc.nodeMonitorPeriod, wait.NeverStop) +} + +// UpdateNodeStatus updates the node status, such as node addresses +func (cnc *CloudNodeController) UpdateNodeStatus() { + instances, ok := cnc.cloud.Instances() + if !ok { + utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider")) + return + } + + nodes, err := cnc.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{ResourceVersion: "0"}) + if err != nil { + glog.Errorf("Error monitoring node status: %v", err) + return + } + + for i := range nodes.Items { + cnc.updateNodeAddress(&nodes.Items[i], instances) + } +} + +// UpdateNodeAddress updates the nodeAddress of a single node +func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloudprovider.Instances) { + // Do not process nodes that are still tainted + cloudTaint := getCloudTaint(node.Spec.Taints) + if cloudTaint != nil { + glog.V(5).Infof("This node %s is still tainted. Will not process.", node.Name) + return + } + + nodeAddresses, err := getNodeAddressesByProviderIDOrName(instances, node) + if err != nil { + glog.Errorf("%v", err) + return + } + // Check if a hostname address exists in the cloud provided addresses + hostnameExists := false + for i := range nodeAddresses { + if nodeAddresses[i].Type == v1.NodeHostName { + hostnameExists = true + } + } + // If hostname was not present in cloud provided addresses, use the hostname + // from the existing node (populated by kubelet) + if !hostnameExists { + for _, addr := range node.Status.Addresses { + if addr.Type == v1.NodeHostName { + nodeAddresses = append(nodeAddresses, addr) + } + } + } + // If nodeIP was suggested by user, ensure that + // it can be found in the cloud as well (consistent with the behaviour in kubelet) + if nodeIP, ok := ensureNodeProvidedIPExists(node, nodeAddresses); ok { + if nodeIP == nil { + glog.Errorf("Specified Node IP not found in cloudprovider") + return + } + nodeAddresses = []v1.NodeAddress{*nodeIP} + } + nodeCopy, err := api.Scheme.DeepCopy(node) + if err != nil { + glog.Errorf("failed to copy node to a new object") + return + } + newNode := nodeCopy.(*v1.Node) + newNode.Status.Addresses = nodeAddresses + if !nodeAddressesChangeDetected(node.Status.Addresses, newNode.Status.Addresses) { + return + } + _, err = nodeutil.PatchNodeStatus(cnc.kubeClient, types.NodeName(node.Name), node, newNode) + if err != nil { + glog.Errorf("Error patching node with cloud ip addresses = [%v]", err) + } +} + +// Monitor node queries the cloudprovider for non-ready nodes and deletes them +// if they cannot be found in the cloud provider +func (cnc *CloudNodeController) MonitorNode() { + instances, ok := cnc.cloud.Instances() + if !ok { + utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider")) + return + } + + nodes, err := cnc.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{ResourceVersion: "0"}) + if err != nil { + glog.Errorf("Error monitoring node status: %v", err) + return + } + + for i := range nodes.Items { + var currentReadyCondition *v1.NodeCondition + node := &nodes.Items[i] + // Try to get the current node status + // If node status is empty, then kubelet has not posted ready status yet. In this case, process next node + for rep := 0; rep < nodeStatusUpdateRetry; rep++ { + _, currentReadyCondition = nodeutilv1.GetNodeCondition(&node.Status, v1.NodeReady) + if currentReadyCondition != nil { + break + } + name := node.Name + node, err = cnc.kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{}) + if err != nil { + glog.Errorf("Failed while getting a Node to retry updating NodeStatus. Probably Node %s was deleted.", name) + break + } + time.Sleep(retrySleepTime) + } + if currentReadyCondition == nil { + glog.Errorf("Update status of Node %v from CloudNodeController exceeds retry count.", node.Name) + continue + } + // If the known node status says that Node is NotReady, then check if the node has been removed + // from the cloud provider. If node cannot be found in cloudprovider, then delete the node immediately + if currentReadyCondition != nil { + if currentReadyCondition.Status != v1.ConditionTrue { + // Check with the cloud provider to see if the node still exists. If it + // doesn't, delete the node immediately. + if _, err := instances.ExternalID(types.NodeName(node.Name)); err != nil { + if err == cloudprovider.InstanceNotFound { + glog.V(2).Infof("Deleting node no longer present in cloud provider: %s", node.Name) + ref := &v1.ObjectReference{ + Kind: "Node", + Name: node.Name, + UID: types.UID(node.UID), + Namespace: "", + } + glog.V(2).Infof("Recording %s event message for node %s", "DeletingNode", node.Name) + cnc.recorder.Eventf(ref, v1.EventTypeNormal, fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name), "Node %s event: %s", node.Name, "DeletingNode") + go func(nodeName string) { + defer utilruntime.HandleCrash() + if err := cnc.kubeClient.CoreV1().Nodes().Delete(node.Name, nil); err != nil { + glog.Errorf("unable to delete node %q: %v", node.Name, err) + } + }(node.Name) + } + glog.Errorf("Error getting node data from cloud: %v", err) + } + } + } + } +} + +// This processes nodes that were added into the cluster, and cloud initializea them if appropriate +func (cnc *CloudNodeController) AddCloudNode(obj interface{}) { + node := obj.(*v1.Node) + + instances, ok := cnc.cloud.Instances() + if !ok { + utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider")) + return + } + + cloudTaint := getCloudTaint(node.Spec.Taints) + if cloudTaint == nil { + glog.V(2).Infof("This node %s is registered without the cloud taint. Will not process.", node.Name) + return + } + + err := clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error { + curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + nodeAddresses, err := getNodeAddressesByProviderIDOrName(instances, curNode) + if err != nil { + glog.Errorf("%v", err) + return nil + } + + // If user provided an IP address, ensure that IP address is found + // in the cloud provider before removing the taint on the node + if nodeIP, ok := ensureNodeProvidedIPExists(curNode, nodeAddresses); ok { + if nodeIP == nil { + glog.Errorf("failed to get specified nodeIP in cloudprovider") + return nil + } + } + + if instanceType, err := getInstanceTypeByProviderIDOrName(instances, curNode); err != nil { + glog.Errorf("%v", err) + return err + } else if instanceType != "" { + glog.Infof("Adding node label from cloud provider: %s=%s", metav1.LabelInstanceType, instanceType) + curNode.ObjectMeta.Labels[metav1.LabelInstanceType] = instanceType + } + + // TODO(wlan0): Move this logic to the route controller using the node taint instead of condition + // Since there are node taints, do we still need this? + // This condition marks the node as unusable until routes are initialized in the cloud provider + if cnc.cloud.ProviderName() == "gce" { + curNode.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{ + Type: v1.NodeNetworkUnavailable, + Status: v1.ConditionTrue, + Reason: "NoRouteCreated", + Message: "Node created without a route", + LastTransitionTime: metav1.Now(), + }) + } + + if zones, ok := cnc.cloud.Zones(); ok { + zone, err := zones.GetZone() + if err != nil { + return fmt.Errorf("failed to get zone from cloud provider: %v", err) + } + if zone.FailureDomain != "" { + glog.Infof("Adding node label from cloud provider: %s=%s", metav1.LabelZoneFailureDomain, zone.FailureDomain) + curNode.ObjectMeta.Labels[metav1.LabelZoneFailureDomain] = zone.FailureDomain + } + if zone.Region != "" { + glog.Infof("Adding node label from cloud provider: %s=%s", metav1.LabelZoneRegion, zone.Region) + curNode.ObjectMeta.Labels[metav1.LabelZoneRegion] = zone.Region + } + } + + curNode.Spec.Taints = excludeTaintFromList(curNode.Spec.Taints, *cloudTaint) + + _, err = cnc.kubeClient.CoreV1().Nodes().Update(curNode) + if err != nil { + return err + } + // After adding, call UpdateNodeAddress to set the CloudProvider provided IPAddresses + // So that users do not see any significant delay in IP addresses being filled into the node + cnc.updateNodeAddress(curNode, instances) + return nil + }) + if err != nil { + utilruntime.HandleError(err) + return + } +} + +func getCloudTaint(taints []v1.Taint) *v1.Taint { + for _, taint := range taints { + if taint.Key == metav1.TaintExternalCloudProvider { + return &taint + } + } + return nil +} + +func excludeTaintFromList(taints []v1.Taint, toExclude v1.Taint) []v1.Taint { + newTaints := []v1.Taint{} + for _, taint := range taints { + if toExclude.MatchTaint(&taint) { + continue + } + newTaints = append(newTaints, taint) + } + return newTaints +} + +func getNodeAddressesByProviderIDOrName(instances cloudprovider.Instances, node *v1.Node) ([]v1.NodeAddress, error) { + nodeAddresses, err := instances.NodeAddressesByProviderID(node.Spec.ProviderID) + if err != nil { + providerIDErr := err + nodeAddresses, err = instances.NodeAddresses(types.NodeName(node.Name)) + if err != nil { + return nil, fmt.Errorf("NodeAddress: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err) + } + } + return nodeAddresses, nil +} + +func nodeAddressesChangeDetected(addressSet1, addressSet2 []v1.NodeAddress) bool { + if len(addressSet1) != len(addressSet2) { + return true + } + addressMap1 := map[v1.NodeAddressType]string{} + addressMap2 := map[v1.NodeAddressType]string{} + + for i := range addressSet1 { + addressMap1[addressSet1[i].Type] = addressSet1[i].Address + addressMap2[addressSet2[i].Type] = addressSet2[i].Address + } + + for k, v := range addressMap1 { + if addressMap2[k] != v { + return true + } + } + return false +} + +func ensureNodeProvidedIPExists(node *v1.Node, nodeAddresses []v1.NodeAddress) (*v1.NodeAddress, bool) { + var nodeIP *v1.NodeAddress + nodeIPExists := false + if providedIP, ok := node.ObjectMeta.Annotations[metav1.AnnotationProvidedIPAddr]; ok { + nodeIPExists = true + for i := range nodeAddresses { + if nodeAddresses[i].Address == providedIP { + nodeIP = &nodeAddresses[i] + break + } + } + } + return nodeIP, nodeIPExists +} + +func getInstanceTypeByProviderIDOrName(instances cloudprovider.Instances, node *v1.Node) (string, error) { + instanceType, err := instances.InstanceTypeByProviderID(node.Spec.ProviderID) + if err != nil { + providerIDErr := err + instanceType, err = instances.InstanceType(types.NodeName(node.Name)) + if err != nil { + return "", fmt.Errorf("InstanceType: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err) + } + } + return instanceType, err } diff --git a/pkg/controller/cloud/nodecontroller_test.go b/pkg/controller/cloud/nodecontroller_test.go index 74f2267101b..73ad2b990a3 100644 --- a/pkg/controller/cloud/nodecontroller_test.go +++ b/pkg/controller/cloud/nodecontroller_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/record" @@ -103,11 +104,12 @@ func TestNodeDeleted(t *testing.T) { eventBroadcaster := record.NewBroadcaster() cloudNodeController := &CloudNodeController{ - kubeClient: fnh, - nodeInformer: factory.Core().V1().Nodes(), - cloud: &fakecloud.FakeCloud{Err: cloudprovider.InstanceNotFound}, - nodeMonitorPeriod: 5 * time.Second, - recorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "controllermanager"}), + kubeClient: fnh, + nodeInformer: factory.Core().V1().Nodes(), + cloud: &fakecloud.FakeCloud{Err: cloudprovider.InstanceNotFound}, + nodeMonitorPeriod: 1 * time.Second, + recorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "cloud-controller-manager"}), + nodeStatusUpdateFrequency: 1 * time.Second, } eventBroadcaster.StartLogging(glog.Infof) @@ -122,3 +124,682 @@ func TestNodeDeleted(t *testing.T) { t.Errorf("Node was not deleted") } } + +// This test checks that a node with the external cloud provider taint is cloudprovider initialized +func TestNodeInitialized(t *testing.T) { + fnh := &testutil.FakeNodeHandler{ + Existing: []*v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionUnknown, + LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + }, + }, + }, + Spec: v1.NodeSpec{ + Taints: []v1.Taint{ + { + Key: metav1.TaintExternalCloudProvider, + Value: "true", + Effect: v1.TaintEffectNoSchedule, + }, + }, + }, + }, + }, + Clientset: fake.NewSimpleClientset(&v1.PodList{}), + DeleteWaitChan: make(chan struct{}), + } + + factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc()) + + fakeCloud := &fakecloud.FakeCloud{ + InstanceTypes: map[types.NodeName]string{ + types.NodeName("node0"): "t1.micro", + }, + Addresses: []v1.NodeAddress{ + { + Type: v1.NodeHostName, + Address: "node0.cloud.internal", + }, + { + Type: v1.NodeInternalIP, + Address: "10.0.0.1", + }, + { + Type: v1.NodeExternalIP, + Address: "132.143.154.163", + }, + }, + Err: nil, + } + + eventBroadcaster := record.NewBroadcaster() + cloudNodeController := &CloudNodeController{ + kubeClient: fnh, + nodeInformer: factory.Core().V1().Nodes(), + cloud: fakeCloud, + nodeMonitorPeriod: 1 * time.Second, + recorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "cloud-controller-manager"}), + nodeStatusUpdateFrequency: 1 * time.Second, + } + eventBroadcaster.StartLogging(glog.Infof) + + cloudNodeController.AddCloudNode(fnh.Existing[0]) + + if len(fnh.UpdatedNodes) != 1 || fnh.UpdatedNodes[0].Name != "node0" { + t.Errorf("Node was not updated") + } + + if len(fnh.UpdatedNodes[0].Spec.Taints) != 0 { + t.Errorf("Node Taint was not removed after cloud init") + } + +} + +// This test checks that a node without the external cloud provider taint are NOT cloudprovider initialized +func TestNodeIgnored(t *testing.T) { + fnh := &testutil.FakeNodeHandler{ + Existing: []*v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionUnknown, + LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + }, + }, + }, + }, + }, + Clientset: fake.NewSimpleClientset(&v1.PodList{}), + DeleteWaitChan: make(chan struct{}), + } + + factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc()) + + fakeCloud := &fakecloud.FakeCloud{ + InstanceTypes: map[types.NodeName]string{ + types.NodeName("node0"): "t1.micro", + }, + Addresses: []v1.NodeAddress{ + { + Type: v1.NodeHostName, + Address: "node0.cloud.internal", + }, + { + Type: v1.NodeInternalIP, + Address: "10.0.0.1", + }, + { + Type: v1.NodeExternalIP, + Address: "132.143.154.163", + }, + }, + Err: nil, + } + + eventBroadcaster := record.NewBroadcaster() + cloudNodeController := &CloudNodeController{ + kubeClient: fnh, + nodeInformer: factory.Core().V1().Nodes(), + cloud: fakeCloud, + nodeMonitorPeriod: 5 * time.Second, + recorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "cloud-controller-manager"}), + } + eventBroadcaster.StartLogging(glog.Infof) + + cloudNodeController.AddCloudNode(fnh.Existing[0]) + + if len(fnh.UpdatedNodes) != 0 { + t.Errorf("Node was wrongly updated") + } + +} + +// This test checks that a node with the external cloud provider taint is cloudprovider initialized and +// the GCE route condition is added if cloudprovider is GCE +func TestGCECondition(t *testing.T) { + fnh := &testutil.FakeNodeHandler{ + Existing: []*v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionUnknown, + LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + }, + }, + }, + Spec: v1.NodeSpec{ + Taints: []v1.Taint{ + { + Key: metav1.TaintExternalCloudProvider, + Value: "true", + Effect: v1.TaintEffectNoSchedule, + }, + }, + }, + }, + }, + Clientset: fake.NewSimpleClientset(&v1.PodList{}), + DeleteWaitChan: make(chan struct{}), + } + + factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc()) + + fakeCloud := &fakecloud.FakeCloud{ + InstanceTypes: map[types.NodeName]string{ + types.NodeName("node0"): "t1.micro", + }, + Addresses: []v1.NodeAddress{ + { + Type: v1.NodeHostName, + Address: "node0.cloud.internal", + }, + { + Type: v1.NodeInternalIP, + Address: "10.0.0.1", + }, + { + Type: v1.NodeExternalIP, + Address: "132.143.154.163", + }, + }, + Provider: "gce", + Err: nil, + } + + eventBroadcaster := record.NewBroadcaster() + cloudNodeController := &CloudNodeController{ + kubeClient: fnh, + nodeInformer: factory.Core().V1().Nodes(), + cloud: fakeCloud, + nodeMonitorPeriod: 1 * time.Second, + recorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "cloud-controller-manager"}), + } + eventBroadcaster.StartLogging(glog.Infof) + + cloudNodeController.AddCloudNode(fnh.Existing[0]) + + if len(fnh.UpdatedNodes) != 1 && fnh.UpdatedNodes[0].Name != "node0" { + t.Errorf("Node was not updated") + } + + if len(fnh.UpdatedNodes[0].Status.Conditions) != 2 { + t.Errorf("No new conditions were added for GCE") + } + + conditionAdded := false + for _, cond := range fnh.UpdatedNodes[0].Status.Conditions { + if cond.Status == "True" && cond.Type == "NetworkUnavailable" && cond.Reason == "NoRouteCreated" { + conditionAdded = true + } + } + + if !conditionAdded { + t.Errorf("Network Route Condition for GCE not added by external cloud intializer") + } +} + +// This test checks that a node with the external cloud provider taint is cloudprovider initialized and +// and that zone labels are added correctly +func TestZoneInitialized(t *testing.T) { + fnh := &testutil.FakeNodeHandler{ + Existing: []*v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{}, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionUnknown, + LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + }, + }, + }, + Spec: v1.NodeSpec{ + Taints: []v1.Taint{ + { + Key: metav1.TaintExternalCloudProvider, + Value: "true", + Effect: v1.TaintEffectNoSchedule, + }, + }, + }, + }, + }, + Clientset: fake.NewSimpleClientset(&v1.PodList{}), + DeleteWaitChan: make(chan struct{}), + } + + factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc()) + + fakeCloud := &fakecloud.FakeCloud{ + InstanceTypes: map[types.NodeName]string{ + types.NodeName("node0"): "t1.micro", + }, + Addresses: []v1.NodeAddress{ + { + Type: v1.NodeHostName, + Address: "node0.cloud.internal", + }, + { + Type: v1.NodeInternalIP, + Address: "10.0.0.1", + }, + { + Type: v1.NodeExternalIP, + Address: "132.143.154.163", + }, + }, + Provider: "aws", + Zone: cloudprovider.Zone{ + FailureDomain: "us-west-1a", + Region: "us-west", + }, + Err: nil, + } + + eventBroadcaster := record.NewBroadcaster() + cloudNodeController := &CloudNodeController{ + kubeClient: fnh, + nodeInformer: factory.Core().V1().Nodes(), + cloud: fakeCloud, + nodeMonitorPeriod: 5 * time.Second, + recorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "cloud-controller-manager"}), + } + eventBroadcaster.StartLogging(glog.Infof) + + cloudNodeController.AddCloudNode(fnh.Existing[0]) + + if len(fnh.UpdatedNodes) != 1 && fnh.UpdatedNodes[0].Name != "node0" { + t.Errorf("Node was not updated") + } + + if len(fnh.UpdatedNodes[0].ObjectMeta.Labels) != 2 { + t.Errorf("Node label for Region and Zone were not set") + } + + if fnh.UpdatedNodes[0].ObjectMeta.Labels[metav1.LabelZoneRegion] != "us-west" { + t.Errorf("Node Region not correctly updated") + } + + if fnh.UpdatedNodes[0].ObjectMeta.Labels[metav1.LabelZoneFailureDomain] != "us-west-1a" { + t.Errorf("Node FailureDomain not correctly updated") + } +} + +// This test checks that a node with the external cloud provider taint is cloudprovider initialized and +// and nodeAddresses are updated from the cloudprovider +func TestNodeAddresses(t *testing.T) { + fnh := &testutil.FakeNodeHandler{ + Existing: []*v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{}, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionUnknown, + LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + }, + }, + }, + Spec: v1.NodeSpec{ + Taints: []v1.Taint{ + { + Key: "ImproveCoverageTaint", + Value: "true", + Effect: v1.TaintEffectNoSchedule, + }, + { + Key: metav1.TaintExternalCloudProvider, + Value: "true", + Effect: v1.TaintEffectNoSchedule, + }, + }, + }, + }, + }, + Clientset: fake.NewSimpleClientset(&v1.PodList{}), + DeleteWaitChan: make(chan struct{}), + } + + factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc()) + + fakeCloud := &fakecloud.FakeCloud{ + InstanceTypes: map[types.NodeName]string{}, + Addresses: []v1.NodeAddress{ + { + Type: v1.NodeHostName, + Address: "node0.cloud.internal", + }, + { + Type: v1.NodeInternalIP, + Address: "10.0.0.1", + }, + { + Type: v1.NodeExternalIP, + Address: "132.143.154.163", + }, + }, + Provider: "aws", + Zone: cloudprovider.Zone{ + FailureDomain: "us-west-1a", + Region: "us-west", + }, + Err: nil, + } + + eventBroadcaster := record.NewBroadcaster() + cloudNodeController := &CloudNodeController{ + kubeClient: fnh, + nodeInformer: factory.Core().V1().Nodes(), + cloud: fakeCloud, + nodeMonitorPeriod: 5 * time.Second, + nodeStatusUpdateFrequency: 1 * time.Second, + recorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "cloud-controller-manager"}), + } + eventBroadcaster.StartLogging(glog.Infof) + + cloudNodeController.AddCloudNode(fnh.Existing[0]) + + if len(fnh.UpdatedNodes) != 1 && fnh.UpdatedNodes[0].Name != "node0" { + t.Errorf("Node was not updated") + } + + if len(fnh.UpdatedNodes[0].Status.Addresses) != 3 { + t.Errorf("Node status not updated") + } + + fakeCloud.Addresses = []v1.NodeAddress{ + { + Type: v1.NodeHostName, + Address: "node0.cloud.internal", + }, + { + Type: v1.NodeInternalIP, + Address: "10.0.0.1", + }, + } + + cloudNodeController.Run() + + <-time.After(2 * time.Second) + + updatedNodes := fnh.GetUpdatedNodesCopy() + + if len(updatedNodes[0].Status.Addresses) != 2 { + t.Errorf("Node Addresses not correctly updated") + } +} + +// This test checks that a node with the external cloud provider taint is cloudprovider initialized and +// and the provided node ip is validated with the cloudprovider and nodeAddresses are updated from the cloudprovider +func TestNodeProvidedIPAddresses(t *testing.T) { + fnh := &testutil.FakeNodeHandler{ + Existing: []*v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{}, + Annotations: map[string]string{ + metav1.AnnotationProvidedIPAddr: "10.0.0.1", + }, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionUnknown, + LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + }, + }, + Addresses: []v1.NodeAddress{ + { + Type: v1.NodeHostName, + Address: "node0.cloud.internal", + }, + }, + }, + Spec: v1.NodeSpec{ + Taints: []v1.Taint{ + { + Key: "ImproveCoverageTaint", + Value: "true", + Effect: v1.TaintEffectNoSchedule, + }, + { + Key: metav1.TaintExternalCloudProvider, + Value: "true", + Effect: v1.TaintEffectNoSchedule, + }, + }, + ProviderID: "node0.aws.12345", + }, + }, + }, + Clientset: fake.NewSimpleClientset(&v1.PodList{}), + DeleteWaitChan: make(chan struct{}), + } + + factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc()) + + fakeCloud := &fakecloud.FakeCloud{ + InstanceTypes: map[types.NodeName]string{ + types.NodeName("node0"): "t1.micro", + types.NodeName("node0.aws.12345"): "t2.macro", + }, + Addresses: []v1.NodeAddress{ + { + Type: v1.NodeInternalIP, + Address: "10.0.0.1", + }, + { + Type: v1.NodeExternalIP, + Address: "132.143.154.163", + }, + }, + Provider: "aws", + Zone: cloudprovider.Zone{ + FailureDomain: "us-west-1a", + Region: "us-west", + }, + Err: nil, + } + + eventBroadcaster := record.NewBroadcaster() + cloudNodeController := &CloudNodeController{ + kubeClient: fnh, + nodeInformer: factory.Core().V1().Nodes(), + cloud: fakeCloud, + nodeMonitorPeriod: 5 * time.Second, + nodeStatusUpdateFrequency: 1 * time.Second, + recorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "cloud-controller-manager"}), + } + eventBroadcaster.StartLogging(glog.Infof) + + cloudNodeController.AddCloudNode(fnh.Existing[0]) + + if len(fnh.UpdatedNodes) != 1 && fnh.UpdatedNodes[0].Name != "node0" { + t.Errorf("Node was not updated") + } + + if len(fnh.UpdatedNodes[0].Status.Addresses) != 1 { + t.Errorf("Node status unexpectedly updated") + } + + cloudNodeController.Run() + + <-time.After(2 * time.Second) + + updatedNodes := fnh.GetUpdatedNodesCopy() + + if len(updatedNodes[0].Status.Addresses) != 1 || updatedNodes[0].Status.Addresses[0].Address != "10.0.0.1" { + t.Errorf("Node Addresses not correctly updated") + } +} + +// Tests that node address changes are detected correctly +func TestNodeAddressesChangeDetected(t *testing.T) { + addressSet1 := []v1.NodeAddress{ + { + Type: v1.NodeInternalIP, + Address: "10.0.0.1", + }, + { + Type: v1.NodeExternalIP, + Address: "132.143.154.163", + }, + } + addressSet2 := []v1.NodeAddress{ + { + Type: v1.NodeInternalIP, + Address: "10.0.0.1", + }, + { + Type: v1.NodeExternalIP, + Address: "132.143.154.163", + }, + } + if nodeAddressesChangeDetected(addressSet1, addressSet2) { + t.Errorf("Node address changes are not detected correctly") + } + + addressSet1 = []v1.NodeAddress{ + { + Type: v1.NodeInternalIP, + Address: "10.0.0.1", + }, + { + Type: v1.NodeExternalIP, + Address: "132.143.154.164", + }, + } + addressSet2 = []v1.NodeAddress{ + { + Type: v1.NodeInternalIP, + Address: "10.0.0.1", + }, + { + Type: v1.NodeExternalIP, + Address: "132.143.154.163", + }, + } + if !nodeAddressesChangeDetected(addressSet1, addressSet2) { + t.Errorf("Node address changes are not detected correctly") + } + + addressSet1 = []v1.NodeAddress{ + { + Type: v1.NodeInternalIP, + Address: "10.0.0.1", + }, + { + Type: v1.NodeExternalIP, + Address: "132.143.154.164", + }, + { + Type: v1.NodeHostName, + Address: "hostname.zone.region.aws.test", + }, + } + addressSet2 = []v1.NodeAddress{ + { + Type: v1.NodeInternalIP, + Address: "10.0.0.1", + }, + { + Type: v1.NodeExternalIP, + Address: "132.143.154.164", + }, + } + if !nodeAddressesChangeDetected(addressSet1, addressSet2) { + t.Errorf("Node address changes are not detected correctly") + } + + addressSet1 = []v1.NodeAddress{ + { + Type: v1.NodeInternalIP, + Address: "10.0.0.1", + }, + { + Type: v1.NodeExternalIP, + Address: "132.143.154.164", + }, + } + addressSet2 = []v1.NodeAddress{ + { + Type: v1.NodeInternalIP, + Address: "10.0.0.1", + }, + { + Type: v1.NodeExternalIP, + Address: "132.143.154.164", + }, + { + Type: v1.NodeHostName, + Address: "hostname.zone.region.aws.test", + }, + } + if !nodeAddressesChangeDetected(addressSet1, addressSet2) { + t.Errorf("Node address changes are not detected correctly") + } + + addressSet1 = []v1.NodeAddress{ + { + Type: v1.NodeExternalIP, + Address: "10.0.0.1", + }, + { + Type: v1.NodeInternalIP, + Address: "132.143.154.163", + }, + } + addressSet2 = []v1.NodeAddress{ + { + Type: v1.NodeInternalIP, + Address: "10.0.0.1", + }, + { + Type: v1.NodeExternalIP, + Address: "132.143.154.163", + }, + } + if !nodeAddressesChangeDetected(addressSet1, addressSet2) { + t.Errorf("Node address changes are not detected correctly") + } +} diff --git a/pkg/controller/node/testutil/test_utils.go b/pkg/controller/node/testutil/test_utils.go index 7d22059ce99..12aa786f546 100644 --- a/pkg/controller/node/testutil/test_utils.go +++ b/pkg/controller/node/testutil/test_utils.go @@ -90,6 +90,11 @@ func (c *FakeNodeHandler) Core() v1core.CoreV1Interface { return &FakeLegacyHandler{c.Clientset.Core(), c} } +// CoreV1 returns fake CoreV1Interface +func (c *FakeNodeHandler) CoreV1() v1core.CoreV1Interface { + return &FakeLegacyHandler{c.Clientset.CoreV1(), c} +} + // Nodes return fake NodeInterfaces. func (m *FakeLegacyHandler) Nodes() v1core.NodeInterface { return m.n diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index d7a28aeacb9..e04a8bdf956 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -186,7 +186,7 @@ type KubeletBootstrap interface { } // create and initialize a Kubelet instance -type KubeletBuilder func(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool, hostnameOverride, nodeIP, dockershimRootDir string) (KubeletBootstrap, error) +type KubeletBuilder func(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool, hostnameOverride, nodeIP, dockershimRootDir, providerID string) (KubeletBootstrap, error) // KubeletDeps is a bin for things we might consider "injected dependencies" -- objects constructed // at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping @@ -281,7 +281,7 @@ func getRuntimeAndImageServices(config *componentconfig.KubeletConfiguration) (i // NewMainKubelet instantiates a new Kubelet object along with all the required internal modules. // No initialization of Kubelet and its modules should happen here. -func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool, hostnameOverride, nodeIP, dockershimRootDir string) (*Kubelet, error) { +func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool, hostnameOverride, nodeIP, dockershimRootDir, providerID string) (*Kubelet, error) { if kubeCfg.RootDirectory == "" { return nil, fmt.Errorf("invalid root directory %q", kubeCfg.RootDirectory) } @@ -433,6 +433,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub diskSpaceManager: diskSpaceManager, cloud: kubeDeps.Cloud, autoDetectCloudProvider: (componentconfigv1alpha1.AutoDetectCloudProvider == kubeCfg.CloudProvider), + externalCloudProvider: cloudprovider.IsExternal(kubeCfg.CloudProvider), + providerID: providerID, nodeRef: nodeRef, nodeLabels: kubeCfg.NodeLabels, nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration, @@ -904,7 +906,8 @@ type Kubelet struct { // Cloud provider interface. cloud cloudprovider.Interface autoDetectCloudProvider bool - + // Indicates that the node initialization happens in an external cloud controller + externalCloudProvider bool // Reference to this node. nodeRef *clientv1.ObjectReference @@ -998,6 +1001,9 @@ type Kubelet struct { // If non-nil, use this IP address for the node nodeIP net.IP + // If non-nil, this is a unique identifier for the node in an external database, eg. cloudprovider + providerID string + // clock is an interface that provides time related functionality in a way that makes it // easy to test the code. clock clock.Clock diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 38fd1562894..3bdb7f84f7e 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -202,6 +202,7 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) { Unschedulable: !kl.registerSchedulable, }, } + nodeTaints := make([]v1.Taint, 0) if len(kl.kubeletConfiguration.RegisterWithTaints) > 0 { taints := make([]v1.Taint, len(kl.kubeletConfiguration.RegisterWithTaints)) for i := range kl.kubeletConfiguration.RegisterWithTaints { @@ -209,7 +210,19 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) { return nil, err } } - node.Spec.Taints = taints + nodeTaints = append(nodeTaints, taints...) + } + if kl.externalCloudProvider { + taint := v1.Taint{ + Key: metav1.TaintExternalCloudProvider, + Value: "true", + Effect: v1.TaintEffectNoSchedule, + } + + nodeTaints = append(nodeTaints, taint) + } + if len(nodeTaints) > 0 { + node.Spec.Taints = nodeTaints } // Initially, set NodeNetworkUnavailable to true. if kl.providerRequiresNetworkingConfiguration() { @@ -241,6 +254,10 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) { node.ObjectMeta.Labels[k] = v } + if kl.providerID != "" { + node.Spec.ProviderID = kl.providerID + } + if kl.cloud != nil { instances, ok := kl.cloud.Instances() if !ok { @@ -259,9 +276,11 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) { // TODO: We can't assume that the node has credentials to talk to the // cloudprovider from arbitrary nodes. At most, we should talk to a // local metadata server here. - node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(kl.cloud, kl.nodeName) - if err != nil { - return nil, err + if node.Spec.ProviderID == "" { + node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(kl.cloud, kl.nodeName) + if err != nil { + return nil, err + } } instanceType, err := instances.InstanceType(kl.nodeName) @@ -443,6 +462,7 @@ func (kl *Kubelet) setNodeAddress(node *v1.Node) error { // 4) Try to get the IP from the network interface used as default gateway if kl.nodeIP != nil { ipAddr = kl.nodeIP + node.ObjectMeta.Annotations[metav1.AnnotationProvidedIPAddr] = kl.nodeIP.String() } else if addr := net.ParseIP(kl.hostname); addr != nil { ipAddr = addr } else { diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/BUILD b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/BUILD index 3624e1364d3..b639f24c45a 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/BUILD +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/BUILD @@ -44,6 +44,7 @@ go_library( "types.go", "types_swagger_doc_generated.go", "watch.go", + "well_known_annotations.go", "well_known_labels.go", "zz_generated.deepcopy.go", "zz_generated.defaults.go", diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/well_known_annotations.go b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/well_known_annotations.go new file mode 100644 index 00000000000..937c54c7976 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/well_known_annotations.go @@ -0,0 +1,25 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +const ( + // When kubelet is started with the "external" cloud provider, then + // it sets this annotation on the node to denote an ip address set from the + // cmd line flag. This ip is verified with the cloudprovider as valid by + // the cloud-controller-manager + AnnotationProvidedIPAddr = "alpha.kubernetes.io/provided-node-ip" +) diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/well_known_labels.go b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/well_known_labels.go index 0e9ad56dbe8..6d0b4696e99 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/well_known_labels.go +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/well_known_labels.go @@ -36,6 +36,12 @@ const ( // when node becomes unreachable (corresponding to NodeReady status ConditionUnknown) // and removed when node becomes reachable (NodeReady status ConditionTrue). TaintNodeUnreachable = "node.alpha.kubernetes.io/unreachable" + + // When kubelet is started with the "external" cloud provider, then + // it sets this taint on a node to mark it as unusable, until a controller + // from the cloud-controller-manager intitializes this node, and then removes + // the taint + TaintExternalCloudProvider = "node.cloudprovider.kubernetes.io/uninitialized" ) // Role labels are applied to Nodes to mark their purpose. In particular, we