diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 31c4f669663..707668a1432 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -77,6 +77,19 @@ func (r *Registry) ListPods(selector labels.Selector) ([]api.Pod, error) { return filteredPods, nil } +// WatchPods begins watching for new, changed, or deleted pods. +func (r *Registry) WatchPods(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return r.WatchList("/registry/pods", resourceVersion, func(obj interface{}) bool { + pod := obj.(*api.Pod) + fields := labels.Set{ + "ID": pod.ID, + "CurrentState.Status": string(pod.CurrentState.Status), + "CurrentState.Host": pod.CurrentState.Host, + } + return label.Matches(labels.Set(pod.Labels)) && field.Matches(fields) + }) +} + // GetPod gets a specific pod specified by its ID. func (r *Registry) GetPod(podID string) (*api.Pod, error) { var pod api.Pod diff --git a/pkg/registry/pod/registry.go b/pkg/registry/pod/registry.go index 12440353ad5..8e027119991 100644 --- a/pkg/registry/pod/registry.go +++ b/pkg/registry/pod/registry.go @@ -19,12 +19,15 @@ package pod import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // Registry is an interface implemented by things that know how to store Pod objects. type Registry interface { // ListPods obtains a list of pods that match selector. ListPods(selector labels.Selector) ([]api.Pod, error) + // Watch for new/changed/deleted pods + WatchPods(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) // Get a specific pod GetPod(podID string) (*api.Pod, error) // Create a pod based on a specification, schedule it onto a specific machine. diff --git a/pkg/registry/pod/storage.go b/pkg/registry/pod/storage.go index 1dc1ea01c0e..ea665874f53 100644 --- a/pkg/registry/pod/storage.go +++ b/pkg/registry/pod/storage.go @@ -29,6 +29,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "code.google.com/p/go-uuid/uuid" "github.com/golang/glog" @@ -122,6 +123,11 @@ func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { return result, err } +// Watch begins watching for new, changed, or deleted pods. +func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return rs.registry.WatchPods(label, field, resourceVersion) +} + func (rs RegistryStorage) New() interface{} { return &api.Pod{} } diff --git a/pkg/registry/registrytest/pod.go b/pkg/registry/registrytest/pod.go index b2803328a46..ffdd69667f3 100644 --- a/pkg/registry/registrytest/pod.go +++ b/pkg/registry/registrytest/pod.go @@ -17,10 +17,12 @@ limitations under the License. package registrytest import ( + "errors" "sync" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) type PodRegistry struct { @@ -51,6 +53,10 @@ func (r *PodRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) { return filtered, nil } +func (r *PodRegistry) WatchPods(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return nil, errors.New("unimplemented") +} + func (r *PodRegistry) GetPod(podId string) (*api.Pod, error) { r.Lock() defer r.Unlock()