diff --git a/pkg/controller/cloud/BUILD b/pkg/controller/cloud/BUILD index d3c48293f0f..2dbef978894 100644 --- a/pkg/controller/cloud/BUILD +++ b/pkg/controller/cloud/BUILD @@ -19,6 +19,7 @@ go_library( "//pkg/scheduler/api:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", diff --git a/pkg/controller/cloud/node_controller.go b/pkg/controller/cloud/node_controller.go index c833b57e56e..17fedb72b13 100644 --- a/pkg/controller/cloud/node_controller.go +++ b/pkg/controller/cloud/node_controller.go @@ -23,6 +23,7 @@ import ( "time" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -42,6 +43,35 @@ import ( nodeutil "k8s.io/kubernetes/pkg/util/node" ) +// labelReconcileInfo lists Node labels to reconcile, and how to reconcile them. +// primaryKey and secondaryKey are keys of labels to reconcile. +// - If both keys exist, but their values don't match. Use the value from the +// primaryKey as the source of truth to reconcile. +// - If ensureSecondaryExists is true, and the secondaryKey does not +// exist, secondaryKey will be added with the value of the primaryKey. +var labelReconcileInfo = []struct { + primaryKey string + secondaryKey string + ensureSecondaryExists bool +}{ + { + // Reconcile the beta and the GA zone label using the beta label as + // the source of truth + // TODO: switch the primary key to GA labels in v1.21 + primaryKey: v1.LabelZoneFailureDomain, + secondaryKey: v1.LabelZoneFailureDomainStable, + ensureSecondaryExists: true, + }, + { + // Reconcile the beta and the stable region label using the beta label as + // the source of truth + // TODO: switch the primary key to GA labels in v1.21 + primaryKey: v1.LabelZoneRegion, + secondaryKey: v1.LabelZoneRegionStable, + ensureSecondaryExists: true, + }, +} + var UpdateNodeSpecBackoff = wait.Backoff{ Steps: 20, Duration: 50 * time.Millisecond, @@ -125,6 +155,63 @@ func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) { for i := range nodes.Items { cnc.updateNodeAddress(ctx, &nodes.Items[i], instances) } + + for _, node := range nodes.Items { + err = cnc.reconcileNodeLabels(node.Name) + if err != nil { + klog.Errorf("Error reconciling node labels for node %q, err: %v", node.Name, err) + } + } +} + +// reconcileNodeLabels reconciles node labels transitioning from beta to GA +func (cnc *CloudNodeController) reconcileNodeLabels(nodeName string) error { + node, err := cnc.nodeInformer.Lister().Get(nodeName) + if err != nil { + // If node not found, just ignore it. + if apierrors.IsNotFound(err) { + return nil + } + + return err + } + + if node.Labels == nil { + // Nothing to reconcile. + return nil + } + + labelsToUpdate := map[string]string{} + for _, r := range labelReconcileInfo { + primaryValue, primaryExists := node.Labels[r.primaryKey] + secondaryValue, secondaryExists := node.Labels[r.secondaryKey] + + if !primaryExists { + // The primary label key does not exist. This should not happen + // within our supported version skew range, when no external + // components/factors modifying the node object. Ignore this case. + continue + } + if secondaryExists && primaryValue != secondaryValue { + // Secondary label exists, but not consistent with the primary + // label. Need to reconcile. + labelsToUpdate[r.secondaryKey] = primaryValue + + } else if !secondaryExists && r.ensureSecondaryExists { + // Apply secondary label based on primary label. + labelsToUpdate[r.secondaryKey] = primaryValue + } + } + + if len(labelsToUpdate) == 0 { + return nil + } + + if !cloudnodeutil.AddOrUpdateLabelsOnNode(cnc.kubeClient, labelsToUpdate, node) { + return fmt.Errorf("failed update labels for node %+v", node) + } + + return nil } // UpdateNodeAddress updates the nodeAddress of a single node @@ -298,10 +385,14 @@ func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Nod if zone.FailureDomain != "" { klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomain, zone.FailureDomain) curNode.ObjectMeta.Labels[v1.LabelZoneFailureDomain] = zone.FailureDomain + klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomainStable, zone.FailureDomain) + curNode.ObjectMeta.Labels[v1.LabelZoneFailureDomainStable] = zone.FailureDomain } if zone.Region != "" { klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegion, zone.Region) curNode.ObjectMeta.Labels[v1.LabelZoneRegion] = zone.Region + klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegionStable, zone.Region) + curNode.ObjectMeta.Labels[v1.LabelZoneRegionStable] = zone.Region } } diff --git a/pkg/controller/cloud/node_controller_test.go b/pkg/controller/cloud/node_controller_test.go index 6271e3ab492..5e10eb9236a 100644 --- a/pkg/controller/cloud/node_controller_test.go +++ b/pkg/controller/cloud/node_controller_test.go @@ -19,6 +19,7 @@ package cloud import ( "context" "errors" + "reflect" "testing" "time" @@ -460,8 +461,12 @@ func TestZoneInitialized(t *testing.T) { assert.Equal(t, 1, len(fnh.UpdatedNodes), "Node was not updated") assert.Equal(t, "node0", fnh.UpdatedNodes[0].Name, "Node was not updated") - assert.Equal(t, 2, len(fnh.UpdatedNodes[0].ObjectMeta.Labels), + assert.Equal(t, 4, len(fnh.UpdatedNodes[0].ObjectMeta.Labels), "Node label for Region and Zone were not set") + assert.Equal(t, "us-west", fnh.UpdatedNodes[0].ObjectMeta.Labels[v1.LabelZoneRegionStable], + "Node Region not correctly updated") + assert.Equal(t, "us-west-1a", fnh.UpdatedNodes[0].ObjectMeta.Labels[v1.LabelZoneFailureDomainStable], + "Node FailureDomain not correctly updated") assert.Equal(t, "us-west", fnh.UpdatedNodes[0].ObjectMeta.Labels[v1.LabelZoneRegion], "Node Region not correctly updated") assert.Equal(t, "us-west-1a", fnh.UpdatedNodes[0].ObjectMeta.Labels[v1.LabelZoneFailureDomain], @@ -672,6 +677,105 @@ func TestNodeProvidedIPAddresses(t *testing.T) { assert.Equal(t, "10.0.0.1", updatedNodes[0].Status.Addresses[0].Address, "Node Addresses not correctly updated") } +func Test_reconcileNodeLabels(t *testing.T) { + testcases := []struct { + name string + labels map[string]string + expectedLabels map[string]string + expectedErr error + }{ + { + name: "requires reconcile", + labels: map[string]string{ + v1.LabelZoneFailureDomain: "foo", + v1.LabelZoneRegion: "bar", + }, + expectedLabels: map[string]string{ + v1.LabelZoneFailureDomain: "foo", + v1.LabelZoneRegion: "bar", + v1.LabelZoneFailureDomainStable: "foo", + v1.LabelZoneRegionStable: "bar", + }, + expectedErr: nil, + }, + { + name: "doesn't require reconcile", + labels: map[string]string{ + v1.LabelZoneFailureDomain: "foo", + v1.LabelZoneRegion: "bar", + v1.LabelZoneFailureDomainStable: "foo", + v1.LabelZoneRegionStable: "bar", + }, + expectedLabels: map[string]string{ + v1.LabelZoneFailureDomain: "foo", + v1.LabelZoneRegion: "bar", + v1.LabelZoneFailureDomainStable: "foo", + v1.LabelZoneRegionStable: "bar", + }, + expectedErr: nil, + }, + { + name: "require reconcile -- secondary labels are different from primary", + labels: map[string]string{ + v1.LabelZoneFailureDomain: "foo", + v1.LabelZoneRegion: "bar", + v1.LabelZoneFailureDomainStable: "wrongfoo", + v1.LabelZoneRegionStable: "wrongbar", + }, + expectedLabels: map[string]string{ + v1.LabelZoneFailureDomain: "foo", + v1.LabelZoneRegion: "bar", + v1.LabelZoneFailureDomainStable: "foo", + v1.LabelZoneRegionStable: "bar", + }, + expectedErr: nil, + }, + } + + for _, test := range testcases { + t.Run(test.name, func(t *testing.T) { + testNode := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node01", + Labels: test.labels, + }, + } + + clientset := fake.NewSimpleClientset(testNode) + factory := informers.NewSharedInformerFactory(clientset, 0) + + cnc := &CloudNodeController{ + kubeClient: clientset, + nodeInformer: factory.Core().V1().Nodes(), + } + + // activate node informer + factory.Core().V1().Nodes().Informer() + factory.Start(nil) + factory.WaitForCacheSync(nil) + + err := cnc.reconcileNodeLabels("node01") + if err != test.expectedErr { + t.Logf("actual err: %v", err) + t.Logf("expected err: %v", test.expectedErr) + t.Errorf("unexpected error") + } + + actualNode, err := clientset.CoreV1().Nodes().Get("node01", metav1.GetOptions{}) + if err != nil { + t.Fatalf("error getting updated node: %v", err) + } + + if !reflect.DeepEqual(actualNode.Labels, test.expectedLabels) { + t.Logf("actual node labels: %v", actualNode.Labels) + t.Logf("expected node labels: %v", test.expectedLabels) + t.Errorf("updated node did not match expected node") + } + }) + } + +} + // Tests that node address changes are detected correctly func TestNodeAddressesChangeDetected(t *testing.T) { addressSet1 := []v1.NodeAddress{ diff --git a/staging/src/k8s.io/cloud-provider/node/helpers/BUILD b/staging/src/k8s.io/cloud-provider/node/helpers/BUILD index e6c77e354a9..a1fc3a33de7 100644 --- a/staging/src/k8s.io/cloud-provider/node/helpers/BUILD +++ b/staging/src/k8s.io/cloud-provider/node/helpers/BUILD @@ -5,6 +5,7 @@ go_library( srcs = [ "address.go", "conditions.go", + "labels.go", "taints.go", ], importmap = "k8s.io/kubernetes/vendor/k8s.io/cloud-provider/node/helpers", @@ -15,10 +16,12 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/util/retry:go_default_library", + "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/staging/src/k8s.io/cloud-provider/node/helpers/labels.go b/staging/src/k8s.io/cloud-provider/node/helpers/labels.go new file mode 100644 index 00000000000..80e77bb145c --- /dev/null +++ b/staging/src/k8s.io/cloud-provider/node/helpers/labels.go @@ -0,0 +1,102 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package helpers + +import ( + "encoding/json" + "fmt" + "time" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + clientretry "k8s.io/client-go/util/retry" + "k8s.io/klog" +) + +var updateLabelBackoff = wait.Backoff{ + Steps: 5, + Duration: 100 * time.Millisecond, + Jitter: 1.0, +} + +// AddOrUpdateLabelsOnNode updates the labels on the node and returns true on +// success and false on failure. +func AddOrUpdateLabelsOnNode(kubeClient clientset.Interface, labelsToUpdate map[string]string, node *v1.Node) bool { + err := addOrUpdateLabelsOnNode(kubeClient, node.Name, labelsToUpdate) + if err != nil { + utilruntime.HandleError( + fmt.Errorf( + "unable to update labels %+v for Node %q: %v", + labelsToUpdate, + node.Name, + err)) + return false + } + + klog.V(4).Infof("Updated labels %+v to Node %v", labelsToUpdate, node.Name) + return true +} + +func addOrUpdateLabelsOnNode(kubeClient clientset.Interface, nodeName string, labelsToUpdate map[string]string) error { + firstTry := true + return clientretry.RetryOnConflict(updateLabelBackoff, func() error { + var err error + var node *v1.Node + // First we try getting node from the API server cache, as it's cheaper. If it fails + // we get it from etcd to be sure to have fresh data. + if firstTry { + node, err = kubeClient.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{ResourceVersion: "0"}) + firstTry = false + } else { + node, err = kubeClient.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) + } + if err != nil { + return err + } + + // Make a copy of the node and update the labels. + newNode := node.DeepCopy() + if newNode.Labels == nil { + newNode.Labels = make(map[string]string) + } + for key, value := range labelsToUpdate { + newNode.Labels[key] = value + } + + oldData, err := json.Marshal(node) + if err != nil { + return fmt.Errorf("failed to marshal the existing node %#v: %v", node, err) + } + newData, err := json.Marshal(newNode) + if err != nil { + return fmt.Errorf("failed to marshal the new node %#v: %v", newNode, err) + } + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{}) + if err != nil { + return fmt.Errorf("failed to create a two-way merge patch: %v", err) + } + if _, err := kubeClient.CoreV1().Nodes().Patch(node.Name, types.StrategicMergePatchType, patchBytes); err != nil { + return fmt.Errorf("failed to patch the node: %v", err) + } + return nil + }) +}