Reconcile extended resource capacity after kubelet restart.

This commit is contained in:
Jiaying Zhang 2018-06-04 16:01:16 -07:00
parent 46d2b47156
commit 35efc4f96a
2 changed files with 112 additions and 0 deletions

View File

@ -127,6 +127,7 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
// annotation.
requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode)
requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
requiresUpdate = kl.reconcileExtendedResource(node, existingNode) || requiresUpdate
if requiresUpdate {
if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil {
glog.Errorf("Unable to reconcile node %q with API server: error updating node: %v", kl.nodeName, err)
@ -137,6 +138,19 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
return true
}
// Zeros out extended resource capacity during reconciliation.
func (kl *Kubelet) reconcileExtendedResource(initialNode, node *v1.Node) bool {
requiresUpdate := false
for k := range node.Status.Capacity {
if v1helper.IsExtendedResourceName(k) {
node.Status.Capacity[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
node.Status.Allocatable[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
requiresUpdate = true
}
}
return requiresUpdate
}
// updateDefaultLabels will set the default labels on the node
func (kl *Kubelet) updateDefaultLabels(initialNode, existingNode *v1.Node) bool {
defaultLabels := []string{

View File

@ -1526,6 +1526,104 @@ func TestUpdateDefaultLabels(t *testing.T) {
}
}
func TestReconcileExtendedResource(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
testKubelet.kubelet.kubeClient = nil // ensure only the heartbeat client is used
extendedResourceName1 := v1.ResourceName("test.com/resource1")
extendedResourceName2 := v1.ResourceName("test.com/resource2")
cases := []struct {
name string
existingNode *v1.Node
expectedNode *v1.Node
needsUpdate bool
}{
{
name: "no update needed without extended resource",
existingNode: &v1.Node{
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
},
},
expectedNode: &v1.Node{
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
},
},
needsUpdate: false,
},
{
name: "extended resource capacity is zeroed",
existingNode: &v1.Node{
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
extendedResourceName1: *resource.NewQuantity(int64(2), resource.DecimalSI),
extendedResourceName2: *resource.NewQuantity(int64(10), resource.DecimalSI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
extendedResourceName1: *resource.NewQuantity(int64(2), resource.DecimalSI),
extendedResourceName2: *resource.NewQuantity(int64(10), resource.DecimalSI),
},
},
},
expectedNode: &v1.Node{
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
extendedResourceName1: *resource.NewQuantity(int64(0), resource.DecimalSI),
extendedResourceName2: *resource.NewQuantity(int64(0), resource.DecimalSI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
extendedResourceName1: *resource.NewQuantity(int64(0), resource.DecimalSI),
extendedResourceName2: *resource.NewQuantity(int64(0), resource.DecimalSI),
},
},
},
needsUpdate: true,
},
}
for _, tc := range cases {
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
initialNode := &v1.Node{}
needsUpdate := kubelet.reconcileExtendedResource(initialNode, tc.existingNode)
assert.Equal(t, tc.needsUpdate, needsUpdate, tc.name)
assert.Equal(t, tc.expectedNode, tc.existingNode, tc.name)
}
}
func TestValidateNodeIPParam(t *testing.T) {
type test struct {
nodeIP string