Merge pull request #50440 from bskiba/kubemark_e2e_open

Automatic merge from submit-queue (batch tested with PRs 45186, 50440)

Add functionality needed by Cluster Autoscaler to Kubemark Provider.

Make adding nodes asynchronous. Add method for getting target
size of node group. Add method for getting node group for node.
Factor out some common code.

**Release note**:
```
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-08-10 07:31:01 -07:00 committed by GitHub
commit eb700d86c5
3 changed files with 123 additions and 60 deletions

View File

@ -39,7 +39,6 @@ import (
const ( const (
namespaceKubemark = "kubemark" namespaceKubemark = "kubemark"
hollowNodeName = "hollow-node"
nodeGroupLabel = "autoscaling.k8s.io/nodegroup" nodeGroupLabel = "autoscaling.k8s.io/nodegroup"
numRetries = 3 numRetries = 3
) )
@ -52,6 +51,9 @@ type KubemarkController struct {
externalCluster externalCluster externalCluster externalCluster
kubemarkCluster kubemarkCluster kubemarkCluster kubemarkCluster
rand *rand.Rand rand *rand.Rand
createNodeQueue chan string
nodeGroupQueueSize map[string]int
nodeGroupQueueSizeLock sync.Mutex
} }
// externalCluster is used to communicate with the external cluster that hosts // externalCluster is used to communicate with the external cluster that hosts
@ -99,6 +101,9 @@ func NewKubemarkController(externalClient kubeclient.Interface, externalInformer
nodesToDeleteLock: sync.Mutex{}, nodesToDeleteLock: sync.Mutex{},
}, },
rand: rand.New(rand.NewSource(time.Now().UTC().UnixNano())), rand: rand.New(rand.NewSource(time.Now().UTC().UnixNano())),
createNodeQueue: make(chan string, 1000),
nodeGroupQueueSize: make(map[string]int),
nodeGroupQueueSizeLock: sync.Mutex{},
} }
kubemarkNodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ kubemarkNodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -108,27 +113,29 @@ func NewKubemarkController(externalClient kubeclient.Interface, externalInformer
return controller, nil return controller, nil
} }
// Init waits for population of caches and populates the node template needed // WaitForCacheSync waits until all caches in the controller are populated.
// for creation of kubemark nodes. func (kubemarkController *KubemarkController) WaitForCacheSync(stopCh chan struct{}) bool {
func (kubemarkController *KubemarkController) Init(stopCh chan struct{}) { return controller.WaitForCacheSync("kubemark", stopCh,
if !controller.WaitForCacheSync("kubemark", stopCh,
kubemarkController.externalCluster.rcSynced, kubemarkController.externalCluster.rcSynced,
kubemarkController.externalCluster.podSynced, kubemarkController.externalCluster.podSynced,
kubemarkController.kubemarkCluster.nodeSynced) { kubemarkController.kubemarkCluster.nodeSynced)
return
}
// Get hollow node template from an existing hollow node to be able to create
// new nodes based on it.
nodeTemplate, err := kubemarkController.getNodeTemplate()
if err != nil {
glog.Fatalf("Failed to get node template: %s", err)
}
kubemarkController.nodeTemplate = nodeTemplate
} }
// GetNodesForNodegroup returns list of the nodes in the node group. // Run populates the node template needed for creation of kubemark nodes and
func (kubemarkController *KubemarkController) GetNodeNamesForNodegroup(nodeGroup string) ([]string, error) { // starts the worker routine for creating new nodes.
func (kubemarkController *KubemarkController) Run(stopCh chan struct{}) {
nodeTemplate, err := kubemarkController.getNodeTemplate()
if err != nil {
glog.Fatalf("failed to get node template: %s", err)
}
kubemarkController.nodeTemplate = nodeTemplate
go kubemarkController.runNodeCreation(stopCh)
<-stopCh
}
// GetNodeNamesForNodeGroup returns list of the nodes in the node group.
func (kubemarkController *KubemarkController) GetNodeNamesForNodeGroup(nodeGroup string) ([]string, error) {
selector := labels.SelectorFromSet(labels.Set{nodeGroupLabel: nodeGroup}) selector := labels.SelectorFromSet(labels.Set{nodeGroupLabel: nodeGroup})
pods, err := kubemarkController.externalCluster.podLister.List(selector) pods, err := kubemarkController.externalCluster.podLister.List(selector)
if err != nil { if err != nil {
@ -141,7 +148,7 @@ func (kubemarkController *KubemarkController) GetNodeNamesForNodegroup(nodeGroup
return result, nil return result, nil
} }
// GetNodeGroupSize returns the current size for the node group. // GetNodeGroupSize returns the current size for the node group as observed.
func (kubemarkController *KubemarkController) GetNodeGroupSize(nodeGroup string) (int, error) { func (kubemarkController *KubemarkController) GetNodeGroupSize(nodeGroup string) (int, error) {
selector := labels.SelectorFromSet(labels.Set(map[string]string{nodeGroupLabel: nodeGroup})) selector := labels.SelectorFromSet(labels.Set(map[string]string{nodeGroupLabel: nodeGroup}))
nodes, err := kubemarkController.externalCluster.rcLister.List(selector) nodes, err := kubemarkController.externalCluster.rcLister.List(selector)
@ -151,21 +158,33 @@ func (kubemarkController *KubemarkController) GetNodeGroupSize(nodeGroup string)
return len(nodes), nil return len(nodes), nil
} }
// GetNodeGroupTargetSize returns the size of the node group as a sum of current
// observed size and number of upcoming nodes.
func (kubemarkController *KubemarkController) GetNodeGroupTargetSize(nodeGroup string) (int, error) {
kubemarkController.nodeGroupQueueSizeLock.Lock()
defer kubemarkController.nodeGroupQueueSizeLock.Unlock()
realSize, err := kubemarkController.GetNodeGroupSize(nodeGroup)
if err != nil {
return realSize, err
}
return realSize + kubemarkController.nodeGroupQueueSize[nodeGroup], nil
}
// SetNodeGroupSize changes the size of node group by adding or removing nodes. // SetNodeGroupSize changes the size of node group by adding or removing nodes.
func (kubemarkController *KubemarkController) SetNodeGroupSize(nodeGroup string, size int) error { func (kubemarkController *KubemarkController) SetNodeGroupSize(nodeGroup string, size int) error {
currSize, err := kubemarkController.GetNodeGroupSize(nodeGroup) currSize, err := kubemarkController.GetNodeGroupTargetSize(nodeGroup)
if err != nil { if err != nil {
return err return err
} }
switch delta := size - currSize; { switch delta := size - currSize; {
case delta < 0: case delta < 0:
absDelta := -delta absDelta := -delta
nodes, err := kubemarkController.GetNodeNamesForNodegroup(nodeGroup) nodes, err := kubemarkController.GetNodeNamesForNodeGroup(nodeGroup)
if err != nil { if err != nil {
return err return err
} }
if len(nodes) > absDelta { if len(nodes) < absDelta {
return fmt.Errorf("can't remove %d nodes from %s nodegroup, not enough nodes", absDelta, nodeGroup) return fmt.Errorf("can't remove %d nodes from %s nodegroup, not enough nodes: %d", absDelta, nodeGroup, len(nodes))
} }
for i, node := range nodes { for i, node := range nodes {
if i == absDelta { if i == absDelta {
@ -176,16 +195,31 @@ func (kubemarkController *KubemarkController) SetNodeGroupSize(nodeGroup string,
} }
} }
case delta > 0: case delta > 0:
kubemarkController.nodeGroupQueueSizeLock.Lock()
for i := 0; i < delta; i++ { for i := 0; i < delta; i++ {
if err := kubemarkController.addNodeToNodeGroup(nodeGroup); err != nil { kubemarkController.nodeGroupQueueSize[nodeGroup]++
return err kubemarkController.createNodeQueue <- nodeGroup
}
} }
kubemarkController.nodeGroupQueueSizeLock.Unlock()
} }
return nil return nil
} }
// GetNodeGroupForNode returns the name of the node group to which the node
// belongs.
func (kubemarkController *KubemarkController) GetNodeGroupForNode(node string) (string, error) {
pod := kubemarkController.getPodByName(node)
if pod == nil {
return "", fmt.Errorf("node %s does not exist", node)
}
nodeGroup, ok := pod.ObjectMeta.Labels[nodeGroupLabel]
if ok {
return nodeGroup, nil
}
return "", fmt.Errorf("can't find nodegroup for node %s due to missing label %s", node, nodeGroupLabel)
}
func (kubemarkController *KubemarkController) addNodeToNodeGroup(nodeGroup string) error { func (kubemarkController *KubemarkController) addNodeToNodeGroup(nodeGroup string) error {
templateCopy, err := api.Scheme.Copy(kubemarkController.nodeTemplate) templateCopy, err := api.Scheme.Copy(kubemarkController.nodeTemplate)
if err != nil { if err != nil {
@ -207,18 +241,18 @@ func (kubemarkController *KubemarkController) addNodeToNodeGroup(nodeGroup strin
} }
func (kubemarkController *KubemarkController) removeNodeFromNodeGroup(nodeGroup string, node string) error { func (kubemarkController *KubemarkController) removeNodeFromNodeGroup(nodeGroup string, node string) error {
pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything()) pod := kubemarkController.getPodByName(node)
if err != nil { if pod == nil {
return err glog.Warningf("Can't delete node %s from nodegroup %s. Node does not exist.", node, nodeGroup)
return nil
} }
for _, pod := range pods {
if pod.ObjectMeta.Name == node {
if pod.ObjectMeta.Labels[nodeGroupLabel] != nodeGroup { if pod.ObjectMeta.Labels[nodeGroupLabel] != nodeGroup {
return fmt.Errorf("can't delete node %s from nodegroup %s. Node is not in nodegroup", node, nodeGroup) return fmt.Errorf("can't delete node %s from nodegroup %s. Node is not in nodegroup", node, nodeGroup)
} }
policy := metav1.DeletePropagationForeground policy := metav1.DeletePropagationForeground
var err error
for i := 0; i < numRetries; i++ { for i := 0; i < numRetries; i++ {
err := kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(namespaceKubemark).Delete( err = kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(namespaceKubemark).Delete(
pod.ObjectMeta.Labels["name"], pod.ObjectMeta.Labels["name"],
&metav1.DeleteOptions{PropagationPolicy: &policy}) &metav1.DeleteOptions{PropagationPolicy: &policy})
if err == nil { if err == nil {
@ -232,10 +266,7 @@ func (kubemarkController *KubemarkController) removeNodeFromNodeGroup(nodeGroup
return nil return nil
} }
} }
} return fmt.Errorf("Failed to delete node %s: %v", node, err)
}
return fmt.Errorf("can't delete node %s from nodegroup %s. Node does not exist", node, nodeGroup)
} }
func (kubemarkController *KubemarkController) getReplicationControllerByName(name string) *apiv1.ReplicationController { func (kubemarkController *KubemarkController) getReplicationControllerByName(name string) *apiv1.ReplicationController {
@ -251,6 +282,19 @@ func (kubemarkController *KubemarkController) getReplicationControllerByName(nam
return nil return nil
} }
func (kubemarkController *KubemarkController) getPodByName(name string) *apiv1.Pod {
pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything())
if err != nil {
return nil
}
for _, pod := range pods {
if pod.ObjectMeta.Name == name {
return pod
}
}
return nil
}
func (kubemarkController *KubemarkController) getNodeNameForPod(podName string) (string, error) { func (kubemarkController *KubemarkController) getNodeNameForPod(podName string) (string, error) {
pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything()) pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything())
if err != nil { if err != nil {
@ -293,6 +337,24 @@ func (kubemarkController *KubemarkController) getNodeTemplate() (*apiv1.Replicat
return nil, fmt.Errorf("can't get hollow node template") return nil, fmt.Errorf("can't get hollow node template")
} }
func (kubemarkController *KubemarkController) runNodeCreation(stop <-chan struct{}) {
for {
select {
case nodeGroup := <-kubemarkController.createNodeQueue:
kubemarkController.nodeGroupQueueSizeLock.Lock()
err := kubemarkController.addNodeToNodeGroup(nodeGroup)
if err != nil {
glog.Errorf("failed to add node to node group %s: %v", nodeGroup, err)
} else {
kubemarkController.nodeGroupQueueSize[nodeGroup]--
}
kubemarkController.nodeGroupQueueSizeLock.Unlock()
case <-stop:
return
}
}
}
func (kubemarkCluster *kubemarkCluster) getHollowNodeName() (string, error) { func (kubemarkCluster *kubemarkCluster) getHollowNodeName() (string, error) {
nodes, err := kubemarkCluster.nodeLister.List(labels.Everything()) nodes, err := kubemarkCluster.nodeLister.List(labels.Everything())
if err != nil { if err != nil {
@ -318,7 +380,7 @@ func (kubemarkCluster *kubemarkCluster) removeUnneededNodes(oldObj interface{},
if kubemarkCluster.nodesToDelete[node.Name] { if kubemarkCluster.nodesToDelete[node.Name] {
kubemarkCluster.nodesToDelete[node.Name] = false kubemarkCluster.nodesToDelete[node.Name] = false
if err := kubemarkCluster.client.CoreV1().Nodes().Delete(node.Name, &metav1.DeleteOptions{}); err != nil { if err := kubemarkCluster.client.CoreV1().Nodes().Delete(node.Name, &metav1.DeleteOptions{}); err != nil {
glog.Errorf("failed to delete node %s from kubemark cluster", node.Name) glog.Errorf("failed to delete node %s from kubemark cluster, err: %v", node.Name, err)
} }
} }
return return

View File

@ -215,7 +215,8 @@ func (f *Framework) BeforeEach() {
TestContext.CloudConfig.KubemarkController, err = kubemark.NewKubemarkController(f.KubemarkExternalClusterClientSet, externalInformerFactory, f.ClientSet, kubemarkNodeInformer) TestContext.CloudConfig.KubemarkController, err = kubemark.NewKubemarkController(f.KubemarkExternalClusterClientSet, externalInformerFactory, f.ClientSet, kubemarkNodeInformer)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
externalInformerFactory.Start(f.kubemarkControllerCloseChannel) externalInformerFactory.Start(f.kubemarkControllerCloseChannel)
TestContext.CloudConfig.KubemarkController.Init(f.kubemarkControllerCloseChannel) Expect(TestContext.CloudConfig.KubemarkController.WaitForCacheSync(f.kubemarkControllerCloseChannel)).To(BeTrue())
go TestContext.CloudConfig.KubemarkController.Run(f.kubemarkControllerCloseChannel)
} }
} }

View File

@ -75,7 +75,7 @@ func GetGroupNodes(group string) ([]string, error) {
} }
return lines, nil return lines, nil
} else if TestContext.Provider == "kubemark" { } else if TestContext.Provider == "kubemark" {
return TestContext.CloudConfig.KubemarkController.GetNodeNamesForNodegroup(group) return TestContext.CloudConfig.KubemarkController.GetNodeNamesForNodeGroup(group)
} else { } else {
return nil, fmt.Errorf("provider does not support InstanceGroups") return nil, fmt.Errorf("provider does not support InstanceGroups")
} }