Merge pull request #28966 from liggitt/cache-filter

Automatic merge from submit-queue

Fix watch cache filtering

When serving watch events for a particular namespace, the watch cache filters out events from other namespaces by checking the etcd key of the event's object, and making sure it is prefixed with the root key for the namespace being watched.

The prefix check does not ensure the match occurs on a path segment boundary, so a watch on  namespace `test` is delivered watch events for namespace `test1`.

This tightens the check to ensure the prefix match occurs on a path segment boundary.
This commit is contained in:
k8s-merge-robot 2016-07-14 11:22:10 -07:00 committed by GitHub
commit 4c58621413
4 changed files with 99 additions and 3 deletions

View File

@ -21,7 +21,6 @@ import (
"net/http"
"reflect"
"strconv"
"strings"
"sync"
"time"
@ -479,7 +478,7 @@ func filterFunction(key string, keyFunc func(runtime.Object) (string, error), fi
glog.Errorf("invalid object for filter: %v", obj)
return false
}
if !strings.HasPrefix(objKey, key) {
if !hasPathPrefix(objKey, key) {
return false
}
return filter.Filter(obj)

View File

@ -79,7 +79,7 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *api.Pod) *api.Pod {
}
return newObj.(*api.Pod), nil, nil
}
key := etcdtest.AddPrefix("pods/ns/" + obj.Name)
key := etcdtest.AddPrefix("pods/" + obj.Namespace + "/" + obj.Name)
if err := s.GuaranteedUpdate(context.TODO(), key, &api.Pod{}, old == nil, nil, updateFn); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -110,6 +110,12 @@ func TestList(t *testing.T) {
_ = updatePod(t, etcdStorage, podFooPrime, fooCreated)
// Create a pod in a namespace that contains "ns" as a prefix
// Make sure it is not returned in a watch of "ns"
podFooNS2 := makeTestPod("foo")
podFooNS2.Namespace += "2"
updatePod(t, etcdStorage, podFooNS2, nil)
deleted := api.Pod{}
if err := etcdStorage.Delete(context.TODO(), etcdtest.AddPrefix("pods/ns/bar"), &deleted, nil); err != nil {
t.Errorf("Unexpected error: %v", err)
@ -147,6 +153,10 @@ func TestList(t *testing.T) {
item.ResourceVersion = ""
item.CreationTimestamp = unversioned.Time{}
if item.Namespace != "ns" {
t.Errorf("Unexpected namespace: %s", item.Namespace)
}
var expected *api.Pod
switch item.Name {
case "foo":
@ -210,6 +220,9 @@ func TestWatch(t *testing.T) {
podFooBis := makeTestPod("foo")
podFooBis.Spec.NodeName = "anotherFakeNode"
podFooNS2 := makeTestPod("foo")
podFooNS2.Namespace += "2"
// initialVersion is used to initate the watcher at the beginning of the world,
// which is not defined precisely in etcd.
initialVersion, err := cacher.LastSyncResourceVersion()
@ -225,6 +238,9 @@ func TestWatch(t *testing.T) {
}
defer watcher.Stop()
// Create in another namespace first to make sure events from other namespaces don't get delivered
updatePod(t, etcdStorage, podFooNS2, nil)
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
_ = updatePod(t, etcdStorage, podBar, nil)
fooUpdated := updatePod(t, etcdStorage, podFooPrime, fooCreated)
@ -320,6 +336,13 @@ func TestFiltering(t *testing.T) {
podFooPrime.Labels = map[string]string{"filter": "foo"}
podFooPrime.Spec.NodeName = "fakeNode"
podFooNS2 := makeTestPod("foo")
podFooNS2.Namespace += "2"
podFooNS2.Labels = map[string]string{"filter": "foo"}
// Create in another namespace first to make sure events from other namespaces don't get delivered
updatePod(t, etcdStorage, podFooNS2, nil)
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
fooFiltered := updatePod(t, etcdStorage, podFooFiltered, fooCreated)
fooUnfiltered := updatePod(t, etcdStorage, podFoo, fooFiltered)

View File

@ -19,6 +19,7 @@ package storage
import (
"fmt"
"strconv"
"strings"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/validation"
@ -123,3 +124,28 @@ func NoNamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
}
return prefix + "/" + name, nil
}
// hasPathPrefix returns true if the string matches pathPrefix exactly, or if is prefixed with pathPrefix at a path segment boundary
func hasPathPrefix(s, pathPrefix string) bool {
// Short circuit if s doesn't contain the prefix at all
if !strings.HasPrefix(s, pathPrefix) {
return false
}
pathPrefixLength := len(pathPrefix)
if len(s) == pathPrefixLength {
// Exact match
return true
}
if strings.HasSuffix(pathPrefix, "/") {
// pathPrefix already ensured a path segment boundary
return true
}
if s[pathPrefixLength:pathPrefixLength+1] == "/" {
// The next character in s is a path segment boundary
// Check this instead of normalizing pathPrefix to avoid allocating on every call
return true
}
return false
}

View File

@ -51,3 +51,51 @@ func TestEtcdParseWatchResourceVersion(t *testing.T) {
}
}
}
func TestHasPathPrefix(t *testing.T) {
validTestcases := []struct {
s string
prefix string
}{
// Exact matches
{"", ""},
{"a", "a"},
{"a/", "a/"},
{"a/../", "a/../"},
// Path prefix matches
{"a/b", "a"},
{"a/b", "a/"},
{"中文/", "中文"},
}
for i, tc := range validTestcases {
if !hasPathPrefix(tc.s, tc.prefix) {
t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be true`, i, tc.s, tc.prefix)
}
}
invalidTestcases := []struct {
s string
prefix string
}{
// Mismatch
{"a", "b"},
// Dir requirement
{"a", "a/"},
// Prefix mismatch
{"ns2", "ns"},
{"ns2", "ns/"},
{"中文文", "中文"},
// Ensure no normalization is applied
{"a/c/../b/", "a/b/"},
{"a/", "a/b/.."},
}
for i, tc := range invalidTestcases {
if hasPathPrefix(tc.s, tc.prefix) {
t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be false`, i, tc.s, tc.prefix)
}
}
}