diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index b7bfc03a5b8..9227a9dd75c 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -38,6 +38,9 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/master" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" + "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler" + "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory" + "github.com/coreos/go-etcd/etcd" "github.com/golang/glog" ) @@ -64,7 +67,7 @@ func (fakePodInfoGetter) GetPodInfo(host, podID string) (api.PodInfo, error) { Port: 10251, } default: - glog.Fatalf("Can't get info for: %v, %v", host, podID) + glog.Fatalf("Can't get info for: '%v', '%v'", host, podID) } return c.GetPodInfo("localhost", podID) } @@ -106,6 +109,9 @@ func startComponents(manifestURL string) (apiServerURL string) { storage, codec := m.API_v1beta1() handler.delegate = apiserver.Handle(storage, codec, "/api/v1beta1") + // Scheduler + scheduler.New((&factory.ConfigFactory{cl}).Create()).Run() + controllerManager := controller.NewReplicationManager(cl) // Prove that controllerManager's watch works by making it not sync until after this diff --git a/pkg/constraint/constraint.go b/pkg/constraint/constraint.go new file mode 100644 index 00000000000..0e08162792f --- /dev/null +++ b/pkg/constraint/constraint.go @@ -0,0 +1,27 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package constraint + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" +) + +// Allowed returns true if manifests is a collection of manifests +// which can run without conflict on a single minion. +func Allowed(manifests []api.ContainerManifest) bool { + return !PortsConflict(manifests) +} diff --git a/pkg/constraint/constraint_test.go b/pkg/constraint/constraint_test.go new file mode 100644 index 00000000000..4d168551fc7 --- /dev/null +++ b/pkg/constraint/constraint_test.go @@ -0,0 +1,98 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package constraint + +import ( + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" +) + +func containerWithHostPorts(ports ...int) api.Container { + c := api.Container{} + for _, p := range ports { + c.Ports = append(c.Ports, api.Port{HostPort: p}) + } + return c +} + +func manifestWithContainers(containers ...api.Container) api.ContainerManifest { + m := api.ContainerManifest{} + for _, c := range containers { + m.Containers = append(m.Containers, c) + } + return m +} + +func TestAllowed(t *testing.T) { + table := []struct { + allowed bool + manifests []api.ContainerManifest + }{ + { + allowed: true, + manifests: []api.ContainerManifest{ + manifestWithContainers( + containerWithHostPorts(1, 2, 3), + containerWithHostPorts(4, 5, 6), + ), + manifestWithContainers( + containerWithHostPorts(7, 8, 9), + containerWithHostPorts(10, 11, 12), + ), + }, + }, + { + allowed: true, + manifests: []api.ContainerManifest{ + manifestWithContainers( + containerWithHostPorts(0, 0), + containerWithHostPorts(0, 0), + ), + manifestWithContainers( + containerWithHostPorts(0, 0), + containerWithHostPorts(0, 0), + ), + }, + }, + { + allowed: false, + manifests: []api.ContainerManifest{ + manifestWithContainers( + containerWithHostPorts(3, 3), + ), + }, + }, + { + allowed: false, + manifests: []api.ContainerManifest{ + manifestWithContainers( + containerWithHostPorts(6), + ), + manifestWithContainers( + containerWithHostPorts(6), + ), + }, + }, + } + + for _, item := range table { + if e, a := item.allowed, Allowed(item.manifests); e != a { + t.Errorf("Expected %v, got %v: \n%v\v", e, a, item.manifests) + } + } +} diff --git a/pkg/constraint/doc.go b/pkg/constraint/doc.go new file mode 100644 index 00000000000..28d7218a43d --- /dev/null +++ b/pkg/constraint/doc.go @@ -0,0 +1,23 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package constraint has functions for ensuring that collections of +// containers are allowed to run together on a single host. +// +// TODO: Add resource math. Phrase this code in a way that makes it easy +// to call from schedulers as well as from the feasiblity check that +// apiserver performs. +package constraint diff --git a/pkg/constraint/ports.go b/pkg/constraint/ports.go new file mode 100644 index 00000000000..92622469a2c --- /dev/null +++ b/pkg/constraint/ports.go @@ -0,0 +1,41 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package constraint + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" +) + +// PortsConflict returns true iff two containers attempt to expose +// the same host port. +func PortsConflict(manifests []api.ContainerManifest) bool { + hostPorts := map[int]struct{}{} + for _, manifest := range manifests { + for _, container := range manifest.Containers { + for _, port := range container.Ports { + if port.HostPort == 0 { + continue + } + if _, exists := hostPorts[port.HostPort]; exists { + return true + } + hostPorts[port.HostPort] = struct{}{} + } + } + } + return false +} diff --git a/pkg/master/master.go b/pkg/master/master.go index 174d70b0781..626291c6185 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -17,7 +17,6 @@ limitations under the License. package master import ( - "math/rand" "net/http" "time" @@ -32,7 +31,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" - "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" goetcd "github.com/coreos/go-etcd/etcd" @@ -111,16 +109,12 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf endpoints := endpoint.NewEndpointController(m.serviceRegistry, m.client) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) - random := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) - s := scheduler.NewRandomFitScheduler(m.podRegistry, random) m.storage = map[string]apiserver.RESTStorage{ "pods": pod.NewRegistryStorage(&pod.RegistryStorageConfig{ CloudProvider: cloud, - MinionLister: m.minionRegistry, PodCache: podCache, PodInfoGetter: podInfoGetter, Registry: m.podRegistry, - Scheduler: s, }), "replicationControllers": controller.NewRegistryStorage(m.controllerRegistry, m.podRegistry), "services": service.NewRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry), diff --git a/pkg/master/pod_cache.go b/pkg/master/pod_cache.go index 085cbcf8e64..674c6c8b9c6 100644 --- a/pkg/master/pod_cache.go +++ b/pkg/master/pod_cache.go @@ -48,6 +48,7 @@ func NewPodCache(info client.PodInfoGetter, pods pod.Registry) *PodCache { // GetPodInfo Implements the PodInfoGetter.GetPodInfo. // The returned value should be treated as read-only. +// TODO: Remove the host from this call, it's totally unnecessary. func (p *PodCache) GetPodInfo(host, podID string) (api.PodInfo, error) { p.podLock.Lock() defer p.podLock.Unlock() diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 1f54041c8c2..371454a9ba4 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -21,6 +21,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" + "github.com/GoogleCloudPlatform/kubernetes/pkg/constraint" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" @@ -99,19 +100,15 @@ func makeContainerKey(machine string) string { return "/registry/hosts/" + machine + "/kubelet" } -// CreatePod creates a pod based on a specification, schedule it onto a specific machine. -func (r *Registry) CreatePod(machine string, pod api.Pod) error { +// CreatePod creates a pod based on a specification. +func (r *Registry) CreatePod(pod api.Pod) error { // Set current status to "Waiting". pod.CurrentState.Status = api.PodWaiting pod.CurrentState.Host = "" // DesiredState.Host == "" is a signal to the scheduler that this pod needs scheduling. pod.DesiredState.Status = api.PodRunning pod.DesiredState.Host = "" - if err := r.CreateObj(makePodKey(pod.ID), &pod); err != nil { - return err - } - // TODO: Until scheduler separation is completed, just assign here. - return r.assignPod(pod.ID, machine) + return r.CreateObj(makePodKey(pod.ID), &pod) } // ApplyBinding implements binding's registry @@ -119,23 +116,28 @@ func (r *Registry) ApplyBinding(binding *api.Binding) error { return r.assignPod(binding.PodID, binding.Host) } -// assignPod assigns the given pod to the given machine. -// TODO: hook this up via apiserver, not by calling it from CreatePod(). -func (r *Registry) assignPod(podID string, machine string) error { +// setPodHostTo sets the given pod's host to 'machine' iff it was previously 'oldMachine'. +// Returns the current state of the pod, or an error. +func (r *Registry) setPodHostTo(podID, oldMachine, machine string) (finalPod *api.Pod, err error) { podKey := makePodKey(podID) - var finalPod *api.Pod - err := r.AtomicUpdate(podKey, &api.Pod{}, func(obj interface{}) (interface{}, error) { + err = r.AtomicUpdate(podKey, &api.Pod{}, func(obj interface{}) (interface{}, error) { pod, ok := obj.(*api.Pod) if !ok { return nil, fmt.Errorf("unexpected object: %#v", obj) } - if pod.DesiredState.Host != "" { + if pod.DesiredState.Host != oldMachine { return nil, fmt.Errorf("pod %v is already assigned to host %v", pod.ID, pod.DesiredState.Host) } pod.DesiredState.Host = machine finalPod = pod return pod, nil }) + return finalPod, err +} + +// assignPod assigns the given pod to the given machine. +func (r *Registry) assignPod(podID string, machine string) error { + finalPod, err := r.setPodHostTo(podID, "", machine) if err != nil { return err } @@ -148,14 +150,16 @@ func (r *Registry) assignPod(podID string, machine string) error { err = r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) { manifests := *in.(*api.ContainerManifestList) manifests.Items = append(manifests.Items, manifest) + if !constraint.Allowed(manifests.Items) { + return nil, fmt.Errorf("The assignment would cause a constraint violation") + } return manifests, nil }) if err != nil { - // Don't strand stuff. This is a terrible hack that won't be needed - // when the above TODO is fixed. - err2 := r.Delete(podKey, false) - if err2 != nil { - glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2) + // Put the pod's host back the way it was. This is a terrible hack that + // won't be needed if we convert this to a rectification loop. + if _, err2 := r.setPodHostTo(podID, machine, ""); err2 != nil { + glog.Errorf("Stranding pod %v; couldn't clear host after previous error: %v", podID, err2) } } return err diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index 0e10c226315..1bf44bd501e 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -78,7 +78,7 @@ func TestEtcdCreatePod(t *testing.T) { } fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{}), 0) registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) - err := registry.CreatePod("machine", api.Pod{ + err := registry.CreatePod(api.Pod{ JSONBase: api.JSONBase{ ID: "foo", }, @@ -93,7 +93,13 @@ func TestEtcdCreatePod(t *testing.T) { }, }) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) + } + + // Suddenly, a wild scheduler appears: + err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"}) + if err != nil { + t.Fatalf("unexpected error: %v", err) } resp, err := fakeClient.Get("/registry/pods/foo", false, false) @@ -132,7 +138,7 @@ func TestEtcdCreatePodAlreadyExisting(t *testing.T) { E: nil, } registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) - err := registry.CreatePod("machine", api.Pod{ + err := registry.CreatePod(api.Pod{ JSONBase: api.JSONBase{ ID: "foo", }, @@ -158,20 +164,27 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) { E: tools.EtcdErrorValueRequired, } registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) - err := registry.CreatePod("machine", api.Pod{ + err := registry.CreatePod(api.Pod{ JSONBase: api.JSONBase{ ID: "foo", }, }) - if err == nil { - t.Fatalf("Unexpected non-error") + if err != nil { + t.Fatalf("Unexpected error: %v", err) } - _, err = fakeClient.Get("/registry/pods/foo", false, false) + + // Suddenly, a wild scheduler appears: + err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"}) if err == nil { - t.Error("Unexpected non-error") + t.Fatalf("Unexpected non error.") } - if !tools.IsEtcdNotFound(err) { - t.Errorf("Unexpected error: %#v", err) + + existingPod, err := registry.GetPod("foo") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if existingPod.DesiredState.Host == "machine" { + t.Fatal("Pod's host changed in response to an unappliable binding.") } } @@ -191,7 +204,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { E: tools.EtcdErrorNotFound, } registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) - err := registry.CreatePod("machine", api.Pod{ + err := registry.CreatePod(api.Pod{ JSONBase: api.JSONBase{ ID: "foo", }, @@ -210,6 +223,12 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { t.Fatalf("unexpected error: %v", err) } + // Suddenly, a wild scheduler appears: + err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + resp, err := fakeClient.Get("/registry/pods/foo", false, false) if err != nil { t.Fatalf("Unexpected error %v", err) @@ -250,7 +269,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { }, }), 0) registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) - err := registry.CreatePod("machine", api.Pod{ + err := registry.CreatePod(api.Pod{ JSONBase: api.JSONBase{ ID: "foo", }, @@ -266,7 +285,13 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { }, }) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) + } + + // Suddenly, a wild scheduler appears: + err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"}) + if err != nil { + t.Fatalf("unexpected error: %v", err) } resp, err := fakeClient.Get("/registry/pods/foo", false, false) diff --git a/pkg/registry/pod/registry.go b/pkg/registry/pod/registry.go index 0a2f0822be7..d65f82a080b 100644 --- a/pkg/registry/pod/registry.go +++ b/pkg/registry/pod/registry.go @@ -30,8 +30,8 @@ type Registry interface { WatchPods(resourceVersion uint64) (watch.Interface, error) // Get a specific pod GetPod(podID string) (*api.Pod, error) - // Create a pod based on a specification, schedule it onto a specific machine. - CreatePod(machine string, pod api.Pod) error + // Create a pod based on a specification. + CreatePod(pod api.Pod) error // Update an existing pod UpdatePod(pod api.Pod) error // Delete an existing pod diff --git a/pkg/registry/pod/storage.go b/pkg/registry/pod/storage.go index 0168ae42e32..2f9410c8d8c 100644 --- a/pkg/registry/pod/storage.go +++ b/pkg/registry/pod/storage.go @@ -27,7 +27,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -39,33 +38,27 @@ import ( type RegistryStorage struct { cloudProvider cloudprovider.Interface mu sync.Mutex - minionLister scheduler.MinionLister podCache client.PodInfoGetter podInfoGetter client.PodInfoGetter podPollPeriod time.Duration registry Registry - scheduler scheduler.Scheduler } type RegistryStorageConfig struct { CloudProvider cloudprovider.Interface - MinionLister scheduler.MinionLister PodCache client.PodInfoGetter PodInfoGetter client.PodInfoGetter Registry Registry - Scheduler scheduler.Scheduler } // NewRegistryStorage returns a new RegistryStorage. func NewRegistryStorage(config *RegistryStorageConfig) apiserver.RESTStorage { return &RegistryStorage{ cloudProvider: config.CloudProvider, - minionLister: config.MinionLister, podCache: config.PodCache, podInfoGetter: config.PodInfoGetter, podPollPeriod: time.Second * 10, registry: config.Registry, - scheduler: config.Scheduler, } } @@ -82,7 +75,7 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { pod.CreationTimestamp = util.Now() return apiserver.MakeAsync(func() (interface{}, error) { - if err := rs.scheduleAndCreatePod(*pod); err != nil { + if err := rs.registry.CreatePod(*pod); err != nil { return nil, err } return rs.registry.GetPod(pod.ID) @@ -133,10 +126,11 @@ func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion u pod := e.Object.(*api.Pod) fields := labels.Set{ "ID": pod.ID, - "DesiredState.Status": string(pod.CurrentState.Status), - "DesiredState.Host": pod.CurrentState.Host, + "DesiredState.Status": string(pod.DesiredState.Status), + "DesiredState.Host": pod.DesiredState.Host, } - return e, label.Matches(labels.Set(pod.Labels)) && field.Matches(fields) + passesFilter := label.Matches(labels.Set(pod.Labels)) && field.Matches(fields) + return e, passesFilter }), nil } @@ -158,6 +152,10 @@ func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { } func (rs *RegistryStorage) fillPodInfo(pod *api.Pod) { + pod.CurrentState.Host = pod.DesiredState.Host + if pod.CurrentState.Host == "" { + return + } // Get cached info for the list currently. // TODO: Optionally use fresh info if rs.podCache != nil { @@ -240,17 +238,6 @@ func getPodStatus(pod *api.Pod) api.PodStatus { } } -func (rs *RegistryStorage) scheduleAndCreatePod(pod api.Pod) error { - rs.mu.Lock() - defer rs.mu.Unlock() - // TODO(lavalamp): Separate scheduler more cleanly. - machine, err := rs.scheduler.Schedule(pod, rs.minionLister) - if err != nil { - return err - } - return rs.registry.CreatePod(machine, pod) -} - func (rs *RegistryStorage) waitForPodRunning(pod api.Pod) (interface{}, error) { for { podObj, err := rs.Get(pod.ID) diff --git a/pkg/registry/pod/storage_test.go b/pkg/registry/pod/storage_test.go index 8c4e81f72de..24ad2c46e95 100644 --- a/pkg/registry/pod/storage_test.go +++ b/pkg/registry/pod/storage_test.go @@ -25,9 +25,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" - "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" "github.com/fsouza/go-dockerclient" ) @@ -58,8 +56,7 @@ func TestCreatePodRegistryError(t *testing.T) { podRegistry := registrytest.NewPodRegistry(nil) podRegistry.Err = fmt.Errorf("test error") storage := RegistryStorage{ - scheduler: ®istrytest.Scheduler{}, - registry: podRegistry, + registry: podRegistry, } desiredState := api.PodState{ Manifest: api.ContainerManifest{ @@ -74,32 +71,11 @@ func TestCreatePodRegistryError(t *testing.T) { expectApiStatusError(t, ch, podRegistry.Err.Error()) } -func TestCreatePodSchedulerError(t *testing.T) { - mockScheduler := registrytest.Scheduler{ - Err: fmt.Errorf("test error"), - } - storage := RegistryStorage{ - scheduler: &mockScheduler, - } - desiredState := api.PodState{ - Manifest: api.ContainerManifest{ - Version: "v1beta1", - }, - } - pod := &api.Pod{DesiredState: desiredState} - ch, err := storage.Create(pod) - if err != nil { - t.Errorf("Expected %#v, Got %#v", nil, err) - } - expectApiStatusError(t, ch, mockScheduler.Err.Error()) -} - func TestCreatePodSetsIds(t *testing.T) { podRegistry := registrytest.NewPodRegistry(nil) podRegistry.Err = fmt.Errorf("test error") storage := RegistryStorage{ - scheduler: ®istrytest.Scheduler{Machine: "test"}, - registry: podRegistry, + registry: podRegistry, } desiredState := api.PodState{ Manifest: api.ContainerManifest{ @@ -347,8 +323,7 @@ func TestPodStorageValidatesCreate(t *testing.T) { podRegistry := registrytest.NewPodRegistry(nil) podRegistry.Err = fmt.Errorf("test error") storage := RegistryStorage{ - scheduler: ®istrytest.Scheduler{Machine: "test"}, - registry: podRegistry, + registry: podRegistry, } pod := &api.Pod{} c, err := storage.Create(pod) @@ -364,8 +339,7 @@ func TestPodStorageValidatesUpdate(t *testing.T) { podRegistry := registrytest.NewPodRegistry(nil) podRegistry.Err = fmt.Errorf("test error") storage := RegistryStorage{ - scheduler: ®istrytest.Scheduler{Machine: "test"}, - registry: podRegistry, + registry: podRegistry, } pod := &api.Pod{} c, err := storage.Update(pod) @@ -388,8 +362,6 @@ func TestCreatePod(t *testing.T) { storage := RegistryStorage{ registry: podRegistry, podPollPeriod: time.Millisecond * 100, - scheduler: scheduler.NewRoundRobinScheduler(), - minionLister: minion.NewRegistry([]string{"machine"}), } desiredState := api.PodState{ Manifest: api.ContainerManifest{ @@ -438,7 +410,7 @@ func TestFillPodInfo(t *testing.T) { storage := RegistryStorage{ podCache: &fakeGetter, } - pod := api.Pod{} + pod := api.Pod{DesiredState: api.PodState{Host: "foo"}} storage.fillPodInfo(&pod) if !reflect.DeepEqual(fakeGetter.info, pod.CurrentState.Info) { t.Errorf("Expected: %#v, Got %#v", fakeGetter.info, pod.CurrentState.Info) @@ -461,7 +433,7 @@ func TestFillPodInfoNoData(t *testing.T) { storage := RegistryStorage{ podCache: &fakeGetter, } - pod := api.Pod{} + pod := api.Pod{DesiredState: api.PodState{Host: "foo"}} storage.fillPodInfo(&pod) if !reflect.DeepEqual(fakeGetter.info, pod.CurrentState.Info) { t.Errorf("Expected %#v, Got %#v", fakeGetter.info, pod.CurrentState.Info) diff --git a/pkg/registry/registrytest/pod.go b/pkg/registry/registrytest/pod.go index f702e8743a8..6efaaf7fa87 100644 --- a/pkg/registry/registrytest/pod.go +++ b/pkg/registry/registrytest/pod.go @@ -25,10 +25,9 @@ import ( ) type PodRegistry struct { - Err error - Machine string - Pod *api.Pod - Pods []api.Pod + Err error + Pod *api.Pod + Pods []api.Pod sync.Mutex mux *watch.Mux @@ -66,10 +65,9 @@ func (r *PodRegistry) GetPod(podId string) (*api.Pod, error) { return r.Pod, r.Err } -func (r *PodRegistry) CreatePod(machine string, pod api.Pod) error { +func (r *PodRegistry) CreatePod(pod api.Pod) error { r.Lock() defer r.Unlock() - r.Machine = machine r.Pod = &pod r.mux.Action(watch.Added, &pod) return r.Err diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index d140b1904fd..24bc0db2b21 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -69,7 +69,14 @@ func (factory *ConfigFactory) Create() *scheduler.Config { Algorithm: algo, Binder: &binder{factory.Client}, NextPod: func() *api.Pod { - return podQueue.Pop().(*api.Pod) + pod := podQueue.Pop().(*api.Pod) + // TODO: Remove or reduce verbosity by sep 6th, 2014. Leave until then to + // make it easy to find scheduling problems. + glog.Infof("About to try and schedule pod %v\n"+ + "\tknown minions: %v\n"+ + "\tknown scheduled pods: %v\n", + pod.ID, minionCache.Contains(), podCache.Contains()) + return pod }, Error: factory.makeDefaultErrorFunc(podQueue), } @@ -193,5 +200,8 @@ type binder struct { // Bind just does a POST binding RPC. func (b *binder) Bind(binding *api.Binding) error { + // TODO: Remove or reduce verbosity by sep 6th, 2014. Leave until then to + // make it easy to find scheduling problems. + glog.Infof("Attempting to bind %v to %v", binding.PodID, binding.Host) return b.Post().Path("bindings").Body(binding).Do().Error() }