mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-13 11:25:19 +00:00
Move internal types of job from pkg/apis/extensions to pkg/apis/batch
This commit is contained in:
@@ -25,7 +25,7 @@ import (
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
"k8s.io/kubernetes/pkg/apis/batch"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
|
||||
@@ -53,7 +53,7 @@ type JobController struct {
|
||||
internalPodInformer framework.SharedInformer
|
||||
|
||||
// To allow injection of updateJobStatus for testing.
|
||||
updateHandler func(job *extensions.Job) error
|
||||
updateHandler func(job *batch.Job) error
|
||||
syncHandler func(jobKey string) error
|
||||
// podStoreSynced returns true if the pod store has been synced at least once.
|
||||
// Added as a member to the struct to allow injection for testing.
|
||||
@@ -96,19 +96,19 @@ func NewJobController(podInformer framework.SharedInformer, kubeClient clientset
|
||||
jm.jobStore.Store, jm.jobController = framework.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return jm.kubeClient.Extensions().Jobs(api.NamespaceAll).List(options)
|
||||
return jm.kubeClient.Batch().Jobs(api.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return jm.kubeClient.Extensions().Jobs(api.NamespaceAll).Watch(options)
|
||||
return jm.kubeClient.Batch().Jobs(api.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&extensions.Job{},
|
||||
&batch.Job{},
|
||||
// TODO: Can we have much longer period here?
|
||||
replicationcontroller.FullControllerResyncPeriod,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: jm.enqueueController,
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
if job := cur.(*extensions.Job); !isJobFinished(job) {
|
||||
if job := cur.(*batch.Job); !isJobFinished(job) {
|
||||
jm.enqueueController(job)
|
||||
}
|
||||
},
|
||||
@@ -155,7 +155,7 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
|
||||
}
|
||||
|
||||
// getPodJob returns the job managing the given pod.
|
||||
func (jm *JobController) getPodJob(pod *api.Pod) *extensions.Job {
|
||||
func (jm *JobController) getPodJob(pod *api.Pod) *batch.Job {
|
||||
jobs, err := jm.jobStore.GetPodJobs(pod)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("No jobs found for pod %v, job controller will avoid syncing", pod.Name)
|
||||
@@ -251,7 +251,7 @@ func (jm *JobController) deletePod(obj interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
// obj could be an *extensions.Job, or a DeletionFinalStateUnknown marker item.
|
||||
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
|
||||
func (jm *JobController) enqueueController(obj interface{}) {
|
||||
key, err := controller.KeyFunc(obj)
|
||||
if err != nil {
|
||||
@@ -314,7 +314,7 @@ func (jm *JobController) syncJob(key string) error {
|
||||
jm.queue.Add(key)
|
||||
return err
|
||||
}
|
||||
job := *obj.(*extensions.Job)
|
||||
job := *obj.(*batch.Job)
|
||||
|
||||
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
|
||||
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
|
||||
@@ -366,7 +366,7 @@ func (jm *JobController) syncJob(key string) error {
|
||||
// update status values accordingly
|
||||
failed += active
|
||||
active = 0
|
||||
job.Status.Conditions = append(job.Status.Conditions, newCondition(extensions.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline"))
|
||||
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline"))
|
||||
jm.recorder.Event(&job, api.EventTypeNormal, "DeadlineExceeded", "Job was active longer than specified deadline")
|
||||
} else {
|
||||
if jobNeedsSync {
|
||||
@@ -400,7 +400,7 @@ func (jm *JobController) syncJob(key string) error {
|
||||
}
|
||||
}
|
||||
if complete {
|
||||
job.Status.Conditions = append(job.Status.Conditions, newCondition(extensions.JobComplete, "", ""))
|
||||
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
|
||||
now := unversioned.Now()
|
||||
job.Status.CompletionTime = &now
|
||||
}
|
||||
@@ -421,7 +421,7 @@ func (jm *JobController) syncJob(key string) error {
|
||||
}
|
||||
|
||||
// pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
|
||||
func pastActiveDeadline(job *extensions.Job) bool {
|
||||
func pastActiveDeadline(job *batch.Job) bool {
|
||||
if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil {
|
||||
return false
|
||||
}
|
||||
@@ -432,8 +432,8 @@ func pastActiveDeadline(job *extensions.Job) bool {
|
||||
return duration >= allowedDuration
|
||||
}
|
||||
|
||||
func newCondition(conditionType extensions.JobConditionType, reason, message string) extensions.JobCondition {
|
||||
return extensions.JobCondition{
|
||||
func newCondition(conditionType batch.JobConditionType, reason, message string) batch.JobCondition {
|
||||
return batch.JobCondition{
|
||||
Type: conditionType,
|
||||
Status: api.ConditionTrue,
|
||||
LastProbeTime: unversioned.Now(),
|
||||
@@ -452,7 +452,7 @@ func getStatus(pods []api.Pod) (succeeded, failed int) {
|
||||
|
||||
// manageJob is the core method responsible for managing the number of running
|
||||
// pods according to what is specified in the job.Spec.
|
||||
func (jm *JobController) manageJob(activePods []*api.Pod, succeeded int, job *extensions.Job) int {
|
||||
func (jm *JobController) manageJob(activePods []*api.Pod, succeeded int, job *batch.Job) int {
|
||||
var activeLock sync.Mutex
|
||||
active := len(activePods)
|
||||
parallelism := *job.Spec.Parallelism
|
||||
@@ -538,8 +538,8 @@ func (jm *JobController) manageJob(activePods []*api.Pod, succeeded int, job *ex
|
||||
return active
|
||||
}
|
||||
|
||||
func (jm *JobController) updateJobStatus(job *extensions.Job) error {
|
||||
_, err := jm.kubeClient.Extensions().Jobs(job.Namespace).UpdateStatus(job)
|
||||
func (jm *JobController) updateJobStatus(job *batch.Job) error {
|
||||
_, err := jm.kubeClient.Batch().Jobs(job.Namespace).UpdateStatus(job)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -554,9 +554,9 @@ func filterPods(pods []api.Pod, phase api.PodPhase) int {
|
||||
return result
|
||||
}
|
||||
|
||||
func isJobFinished(j *extensions.Job) bool {
|
||||
func isJobFinished(j *batch.Job) bool {
|
||||
for _, c := range j.Status.Conditions {
|
||||
if (c.Type == extensions.JobComplete || c.Type == extensions.JobFailed) && c.Status == api.ConditionTrue {
|
||||
if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == api.ConditionTrue {
|
||||
return true
|
||||
}
|
||||
}
|
||||
@@ -564,7 +564,7 @@ func isJobFinished(j *extensions.Job) bool {
|
||||
}
|
||||
|
||||
// byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker.
|
||||
type byCreationTimestamp []extensions.Job
|
||||
type byCreationTimestamp []batch.Job
|
||||
|
||||
func (o byCreationTimestamp) Len() int { return len(o) }
|
||||
func (o byCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
|
||||
|
||||
@@ -24,7 +24,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
"k8s.io/kubernetes/pkg/apis/batch"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
@@ -37,13 +37,13 @@ import (
|
||||
|
||||
var alwaysReady = func() bool { return true }
|
||||
|
||||
func newJob(parallelism, completions int) *extensions.Job {
|
||||
j := &extensions.Job{
|
||||
func newJob(parallelism, completions int) *batch.Job {
|
||||
j := &batch.Job{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foobar",
|
||||
Namespace: api.NamespaceDefault,
|
||||
},
|
||||
Spec: extensions.JobSpec{
|
||||
Spec: batch.JobSpec{
|
||||
Selector: &unversioned.LabelSelector{
|
||||
MatchLabels: map[string]string{"foo": "bar"},
|
||||
},
|
||||
@@ -76,7 +76,7 @@ func newJob(parallelism, completions int) *extensions.Job {
|
||||
return j
|
||||
}
|
||||
|
||||
func getKey(job *extensions.Job, t *testing.T) string {
|
||||
func getKey(job *batch.Job, t *testing.T) string {
|
||||
if key, err := controller.KeyFunc(job); err != nil {
|
||||
t.Errorf("Unexpected error getting key for job %v: %v", job.Name, err)
|
||||
return ""
|
||||
@@ -86,7 +86,7 @@ func getKey(job *extensions.Job, t *testing.T) string {
|
||||
}
|
||||
|
||||
// create count pods with the given phase for the given job
|
||||
func newPodList(count int, status api.PodPhase, job *extensions.Job) []api.Pod {
|
||||
func newPodList(count int, status api.PodPhase, job *batch.Job) []api.Pod {
|
||||
pods := []api.Pod{}
|
||||
for i := 0; i < count; i++ {
|
||||
newPod := api.Pod{
|
||||
@@ -211,8 +211,8 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
fakePodControl := controller.FakePodControl{Err: tc.podControllerError}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
var actual *extensions.Job
|
||||
manager.updateHandler = func(job *extensions.Job) error {
|
||||
var actual *batch.Job
|
||||
manager.updateHandler = func(job *batch.Job) error {
|
||||
actual = job
|
||||
return nil
|
||||
}
|
||||
@@ -257,7 +257,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
t.Errorf("%s: .status.startTime was not set", name)
|
||||
}
|
||||
// validate conditions
|
||||
if tc.expectedComplete && !getCondition(actual, extensions.JobComplete) {
|
||||
if tc.expectedComplete && !getCondition(actual, batch.JobComplete) {
|
||||
t.Errorf("%s: expected completion condition. Got %#v", name, actual.Status.Conditions)
|
||||
}
|
||||
}
|
||||
@@ -306,8 +306,8 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
var actual *extensions.Job
|
||||
manager.updateHandler = func(job *extensions.Job) error {
|
||||
var actual *batch.Job
|
||||
manager.updateHandler = func(job *batch.Job) error {
|
||||
actual = job
|
||||
return nil
|
||||
}
|
||||
@@ -355,13 +355,13 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
||||
t.Errorf("%s: .status.startTime was not set", name)
|
||||
}
|
||||
// validate conditions
|
||||
if !getCondition(actual, extensions.JobFailed) {
|
||||
if !getCondition(actual, batch.JobFailed) {
|
||||
t.Errorf("%s: expected fail condition. Got %#v", name, actual.Status.Conditions)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getCondition(job *extensions.Job, condition extensions.JobConditionType) bool {
|
||||
func getCondition(job *batch.Job, condition batch.JobConditionType) bool {
|
||||
for _, v := range job.Status.Conditions {
|
||||
if v.Type == condition && v.Status == api.ConditionTrue {
|
||||
return true
|
||||
@@ -376,8 +376,8 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) {
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
var actual *extensions.Job
|
||||
manager.updateHandler = func(job *extensions.Job) error {
|
||||
var actual *batch.Job
|
||||
manager.updateHandler = func(job *batch.Job) error {
|
||||
actual = job
|
||||
return nil
|
||||
}
|
||||
@@ -387,7 +387,7 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) {
|
||||
job.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds
|
||||
start := unversioned.Unix(unversioned.Now().Time.Unix()-15, 0)
|
||||
job.Status.StartTime = &start
|
||||
job.Status.Conditions = append(job.Status.Conditions, newCondition(extensions.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline"))
|
||||
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline"))
|
||||
manager.jobStore.Store.Add(job)
|
||||
err := manager.syncJob(getKey(job, t))
|
||||
if err != nil {
|
||||
@@ -412,7 +412,7 @@ func TestSyncJobComplete(t *testing.T) {
|
||||
manager.podStoreSynced = alwaysReady
|
||||
|
||||
job := newJob(1, 1)
|
||||
job.Status.Conditions = append(job.Status.Conditions, newCondition(extensions.JobComplete, "", ""))
|
||||
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
|
||||
manager.jobStore.Store.Add(job)
|
||||
err := manager.syncJob(getKey(job, t))
|
||||
if err != nil {
|
||||
@@ -422,7 +422,7 @@ func TestSyncJobComplete(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error when trying to get job from the store: %v", err)
|
||||
}
|
||||
actual := uncastJob.(*extensions.Job)
|
||||
actual := uncastJob.(*batch.Job)
|
||||
// Verify that after syncing a complete job, the conditions are the same.
|
||||
if got, expected := len(actual.Status.Conditions), 1; got != expected {
|
||||
t.Fatalf("Unexpected job status conditions amount; expected %d, got %d", expected, got)
|
||||
@@ -435,7 +435,7 @@ func TestSyncJobDeleted(t *testing.T) {
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
manager.updateHandler = func(job *extensions.Job) error { return nil }
|
||||
manager.updateHandler = func(job *batch.Job) error { return nil }
|
||||
job := newJob(2, 2)
|
||||
err := manager.syncJob(getKey(job, t))
|
||||
if err != nil {
|
||||
@@ -455,7 +455,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
manager.updateHandler = func(job *extensions.Job) error { return fmt.Errorf("Fake error") }
|
||||
manager.updateHandler = func(job *batch.Job) error { return fmt.Errorf("Fake error") }
|
||||
job := newJob(2, 2)
|
||||
manager.jobStore.Store.Add(job)
|
||||
err := manager.syncJob(getKey(job, t))
|
||||
@@ -475,14 +475,14 @@ func TestJobPodLookup(t *testing.T) {
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
manager.podStoreSynced = alwaysReady
|
||||
testCases := []struct {
|
||||
job *extensions.Job
|
||||
job *batch.Job
|
||||
pod *api.Pod
|
||||
|
||||
expectedName string
|
||||
}{
|
||||
// pods without labels don't match any job
|
||||
{
|
||||
job: &extensions.Job{
|
||||
job: &batch.Job{
|
||||
ObjectMeta: api.ObjectMeta{Name: "basic"},
|
||||
},
|
||||
pod: &api.Pod{
|
||||
@@ -492,9 +492,9 @@ func TestJobPodLookup(t *testing.T) {
|
||||
},
|
||||
// matching labels, different namespace
|
||||
{
|
||||
job: &extensions.Job{
|
||||
job: &batch.Job{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo"},
|
||||
Spec: extensions.JobSpec{
|
||||
Spec: batch.JobSpec{
|
||||
Selector: &unversioned.LabelSelector{
|
||||
MatchLabels: map[string]string{"foo": "bar"},
|
||||
},
|
||||
@@ -511,9 +511,9 @@ func TestJobPodLookup(t *testing.T) {
|
||||
},
|
||||
// matching ns and labels returns
|
||||
{
|
||||
job: &extensions.Job{
|
||||
job: &batch.Job{
|
||||
ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: "ns"},
|
||||
Spec: extensions.JobSpec{
|
||||
Spec: batch.JobSpec{
|
||||
Selector: &unversioned.LabelSelector{
|
||||
MatchExpressions: []unversioned.LabelSelectorRequirement{
|
||||
{
|
||||
@@ -566,7 +566,7 @@ func TestSyncJobExpectations(t *testing.T) {
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
manager.updateHandler = func(job *extensions.Job) error { return nil }
|
||||
manager.updateHandler = func(job *batch.Job) error { return nil }
|
||||
|
||||
job := newJob(2, 2)
|
||||
manager.jobStore.Store.Add(job)
|
||||
@@ -602,7 +602,7 @@ func TestWatchJobs(t *testing.T) {
|
||||
manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
manager.podStoreSynced = alwaysReady
|
||||
|
||||
var testJob extensions.Job
|
||||
var testJob batch.Job
|
||||
received := make(chan struct{})
|
||||
|
||||
// The update sent through the fakeWatcher should make its way into the workqueue,
|
||||
@@ -613,7 +613,7 @@ func TestWatchJobs(t *testing.T) {
|
||||
if !exists || err != nil {
|
||||
t.Errorf("Expected to find job under key %v", key)
|
||||
}
|
||||
job, ok := obj.(*extensions.Job)
|
||||
job, ok := obj.(*batch.Job)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected type: %v %#v", reflect.TypeOf(obj), obj)
|
||||
}
|
||||
@@ -637,10 +637,10 @@ func TestWatchJobs(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestIsJobFinished(t *testing.T) {
|
||||
job := &extensions.Job{
|
||||
Status: extensions.JobStatus{
|
||||
Conditions: []extensions.JobCondition{{
|
||||
Type: extensions.JobComplete,
|
||||
job := &batch.Job{
|
||||
Status: batch.JobStatus{
|
||||
Conditions: []batch.JobCondition{{
|
||||
Type: batch.JobComplete,
|
||||
Status: api.ConditionTrue,
|
||||
}},
|
||||
},
|
||||
@@ -681,7 +681,7 @@ func TestWatchPods(t *testing.T) {
|
||||
close(received)
|
||||
return nil
|
||||
}
|
||||
job, ok := obj.(*extensions.Job)
|
||||
job, ok := obj.(*batch.Job)
|
||||
if !ok {
|
||||
t.Errorf("unexpected type: %v %#v", reflect.TypeOf(obj), obj)
|
||||
close(received)
|
||||
|
||||
Reference in New Issue
Block a user