Changed scheduler to use patch when updating pod status to avoid potential conflicts

This commit is contained in:
Brian Pursley
2020-05-14 15:17:53 -04:00
parent 5bda0c1b3b
commit 9eb8e7a6d6
4 changed files with 263 additions and 8 deletions

View File

@@ -18,6 +18,7 @@ package scheduler
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
@@ -27,6 +28,8 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
@@ -718,11 +721,23 @@ type podConditionUpdaterImpl struct {
func (p *podConditionUpdaterImpl) update(pod *v1.Pod, condition *v1.PodCondition) error {
klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s, Reason=%s)", pod.Namespace, pod.Name, condition.Type, condition.Status, condition.Reason)
if podutil.UpdatePodCondition(&pod.Status, condition) {
_, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{})
oldData, err := json.Marshal(pod)
if err != nil {
return err
}
return nil
if !podutil.UpdatePodCondition(&pod.Status, condition) {
return nil
}
newData, err := json.Marshal(pod)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{})
if err != nil {
return fmt.Errorf("failed to create merge patch for pod %q/%q: %v", pod.Namespace, pod.Name, err)
}
_, err = p.Client.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
return err
}
type podPreemptorImpl struct {
@@ -738,9 +753,25 @@ func (p *podPreemptorImpl) deletePod(pod *v1.Pod) error {
}
func (p *podPreemptorImpl) setNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error {
klog.V(3).Infof("Setting nominated node name for %s/%s to \"%s\"", pod.Namespace, pod.Name, nominatedNodeName)
if pod.Status.NominatedNodeName == nominatedNodeName {
return nil
}
podCopy := pod.DeepCopy()
oldData, err := json.Marshal(podCopy)
if err != nil {
return err
}
podCopy.Status.NominatedNodeName = nominatedNodeName
_, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), podCopy, metav1.UpdateOptions{})
newData, err := json.Marshal(podCopy)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{})
if err != nil {
return fmt.Errorf("failed to create merge patch for pod %q/%q: %v", pod.Namespace, pod.Name, err)
}
_, err = p.Client.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
return err
}