mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #2662 from ddysher/node-watch
Enable watch on node labels.
This commit is contained in:
commit
61e8baeef9
@ -42,12 +42,15 @@ const (
|
|||||||
ServicePath string = "/registry/services/specs"
|
ServicePath string = "/registry/services/specs"
|
||||||
// ServiceEndpointPath is the path to service endpoints resources in etcd
|
// ServiceEndpointPath is the path to service endpoints resources in etcd
|
||||||
ServiceEndpointPath string = "/registry/services/endpoints"
|
ServiceEndpointPath string = "/registry/services/endpoints"
|
||||||
|
// NodePath is the path to node resources in etcd
|
||||||
|
NodePath string = "/registry/minions"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into
|
// TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into
|
||||||
// kubelet (and vice versa)
|
// kubelet (and vice versa)
|
||||||
|
|
||||||
// Registry implements PodRegistry, ControllerRegistry, ServiceRegistry and MinionRegistry, backed by etcd.
|
// Registry implements BindingRegistry, ControllerRegistry, EndpointRegistry,
|
||||||
|
// MinionRegistry, PodRegistry and ServiceRegistry, backed by etcd.
|
||||||
type Registry struct {
|
type Registry struct {
|
||||||
tools.EtcdHelper
|
tools.EtcdHelper
|
||||||
boundPodFactory pod.BoundPodFactory
|
boundPodFactory pod.BoundPodFactory
|
||||||
@ -570,31 +573,35 @@ func (r *Registry) WatchEndpoints(ctx api.Context, label, field labels.Selector,
|
|||||||
return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported")
|
return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported")
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeMinionKey(minionID string) string {
|
func makeNodeKey(nodeID string) string {
|
||||||
return "/registry/minions/" + minionID
|
return NodePath + "/" + nodeID
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeNodeListKey() string {
|
||||||
|
return NodePath
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Registry) ListMinions(ctx api.Context) (*api.NodeList, error) {
|
func (r *Registry) ListMinions(ctx api.Context) (*api.NodeList, error) {
|
||||||
minions := &api.NodeList{}
|
minions := &api.NodeList{}
|
||||||
err := r.ExtractToList("/registry/minions", minions)
|
err := r.ExtractToList(makeNodeListKey(), minions)
|
||||||
return minions, err
|
return minions, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Registry) CreateMinion(ctx api.Context, minion *api.Node) error {
|
func (r *Registry) CreateMinion(ctx api.Context, minion *api.Node) error {
|
||||||
// TODO: Add some validations.
|
// TODO: Add some validations.
|
||||||
err := r.CreateObj(makeMinionKey(minion.Name), minion, 0)
|
err := r.CreateObj(makeNodeKey(minion.Name), minion, 0)
|
||||||
return etcderr.InterpretCreateError(err, "minion", minion.Name)
|
return etcderr.InterpretCreateError(err, "minion", minion.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Registry) UpdateMinion(ctx api.Context, minion *api.Node) error {
|
func (r *Registry) UpdateMinion(ctx api.Context, minion *api.Node) error {
|
||||||
// TODO: Add some validations.
|
// TODO: Add some validations.
|
||||||
err := r.SetObj(makeMinionKey(minion.Name), minion)
|
err := r.SetObj(makeNodeKey(minion.Name), minion)
|
||||||
return etcderr.InterpretUpdateError(err, "minion", minion.Name)
|
return etcderr.InterpretUpdateError(err, "minion", minion.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Registry) GetMinion(ctx api.Context, minionID string) (*api.Node, error) {
|
func (r *Registry) GetMinion(ctx api.Context, minionID string) (*api.Node, error) {
|
||||||
var minion api.Node
|
var minion api.Node
|
||||||
key := makeMinionKey(minionID)
|
key := makeNodeKey(minionID)
|
||||||
err := r.ExtractObj(key, &minion, false)
|
err := r.ExtractObj(key, &minion, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, etcderr.InterpretGetError(err, "minion", minion.Name)
|
return nil, etcderr.InterpretGetError(err, "minion", minion.Name)
|
||||||
@ -603,10 +610,27 @@ func (r *Registry) GetMinion(ctx api.Context, minionID string) (*api.Node, error
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *Registry) DeleteMinion(ctx api.Context, minionID string) error {
|
func (r *Registry) DeleteMinion(ctx api.Context, minionID string) error {
|
||||||
key := makeMinionKey(minionID)
|
key := makeNodeKey(minionID)
|
||||||
err := r.Delete(key, true)
|
err := r.Delete(key, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return etcderr.InterpretDeleteError(err, "minion", minionID)
|
return etcderr.InterpretDeleteError(err, "minion", minionID)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Registry) WatchMinions(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
|
||||||
|
version, err := tools.ParseWatchResourceVersion(resourceVersion, "node")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
key := makeNodeListKey()
|
||||||
|
return r.WatchList(key, version, func(obj runtime.Object) bool {
|
||||||
|
minionObj, ok := obj.(*api.Node)
|
||||||
|
if !ok {
|
||||||
|
// Must be an error: return true to propagate to upper level.
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
// TODO: Add support for filtering based on field, once NodeStatus is defined.
|
||||||
|
return label.Matches(labels.Set(minionObj.Labels))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@ -1681,6 +1681,112 @@ func TestEtcdDeleteMinion(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEtcdWatchMinion(t *testing.T) {
|
||||||
|
ctx := api.NewDefaultContext()
|
||||||
|
fakeClient := tools.NewFakeEtcdClient(t)
|
||||||
|
registry := NewTestEtcdRegistry(fakeClient)
|
||||||
|
watching, err := registry.WatchMinions(ctx,
|
||||||
|
labels.Everything(),
|
||||||
|
labels.Everything(),
|
||||||
|
"1",
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
fakeClient.WaitForWatchCompletion()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case _, ok := <-watching.ResultChan():
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("watching channel should be open")
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
fakeClient.WatchInjectError <- nil
|
||||||
|
if _, ok := <-watching.ResultChan(); ok {
|
||||||
|
t.Errorf("watching channel should be closed")
|
||||||
|
}
|
||||||
|
watching.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEtcdWatchMinionsMatch(t *testing.T) {
|
||||||
|
ctx := api.NewDefaultContext()
|
||||||
|
fakeClient := tools.NewFakeEtcdClient(t)
|
||||||
|
registry := NewTestEtcdRegistry(fakeClient)
|
||||||
|
watching, err := registry.WatchMinions(ctx,
|
||||||
|
labels.SelectorFromSet(labels.Set{"name": "foo"}),
|
||||||
|
labels.Everything(),
|
||||||
|
"1",
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
fakeClient.WaitForWatchCompletion()
|
||||||
|
|
||||||
|
node := &api.Node{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "foo",
|
||||||
|
Labels: map[string]string{
|
||||||
|
"name": "foo",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
nodeBytes, _ := latest.Codec.Encode(node)
|
||||||
|
fakeClient.WatchResponse <- &etcd.Response{
|
||||||
|
Action: "create",
|
||||||
|
Node: &etcd.Node{
|
||||||
|
Value: string(nodeBytes),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case _, ok := <-watching.ResultChan():
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("watching channel should be open")
|
||||||
|
}
|
||||||
|
case <-time.After(time.Millisecond * 100):
|
||||||
|
t.Error("unexpected timeout from result channel")
|
||||||
|
}
|
||||||
|
watching.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEtcdWatchMinionsNotMatch(t *testing.T) {
|
||||||
|
ctx := api.NewDefaultContext()
|
||||||
|
fakeClient := tools.NewFakeEtcdClient(t)
|
||||||
|
registry := NewTestEtcdRegistry(fakeClient)
|
||||||
|
watching, err := registry.WatchMinions(ctx,
|
||||||
|
labels.SelectorFromSet(labels.Set{"name": "foo"}),
|
||||||
|
labels.Everything(),
|
||||||
|
"1",
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
fakeClient.WaitForWatchCompletion()
|
||||||
|
|
||||||
|
node := &api.Node{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "bar",
|
||||||
|
Labels: map[string]string{
|
||||||
|
"name": "bar",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
nodeBytes, _ := latest.Codec.Encode(node)
|
||||||
|
fakeClient.WatchResponse <- &etcd.Response{
|
||||||
|
Action: "create",
|
||||||
|
Node: &etcd.Node{
|
||||||
|
Value: string(nodeBytes),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-watching.ResultChan():
|
||||||
|
t.Error("unexpected result from result channel")
|
||||||
|
case <-time.After(time.Millisecond * 100):
|
||||||
|
// expected case
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO We need a test for the compare and swap behavior. This basically requires two things:
|
// TODO We need a test for the compare and swap behavior. This basically requires two things:
|
||||||
// 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that
|
// 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that
|
||||||
// channel, this will enable us to orchestrate the flow of etcd requests in the test.
|
// channel, this will enable us to orchestrate the flow of etcd requests in the test.
|
||||||
|
@ -20,6 +20,8 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
@ -86,3 +88,7 @@ func (r *HealthyRegistry) ListMinions(ctx api.Context) (currentMinions *api.Node
|
|||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *HealthyRegistry) WatchMinions(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
|
||||||
|
return r.delegate.WatchMinions(ctx, label, field, resourceVersion)
|
||||||
|
}
|
||||||
|
@ -16,7 +16,11 @@ limitations under the License.
|
|||||||
|
|
||||||
package minion
|
package minion
|
||||||
|
|
||||||
import "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
import (
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
|
)
|
||||||
|
|
||||||
// MinionRegistry is an interface for things that know how to store minions.
|
// MinionRegistry is an interface for things that know how to store minions.
|
||||||
type Registry interface {
|
type Registry interface {
|
||||||
@ -25,4 +29,5 @@ type Registry interface {
|
|||||||
UpdateMinion(ctx api.Context, minion *api.Node) error
|
UpdateMinion(ctx api.Context, minion *api.Node) error
|
||||||
GetMinion(ctx api.Context, minionID string) (*api.Node, error)
|
GetMinion(ctx api.Context, minionID string) (*api.Node, error)
|
||||||
DeleteMinion(ctx api.Context, minionID string) error
|
DeleteMinion(ctx api.Context, minionID string) error
|
||||||
|
WatchMinions(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error)
|
||||||
}
|
}
|
||||||
|
@ -29,14 +29,15 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
// REST implements the RESTStorage interface, backed by a MinionRegistry.
|
// REST adapts minion into apiserver's RESTStorage model.
|
||||||
type REST struct {
|
type REST struct {
|
||||||
registry Registry
|
registry Registry
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewREST returns a new REST.
|
// NewREST returns a new apiserver.RESTStorage implementation for minion.
|
||||||
func NewREST(m Registry) *REST {
|
func NewREST(m Registry) *REST {
|
||||||
return &REST{
|
return &REST{
|
||||||
registry: m,
|
registry: m,
|
||||||
@ -46,6 +47,7 @@ func NewREST(m Registry) *REST {
|
|||||||
var ErrDoesNotExist = errors.New("The requested resource does not exist.")
|
var ErrDoesNotExist = errors.New("The requested resource does not exist.")
|
||||||
var ErrNotHealty = errors.New("The requested minion is not healthy.")
|
var ErrNotHealty = errors.New("The requested minion is not healthy.")
|
||||||
|
|
||||||
|
// Create satisfies the RESTStorage interface.
|
||||||
func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) {
|
func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) {
|
||||||
minion, ok := obj.(*api.Node)
|
minion, ok := obj.(*api.Node)
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -67,6 +69,7 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE
|
|||||||
}), nil
|
}), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Delete satisfies the RESTStorage interface.
|
||||||
func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, error) {
|
func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, error) {
|
||||||
minion, err := rs.registry.GetMinion(ctx, id)
|
minion, err := rs.registry.GetMinion(ctx, id)
|
||||||
if minion == nil {
|
if minion == nil {
|
||||||
@ -80,6 +83,7 @@ func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult,
|
|||||||
}), nil
|
}), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get satisfies the RESTStorage interface.
|
||||||
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
|
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
|
||||||
minion, err := rs.registry.GetMinion(ctx, id)
|
minion, err := rs.registry.GetMinion(ctx, id)
|
||||||
if minion == nil {
|
if minion == nil {
|
||||||
@ -88,6 +92,7 @@ func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
|
|||||||
return minion, err
|
return minion, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// List satisfies the RESTStorage interface.
|
||||||
func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) {
|
func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) {
|
||||||
return rs.registry.ListMinions(ctx)
|
return rs.registry.ListMinions(ctx)
|
||||||
}
|
}
|
||||||
@ -96,6 +101,7 @@ func (rs *REST) New() runtime.Object {
|
|||||||
return &api.Node{}
|
return &api.Node{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update satisfies the RESTStorage interface.
|
||||||
func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) {
|
func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) {
|
||||||
minion, ok := obj.(*api.Node)
|
minion, ok := obj.(*api.Node)
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -121,8 +127,10 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE
|
|||||||
}), nil
|
}), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rs *REST) toApiMinion(name string) *api.Node {
|
// Watch returns Minions events via a watch.Interface.
|
||||||
return &api.Node{ObjectMeta: api.ObjectMeta{Name: name}}
|
// It implements apiserver.ResourceWatcher.
|
||||||
|
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
|
||||||
|
return rs.registry.WatchMinions(ctx, label, field, resourceVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResourceLocation returns a URL to which one can send traffic for the specified minion.
|
// ResourceLocation returns a URL to which one can send traffic for the specified minion.
|
||||||
|
@ -25,7 +25,7 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMinionREST(t *testing.T) {
|
func TestMinionRegistryREST(t *testing.T) {
|
||||||
ms := NewREST(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}))
|
ms := NewREST(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}))
|
||||||
ctx := api.NewContext()
|
ctx := api.NewContext()
|
||||||
if obj, err := ms.Get(ctx, "foo"); err != nil || obj.(*api.Node).Name != "foo" {
|
if obj, err := ms.Get(ctx, "foo"); err != nil || obj.(*api.Node).Name != "foo" {
|
||||||
@ -87,7 +87,7 @@ func TestMinionREST(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMinionStorageWithHealthCheck(t *testing.T) {
|
func TestMinionRegistryHealthCheck(t *testing.T) {
|
||||||
minionRegistry := registrytest.NewMinionRegistry([]string{}, api.NodeResources{})
|
minionRegistry := registrytest.NewMinionRegistry([]string{}, api.NodeResources{})
|
||||||
minionHealthRegistry := HealthyRegistry{
|
minionHealthRegistry := HealthyRegistry{
|
||||||
delegate: minionRegistry,
|
delegate: minionRegistry,
|
||||||
@ -119,7 +119,7 @@ func contains(nodes *api.NodeList, nodeID string) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMinionStorageInvalidUpdate(t *testing.T) {
|
func TestMinionRegistryInvalidUpdate(t *testing.T) {
|
||||||
storage := NewREST(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}))
|
storage := NewREST(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}))
|
||||||
ctx := api.NewContext()
|
ctx := api.NewContext()
|
||||||
obj, err := storage.Get(ctx, "foo")
|
obj, err := storage.Get(ctx, "foo")
|
||||||
@ -136,7 +136,7 @@ func TestMinionStorageInvalidUpdate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMinionStorageValidUpdate(t *testing.T) {
|
func TestMinionRegistryValidUpdate(t *testing.T) {
|
||||||
storage := NewREST(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}))
|
storage := NewREST(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}))
|
||||||
ctx := api.NewContext()
|
ctx := api.NewContext()
|
||||||
obj, err := storage.Get(ctx, "foo")
|
obj, err := storage.Get(ctx, "foo")
|
||||||
@ -156,7 +156,7 @@ func TestMinionStorageValidUpdate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMinionStorageValidatesCreate(t *testing.T) {
|
func TestMinionRegistryValidatesCreate(t *testing.T) {
|
||||||
storage := NewREST(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}))
|
storage := NewREST(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}))
|
||||||
ctx := api.NewContext()
|
ctx := api.NewContext()
|
||||||
validSelector := map[string]string{"a": "b"}
|
validSelector := map[string]string{"a": "b"}
|
||||||
|
@ -20,15 +20,20 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// MinionRegistry implements minion.Registry interface.
|
||||||
type MinionRegistry struct {
|
type MinionRegistry struct {
|
||||||
Err error
|
Err error
|
||||||
Minion string
|
Minion string
|
||||||
Minions api.NodeList
|
Minions api.NodeList
|
||||||
|
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MakeMinionList constructs api.MinionList from list of minion names and a NodeResource.
|
||||||
func MakeMinionList(minions []string, nodeResources api.NodeResources) *api.NodeList {
|
func MakeMinionList(minions []string, nodeResources api.NodeResources) *api.NodeList {
|
||||||
list := api.NodeList{
|
list := api.NodeList{
|
||||||
Items: make([]api.Node, len(minions)),
|
Items: make([]api.Node, len(minions)),
|
||||||
@ -95,3 +100,7 @@ func (r *MinionRegistry) DeleteMinion(ctx api.Context, minionID string) error {
|
|||||||
r.Minions.Items = newList
|
r.Minions.Items = newList
|
||||||
return r.Err
|
return r.Err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *MinionRegistry) WatchMinions(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
|
||||||
|
return nil, r.Err
|
||||||
|
}
|
||||||
|
@ -65,7 +65,6 @@ func (r *PodRegistry) ListPods(ctx api.Context, selector labels.Selector) (*api.
|
|||||||
|
|
||||||
func (r *PodRegistry) WatchPods(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
|
func (r *PodRegistry) WatchPods(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
|
||||||
return r.broadcaster.Watch(), nil
|
return r.broadcaster.Watch(), nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *PodRegistry) GetPod(ctx api.Context, podId string) (*api.Pod, error) {
|
func (r *PodRegistry) GetPod(ctx api.Context, podId string) (*api.Pod, error) {
|
||||||
|
Loading…
Reference in New Issue
Block a user