mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #65226 from ingvagabund/store-cloud-provider-latest-node-addresses
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Store the latest cloud provider node addresses **What this PR does / why we need it**: Buffer the recently retrieved node address so they can be used as soon as the next node status update is run. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes #65814 **Special notes for your reviewer**: **Release note**: ```release-note None ```
This commit is contained in:
commit
f70410959d
@ -22,6 +22,7 @@ import (
|
||||
"net"
|
||||
"regexp"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
@ -57,6 +58,7 @@ type FakeCloud struct {
|
||||
|
||||
Calls []string
|
||||
Addresses []v1.NodeAddress
|
||||
addressesMux sync.Mutex
|
||||
ExtID map[types.NodeName]string
|
||||
InstanceTypes map[types.NodeName]string
|
||||
Machines []types.NodeName
|
||||
@ -72,6 +74,8 @@ type FakeCloud struct {
|
||||
addCallLock sync.Mutex
|
||||
cloudprovider.Zone
|
||||
VolumeLabelMap map[string]map[string]string
|
||||
|
||||
RequestDelay time.Duration
|
||||
}
|
||||
|
||||
type FakeRoute struct {
|
||||
@ -82,6 +86,9 @@ type FakeRoute struct {
|
||||
func (f *FakeCloud) addCall(desc string) {
|
||||
f.addCallLock.Lock()
|
||||
defer f.addCallLock.Unlock()
|
||||
|
||||
time.Sleep(f.RequestDelay)
|
||||
|
||||
f.Calls = append(f.Calls, desc)
|
||||
}
|
||||
|
||||
@ -200,9 +207,17 @@ func (f *FakeCloud) CurrentNodeName(ctx context.Context, hostname string) (types
|
||||
// It adds an entry "node-addresses" into the internal method call record.
|
||||
func (f *FakeCloud) NodeAddresses(ctx context.Context, instance types.NodeName) ([]v1.NodeAddress, error) {
|
||||
f.addCall("node-addresses")
|
||||
f.addressesMux.Lock()
|
||||
defer f.addressesMux.Unlock()
|
||||
return f.Addresses, f.Err
|
||||
}
|
||||
|
||||
func (f *FakeCloud) SetNodeAddresses(nodeAddresses []v1.NodeAddress) {
|
||||
f.addressesMux.Lock()
|
||||
defer f.addressesMux.Unlock()
|
||||
f.Addresses = nodeAddresses
|
||||
}
|
||||
|
||||
// NodeAddressesByProviderID is a test-spy implementation of Instances.NodeAddressesByProviderID.
|
||||
// It adds an entry "node-addresses-by-provider-id" into the internal method call record.
|
||||
func (f *FakeCloud) NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error) {
|
||||
|
@ -10,6 +10,7 @@ go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"active_deadline.go",
|
||||
"cloud_request_manager.go",
|
||||
"doc.go",
|
||||
"kubelet.go",
|
||||
"kubelet_getters.go",
|
||||
@ -147,6 +148,7 @@ go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"active_deadline_test.go",
|
||||
"cloud_request_manager_test.go",
|
||||
"kubelet_getters_test.go",
|
||||
"kubelet_network_test.go",
|
||||
"kubelet_node_status_test.go",
|
||||
|
116
pkg/kubelet/cloud_request_manager.go
Normal file
116
pkg/kubelet/cloud_request_manager.go
Normal file
@ -0,0 +1,116 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kubelet
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
var nodeAddressesRetryPeriod = 5 * time.Second
|
||||
|
||||
type cloudResourceSyncManager struct {
|
||||
// Cloud provider interface.
|
||||
cloud cloudprovider.Interface
|
||||
// Sync period
|
||||
syncPeriod time.Duration
|
||||
|
||||
nodeAddressesMux sync.Mutex
|
||||
nodeAddressesErr error
|
||||
nodeAddresses []v1.NodeAddress
|
||||
|
||||
nodeName types.NodeName
|
||||
}
|
||||
|
||||
// NewCloudResourceSyncManager creates a manager responsible for collecting resources
|
||||
// from a cloud provider through requests that are sensitive to timeouts and hanging
|
||||
func NewCloudResourceSyncManager(cloud cloudprovider.Interface, nodeName types.NodeName, syncPeriod time.Duration) *cloudResourceSyncManager {
|
||||
return &cloudResourceSyncManager{
|
||||
cloud: cloud,
|
||||
syncPeriod: syncPeriod,
|
||||
nodeName: nodeName,
|
||||
}
|
||||
}
|
||||
|
||||
func (manager *cloudResourceSyncManager) getNodeAddressSafe() ([]v1.NodeAddress, error) {
|
||||
manager.nodeAddressesMux.Lock()
|
||||
defer manager.nodeAddressesMux.Unlock()
|
||||
|
||||
return manager.nodeAddresses, manager.nodeAddressesErr
|
||||
}
|
||||
|
||||
func (manager *cloudResourceSyncManager) setNodeAddressSafe(nodeAddresses []v1.NodeAddress, err error) {
|
||||
manager.nodeAddressesMux.Lock()
|
||||
defer manager.nodeAddressesMux.Unlock()
|
||||
|
||||
manager.nodeAddresses = nodeAddresses
|
||||
manager.nodeAddressesErr = err
|
||||
}
|
||||
|
||||
// NodeAddresses does not wait for cloud provider to return a node addresses.
|
||||
// It always returns node addresses or an error.
|
||||
func (manager *cloudResourceSyncManager) NodeAddresses() ([]v1.NodeAddress, error) {
|
||||
// wait until there is something
|
||||
for {
|
||||
nodeAddresses, err := manager.getNodeAddressSafe()
|
||||
if len(nodeAddresses) == 0 && err == nil {
|
||||
glog.V(5).Infof("Waiting for %v for cloud provider to provide node addresses", nodeAddressesRetryPeriod)
|
||||
time.Sleep(nodeAddressesRetryPeriod)
|
||||
continue
|
||||
}
|
||||
return nodeAddresses, err
|
||||
}
|
||||
}
|
||||
|
||||
func (manager *cloudResourceSyncManager) collectNodeAddresses(ctx context.Context, nodeName types.NodeName) {
|
||||
glog.V(2).Infof("Requesting node addresses from cloud provider for node %q", nodeName)
|
||||
|
||||
instances, ok := manager.cloud.Instances()
|
||||
if !ok {
|
||||
manager.setNodeAddressSafe(nil, fmt.Errorf("failed to get instances from cloud provider"))
|
||||
return
|
||||
}
|
||||
|
||||
// TODO(roberthbailey): Can we do this without having credentials to talk
|
||||
// to the cloud provider?
|
||||
// TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface
|
||||
// TODO: If IP addresses couldn't be fetched from the cloud provider, should kubelet fallback on the other methods for getting the IP below?
|
||||
|
||||
nodeAddresses, err := instances.NodeAddresses(ctx, nodeName)
|
||||
if err != nil {
|
||||
manager.setNodeAddressSafe(nil, fmt.Errorf("failed to get node address from cloud provider: %v", err))
|
||||
glog.V(2).Infof("Node addresses from cloud provider for node %q not collected", nodeName)
|
||||
} else {
|
||||
manager.setNodeAddressSafe(nodeAddresses, nil)
|
||||
glog.V(2).Infof("Node addresses from cloud provider for node %q collected", nodeName)
|
||||
}
|
||||
}
|
||||
|
||||
func (manager *cloudResourceSyncManager) Run(stopCh <-chan struct{}) {
|
||||
wait.Until(func() {
|
||||
manager.collectNodeAddresses(context.TODO(), manager.nodeName)
|
||||
}, manager.syncPeriod, stopCh)
|
||||
}
|
99
pkg/kubelet/cloud_request_manager_test.go
Normal file
99
pkg/kubelet/cloud_request_manager_test.go
Normal file
@ -0,0 +1,99 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kubelet
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
|
||||
)
|
||||
|
||||
func collectNodeAddresses(manager *cloudResourceSyncManager) ([]v1.NodeAddress, error) {
|
||||
var nodeAddresses []v1.NodeAddress
|
||||
var err error
|
||||
|
||||
collected := make(chan struct{}, 1)
|
||||
go func() {
|
||||
nodeAddresses, err = manager.NodeAddresses()
|
||||
close(collected)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-collected:
|
||||
return nodeAddresses, err
|
||||
case <-time.Tick(2 * nodeAddressesRetryPeriod):
|
||||
return nil, fmt.Errorf("Timeout after %v waiting for address to appear", 2*nodeAddressesRetryPeriod)
|
||||
}
|
||||
}
|
||||
|
||||
func createNodeInternalIPAddress(address string) []v1.NodeAddress {
|
||||
return []v1.NodeAddress{
|
||||
{
|
||||
Type: v1.NodeInternalIP,
|
||||
Address: address,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeAddressesRequest(t *testing.T) {
|
||||
syncPeriod := 300 * time.Millisecond
|
||||
maxRetry := 5
|
||||
cloud := &fake.FakeCloud{
|
||||
Addresses: createNodeInternalIPAddress("10.0.1.12"),
|
||||
// Set the request delay so the manager timeouts and collects the node addresses later
|
||||
RequestDelay: 400 * time.Millisecond,
|
||||
}
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
manager := NewCloudResourceSyncManager(cloud, "defaultNode", syncPeriod)
|
||||
go manager.Run(stopCh)
|
||||
|
||||
nodeAddresses, err := collectNodeAddresses(manager)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected err: %q\n", err)
|
||||
}
|
||||
if !reflect.DeepEqual(nodeAddresses, cloud.Addresses) {
|
||||
t.Errorf("Unexpected list of node addresses %#v, expected %#v: %v", nodeAddresses, cloud.Addresses, err)
|
||||
}
|
||||
|
||||
// Change the IP address
|
||||
cloud.SetNodeAddresses(createNodeInternalIPAddress("10.0.1.13"))
|
||||
|
||||
// Wait until the IP address changes
|
||||
for i := 0; i < maxRetry; i++ {
|
||||
nodeAddresses, err := collectNodeAddresses(manager)
|
||||
t.Logf("nodeAddresses: %#v, err: %v", nodeAddresses, err)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected err: %q\n", err)
|
||||
}
|
||||
// It is safe to read cloud.Addresses since no routine is changing the value at the same time
|
||||
if err == nil && nodeAddresses[0].Address != cloud.Addresses[0].Address {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected err: %q\n", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
t.Errorf("Timeout waiting for %q address to appear", cloud.Addresses[0].Address)
|
||||
}
|
@ -545,10 +545,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
}
|
||||
|
||||
if klet.cloud != nil {
|
||||
klet.cloudproviderRequestParallelism = make(chan int, 1)
|
||||
klet.cloudproviderRequestSync = make(chan int)
|
||||
// TODO(jchaloup): Make it configurable via --cloud-provider-request-timeout
|
||||
klet.cloudproviderRequestTimeout = 10 * time.Second
|
||||
klet.cloudResourceSyncManager = NewCloudResourceSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency)
|
||||
}
|
||||
|
||||
var secretManager secret.Manager
|
||||
@ -999,14 +996,8 @@ type Kubelet struct {
|
||||
|
||||
// Cloud provider interface.
|
||||
cloud cloudprovider.Interface
|
||||
// To keep exclusive access to the cloudproviderRequestParallelism
|
||||
cloudproviderRequestMux sync.Mutex
|
||||
// Keep the count of requests processed in parallel (expected to be 1 at most at a given time)
|
||||
cloudproviderRequestParallelism chan int
|
||||
// Sync with finished requests
|
||||
cloudproviderRequestSync chan int
|
||||
// Request timeout
|
||||
cloudproviderRequestTimeout time.Duration
|
||||
// Handles requests to cloud provider with timeout
|
||||
cloudResourceSyncManager *cloudResourceSyncManager
|
||||
|
||||
// Indicates that the node initialization happens in an external cloud controller
|
||||
externalCloudProvider bool
|
||||
@ -1393,6 +1384,11 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
|
||||
// handled by pod workers).
|
||||
go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
|
||||
|
||||
// Start the cloud provider sync manager
|
||||
if kl.cloudResourceSyncManager != nil {
|
||||
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
|
||||
}
|
||||
|
||||
// Start component sync loops.
|
||||
kl.statusManager.Start()
|
||||
kl.probeManager.Start()
|
||||
|
@ -463,47 +463,11 @@ func (kl *Kubelet) setNodeAddress(node *v1.Node) error {
|
||||
return nil
|
||||
}
|
||||
if kl.cloud != nil {
|
||||
instances, ok := kl.cloud.Instances()
|
||||
if !ok {
|
||||
return fmt.Errorf("failed to get instances from cloud provider")
|
||||
}
|
||||
// TODO(roberthbailey): Can we do this without having credentials to talk
|
||||
// to the cloud provider?
|
||||
// TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface
|
||||
// TODO: If IP addresses couldn't be fetched from the cloud provider, should kubelet fallback on the other methods for getting the IP below?
|
||||
var nodeAddresses []v1.NodeAddress
|
||||
var err error
|
||||
|
||||
// Make sure the instances.NodeAddresses returns even if the cloud provider API hangs for a long time
|
||||
func() {
|
||||
kl.cloudproviderRequestMux.Lock()
|
||||
if len(kl.cloudproviderRequestParallelism) > 0 {
|
||||
kl.cloudproviderRequestMux.Unlock()
|
||||
return
|
||||
}
|
||||
kl.cloudproviderRequestParallelism <- 0
|
||||
kl.cloudproviderRequestMux.Unlock()
|
||||
|
||||
go func() {
|
||||
nodeAddresses, err = instances.NodeAddresses(context.TODO(), kl.nodeName)
|
||||
|
||||
kl.cloudproviderRequestMux.Lock()
|
||||
<-kl.cloudproviderRequestParallelism
|
||||
kl.cloudproviderRequestMux.Unlock()
|
||||
|
||||
kl.cloudproviderRequestSync <- 0
|
||||
}()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-kl.cloudproviderRequestSync:
|
||||
case <-time.After(kl.cloudproviderRequestTimeout):
|
||||
err = fmt.Errorf("Timeout after %v", kl.cloudproviderRequestTimeout)
|
||||
}
|
||||
|
||||
nodeAddresses, err := kl.cloudResourceSyncManager.NodeAddresses()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get node address from cloud provider: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if kl.nodeIP != nil {
|
||||
enforcedNodeAddresses := []v1.NodeAddress{}
|
||||
|
||||
|
@ -260,15 +260,16 @@ func TestNodeStatusWithCloudProviderNodeIP(t *testing.T) {
|
||||
Err: nil,
|
||||
}
|
||||
kubelet.cloud = fakeCloud
|
||||
kubelet.cloudproviderRequestParallelism = make(chan int, 1)
|
||||
kubelet.cloudproviderRequestSync = make(chan int)
|
||||
kubelet.cloudproviderRequestTimeout = 10 * time.Second
|
||||
kubelet.cloudResourceSyncManager = NewCloudResourceSyncManager(kubelet.cloud, kubelet.nodeName, kubelet.nodeStatusUpdateFrequency)
|
||||
stopCh := make(chan struct{})
|
||||
go kubelet.cloudResourceSyncManager.Run(stopCh)
|
||||
kubelet.nodeIPValidator = func(nodeIP net.IP) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// execute method
|
||||
err := kubelet.setNodeAddress(&existingNode)
|
||||
close(stopCh)
|
||||
if err != nil && !testCase.shouldError {
|
||||
t.Errorf("Unexpected error for test %s: %q", testCase.name, err)
|
||||
continue
|
||||
@ -1664,9 +1665,6 @@ func TestSetVolumeLimits(t *testing.T) {
|
||||
Err: nil,
|
||||
}
|
||||
kubelet.cloud = fakeCloud
|
||||
kubelet.cloudproviderRequestParallelism = make(chan int, 1)
|
||||
kubelet.cloudproviderRequestSync = make(chan int)
|
||||
kubelet.cloudproviderRequestTimeout = 10 * time.Second
|
||||
} else {
|
||||
kubelet.cloud = nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user