diff --git a/pkg/watch/json/decoder.go b/pkg/watch/json/decoder.go index bb087b88003..416ef110953 100644 --- a/pkg/watch/json/decoder.go +++ b/pkg/watch/json/decoder.go @@ -51,7 +51,7 @@ func (d *Decoder) Decode() (watch.EventType, runtime.Object, error) { return "", nil, err } switch got.Type { - case watch.Added, watch.Modified, watch.Deleted: + case watch.Added, watch.Modified, watch.Deleted, watch.Error: default: return "", nil, fmt.Errorf("got invalid watch event type: %v", got.Type) } diff --git a/pkg/watch/json/decoder_test.go b/pkg/watch/json/decoder_test.go index 2b35b1dcf2f..7571819d4db 100644 --- a/pkg/watch/json/decoder_test.go +++ b/pkg/watch/json/decoder_test.go @@ -24,61 +24,65 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) func TestDecoder(t *testing.T) { - out, in := io.Pipe() - decoder := NewDecoder(out, v1beta1.Codec) + table := []watch.EventType{watch.Added, watch.Deleted, watch.Modified, watch.Error} - expect := &api.Pod{TypeMeta: api.TypeMeta{ID: "foo"}} - encoder := json.NewEncoder(in) - go func() { - data, err := v1beta1.Codec.Encode(expect) - if err != nil { - t.Fatalf("Unexpected error %v", err) - } - if err := encoder.Encode(&watchEvent{watch.Added, runtime.RawExtension{json.RawMessage(data)}}); err != nil { - t.Errorf("Unexpected error %v", err) - } - in.Close() - }() + for _, eventType := range table { + out, in := io.Pipe() + decoder := NewDecoder(out, testapi.Codec()) - done := make(chan struct{}) - go func() { - action, got, err := decoder.Decode() - if err != nil { - t.Fatalf("Unexpected error %v", err) - } - if e, a := watch.Added, action; e != a { - t.Errorf("Expected %v, got %v", e, a) - } - if e, a := expect, got; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %v, got %v", e, a) - } - t.Logf("Exited read") - close(done) - }() - <-done + expect := &api.Pod{TypeMeta: api.TypeMeta{ID: "foo"}} + encoder := json.NewEncoder(in) + go func() { + data, err := testapi.Codec().Encode(expect) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + if err := encoder.Encode(&watchEvent{eventType, runtime.RawExtension{json.RawMessage(data)}}); err != nil { + t.Errorf("Unexpected error %v", err) + } + in.Close() + }() - done = make(chan struct{}) - go func() { - _, _, err := decoder.Decode() - if err == nil { - t.Errorf("Unexpected nil error") - } - close(done) - }() - <-done + done := make(chan struct{}) + go func() { + action, got, err := decoder.Decode() + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + if e, a := eventType, action; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := expect, got; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } + t.Logf("Exited read") + close(done) + }() + <-done - decoder.Close() + done = make(chan struct{}) + go func() { + _, _, err := decoder.Decode() + if err == nil { + t.Errorf("Unexpected nil error") + } + close(done) + }() + <-done + + decoder.Close() + } } func TestDecoder_SourceClose(t *testing.T) { out, in := io.Pipe() - decoder := NewDecoder(out, v1beta1.Codec) + decoder := NewDecoder(out, testapi.Codec()) done := make(chan struct{})