Merge pull request #90978 from brianpursley/kubernetes-89259

Changed scheduler to use patch when updating pod status
This commit is contained in:
Kubernetes Prow Robot 2020-05-15 02:47:53 -07:00 committed by GitHub
commit 78abe8b270
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 263 additions and 8 deletions

View File

@ -37,6 +37,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",

View File

@ -18,6 +18,7 @@ package scheduler
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
@ -27,6 +28,8 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
@ -718,11 +721,23 @@ type podConditionUpdaterImpl struct {
func (p *podConditionUpdaterImpl) update(pod *v1.Pod, condition *v1.PodCondition) error {
klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s, Reason=%s)", pod.Namespace, pod.Name, condition.Type, condition.Status, condition.Reason)
if podutil.UpdatePodCondition(&pod.Status, condition) {
_, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{})
oldData, err := json.Marshal(pod)
if err != nil {
return err
}
return nil
if !podutil.UpdatePodCondition(&pod.Status, condition) {
return nil
}
newData, err := json.Marshal(pod)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{})
if err != nil {
return fmt.Errorf("failed to create merge patch for pod %q/%q: %v", pod.Namespace, pod.Name, err)
}
_, err = p.Client.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
return err
}
type podPreemptorImpl struct {
@ -738,9 +753,25 @@ func (p *podPreemptorImpl) deletePod(pod *v1.Pod) error {
}
func (p *podPreemptorImpl) setNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error {
klog.V(3).Infof("Setting nominated node name for %s/%s to \"%s\"", pod.Namespace, pod.Name, nominatedNodeName)
if pod.Status.NominatedNodeName == nominatedNodeName {
return nil
}
podCopy := pod.DeepCopy()
oldData, err := json.Marshal(podCopy)
if err != nil {
return err
}
podCopy.Status.NominatedNodeName = nominatedNodeName
_, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), podCopy, metav1.UpdateOptions{})
newData, err := json.Marshal(podCopy)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{})
if err != nil {
return fmt.Errorf("failed to create merge patch for pod %q/%q: %v", pod.Namespace, pod.Name, err)
}
_, err = p.Client.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
return err
}

View File

@ -24,6 +24,7 @@ import (
"os"
"path"
"reflect"
"regexp"
"sort"
"strings"
"sync"
@ -1316,3 +1317,227 @@ func TestInjectingPluginConfigForVolumeBinding(t *testing.T) {
}
}
}
func TestSetNominatedNodeName(t *testing.T) {
tests := []struct {
name string
currentNominatedNodeName string
newNominatedNodeName string
expectedPatchRequests int
expectedPatchData string
}{
{
name: "Should make patch request to set node name",
currentNominatedNodeName: "",
newNominatedNodeName: "node1",
expectedPatchRequests: 1,
expectedPatchData: `{"status":{"nominatedNodeName":"node1"}}`,
},
{
name: "Should make patch request to clear node name",
currentNominatedNodeName: "node1",
newNominatedNodeName: "",
expectedPatchRequests: 1,
expectedPatchData: `{"status":{"nominatedNodeName":null}}`,
},
{
name: "Should not make patch request if nominated node is already set to the specified value",
currentNominatedNodeName: "node1",
newNominatedNodeName: "node1",
expectedPatchRequests: 0,
},
{
name: "Should not make patch request if nominated node is already cleared",
currentNominatedNodeName: "",
newNominatedNodeName: "",
expectedPatchRequests: 0,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
actualPatchRequests := 0
var actualPatchData string
cs := &clientsetfake.Clientset{}
cs.AddReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
actualPatchRequests++
patch := action.(clienttesting.PatchAction)
actualPatchData = string(patch.GetPatch())
// For this test, we don't care about the result of the patched pod, just that we got the expected
// patch request, so just returning &v1.Pod{} here is OK because scheduler doesn't use the response.
return true, &v1.Pod{}, nil
})
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Status: v1.PodStatus{NominatedNodeName: test.currentNominatedNodeName},
}
preemptor := &podPreemptorImpl{Client: cs}
if err := preemptor.setNominatedNodeName(pod, test.newNominatedNodeName); err != nil {
t.Fatalf("Error calling setNominatedNodeName: %v", err)
}
if actualPatchRequests != test.expectedPatchRequests {
t.Fatalf("Actual patch requests (%d) dos not equal expected patch requests (%d)", actualPatchRequests, test.expectedPatchRequests)
}
if test.expectedPatchRequests > 0 && actualPatchData != test.expectedPatchData {
t.Fatalf("Patch data mismatch: Actual was %v, but expected %v", actualPatchData, test.expectedPatchData)
}
})
}
}
func TestUpdatePodCondition(t *testing.T) {
tests := []struct {
name string
currentPodConditions []v1.PodCondition
newPodCondition *v1.PodCondition
expectedPatchRequests int
expectedPatchDataPattern string
}{
{
name: "Should make patch request to add pod condition when there are none currently",
currentPodConditions: []v1.PodCondition{},
newPodCondition: &v1.PodCondition{
Type: "newType",
Status: "newStatus",
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 1, 1, 1, 1, time.UTC)),
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 1, 1, 1, 1, time.UTC)),
Reason: "newReason",
Message: "newMessage",
},
expectedPatchRequests: 1,
expectedPatchDataPattern: `{"status":{"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","lastTransitionTime":".*","message":"newMessage","reason":"newReason","status":"newStatus","type":"newType"}]}}`,
},
{
name: "Should make patch request to add a new pod condition when there is already one with another type",
currentPodConditions: []v1.PodCondition{
{
Type: "someOtherType",
Status: "someOtherTypeStatus",
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 11, 0, 0, 0, 0, time.UTC)),
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 10, 0, 0, 0, 0, time.UTC)),
Reason: "someOtherTypeReason",
Message: "someOtherTypeMessage",
},
},
newPodCondition: &v1.PodCondition{
Type: "newType",
Status: "newStatus",
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 1, 1, 1, 1, time.UTC)),
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 1, 1, 1, 1, time.UTC)),
Reason: "newReason",
Message: "newMessage",
},
expectedPatchRequests: 1,
expectedPatchDataPattern: `{"status":{"\$setElementOrder/conditions":\[{"type":"someOtherType"},{"type":"newType"}],"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","lastTransitionTime":".*","message":"newMessage","reason":"newReason","status":"newStatus","type":"newType"}]}}`,
},
{
name: "Should make patch request to update an existing pod condition",
currentPodConditions: []v1.PodCondition{
{
Type: "currentType",
Status: "currentStatus",
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
Reason: "currentReason",
Message: "currentMessage",
},
},
newPodCondition: &v1.PodCondition{
Type: "currentType",
Status: "newStatus",
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 1, 1, 1, 1, time.UTC)),
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 1, 1, 1, 1, time.UTC)),
Reason: "newReason",
Message: "newMessage",
},
expectedPatchRequests: 1,
expectedPatchDataPattern: `{"status":{"\$setElementOrder/conditions":\[{"type":"currentType"}],"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","lastTransitionTime":".*","message":"newMessage","reason":"newReason","status":"newStatus","type":"currentType"}]}}`,
},
{
name: "Should make patch request to update an existing pod condition, but the transition time should remain unchanged because the status is the same",
currentPodConditions: []v1.PodCondition{
{
Type: "currentType",
Status: "currentStatus",
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
Reason: "currentReason",
Message: "currentMessage",
},
},
newPodCondition: &v1.PodCondition{
Type: "currentType",
Status: "currentStatus",
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 1, 1, 1, 1, time.UTC)),
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
Reason: "newReason",
Message: "newMessage",
},
expectedPatchRequests: 1,
expectedPatchDataPattern: `{"status":{"\$setElementOrder/conditions":\[{"type":"currentType"}],"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","message":"newMessage","reason":"newReason","type":"currentType"}]}}`,
},
{
name: "Should not make patch request if pod condition already exists and is identical",
currentPodConditions: []v1.PodCondition{
{
Type: "currentType",
Status: "currentStatus",
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
Reason: "currentReason",
Message: "currentMessage",
},
},
newPodCondition: &v1.PodCondition{
Type: "currentType",
Status: "currentStatus",
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
Reason: "currentReason",
Message: "currentMessage",
},
expectedPatchRequests: 0,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
actualPatchRequests := 0
var actualPatchData string
cs := &clientsetfake.Clientset{}
cs.AddReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
actualPatchRequests++
patch := action.(clienttesting.PatchAction)
actualPatchData = string(patch.GetPatch())
// For this test, we don't care about the result of the patched pod, just that we got the expected
// patch request, so just returning &v1.Pod{} here is OK because scheduler doesn't use the response.
return true, &v1.Pod{}, nil
})
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Status: v1.PodStatus{Conditions: test.currentPodConditions},
}
updater := &podConditionUpdaterImpl{Client: cs}
if err := updater.update(pod, test.newPodCondition); err != nil {
t.Fatalf("Error calling update: %v", err)
}
if actualPatchRequests != test.expectedPatchRequests {
t.Fatalf("Actual patch requests (%d) dos not equal expected patch requests (%d)", actualPatchRequests, test.expectedPatchRequests)
}
regex, err := regexp.Compile(test.expectedPatchDataPattern)
if err != nil {
t.Fatalf("Error compiling regexp for %v: %v", test.expectedPatchDataPattern, err)
}
if test.expectedPatchRequests > 0 && !regex.MatchString(actualPatchData) {
t.Fatalf("Patch data mismatch: Actual was %v, but expected to match regexp %v", actualPatchData, test.expectedPatchDataPattern)
}
})
}
}

View File

@ -45,7 +45,7 @@ import (
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/plugin/pkg/admission/priority"
testutils "k8s.io/kubernetes/test/integration/util"
utils "k8s.io/kubernetes/test/utils"
"k8s.io/kubernetes/test/utils"
)
var lowPriority, mediumPriority, highPriority = int32(100), int32(200), int32(300)
@ -629,8 +629,6 @@ func mkPriorityPodWithGrace(tc *testutils.TestContext, name string, priority int
Labels: map[string]string{"pod": name},
Resources: defaultPodRes,
})
// Setting grace period to zero. Otherwise, we may never see the actual deletion
// of the pods in integration tests.
pod.Spec.TerminationGracePeriodSeconds = &grace
return pod
}
@ -924,7 +922,7 @@ func TestNominatedNodeCleanUp(t *testing.T) {
}
// Step 5. Check that nominated node name of the high priority pod is set.
if err := waitForNominatedNodeName(cs, highPriPod); err != nil {
t.Errorf("NominatedNodeName annotation was not set for pod %v/%v: %v", medPriPod.Namespace, medPriPod.Name, err)
t.Errorf("NominatedNodeName annotation was not set for pod %v/%v: %v", highPriPod.Namespace, highPriPod.Name, err)
}
// And the nominated node name of the medium priority pod is cleared.
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {