mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Merge pull request #5971 from bprashanth/rc_watch_fields
Add the ability to watch fields of a replication controller
This commit is contained in:
commit
768c733bed
@ -1514,6 +1514,21 @@ func init() {
|
|||||||
// If one of the conversion functions is malformed, detect it immediately.
|
// If one of the conversion functions is malformed, detect it immediately.
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta1", "replicationControllers",
|
||||||
|
func(label, value string) (string, string, error) {
|
||||||
|
switch label {
|
||||||
|
case "name":
|
||||||
|
return "name", value, nil
|
||||||
|
case "currentState.replicas":
|
||||||
|
return "status.replicas", value, nil
|
||||||
|
default:
|
||||||
|
return "", "", fmt.Errorf("field label not supported: %s", label)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
// If one of the conversion functions is malformed, detect it immediately.
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta1", "events",
|
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta1", "events",
|
||||||
func(label, value string) (string, string, error) {
|
func(label, value string) (string, string, error) {
|
||||||
switch label {
|
switch label {
|
||||||
|
@ -1440,6 +1440,21 @@ func init() {
|
|||||||
// If one of the conversion functions is malformed, detect it immediately.
|
// If one of the conversion functions is malformed, detect it immediately.
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta2", "replicationControllers",
|
||||||
|
func(label, value string) (string, string, error) {
|
||||||
|
switch label {
|
||||||
|
case "name":
|
||||||
|
return "name", value, nil
|
||||||
|
case "currentState.replicas":
|
||||||
|
return "status.replicas", value, nil
|
||||||
|
default:
|
||||||
|
return "", "", fmt.Errorf("field label not supported: %s", label)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
// If one of the conversion functions is malformed, detect it immediately.
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta2", "events",
|
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta2", "events",
|
||||||
func(label, value string) (string, string, error) {
|
func(label, value string) (string, string, error) {
|
||||||
switch label {
|
switch label {
|
||||||
|
@ -39,6 +39,21 @@ func init() {
|
|||||||
// If one of the conversion functions is malformed, detect it immediately.
|
// If one of the conversion functions is malformed, detect it immediately.
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta3", "replicationControllers",
|
||||||
|
func(label, value string) (string, string, error) {
|
||||||
|
switch label {
|
||||||
|
case "name":
|
||||||
|
return "name", value, nil
|
||||||
|
case "status.replicas":
|
||||||
|
return "status.replicas", value, nil
|
||||||
|
default:
|
||||||
|
return "", "", fmt.Errorf("field label not supported: %s", label)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
// If one of the conversion functions is malformed, detect it immediately.
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta3", "events",
|
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta3", "events",
|
||||||
func(label, value string) (string, string, error) {
|
func(label, value string) (string, string, error) {
|
||||||
switch label {
|
switch label {
|
||||||
|
@ -34,6 +34,11 @@ import (
|
|||||||
"github.com/coreos/go-etcd/etcd"
|
"github.com/coreos/go-etcd/etcd"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
PASS = iota
|
||||||
|
FAIL
|
||||||
|
)
|
||||||
|
|
||||||
// newStorage creates a REST storage backed by etcd helpers
|
// newStorage creates a REST storage backed by etcd helpers
|
||||||
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
|
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
|
||||||
fakeEtcdClient := tools.NewFakeEtcdClient(t)
|
fakeEtcdClient := tools.NewFakeEtcdClient(t)
|
||||||
@ -531,6 +536,86 @@ func TestEtcdWatchControllersMatch(t *testing.T) {
|
|||||||
watching.Stop()
|
watching.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEtcdWatchControllersFields(t *testing.T) {
|
||||||
|
ctx := api.WithNamespace(api.NewDefaultContext(), validController.Namespace)
|
||||||
|
storage, fakeClient := newStorage(t)
|
||||||
|
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))
|
||||||
|
|
||||||
|
testFieldMap := map[int][]fields.Set{
|
||||||
|
PASS: {
|
||||||
|
{"status.replicas": "0"},
|
||||||
|
{"name": "foo"},
|
||||||
|
{"status.replicas": "0", "name": "foo"},
|
||||||
|
},
|
||||||
|
FAIL: {
|
||||||
|
{"status.replicas": "10"},
|
||||||
|
{"name": "bar"},
|
||||||
|
{"status.replicas": "10", "name": "foo"},
|
||||||
|
{"status.replicas": "0", "name": "bar"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
testEtcdActions := []string{
|
||||||
|
tools.EtcdCreate,
|
||||||
|
tools.EtcdSet,
|
||||||
|
tools.EtcdCAS,
|
||||||
|
tools.EtcdDelete}
|
||||||
|
|
||||||
|
controller := &api.ReplicationController{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "foo",
|
||||||
|
Labels: validController.Spec.Selector,
|
||||||
|
Namespace: "default",
|
||||||
|
},
|
||||||
|
Status: api.ReplicationControllerStatus{
|
||||||
|
Replicas: 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
controllerBytes, _ := latest.Codec.Encode(controller)
|
||||||
|
|
||||||
|
for expectedResult, fieldSet := range testFieldMap {
|
||||||
|
for _, field := range fieldSet {
|
||||||
|
for _, action := range testEtcdActions {
|
||||||
|
watching, err := storage.Watch(ctx,
|
||||||
|
labels.Everything(),
|
||||||
|
field.AsSelector(),
|
||||||
|
"1",
|
||||||
|
)
|
||||||
|
var prevNode *etcd.Node = nil
|
||||||
|
node := &etcd.Node{
|
||||||
|
Value: string(controllerBytes),
|
||||||
|
}
|
||||||
|
if action == tools.EtcdDelete {
|
||||||
|
prevNode = node
|
||||||
|
}
|
||||||
|
fakeClient.WaitForWatchCompletion()
|
||||||
|
fakeClient.WatchResponse <- &etcd.Response{
|
||||||
|
Action: action,
|
||||||
|
Node: node,
|
||||||
|
PrevNode: prevNode,
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case r, ok := <-watching.ResultChan():
|
||||||
|
if expectedResult == FAIL {
|
||||||
|
t.Errorf("Unexpected result from channel %#v", r)
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("watching channel should be open")
|
||||||
|
}
|
||||||
|
case <-time.After(time.Millisecond * 100):
|
||||||
|
if expectedResult == PASS {
|
||||||
|
t.Error("unexpected timeout from result channel")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
watching.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestEtcdWatchControllersNotMatch(t *testing.T) {
|
func TestEtcdWatchControllersNotMatch(t *testing.T) {
|
||||||
ctx := api.NewDefaultContext()
|
ctx := api.NewDefaultContext()
|
||||||
storage, fakeClient := newStorage(t)
|
storage, fakeClient := newStorage(t)
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/fielderrors"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/fielderrors"
|
||||||
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
// rcStrategy implements verification logic for Replication Controllers.
|
// rcStrategy implements verification logic for Replication Controllers.
|
||||||
@ -73,19 +74,25 @@ func (rcStrategy) ValidateUpdate(obj, old runtime.Object) fielderrors.Validation
|
|||||||
return validation.ValidateReplicationControllerUpdate(old.(*api.ReplicationController), obj.(*api.ReplicationController))
|
return validation.ValidateReplicationControllerUpdate(old.(*api.ReplicationController), obj.(*api.ReplicationController))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ControllerToSelectableFields returns a label set that represents the object.
|
||||||
|
func ControllerToSelectableFields(controller *api.ReplicationController) labels.Set {
|
||||||
|
return labels.Set{
|
||||||
|
"name": controller.Name,
|
||||||
|
"status.replicas": strconv.Itoa(controller.Status.Replicas),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// MatchController is the filter used by the generic etcd backend to route
|
// MatchController is the filter used by the generic etcd backend to route
|
||||||
// watch events from etcd to clients of the apiserver only interested in specific
|
// watch events from etcd to clients of the apiserver only interested in specific
|
||||||
// labels/fields.
|
// labels/fields.
|
||||||
func MatchController(label labels.Selector, field fields.Selector) generic.Matcher {
|
func MatchController(label labels.Selector, field fields.Selector) generic.Matcher {
|
||||||
return generic.MatcherFunc(
|
return generic.MatcherFunc(
|
||||||
func(obj runtime.Object) (bool, error) {
|
func(obj runtime.Object) (bool, error) {
|
||||||
if !field.Empty() {
|
|
||||||
return false, fmt.Errorf("field selector not supported yet")
|
|
||||||
}
|
|
||||||
controllerObj, ok := obj.(*api.ReplicationController)
|
controllerObj, ok := obj.(*api.ReplicationController)
|
||||||
if !ok {
|
if !ok {
|
||||||
return false, fmt.Errorf("Given object is not a replication controller.")
|
return false, fmt.Errorf("Given object is not a replication controller.")
|
||||||
}
|
}
|
||||||
return label.Matches(labels.Set(controllerObj.Labels)), nil
|
fields := ControllerToSelectableFields(controllerObj)
|
||||||
|
return label.Matches(labels.Set(controllerObj.Labels)) && field.Matches(fields), nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -32,6 +32,15 @@ import (
|
|||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Etcd watch event actions
|
||||||
|
const (
|
||||||
|
EtcdCreate = "create"
|
||||||
|
EtcdGet = "get"
|
||||||
|
EtcdSet = "set"
|
||||||
|
EtcdCAS = "compareAndSwap"
|
||||||
|
EtcdDelete = "delete"
|
||||||
|
)
|
||||||
|
|
||||||
// FilterFunc is a predicate which takes an API object and returns true
|
// FilterFunc is a predicate which takes an API object and returns true
|
||||||
// iff the object should remain in the set.
|
// iff the object should remain in the set.
|
||||||
type FilterFunc func(obj runtime.Object) bool
|
type FilterFunc func(obj runtime.Object) bool
|
||||||
@ -377,11 +386,11 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) {
|
|||||||
|
|
||||||
func (w *etcdWatcher) sendResult(res *etcd.Response) {
|
func (w *etcdWatcher) sendResult(res *etcd.Response) {
|
||||||
switch res.Action {
|
switch res.Action {
|
||||||
case "create", "get":
|
case EtcdCreate, EtcdGet:
|
||||||
w.sendAdd(res)
|
w.sendAdd(res)
|
||||||
case "set", "compareAndSwap":
|
case EtcdSet, EtcdCAS:
|
||||||
w.sendModify(res)
|
w.sendModify(res)
|
||||||
case "delete":
|
case EtcdDelete:
|
||||||
w.sendDelete(res)
|
w.sendDelete(res)
|
||||||
default:
|
default:
|
||||||
glog.Errorf("unknown action: %v", res.Action)
|
glog.Errorf("unknown action: %v", res.Action)
|
||||||
|
Loading…
Reference in New Issue
Block a user