Merge pull request #123331 from aojea/ccm_update

CCM wait for providerID to initialize the Node object
This commit is contained in:
Kubernetes Prow Robot 2024-02-29 01:50:02 -08:00 committed by GitHub
commit 411c29c39f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 346 additions and 24 deletions

View File

@ -109,6 +109,8 @@ func StartTestServer(ctx context.Context, customFlags []string) (result TestServ
return TestServer{}, err
}
s.Generic.LeaderElection.LeaderElect = false
cloudInitializer := func(config *config.CompletedConfig) cloudprovider.Interface {
capturedConfig = *config
// send signal to indicate the capturedConfig has been properly set

View File

@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
@ -44,6 +45,7 @@ import (
cloudnodeutil "k8s.io/cloud-provider/node/helpers"
controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
nodeutil "k8s.io/component-helpers/node/util"
"k8s.io/controller-manager/pkg/features"
"k8s.io/klog/v2"
)
@ -282,11 +284,6 @@ func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) error {
}
cnc.updateNodeAddress(ctx, node, instanceMetadata)
err = cnc.reconcileNodeLabels(node.Name)
if err != nil {
klog.Errorf("Error reconciling node labels for node %q, err: %v", node.Name, err)
}
}
workqueue.ParallelizeUntil(ctx, int(cnc.workerCount), len(nodes), updateNodeFunc)
@ -422,9 +419,8 @@ func (cnc *CloudNodeController) syncNode(ctx context.Context, nodeName string) e
cloudTaint := getCloudTaint(curNode.Spec.Taints)
if cloudTaint == nil {
// Node object received from event had the cloud taint but was outdated,
// the node has actually already been initialized, so this sync event can be ignored.
return nil
// Node object was already initialized, only need to reconcile the labels
return cnc.reconcileNodeLabels(nodeName)
}
klog.Infof("Initializing node %s with cloud provider", nodeName)
@ -487,6 +483,19 @@ func (cnc *CloudNodeController) syncNode(ctx context.Context, nodeName string) e
modify(newNode)
}
// spec.ProviderID is required for multiple controllers, like loadbalancers, so we should not
// untaint the node until is set. Once it is set, the field is immutable, so no need to reconcile.
// We only set this value during initialization and is never reconciled, so if for some reason
// we are not able to set it, the instance will never be able to acquire it.
// Before external cloud providers were enabled by default, the field was set by the kubelet, and the
// node was created with the value.
// xref: https://issues.k8s.io/123024
if !utilfeature.DefaultFeatureGate.Enabled(features.OptionalProviderID) {
if newNode.Spec.ProviderID == "" {
return fmt.Errorf("failed to get provider ID for node %s at cloudprovider", nodeName)
}
}
_, err = cnc.kubeClient.CoreV1().Nodes().Update(context.TODO(), newNode, metav1.UpdateOptions{})
if err != nil {
return err

View File

@ -32,12 +32,15 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider"
cloudproviderapi "k8s.io/cloud-provider/api"
fakecloud "k8s.io/cloud-provider/fake"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/controller-manager/pkg/features"
_ "k8s.io/controller-manager/pkg/features/register"
"github.com/google/go-cmp/cmp"
@ -46,10 +49,12 @@ import (
func Test_syncNode(t *testing.T) {
tests := []struct {
name string
fakeCloud *fakecloud.Cloud
existingNode *v1.Node
updatedNode *v1.Node
name string
fakeCloud *fakecloud.Cloud
existingNode *v1.Node
updatedNode *v1.Node
optionalProviderID bool
expectedErr bool
}{
{
name: "node initialized with provider ID",
@ -545,7 +550,8 @@ func Test_syncNode(t *testing.T) {
},
},
{
name: "provided node IP address is not valid",
name: "provided node IP address is not valid",
expectedErr: true,
fakeCloud: &fakecloud.Cloud{
EnableInstancesV2: false,
Addresses: []v1.NodeAddress{
@ -643,7 +649,8 @@ func Test_syncNode(t *testing.T) {
},
},
{
name: "provided node IP address is not present",
name: "provided node IP address is not present",
expectedErr: true,
fakeCloud: &fakecloud.Cloud{
EnableInstancesV2: false,
Addresses: []v1.NodeAddress{
@ -836,7 +843,8 @@ func Test_syncNode(t *testing.T) {
},
},
{
name: "provider ID not implemented",
name: "provider ID not implemented and optional",
optionalProviderID: true,
fakeCloud: &fakecloud.Cloud{
EnableInstancesV2: false,
InstanceTypes: map[types.NodeName]string{},
@ -892,6 +900,71 @@ func Test_syncNode(t *testing.T) {
},
},
},
{
name: "provider ID not implemented and required",
optionalProviderID: false,
expectedErr: true,
fakeCloud: &fakecloud.Cloud{
EnableInstancesV2: false,
InstanceTypes: map[types.NodeName]string{},
Provider: "test",
ExtID: map[types.NodeName]string{},
ExtIDErr: map[types.NodeName]error{
types.NodeName("node0"): cloudprovider.NotImplemented,
},
Err: nil,
},
existingNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
Spec: v1.NodeSpec{
Taints: []v1.Taint{
{
Key: cloudproviderapi.TaintExternalCloudProvider,
Value: "true",
Effect: v1.TaintEffectNoSchedule,
},
},
},
},
updatedNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
Spec: v1.NodeSpec{
Taints: []v1.Taint{
{
Key: cloudproviderapi.TaintExternalCloudProvider,
Value: "true",
Effect: v1.TaintEffectNoSchedule,
},
},
},
},
},
{
name: "[instanceV2] node initialized with provider ID",
fakeCloud: &fakecloud.Cloud{
@ -1641,7 +1714,8 @@ func Test_syncNode(t *testing.T) {
},
},
{
name: "[instanceV2] provider ID not implemented",
name: "[instanceV2] provider ID not implemented",
optionalProviderID: true,
fakeCloud: &fakecloud.Cloud{
EnableInstancesV2: true,
InstanceTypes: map[types.NodeName]string{},
@ -1698,7 +1772,8 @@ func Test_syncNode(t *testing.T) {
},
},
{
name: "[instanceV2] error getting InstanceMetadata",
name: "[instanceV2] error getting InstanceMetadata",
expectedErr: true,
fakeCloud: &fakecloud.Cloud{
EnableInstancesV2: true,
InstanceTypes: map[types.NodeName]string{},
@ -1764,6 +1839,7 @@ func Test_syncNode(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.OptionalProviderID, test.optionalProviderID)()
clientset := fake.NewSimpleClientset(test.existingNode)
factory := informers.NewSharedInformerFactory(clientset, 0)
@ -1786,7 +1862,10 @@ func Test_syncNode(t *testing.T) {
w := eventBroadcaster.StartLogging(klog.Infof)
defer w.Stop()
cloudNodeController.syncNode(context.TODO(), test.existingNode.Name)
err := cloudNodeController.syncNode(context.TODO(), test.existingNode.Name)
if (err != nil) != test.expectedErr {
t.Fatalf("error got: %v expected: %v", err, test.expectedErr)
}
updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), test.existingNode.Name, metav1.GetOptions{})
if err != nil {

View File

@ -47,6 +47,13 @@ const (
// `alpha.kubernetes.io/provided-node-ip` annotation
CloudDualStackNodeIPs featuregate.Feature = "CloudDualStackNodeIPs"
// owner: @aojea
// Deprecated: v1.30
// issue: https://issues.k8s.io/123024
//
// If enabled, the ProviderID field is not required for the node initialization.
OptionalProviderID featuregate.Feature = "OptionalProviderID"
// owner: @alexanderConstantinescu
// kep: http://kep.k8s.io/3458
// beta: v1.27
@ -66,5 +73,6 @@ func SetupCurrentKubernetesSpecificFeatureGates(featuregates featuregate.Mutable
var cloudPublicFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
CloudControllerManagerWebhook: {Default: false, PreRelease: featuregate.Alpha},
CloudDualStackNodeIPs: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.32
OptionalProviderID: {Default: false, PreRelease: featuregate.Deprecated}, // remove after 1.31
StableLoadBalancerNodeSet: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // GA in 1.30, remove in 1.31
}

View File

@ -43,7 +43,7 @@ import (
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
@ -655,6 +655,11 @@ func (g *Cloud) Initialize(clientBuilder cloudprovider.ControllerClientBuilder,
g.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: g.client.CoreV1().Events("")})
g.eventRecorder = g.eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "g-cloudprovider"})
go func() {
defer g.eventBroadcaster.Shutdown()
<-stop
}()
go g.watchClusterID(stop)
go g.metricsCollector.Run(stop)
}

View File

@ -111,12 +111,12 @@ func newLoadBalancerMetrics() loadbalancerMetricsCollector {
func (lm *LoadBalancerMetrics) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Loadbalancer Metrics initialized. Metrics will be exported at an interval of %v", metricsInterval)
// Compute and export metrics periodically.
go func() {
// Wait for service states to be populated in the cache before computing metrics.
time.Sleep(metricsInterval)
select {
case <-stopCh:
return
case <-time.After(metricsInterval): // Wait for service states to be populated in the cache before computing metrics.
wait.Until(lm.export, metricsInterval, stopCh)
}()
<-stopCh
}
}
// SetL4ILBService implements loadbalancerMetricsCollector.

View File

@ -0,0 +1,192 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloudprovider
import (
"context"
"io"
"os"
"testing"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
cloudprovider "k8s.io/cloud-provider"
cloudproviderapi "k8s.io/cloud-provider/api"
ccmservertesting "k8s.io/cloud-provider/app/testing"
fakecloud "k8s.io/cloud-provider/fake"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
"k8s.io/kubernetes/test/integration/framework"
)
func Test_RemoveExternalCloudProviderTaint(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
defer server.TearDownFn()
client := clientset.NewForConfigOrDie(server.ClientConfig)
ns := framework.CreateNamespaceOrDie(client, "config-map", t)
defer framework.DeleteNamespaceOrDie(client, ns, t)
// Create fake node
_, err := client.CoreV1().Nodes().Create(ctx, makeNode("node0"), metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create Node %v", err)
}
// start cloud-controller-manager
kubeconfig := createKubeconfigFileForRestConfig(server.ClientConfig)
// nolint:errcheck // Ignore the error trying to delete the kubeconfig file used for the test
defer os.Remove(kubeconfig)
args := []string{
"--kubeconfig=" + kubeconfig,
"--cloud-provider=fakeCloud",
"--cidr-allocator-type=" + string(ipam.RangeAllocatorType),
"--configure-cloud-routes=false",
}
fakeCloud := &fakecloud.Cloud{
Zone: cloudprovider.Zone{
FailureDomain: "zone-0",
Region: "region-1",
},
EnableInstancesV2: true,
ExistsByProviderID: true,
ProviderID: map[types.NodeName]string{
types.NodeName("node0"): "12345",
},
InstanceTypes: map[types.NodeName]string{
types.NodeName("node0"): "t1.micro",
},
ExtID: map[types.NodeName]string{
types.NodeName("node0"): "12345",
},
Addresses: []v1.NodeAddress{
{
Type: v1.NodeHostName,
Address: "node0.cloud.internal",
},
{
Type: v1.NodeInternalIP,
Address: "10.0.0.1",
},
{
Type: v1.NodeExternalIP,
Address: "132.143.154.163",
},
},
ErrByProviderID: nil,
Err: nil,
}
// register fake GCE cloud provider
cloudprovider.RegisterCloudProvider(
"fakeCloud",
func(config io.Reader) (cloudprovider.Interface, error) {
return fakeCloud, nil
})
ccm := ccmservertesting.StartTestServerOrDie(ctx, args)
defer ccm.TearDownFn()
// There should be only the taint TaintNodeNotReady, added by the admission plugin TaintNodesByCondition
err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 50*time.Second, true, func(ctx context.Context) (done bool, err error) {
n, err := client.CoreV1().Nodes().Get(ctx, "node0", metav1.GetOptions{})
if err != nil {
return false, err
}
if len(n.Spec.Taints) != 1 {
return false, nil
}
if n.Spec.Taints[0].Key != v1.TaintNodeNotReady {
return false, nil
}
return true, nil
})
if err != nil {
t.Logf("Fake Cloud Provider calls: %v", fakeCloud.Calls)
t.Fatalf("expected node to not have Taint: %v", err)
}
}
// sigs.k8s.io/controller-runtime/pkg/envtest
func createKubeconfigFileForRestConfig(restConfig *rest.Config) string {
clusters := make(map[string]*clientcmdapi.Cluster)
clusters["default-cluster"] = &clientcmdapi.Cluster{
Server: restConfig.Host,
TLSServerName: restConfig.ServerName,
CertificateAuthorityData: restConfig.CAData,
}
contexts := make(map[string]*clientcmdapi.Context)
contexts["default-context"] = &clientcmdapi.Context{
Cluster: "default-cluster",
AuthInfo: "default-user",
}
authinfos := make(map[string]*clientcmdapi.AuthInfo)
authinfos["default-user"] = &clientcmdapi.AuthInfo{
ClientCertificateData: restConfig.CertData,
ClientKeyData: restConfig.KeyData,
Token: restConfig.BearerToken,
}
clientConfig := clientcmdapi.Config{
Kind: "Config",
APIVersion: "v1",
Clusters: clusters,
Contexts: contexts,
CurrentContext: "default-context",
AuthInfos: authinfos,
}
kubeConfigFile, _ := os.CreateTemp("", "kubeconfig")
_ = clientcmd.WriteToFile(clientConfig, kubeConfigFile.Name())
return kubeConfigFile.Name()
}
func makeNode(name string) *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1.NodeSpec{
Taints: []v1.Taint{{
Key: cloudproviderapi.TaintExternalCloudProvider,
Value: "true",
Effect: v1.TaintEffectNoSchedule,
}},
Unschedulable: false,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
LastHeartbeatTime: metav1.Time{Time: time.Now()},
},
},
},
}
}

View File

@ -0,0 +1,27 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloudprovider
import (
"testing"
"k8s.io/kubernetes/test/integration/framework"
)
func TestMain(m *testing.M) {
framework.EtcdMain(m.Run)
}