mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-06 02:34:03 +00:00
Job: Use ControllerRefManager to adopt/orphan.
This commit is contained in:
parent
bc423ac39d
commit
424de52779
@ -30,6 +30,7 @@ go_library(
|
|||||||
"//vendor/github.com/golang/glog:go_default_library",
|
"//vendor/github.com/golang/glog:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
|
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
|
||||||
@ -59,6 +60,7 @@ go_test(
|
|||||||
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||||
|
@ -25,6 +25,7 @@ import (
|
|||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
@ -284,6 +285,24 @@ func (jm *JobController) processNextWorkItem() bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getPodsForJob returns the set of pods that this Job should manage.
|
||||||
|
// It also reconciles ControllerRef by adopting/orphaning.
|
||||||
|
// Note that the returned Pods are pointers into the cache.
|
||||||
|
func (jm *JobController) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) {
|
||||||
|
selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("couldn't convert Job selector: %v", err)
|
||||||
|
}
|
||||||
|
// List all pods to include those that don't match the selector anymore
|
||||||
|
// but have a ControllerRef pointing to this controller.
|
||||||
|
pods, err := jm.podStore.Pods(j.Namespace).List(labels.Everything())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind)
|
||||||
|
return cm.ClaimPods(pods)
|
||||||
|
}
|
||||||
|
|
||||||
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
|
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
|
||||||
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
|
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
|
||||||
// concurrently with the same key.
|
// concurrently with the same key.
|
||||||
@ -315,8 +334,8 @@ func (jm *JobController) syncJob(key string) error {
|
|||||||
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
|
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
|
||||||
// the store after we've checked the expectation, the job sync is just deferred till the next relist.
|
// the store after we've checked the expectation, the job sync is just deferred till the next relist.
|
||||||
jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
|
jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
|
||||||
selector, _ := metav1.LabelSelectorAsSelector(job.Spec.Selector)
|
|
||||||
pods, err := jm.podStore.Pods(job.Namespace).List(selector)
|
pods, err := jm.getPodsForJob(&job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/rand"
|
"k8s.io/apimachinery/pkg/util/rand"
|
||||||
|
"k8s.io/apimachinery/pkg/util/uuid"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
restclient "k8s.io/client-go/rest"
|
restclient "k8s.io/client-go/rest"
|
||||||
@ -44,6 +45,7 @@ func newJob(parallelism, completions int32) *batch.Job {
|
|||||||
j := &batch.Job{
|
j := &batch.Job{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: "foobar",
|
Name: "foobar",
|
||||||
|
UID: uuid.NewUUID(),
|
||||||
Namespace: metav1.NamespaceDefault,
|
Namespace: metav1.NamespaceDefault,
|
||||||
},
|
},
|
||||||
Spec: batch.JobSpec{
|
Spec: batch.JobSpec{
|
||||||
@ -91,6 +93,7 @@ func getKey(job *batch.Job, t *testing.T) string {
|
|||||||
func newJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*JobController, informers.SharedInformerFactory) {
|
func newJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*JobController, informers.SharedInformerFactory) {
|
||||||
sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod())
|
sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod())
|
||||||
jm := NewJobController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient)
|
jm := NewJobController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient)
|
||||||
|
jm.podControl = &controller.FakePodControl{}
|
||||||
|
|
||||||
return jm, sharedInformers
|
return jm, sharedInformers
|
||||||
}
|
}
|
||||||
@ -605,6 +608,118 @@ func TestJobPodLookup(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newPod(name string, job *batch.Job) *v1.Pod {
|
||||||
|
return &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
Labels: job.Spec.Selector.MatchLabels,
|
||||||
|
Namespace: job.Namespace,
|
||||||
|
OwnerReferences: []metav1.OwnerReference{*newControllerRef(job)},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetPodsForJob(t *testing.T) {
|
||||||
|
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
|
||||||
|
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||||
|
jm.podStoreSynced = alwaysReady
|
||||||
|
jm.jobStoreSynced = alwaysReady
|
||||||
|
|
||||||
|
job1 := newJob(1, 1)
|
||||||
|
job1.Name = "job1"
|
||||||
|
job2 := newJob(1, 1)
|
||||||
|
job2.Name = "job2"
|
||||||
|
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
|
||||||
|
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
|
||||||
|
|
||||||
|
pod1 := newPod("pod1", job1)
|
||||||
|
pod2 := newPod("pod2", job2)
|
||||||
|
pod3 := newPod("pod3", job1)
|
||||||
|
// Make pod3 an orphan that doesn't match. It should be ignored.
|
||||||
|
pod3.OwnerReferences = nil
|
||||||
|
pod3.Labels = nil
|
||||||
|
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
|
||||||
|
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
|
||||||
|
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod3)
|
||||||
|
|
||||||
|
pods, err := jm.getPodsForJob(job1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("getPodsForJob() error: %v", err)
|
||||||
|
}
|
||||||
|
if got, want := len(pods), 1; got != want {
|
||||||
|
t.Errorf("len(pods) = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
if got, want := pods[0].Name, "pod1"; got != want {
|
||||||
|
t.Errorf("pod.Name = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
|
||||||
|
pods, err = jm.getPodsForJob(job2)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("getPodsForJob() error: %v", err)
|
||||||
|
}
|
||||||
|
if got, want := len(pods), 1; got != want {
|
||||||
|
t.Errorf("len(pods) = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
if got, want := pods[0].Name, "pod2"; got != want {
|
||||||
|
t.Errorf("pod.Name = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetPodsForJobAdopt(t *testing.T) {
|
||||||
|
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
|
||||||
|
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||||
|
jm.podStoreSynced = alwaysReady
|
||||||
|
jm.jobStoreSynced = alwaysReady
|
||||||
|
|
||||||
|
job1 := newJob(1, 1)
|
||||||
|
job1.Name = "job1"
|
||||||
|
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
|
||||||
|
|
||||||
|
pod1 := newPod("pod1", job1)
|
||||||
|
pod2 := newPod("pod2", job1)
|
||||||
|
// Make this pod an orphan. It should still be returned because it's adopted.
|
||||||
|
pod2.OwnerReferences = nil
|
||||||
|
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
|
||||||
|
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
|
||||||
|
|
||||||
|
pods, err := jm.getPodsForJob(job1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("getPodsForJob() error: %v", err)
|
||||||
|
}
|
||||||
|
if got, want := len(pods), 2; got != want {
|
||||||
|
t.Errorf("len(pods) = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetPodsForJobRelease(t *testing.T) {
|
||||||
|
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
|
||||||
|
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||||
|
jm.podStoreSynced = alwaysReady
|
||||||
|
jm.jobStoreSynced = alwaysReady
|
||||||
|
|
||||||
|
job1 := newJob(1, 1)
|
||||||
|
job1.Name = "job1"
|
||||||
|
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
|
||||||
|
|
||||||
|
pod1 := newPod("pod1", job1)
|
||||||
|
pod2 := newPod("pod2", job1)
|
||||||
|
// Make this pod not match, even though it's owned. It should be released.
|
||||||
|
pod2.Labels = nil
|
||||||
|
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
|
||||||
|
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
|
||||||
|
|
||||||
|
pods, err := jm.getPodsForJob(job1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("getPodsForJob() error: %v", err)
|
||||||
|
}
|
||||||
|
if got, want := len(pods), 1; got != want {
|
||||||
|
t.Errorf("len(pods) = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
if got, want := pods[0].Name, "pod1"; got != want {
|
||||||
|
t.Errorf("pod.Name = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type FakeJobExpectations struct {
|
type FakeJobExpectations struct {
|
||||||
*controller.ControllerExpectations
|
*controller.ControllerExpectations
|
||||||
satisfied bool
|
satisfied bool
|
||||||
|
Loading…
Reference in New Issue
Block a user