mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-11-04 07:49:35 +00:00 
			
		
		
		
	It's possible to watch /foo/bar, get events for /foo/bar/baz, and then if someone deletes /foo/bar you'll get an event for /foo/bar without a value. This results in an error being printed in the logs because /foo/bar has value "" and we can't decode that. This commit excludes the parent directory from watch events for now.
		
			
				
	
	
		
			685 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			685 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2014 Google Inc. All rights reserved.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package tools
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"reflect"
 | 
						|
	"testing"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
 | 
						|
	"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
 | 
						|
	"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
 | 
						|
	"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
 | 
						|
	"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
 | 
						|
	"github.com/coreos/go-etcd/etcd"
 | 
						|
)
 | 
						|
 | 
						|
func TestWatchInterpretations(t *testing.T) {
 | 
						|
	codec := latest.Codec
 | 
						|
	// Declare some pods to make the test cases compact.
 | 
						|
	podFoo := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
 | 
						|
	podBar := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}}
 | 
						|
	podBaz := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "baz"}}
 | 
						|
	firstLetterIsB := func(obj runtime.Object) bool {
 | 
						|
		return obj.(*api.Pod).Name[0] == 'b'
 | 
						|
	}
 | 
						|
 | 
						|
	// All of these test cases will be run with the firstLetterIsB FilterFunc.
 | 
						|
	table := map[string]struct {
 | 
						|
		actions       []string // Run this test item for every action here.
 | 
						|
		prevNodeValue string
 | 
						|
		nodeValue     string
 | 
						|
		expectEmit    bool
 | 
						|
		expectType    watch.EventType
 | 
						|
		expectObject  runtime.Object
 | 
						|
	}{
 | 
						|
		"create": {
 | 
						|
			actions:      []string{"create", "get"},
 | 
						|
			nodeValue:    runtime.EncodeOrDie(codec, podBar),
 | 
						|
			expectEmit:   true,
 | 
						|
			expectType:   watch.Added,
 | 
						|
			expectObject: podBar,
 | 
						|
		},
 | 
						|
		"create but filter blocks": {
 | 
						|
			actions:    []string{"create", "get"},
 | 
						|
			nodeValue:  runtime.EncodeOrDie(codec, podFoo),
 | 
						|
			expectEmit: false,
 | 
						|
		},
 | 
						|
		"delete": {
 | 
						|
			actions:       []string{"delete"},
 | 
						|
			prevNodeValue: runtime.EncodeOrDie(codec, podBar),
 | 
						|
			expectEmit:    true,
 | 
						|
			expectType:    watch.Deleted,
 | 
						|
			expectObject:  podBar,
 | 
						|
		},
 | 
						|
		"delete but filter blocks": {
 | 
						|
			actions:    []string{"delete"},
 | 
						|
			nodeValue:  runtime.EncodeOrDie(codec, podFoo),
 | 
						|
			expectEmit: false,
 | 
						|
		},
 | 
						|
		"modify appears to create 1": {
 | 
						|
			actions:      []string{"set", "compareAndSwap"},
 | 
						|
			nodeValue:    runtime.EncodeOrDie(codec, podBar),
 | 
						|
			expectEmit:   true,
 | 
						|
			expectType:   watch.Added,
 | 
						|
			expectObject: podBar,
 | 
						|
		},
 | 
						|
		"modify appears to create 2": {
 | 
						|
			actions:       []string{"set", "compareAndSwap"},
 | 
						|
			prevNodeValue: runtime.EncodeOrDie(codec, podFoo),
 | 
						|
			nodeValue:     runtime.EncodeOrDie(codec, podBar),
 | 
						|
			expectEmit:    true,
 | 
						|
			expectType:    watch.Added,
 | 
						|
			expectObject:  podBar,
 | 
						|
		},
 | 
						|
		"modify appears to delete": {
 | 
						|
			actions:       []string{"set", "compareAndSwap"},
 | 
						|
			prevNodeValue: runtime.EncodeOrDie(codec, podBar),
 | 
						|
			nodeValue:     runtime.EncodeOrDie(codec, podFoo),
 | 
						|
			expectEmit:    true,
 | 
						|
			expectType:    watch.Deleted,
 | 
						|
			expectObject:  podBar, // Should return last state that passed the filter!
 | 
						|
		},
 | 
						|
		"modify modifies": {
 | 
						|
			actions:       []string{"set", "compareAndSwap"},
 | 
						|
			prevNodeValue: runtime.EncodeOrDie(codec, podBar),
 | 
						|
			nodeValue:     runtime.EncodeOrDie(codec, podBaz),
 | 
						|
			expectEmit:    true,
 | 
						|
			expectType:    watch.Modified,
 | 
						|
			expectObject:  podBaz,
 | 
						|
		},
 | 
						|
		"modify ignores": {
 | 
						|
			actions:    []string{"set", "compareAndSwap"},
 | 
						|
			nodeValue:  runtime.EncodeOrDie(codec, podFoo),
 | 
						|
			expectEmit: false,
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	for name, item := range table {
 | 
						|
		for _, action := range item.actions {
 | 
						|
			w := newEtcdWatcher(true, nil, firstLetterIsB, codec, versioner, nil)
 | 
						|
			emitCalled := false
 | 
						|
			w.emit = func(event watch.Event) {
 | 
						|
				emitCalled = true
 | 
						|
				if !item.expectEmit {
 | 
						|
					return
 | 
						|
				}
 | 
						|
				if e, a := item.expectType, event.Type; e != a {
 | 
						|
					t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a)
 | 
						|
				}
 | 
						|
				if e, a := item.expectObject, event.Object; !reflect.DeepEqual(e, a) {
 | 
						|
					t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a)
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			var n, pn *etcd.Node
 | 
						|
			if item.nodeValue != "" {
 | 
						|
				n = &etcd.Node{Value: item.nodeValue}
 | 
						|
			}
 | 
						|
			if item.prevNodeValue != "" {
 | 
						|
				pn = &etcd.Node{Value: item.prevNodeValue}
 | 
						|
			}
 | 
						|
 | 
						|
			w.sendResult(&etcd.Response{
 | 
						|
				Action:   action,
 | 
						|
				Node:     n,
 | 
						|
				PrevNode: pn,
 | 
						|
			})
 | 
						|
 | 
						|
			if e, a := item.expectEmit, emitCalled; e != a {
 | 
						|
				t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a)
 | 
						|
			}
 | 
						|
			w.Stop()
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
 | 
						|
	w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil)
 | 
						|
	w.emit = func(e watch.Event) {
 | 
						|
		t.Errorf("Unexpected emit: %v", e)
 | 
						|
	}
 | 
						|
 | 
						|
	w.sendResult(&etcd.Response{
 | 
						|
		Action: "update",
 | 
						|
	})
 | 
						|
	w.Stop()
 | 
						|
}
 | 
						|
 | 
						|
func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
 | 
						|
	actions := []string{"create", "set", "compareAndSwap", "delete"}
 | 
						|
	for _, action := range actions {
 | 
						|
		w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil)
 | 
						|
		w.emit = func(e watch.Event) {
 | 
						|
			t.Errorf("Unexpected emit: %v", e)
 | 
						|
		}
 | 
						|
		w.sendResult(&etcd.Response{
 | 
						|
			Action: action,
 | 
						|
		})
 | 
						|
		w.Stop()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestWatchInterpretation_ResponseBadData(t *testing.T) {
 | 
						|
	actions := []string{"create", "set", "compareAndSwap", "delete"}
 | 
						|
	for _, action := range actions {
 | 
						|
		w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil)
 | 
						|
		w.emit = func(e watch.Event) {
 | 
						|
			t.Errorf("Unexpected emit: %v", e)
 | 
						|
		}
 | 
						|
		w.sendResult(&etcd.Response{
 | 
						|
			Action: action,
 | 
						|
			Node: &etcd.Node{
 | 
						|
				Value: "foobar",
 | 
						|
			},
 | 
						|
		})
 | 
						|
		w.sendResult(&etcd.Response{
 | 
						|
			Action: action,
 | 
						|
			PrevNode: &etcd.Node{
 | 
						|
				Value: "foobar",
 | 
						|
			},
 | 
						|
		})
 | 
						|
		w.Stop()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestWatchEtcdError(t *testing.T) {
 | 
						|
	codec := latest.Codec
 | 
						|
	fakeClient := NewFakeEtcdClient(t)
 | 
						|
	fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
 | 
						|
	fakeClient.WatchImmediateError = fmt.Errorf("immediate error")
 | 
						|
	h := EtcdHelper{fakeClient, codec, versioner}
 | 
						|
 | 
						|
	got := <-h.Watch("/some/key", 4).ResultChan()
 | 
						|
	if got.Type != watch.Error {
 | 
						|
		t.Fatalf("Unexpected non-error")
 | 
						|
	}
 | 
						|
	status, ok := got.Object.(*api.Status)
 | 
						|
	if !ok {
 | 
						|
		t.Fatalf("Unexpected non-error object type")
 | 
						|
	}
 | 
						|
	if status.Message != "immediate error" {
 | 
						|
		t.Errorf("Unexpected wrong error")
 | 
						|
	}
 | 
						|
	if status.Status != api.StatusFailure {
 | 
						|
		t.Errorf("Unexpected wrong error status")
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestWatch(t *testing.T) {
 | 
						|
	codec := latest.Codec
 | 
						|
	fakeClient := NewFakeEtcdClient(t)
 | 
						|
	fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
 | 
						|
	h := EtcdHelper{fakeClient, codec, versioner}
 | 
						|
 | 
						|
	watching := h.Watch("/some/key", 0)
 | 
						|
 | 
						|
	fakeClient.WaitForWatchCompletion()
 | 
						|
	// when server returns not found, the watch index starts at the next value (1)
 | 
						|
	if fakeClient.WatchIndex != 1 {
 | 
						|
		t.Errorf("Expected client to be at index %d, got %#v", 1, fakeClient)
 | 
						|
	}
 | 
						|
 | 
						|
	// Test normal case
 | 
						|
	pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
 | 
						|
	podBytes, _ := codec.Encode(pod)
 | 
						|
	fakeClient.WatchResponse <- &etcd.Response{
 | 
						|
		Action: "set",
 | 
						|
		Node: &etcd.Node{
 | 
						|
			Value: string(podBytes),
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	event := <-watching.ResultChan()
 | 
						|
	if e, a := watch.Added, event.Type; e != a {
 | 
						|
		t.Errorf("Expected %v, got %v", e, a)
 | 
						|
	}
 | 
						|
	if e, a := pod, event.Object; !reflect.DeepEqual(e, a) {
 | 
						|
		t.Errorf("Expected %v, got %v", e, a)
 | 
						|
	}
 | 
						|
 | 
						|
	// Test error case
 | 
						|
	fakeClient.WatchInjectError <- fmt.Errorf("Injected error")
 | 
						|
 | 
						|
	if errEvent, ok := <-watching.ResultChan(); !ok {
 | 
						|
		t.Errorf("no error result?")
 | 
						|
	} else {
 | 
						|
		if e, a := watch.Error, errEvent.Type; e != a {
 | 
						|
			t.Errorf("Expected %v, got %v", e, a)
 | 
						|
		}
 | 
						|
		if e, a := "Injected error", errEvent.Object.(*api.Status).Message; e != a {
 | 
						|
			t.Errorf("Expected %v, got %v", e, a)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Did everything shut down?
 | 
						|
	if _, open := <-fakeClient.WatchResponse; open {
 | 
						|
		t.Errorf("An injected error did not cause a graceful shutdown")
 | 
						|
	}
 | 
						|
	if _, open := <-watching.ResultChan(); open {
 | 
						|
		t.Errorf("An injected error did not cause a graceful shutdown")
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestWatchEtcdState(t *testing.T) {
 | 
						|
	codec := latest.Codec
 | 
						|
	type T struct {
 | 
						|
		Type      watch.EventType
 | 
						|
		Endpoints []string
 | 
						|
	}
 | 
						|
	testCases := map[string]struct {
 | 
						|
		Initial   map[string]EtcdResponseWithError
 | 
						|
		Responses []*etcd.Response
 | 
						|
		From      uint64
 | 
						|
		Expected  []*T
 | 
						|
	}{
 | 
						|
		"from not found": {
 | 
						|
			Initial: map[string]EtcdResponseWithError{},
 | 
						|
			Responses: []*etcd.Response{
 | 
						|
				{
 | 
						|
					Action: "create",
 | 
						|
					Node: &etcd.Node{
 | 
						|
						Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{}})),
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
			From: 1,
 | 
						|
			Expected: []*T{
 | 
						|
				{watch.Added, nil},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		"from version 1": {
 | 
						|
			Responses: []*etcd.Response{
 | 
						|
				{
 | 
						|
					Action: "compareAndSwap",
 | 
						|
					Node: &etcd.Node{
 | 
						|
						Value:         string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{"127.0.0.1:9000"}})),
 | 
						|
						CreatedIndex:  1,
 | 
						|
						ModifiedIndex: 2,
 | 
						|
					},
 | 
						|
					PrevNode: &etcd.Node{
 | 
						|
						Value:         string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{}})),
 | 
						|
						CreatedIndex:  1,
 | 
						|
						ModifiedIndex: 1,
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
			From: 1,
 | 
						|
			Expected: []*T{
 | 
						|
				{watch.Modified, []string{"127.0.0.1:9000"}},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		"from initial state": {
 | 
						|
			Initial: map[string]EtcdResponseWithError{
 | 
						|
				"/somekey/foo": {
 | 
						|
					R: &etcd.Response{
 | 
						|
						Action: "get",
 | 
						|
						Node: &etcd.Node{
 | 
						|
							Value:         string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{}})),
 | 
						|
							CreatedIndex:  1,
 | 
						|
							ModifiedIndex: 1,
 | 
						|
						},
 | 
						|
						EtcdIndex: 1,
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
			Responses: []*etcd.Response{
 | 
						|
				nil,
 | 
						|
				{
 | 
						|
					Action: "compareAndSwap",
 | 
						|
					Node: &etcd.Node{
 | 
						|
						Value:         string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{"127.0.0.1:9000"}})),
 | 
						|
						CreatedIndex:  1,
 | 
						|
						ModifiedIndex: 2,
 | 
						|
					},
 | 
						|
					PrevNode: &etcd.Node{
 | 
						|
						Value:         string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{}})),
 | 
						|
						CreatedIndex:  1,
 | 
						|
						ModifiedIndex: 1,
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
			Expected: []*T{
 | 
						|
				{watch.Added, nil},
 | 
						|
				{watch.Modified, []string{"127.0.0.1:9000"}},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	for k, testCase := range testCases {
 | 
						|
		fakeClient := NewFakeEtcdClient(t)
 | 
						|
		for key, value := range testCase.Initial {
 | 
						|
			fakeClient.Data[key] = value
 | 
						|
		}
 | 
						|
		h := EtcdHelper{fakeClient, codec, versioner}
 | 
						|
		watching := h.Watch("/somekey/foo", testCase.From)
 | 
						|
		fakeClient.WaitForWatchCompletion()
 | 
						|
 | 
						|
		t.Logf("Testing %v", k)
 | 
						|
		for i := range testCase.Responses {
 | 
						|
			if testCase.Responses[i] != nil {
 | 
						|
				fakeClient.WatchResponse <- testCase.Responses[i]
 | 
						|
			}
 | 
						|
			event := <-watching.ResultChan()
 | 
						|
			if e, a := testCase.Expected[i].Type, event.Type; e != a {
 | 
						|
				t.Errorf("%s: expected type %v, got %v", k, e, a)
 | 
						|
				break
 | 
						|
			}
 | 
						|
			if e, a := testCase.Expected[i].Endpoints, event.Object.(*api.Endpoints).Endpoints; !reflect.DeepEqual(e, a) {
 | 
						|
				t.Errorf("%s: expected type %v, got %v", k, e, a)
 | 
						|
				break
 | 
						|
			}
 | 
						|
		}
 | 
						|
		watching.Stop()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestWatchFromZeroIndex(t *testing.T) {
 | 
						|
	codec := latest.Codec
 | 
						|
	pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
 | 
						|
 | 
						|
	testCases := map[string]struct {
 | 
						|
		Response        EtcdResponseWithError
 | 
						|
		ExpectedVersion string
 | 
						|
		ExpectedType    watch.EventType
 | 
						|
	}{
 | 
						|
		"get value created": {
 | 
						|
			EtcdResponseWithError{
 | 
						|
				R: &etcd.Response{
 | 
						|
					Node: &etcd.Node{
 | 
						|
						Value:         runtime.EncodeOrDie(codec, pod),
 | 
						|
						CreatedIndex:  1,
 | 
						|
						ModifiedIndex: 1,
 | 
						|
					},
 | 
						|
					Action:    "get",
 | 
						|
					EtcdIndex: 2,
 | 
						|
				},
 | 
						|
			},
 | 
						|
			"1",
 | 
						|
			watch.Added,
 | 
						|
		},
 | 
						|
		"get value modified": {
 | 
						|
			EtcdResponseWithError{
 | 
						|
				R: &etcd.Response{
 | 
						|
					Node: &etcd.Node{
 | 
						|
						Value:         runtime.EncodeOrDie(codec, pod),
 | 
						|
						CreatedIndex:  1,
 | 
						|
						ModifiedIndex: 2,
 | 
						|
					},
 | 
						|
					Action:    "get",
 | 
						|
					EtcdIndex: 3,
 | 
						|
				},
 | 
						|
			},
 | 
						|
			"2",
 | 
						|
			watch.Modified,
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	for k, testCase := range testCases {
 | 
						|
		fakeClient := NewFakeEtcdClient(t)
 | 
						|
		fakeClient.Data["/some/key"] = testCase.Response
 | 
						|
		h := EtcdHelper{fakeClient, codec, versioner}
 | 
						|
 | 
						|
		watching := h.Watch("/some/key", 0)
 | 
						|
 | 
						|
		fakeClient.WaitForWatchCompletion()
 | 
						|
		if e, a := testCase.Response.R.EtcdIndex+1, fakeClient.WatchIndex; e != a {
 | 
						|
			t.Errorf("%s: expected watch index to be %d, got %d", k, e, a)
 | 
						|
		}
 | 
						|
 | 
						|
		// the existing node is detected and the index set
 | 
						|
		event := <-watching.ResultChan()
 | 
						|
		if e, a := testCase.ExpectedType, event.Type; e != a {
 | 
						|
			t.Errorf("%s: expected %v, got %v", k, e, a)
 | 
						|
		}
 | 
						|
		actualPod, ok := event.Object.(*api.Pod)
 | 
						|
		if !ok {
 | 
						|
			t.Fatalf("%s: expected a pod, got %#v", k, event.Object)
 | 
						|
		}
 | 
						|
		if actualPod.ResourceVersion != testCase.ExpectedVersion {
 | 
						|
			t.Errorf("%s: expected pod with resource version %v, Got %#v", k, testCase.ExpectedVersion, actualPod)
 | 
						|
		}
 | 
						|
		pod.ResourceVersion = testCase.ExpectedVersion
 | 
						|
		if e, a := pod, event.Object; !reflect.DeepEqual(e, a) {
 | 
						|
			t.Errorf("%s: expected %v, got %v", k, e, a)
 | 
						|
		}
 | 
						|
		watching.Stop()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestWatchListFromZeroIndex(t *testing.T) {
 | 
						|
	codec := latest.Codec
 | 
						|
	pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
 | 
						|
 | 
						|
	fakeClient := NewFakeEtcdClient(t)
 | 
						|
	fakeClient.Data["/some/key"] = EtcdResponseWithError{
 | 
						|
		R: &etcd.Response{
 | 
						|
			Node: &etcd.Node{
 | 
						|
				Dir: true,
 | 
						|
				Nodes: etcd.Nodes{
 | 
						|
					&etcd.Node{
 | 
						|
						Value:         runtime.EncodeOrDie(codec, pod),
 | 
						|
						CreatedIndex:  1,
 | 
						|
						ModifiedIndex: 1,
 | 
						|
						Nodes:         etcd.Nodes{},
 | 
						|
					},
 | 
						|
					&etcd.Node{
 | 
						|
						Value:         runtime.EncodeOrDie(codec, pod),
 | 
						|
						CreatedIndex:  2,
 | 
						|
						ModifiedIndex: 2,
 | 
						|
						Nodes:         etcd.Nodes{},
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
			Action:    "get",
 | 
						|
			EtcdIndex: 3,
 | 
						|
		},
 | 
						|
	}
 | 
						|
	h := EtcdHelper{fakeClient, codec, versioner}
 | 
						|
 | 
						|
	watching, err := h.WatchList("/some/key", 0, Everything)
 | 
						|
	if err != nil {
 | 
						|
		t.Fatalf("Unexpected error: %v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	// the existing node is detected and the index set
 | 
						|
	event, open := <-watching.ResultChan()
 | 
						|
	if !open {
 | 
						|
		t.Fatalf("unexpected channel close")
 | 
						|
	}
 | 
						|
	for i := 0; i < 2; i++ {
 | 
						|
		if e, a := watch.Added, event.Type; e != a {
 | 
						|
			t.Errorf("Expected %v, got %v", e, a)
 | 
						|
		}
 | 
						|
		actualPod, ok := event.Object.(*api.Pod)
 | 
						|
		if !ok {
 | 
						|
			t.Fatalf("expected a pod, got %#v", event.Object)
 | 
						|
		}
 | 
						|
		if actualPod.ResourceVersion != "1" {
 | 
						|
			t.Errorf("Expected pod with resource version %d, Got %#v", 1, actualPod)
 | 
						|
		}
 | 
						|
		pod.ResourceVersion = "1"
 | 
						|
		if e, a := pod, event.Object; !reflect.DeepEqual(e, a) {
 | 
						|
			t.Errorf("Expected %v, got %v", e, a)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	fakeClient.WaitForWatchCompletion()
 | 
						|
	watching.Stop()
 | 
						|
}
 | 
						|
 | 
						|
func TestWatchListIgnoresRootKey(t *testing.T) {
 | 
						|
	codec := latest.Codec
 | 
						|
	pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
 | 
						|
 | 
						|
	fakeClient := NewFakeEtcdClient(t)
 | 
						|
	h := EtcdHelper{fakeClient, codec, versioner}
 | 
						|
 | 
						|
	watching, err := h.WatchList("/some/key", 1, Everything)
 | 
						|
	if err != nil {
 | 
						|
		t.Fatalf("Unexpected error: %v", err)
 | 
						|
	}
 | 
						|
	fakeClient.WaitForWatchCompletion()
 | 
						|
 | 
						|
	// This is the root directory of the watch, which happens to have a value encoded
 | 
						|
	fakeClient.WatchResponse <- &etcd.Response{
 | 
						|
		Action: "delete",
 | 
						|
		PrevNode: &etcd.Node{
 | 
						|
			Key:           "/some/key",
 | 
						|
			Value:         runtime.EncodeOrDie(codec, pod),
 | 
						|
			CreatedIndex:  1,
 | 
						|
			ModifiedIndex: 1,
 | 
						|
		},
 | 
						|
	}
 | 
						|
	// Delete of the parent directory of a key is an event that a list watch would receive,
 | 
						|
	// but will have no value so the decode will fail.
 | 
						|
	fakeClient.WatchResponse <- &etcd.Response{
 | 
						|
		Action: "delete",
 | 
						|
		PrevNode: &etcd.Node{
 | 
						|
			Key:           "/some/key",
 | 
						|
			Value:         "",
 | 
						|
			CreatedIndex:  1,
 | 
						|
			ModifiedIndex: 1,
 | 
						|
		},
 | 
						|
	}
 | 
						|
	close(fakeClient.WatchStop)
 | 
						|
 | 
						|
	// the existing node is detected and the index set
 | 
						|
	_, open := <-watching.ResultChan()
 | 
						|
	if open {
 | 
						|
		t.Fatalf("unexpected channel open")
 | 
						|
	}
 | 
						|
 | 
						|
	watching.Stop()
 | 
						|
}
 | 
						|
 | 
						|
func TestWatchFromNotFound(t *testing.T) {
 | 
						|
	fakeClient := NewFakeEtcdClient(t)
 | 
						|
	fakeClient.Data["/some/key"] = EtcdResponseWithError{
 | 
						|
		R: &etcd.Response{
 | 
						|
			Node: nil,
 | 
						|
		},
 | 
						|
		E: &etcd.EtcdError{
 | 
						|
			Index:     2,
 | 
						|
			ErrorCode: 100,
 | 
						|
		},
 | 
						|
	}
 | 
						|
	h := EtcdHelper{fakeClient, codec, versioner}
 | 
						|
 | 
						|
	watching := h.Watch("/some/key", 0)
 | 
						|
 | 
						|
	fakeClient.WaitForWatchCompletion()
 | 
						|
	if fakeClient.WatchIndex != 3 {
 | 
						|
		t.Errorf("Expected client to wait for %d, got %#v", 3, fakeClient)
 | 
						|
	}
 | 
						|
 | 
						|
	watching.Stop()
 | 
						|
}
 | 
						|
 | 
						|
func TestWatchFromOtherError(t *testing.T) {
 | 
						|
	fakeClient := NewFakeEtcdClient(t)
 | 
						|
	fakeClient.Data["/some/key"] = EtcdResponseWithError{
 | 
						|
		R: &etcd.Response{
 | 
						|
			Node: nil,
 | 
						|
		},
 | 
						|
		E: &etcd.EtcdError{
 | 
						|
			Index:     2,
 | 
						|
			ErrorCode: 101,
 | 
						|
		},
 | 
						|
	}
 | 
						|
	h := EtcdHelper{fakeClient, codec, versioner}
 | 
						|
 | 
						|
	watching := h.Watch("/some/key", 0)
 | 
						|
 | 
						|
	errEvent := <-watching.ResultChan()
 | 
						|
	if e, a := watch.Error, errEvent.Type; e != a {
 | 
						|
		t.Errorf("Expected %v, got %v", e, a)
 | 
						|
	}
 | 
						|
	if e, a := "101:  () [2]", errEvent.Object.(*api.Status).Message; e != a {
 | 
						|
		t.Errorf("Expected %v, got %v", e, a)
 | 
						|
	}
 | 
						|
 | 
						|
	select {
 | 
						|
	case _, ok := <-watching.ResultChan():
 | 
						|
		if ok {
 | 
						|
			t.Fatalf("expected result channel to be closed")
 | 
						|
		}
 | 
						|
	case <-time.After(1 * time.Second):
 | 
						|
		t.Fatalf("watch should have closed channel: %#v", watching)
 | 
						|
	}
 | 
						|
 | 
						|
	if fakeClient.WatchResponse != nil || fakeClient.WatchIndex != 0 {
 | 
						|
		t.Fatalf("Watch should not have been invoked: %#v", fakeClient)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestWatchPurposefulShutdown(t *testing.T) {
 | 
						|
	fakeClient := NewFakeEtcdClient(t)
 | 
						|
	h := EtcdHelper{fakeClient, codec, versioner}
 | 
						|
	fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
 | 
						|
 | 
						|
	// Test purposeful shutdown
 | 
						|
	watching := h.Watch("/some/key", 0)
 | 
						|
	fakeClient.WaitForWatchCompletion()
 | 
						|
	watching.Stop()
 | 
						|
 | 
						|
	// Did everything shut down?
 | 
						|
	if _, open := <-fakeClient.WatchResponse; open {
 | 
						|
		t.Errorf("A stop did not cause a graceful shutdown")
 | 
						|
	}
 | 
						|
	if _, open := <-watching.ResultChan(); open {
 | 
						|
		t.Errorf("An injected error did not cause a graceful shutdown")
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestEtcdParseWatchResourceVersion(t *testing.T) {
 | 
						|
	testCases := []struct {
 | 
						|
		Version       string
 | 
						|
		Kind          string
 | 
						|
		ExpectVersion uint64
 | 
						|
		Err           bool
 | 
						|
	}{
 | 
						|
		{Version: "", ExpectVersion: 0},
 | 
						|
		{Version: "a", Err: true},
 | 
						|
		{Version: " ", Err: true},
 | 
						|
		{Version: "1", ExpectVersion: 2},
 | 
						|
		{Version: "10", ExpectVersion: 11},
 | 
						|
	}
 | 
						|
	for _, testCase := range testCases {
 | 
						|
		version, err := ParseWatchResourceVersion(testCase.Version, testCase.Kind)
 | 
						|
		switch {
 | 
						|
		case testCase.Err:
 | 
						|
			if err == nil {
 | 
						|
				t.Errorf("%s: unexpected non-error", testCase.Version)
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			if !errors.IsInvalid(err) {
 | 
						|
				t.Errorf("%s: unexpected error: %v", testCase.Version, err)
 | 
						|
				continue
 | 
						|
			}
 | 
						|
		case !testCase.Err && err != nil:
 | 
						|
			t.Errorf("%s: unexpected error: %v", testCase.Version, err)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if version != testCase.ExpectVersion {
 | 
						|
			t.Errorf("%s: expected version %d but was %d", testCase.Version, testCase.ExpectVersion, version)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 |