mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-10-31 05:40:42 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			117 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			117 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2018 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package kubelet
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"k8s.io/api/core/v1"
 | |
| 	"k8s.io/apimachinery/pkg/types"
 | |
| 	"k8s.io/apimachinery/pkg/util/wait"
 | |
| 	"k8s.io/kubernetes/pkg/cloudprovider"
 | |
| 
 | |
| 	"github.com/golang/glog"
 | |
| )
 | |
| 
 | |
| var nodeAddressesRetryPeriod = 5 * time.Second
 | |
| 
 | |
| type cloudResourceSyncManager struct {
 | |
| 	// Cloud provider interface.
 | |
| 	cloud cloudprovider.Interface
 | |
| 	// Sync period
 | |
| 	syncPeriod time.Duration
 | |
| 
 | |
| 	nodeAddressesMux sync.Mutex
 | |
| 	nodeAddressesErr error
 | |
| 	nodeAddresses    []v1.NodeAddress
 | |
| 
 | |
| 	nodeName types.NodeName
 | |
| }
 | |
| 
 | |
| // NewCloudResourceSyncManager creates a manager responsible for collecting resources
 | |
| // from a cloud provider through requests that are sensitive to timeouts and hanging
 | |
| func NewCloudResourceSyncManager(cloud cloudprovider.Interface, nodeName types.NodeName, syncPeriod time.Duration) *cloudResourceSyncManager {
 | |
| 	return &cloudResourceSyncManager{
 | |
| 		cloud:      cloud,
 | |
| 		syncPeriod: syncPeriod,
 | |
| 		nodeName:   nodeName,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (manager *cloudResourceSyncManager) getNodeAddressSafe() ([]v1.NodeAddress, error) {
 | |
| 	manager.nodeAddressesMux.Lock()
 | |
| 	defer manager.nodeAddressesMux.Unlock()
 | |
| 
 | |
| 	return manager.nodeAddresses, manager.nodeAddressesErr
 | |
| }
 | |
| 
 | |
| func (manager *cloudResourceSyncManager) setNodeAddressSafe(nodeAddresses []v1.NodeAddress, err error) {
 | |
| 	manager.nodeAddressesMux.Lock()
 | |
| 	defer manager.nodeAddressesMux.Unlock()
 | |
| 
 | |
| 	manager.nodeAddresses = nodeAddresses
 | |
| 	manager.nodeAddressesErr = err
 | |
| }
 | |
| 
 | |
| // NodeAddresses does not wait for cloud provider to return a node addresses.
 | |
| // It always returns node addresses or an error.
 | |
| func (manager *cloudResourceSyncManager) NodeAddresses() ([]v1.NodeAddress, error) {
 | |
| 	// wait until there is something
 | |
| 	for {
 | |
| 		nodeAddresses, err := manager.getNodeAddressSafe()
 | |
| 		if len(nodeAddresses) == 0 && err == nil {
 | |
| 			glog.V(5).Infof("Waiting for %v for cloud provider to provide node addresses", nodeAddressesRetryPeriod)
 | |
| 			time.Sleep(nodeAddressesRetryPeriod)
 | |
| 			continue
 | |
| 		}
 | |
| 		return nodeAddresses, err
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (manager *cloudResourceSyncManager) collectNodeAddresses(ctx context.Context, nodeName types.NodeName) {
 | |
| 	glog.V(2).Infof("Requesting node addresses from cloud provider for node %q", nodeName)
 | |
| 
 | |
| 	instances, ok := manager.cloud.Instances()
 | |
| 	if !ok {
 | |
| 		manager.setNodeAddressSafe(nil, fmt.Errorf("failed to get instances from cloud provider"))
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// TODO(roberthbailey): Can we do this without having credentials to talk
 | |
| 	// to the cloud provider?
 | |
| 	// TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface
 | |
| 	// TODO: If IP addresses couldn't be fetched from the cloud provider, should kubelet fallback on the other methods for getting the IP below?
 | |
| 
 | |
| 	nodeAddresses, err := instances.NodeAddresses(ctx, nodeName)
 | |
| 	if err != nil {
 | |
| 		manager.setNodeAddressSafe(nil, fmt.Errorf("failed to get node address from cloud provider: %v", err))
 | |
| 		glog.V(2).Infof("Node addresses from cloud provider for node %q not collected", nodeName)
 | |
| 	} else {
 | |
| 		manager.setNodeAddressSafe(nodeAddresses, nil)
 | |
| 		glog.V(2).Infof("Node addresses from cloud provider for node %q collected", nodeName)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (manager *cloudResourceSyncManager) Run(stopCh <-chan struct{}) {
 | |
| 	wait.Until(func() {
 | |
| 		manager.collectNodeAddresses(context.TODO(), manager.nodeName)
 | |
| 	}, manager.syncPeriod, stopCh)
 | |
| }
 |