mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-16 14:45:28 +00:00
Enable watch on node changes.
This commit is contained in:
@@ -42,12 +42,15 @@ const (
|
||||
ServicePath string = "/registry/services/specs"
|
||||
// ServiceEndpointPath is the path to service endpoints resources in etcd
|
||||
ServiceEndpointPath string = "/registry/services/endpoints"
|
||||
// NodePath is the path to node resources in etcd
|
||||
NodePath string = "/registry/minions"
|
||||
)
|
||||
|
||||
// TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into
|
||||
// kubelet (and vice versa)
|
||||
|
||||
// Registry implements PodRegistry, ControllerRegistry, ServiceRegistry and MinionRegistry, backed by etcd.
|
||||
// Registry implements BindingRegistry, ControllerRegistry, EndpointRegistry,
|
||||
// MinionRegistry, PodRegistry and ServiceRegistry, backed by etcd.
|
||||
type Registry struct {
|
||||
tools.EtcdHelper
|
||||
boundPodFactory pod.BoundPodFactory
|
||||
@@ -570,31 +573,35 @@ func (r *Registry) WatchEndpoints(ctx api.Context, label, field labels.Selector,
|
||||
return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported")
|
||||
}
|
||||
|
||||
func makeMinionKey(minionID string) string {
|
||||
return "/registry/minions/" + minionID
|
||||
func makeNodeKey(nodeID string) string {
|
||||
return NodePath + "/" + nodeID
|
||||
}
|
||||
|
||||
func makeNodeListKey() string {
|
||||
return NodePath
|
||||
}
|
||||
|
||||
func (r *Registry) ListMinions(ctx api.Context) (*api.NodeList, error) {
|
||||
minions := &api.NodeList{}
|
||||
err := r.ExtractToList("/registry/minions", minions)
|
||||
err := r.ExtractToList(makeNodeListKey(), minions)
|
||||
return minions, err
|
||||
}
|
||||
|
||||
func (r *Registry) CreateMinion(ctx api.Context, minion *api.Node) error {
|
||||
// TODO: Add some validations.
|
||||
err := r.CreateObj(makeMinionKey(minion.Name), minion, 0)
|
||||
err := r.CreateObj(makeNodeKey(minion.Name), minion, 0)
|
||||
return etcderr.InterpretCreateError(err, "minion", minion.Name)
|
||||
}
|
||||
|
||||
func (r *Registry) UpdateMinion(ctx api.Context, minion *api.Node) error {
|
||||
// TODO: Add some validations.
|
||||
err := r.SetObj(makeMinionKey(minion.Name), minion)
|
||||
err := r.SetObj(makeNodeKey(minion.Name), minion)
|
||||
return etcderr.InterpretUpdateError(err, "minion", minion.Name)
|
||||
}
|
||||
|
||||
func (r *Registry) GetMinion(ctx api.Context, minionID string) (*api.Node, error) {
|
||||
var minion api.Node
|
||||
key := makeMinionKey(minionID)
|
||||
key := makeNodeKey(minionID)
|
||||
err := r.ExtractObj(key, &minion, false)
|
||||
if err != nil {
|
||||
return nil, etcderr.InterpretGetError(err, "minion", minion.Name)
|
||||
@@ -603,10 +610,27 @@ func (r *Registry) GetMinion(ctx api.Context, minionID string) (*api.Node, error
|
||||
}
|
||||
|
||||
func (r *Registry) DeleteMinion(ctx api.Context, minionID string) error {
|
||||
key := makeMinionKey(minionID)
|
||||
key := makeNodeKey(minionID)
|
||||
err := r.Delete(key, true)
|
||||
if err != nil {
|
||||
return etcderr.InterpretDeleteError(err, "minion", minionID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Registry) WatchMinions(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
|
||||
version, err := tools.ParseWatchResourceVersion(resourceVersion, "node")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
key := makeNodeListKey()
|
||||
return r.WatchList(key, version, func(obj runtime.Object) bool {
|
||||
minionObj, ok := obj.(*api.Node)
|
||||
if !ok {
|
||||
// Must be an error: return true to propagate to upper level.
|
||||
return true
|
||||
}
|
||||
// TODO: Add support for filtering based on field, once NodeStatus is defined.
|
||||
return label.Matches(labels.Set(minionObj.Labels))
|
||||
})
|
||||
}
|
||||
|
@@ -1681,6 +1681,112 @@ func TestEtcdDeleteMinion(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdWatchMinion(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
watching, err := registry.WatchMinions(ctx,
|
||||
labels.Everything(),
|
||||
labels.Everything(),
|
||||
"1",
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
fakeClient.WaitForWatchCompletion()
|
||||
|
||||
select {
|
||||
case _, ok := <-watching.ResultChan():
|
||||
if !ok {
|
||||
t.Errorf("watching channel should be open")
|
||||
}
|
||||
default:
|
||||
}
|
||||
fakeClient.WatchInjectError <- nil
|
||||
if _, ok := <-watching.ResultChan(); ok {
|
||||
t.Errorf("watching channel should be closed")
|
||||
}
|
||||
watching.Stop()
|
||||
}
|
||||
|
||||
func TestEtcdWatchMinionsMatch(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
watching, err := registry.WatchMinions(ctx,
|
||||
labels.SelectorFromSet(labels.Set{"name": "foo"}),
|
||||
labels.Everything(),
|
||||
"1",
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
fakeClient.WaitForWatchCompletion()
|
||||
|
||||
node := &api.Node{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Labels: map[string]string{
|
||||
"name": "foo",
|
||||
},
|
||||
},
|
||||
}
|
||||
nodeBytes, _ := latest.Codec.Encode(node)
|
||||
fakeClient.WatchResponse <- &etcd.Response{
|
||||
Action: "create",
|
||||
Node: &etcd.Node{
|
||||
Value: string(nodeBytes),
|
||||
},
|
||||
}
|
||||
select {
|
||||
case _, ok := <-watching.ResultChan():
|
||||
if !ok {
|
||||
t.Errorf("watching channel should be open")
|
||||
}
|
||||
case <-time.After(time.Millisecond * 100):
|
||||
t.Error("unexpected timeout from result channel")
|
||||
}
|
||||
watching.Stop()
|
||||
}
|
||||
|
||||
func TestEtcdWatchMinionsNotMatch(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
watching, err := registry.WatchMinions(ctx,
|
||||
labels.SelectorFromSet(labels.Set{"name": "foo"}),
|
||||
labels.Everything(),
|
||||
"1",
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
fakeClient.WaitForWatchCompletion()
|
||||
|
||||
node := &api.Node{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "bar",
|
||||
Labels: map[string]string{
|
||||
"name": "bar",
|
||||
},
|
||||
},
|
||||
}
|
||||
nodeBytes, _ := latest.Codec.Encode(node)
|
||||
fakeClient.WatchResponse <- &etcd.Response{
|
||||
Action: "create",
|
||||
Node: &etcd.Node{
|
||||
Value: string(nodeBytes),
|
||||
},
|
||||
}
|
||||
|
||||
select {
|
||||
case <-watching.ResultChan():
|
||||
t.Error("unexpected result from result channel")
|
||||
case <-time.After(time.Millisecond * 100):
|
||||
// expected case
|
||||
}
|
||||
}
|
||||
|
||||
// TODO We need a test for the compare and swap behavior. This basically requires two things:
|
||||
// 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that
|
||||
// channel, this will enable us to orchestrate the flow of etcd requests in the test.
|
||||
|
Reference in New Issue
Block a user