Compare commits

..

1 Commits

Author SHA1 Message Date
Kubernetes Publisher
78d2af792b Fix dependencies to point to kubernetes-1.15.0 tag 2019-06-20 08:51:01 +00:00
12 changed files with 105 additions and 273 deletions

8
Godeps/Godeps.json generated
View File

@@ -136,7 +136,7 @@
},
{
"ImportPath": "golang.org/x/net",
"Rev": "cdfb69ac37fc"
"Rev": "65e2d4e15006"
},
{
"ImportPath": "golang.org/x/oauth2",
@@ -184,15 +184,15 @@
},
{
"ImportPath": "gopkg.in/yaml.v2",
"Rev": "v2.2.8"
"Rev": "v2.2.1"
},
{
"ImportPath": "k8s.io/api",
"Rev": "v0.15.11"
"Rev": "7cf5895f2711"
},
{
"ImportPath": "k8s.io/apimachinery",
"Rev": "v0.15.11"
"Rev": "1799e75a0719"
},
{
"ImportPath": "k8s.io/klog",

10
go.mod
View File

@@ -22,12 +22,12 @@ require (
github.com/spf13/pflag v1.0.1
github.com/stretchr/testify v1.2.2
golang.org/x/crypto v0.0.0-20181025213731-e84da0312774
golang.org/x/net v0.0.0-20190812203447-cdfb69ac37fc
golang.org/x/net v0.0.0-20190206173232-65e2d4e15006
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a
golang.org/x/time v0.0.0-20161028155119-f51c12702a4d
google.golang.org/appengine v1.5.0 // indirect
k8s.io/api v0.15.11
k8s.io/apimachinery v0.15.11
k8s.io/api v0.0.0-20190620084959-7cf5895f2711
k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719
k8s.io/klog v0.3.1
k8s.io/utils v0.0.0-20190221042446-c2654d5206da
sigs.k8s.io/yaml v1.1.0
@@ -37,6 +37,6 @@ replace (
golang.org/x/sync => golang.org/x/sync v0.0.0-20181108010431-42b317875d0f
golang.org/x/sys => golang.org/x/sys v0.0.0-20190209173611-3b5209105503
golang.org/x/tools => golang.org/x/tools v0.0.0-20190313210603-aa82965741a9
k8s.io/api => k8s.io/api v0.15.11
k8s.io/apimachinery => k8s.io/apimachinery v0.15.11
k8s.io/api => k8s.io/api v0.0.0-20190620084959-7cf5895f2711
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719
)

10
go.sum
View File

@@ -64,8 +64,8 @@ golang.org/x/crypto v0.0.0-20181025213731-e84da0312774/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190812203447-cdfb69ac37fc h1:gkKoSkUmnU6bpS/VhkuO27bzQeSA51uaEfbOW5dNb68=
golang.org/x/net v0.0.0-20190812203447-cdfb69ac37fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190206173232-65e2d4e15006 h1:bfLnR+k0tq5Lqt6dflRLcZiz6UaXCMt3vhYJ1l4FQ80=
golang.org/x/net v0.0.0-20190206173232-65e2d4e15006/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a h1:tImsplftrFpALCYumobsd0K86vlAs/eXGFms2txfJfA=
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ=
@@ -91,10 +91,8 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
k8s.io/api v0.15.11/go.mod h1:DI3kWWWBG0byhZ4druNYQvleDRhbocPrm+Glq4xVpkM=
k8s.io/apimachinery v0.15.11/go.mod h1:ZRw+v83FjgEqlzqaBkxL3XB21MSLYdzjsY9Bgxclhdw=
k8s.io/api v0.0.0-20190620084959-7cf5895f2711/go.mod h1:TBhBqb1AWbBQbW3XRusr7n7E4v2+5ZY8r8sAMnyFC5A=
k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719/go.mod h1:I4A+glKBHiTgiEjQiCCQfCAIcIMFGt291SmsvcrFzJA=
k8s.io/klog v0.3.1 h1:RVgyDHY/kFKtLqh67NvEWIgkMneNoIrdkN0CxDSQc68=
k8s.io/klog v0.3.1/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 h1:TRb4wNWoBVrH9plmkp2q86FIDppkbrEXdXlxU3a3BMI=

View File

@@ -295,6 +295,13 @@ func isDeletionDup(a, b *Delta) *Delta {
return b
}
// willObjectBeDeletedLocked returns true only if the last delta for the
// given object is Delete. Caller must lock first.
func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
deltas := f.items[id]
return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
}
// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
@@ -303,6 +310,13 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
return KeyError{obj, err}
}
// If object is supposed to be deleted (last event is Deleted),
// then we should ignore Sync events, because it would result in
// recreation of this object.
if actionType == Sync && f.willObjectBeDeletedLocked(id) {
return nil
}
newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)

View File

@@ -85,33 +85,6 @@ func TestDeltaFIFO_basic(t *testing.T) {
}
}
// TestDeltaFIFO_replaceWithDeleteDeltaIn tests that a `Sync` delta for an
// object `O` with ID `X` is added when .Replace is called and `O` is among the
// replacement objects even if the DeltaFIFO already stores in terminal position
// a delta of type `Delete` for ID `X`. Not adding the `Sync` delta causes
// SharedIndexInformers to miss `O`'s create notification, see https://github.com/kubernetes/kubernetes/issues/83810
// for more details.
func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) {
oldObj := mkFifoObj("foo", 1)
newObj := mkFifoObj("foo", 2)
f := NewDeltaFIFO(testFifoObjectKeyFunc, keyLookupFunc(func() []testFifoObject {
return []testFifoObject{oldObj}
}))
f.Delete(oldObj)
f.Replace([]interface{}{newObj}, "")
actualDeltas := Pop(f)
expectedDeltas := Deltas{
Delta{Type: Deleted, Object: oldObj},
Delta{Type: Sync, Object: newObj},
}
if !reflect.DeepEqual(expectedDeltas, actualDeltas) {
t.Errorf("expected %#v, got %#v", expectedDeltas, actualDeltas)
}
}
func TestDeltaFIFO_requeueOnPop(t *testing.T) {
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil)

View File

@@ -292,13 +292,6 @@ func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
set := index[indexValue]
if set != nil {
set.Delete(key)
// If we don't delete the set when zero, indices with high cardinality
// short lived resources can cause memory to increase over time from
// unused empty sets. See `kubernetes/kubernetes/issues/84959`.
if len(set) == 0 {
delete(index, indexValue)
}
}
}
}

View File

@@ -1,92 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"testing"
)
func TestThreadSafeStoreDeleteRemovesEmptySetsFromIndex(t *testing.T) {
testIndexer := "testIndexer"
indexers := Indexers{
testIndexer: func(obj interface{}) (strings []string, e error) {
indexes := []string{obj.(string)}
return indexes, nil
},
}
indices := Indices{}
store := NewThreadSafeStore(indexers, indices).(*threadSafeMap)
testKey := "testKey"
store.Add(testKey, testKey)
// Assumption check, there should be a set for the `testKey` with one element in the added index
set := store.indices[testIndexer][testKey]
if len(set) != 1 {
t.Errorf("Initial assumption of index backing string set having 1 element failed. Actual elements: %d", len(set))
return
}
store.Delete(testKey)
set, present := store.indices[testIndexer][testKey]
if present {
t.Errorf("Index backing string set not deleted from index. Set length: %d", len(set))
}
}
func TestThreadSafeStoreAddKeepsNonEmptySetPostDeleteFromIndex(t *testing.T) {
testIndexer := "testIndexer"
testIndex := "testIndex"
indexers := Indexers{
testIndexer: func(obj interface{}) (strings []string, e error) {
indexes := []string{testIndex}
return indexes, nil
},
}
indices := Indices{}
store := NewThreadSafeStore(indexers, indices).(*threadSafeMap)
store.Add("retain", "retain")
store.Add("delete", "delete")
// Assumption check, there should be a set for the `testIndex` with two elements
set := store.indices[testIndexer][testIndex]
if len(set) != 2 {
t.Errorf("Initial assumption of index backing string set having 2 elements failed. Actual elements: %d", len(set))
return
}
store.Delete("delete")
set, present := store.indices[testIndexer][testIndex]
if !present {
t.Errorf("Index backing string set erroneously deleted from index.")
return
}
if len(set) != 1 {
t.Errorf("Index backing string set has incorrect length, expect 1. Set length: %d", len(set))
}
}

View File

@@ -153,7 +153,7 @@ func TestEventf(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "bar",
APIVersion: "v1",
APIVersion: "version",
FieldPath: "spec.containers[2]",
},
Reason: "Started",
@@ -162,7 +162,7 @@ func TestEventf(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: false,
},
{
@@ -181,7 +181,7 @@ func TestEventf(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "bar",
APIVersion: "v1",
APIVersion: "version",
},
Reason: "Killed",
Message: "some other verbose message: 1",
@@ -189,7 +189,7 @@ func TestEventf(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
expectUpdate: false,
},
{
@@ -208,7 +208,7 @@ func TestEventf(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "bar",
APIVersion: "v1",
APIVersion: "version",
FieldPath: "spec.containers[2]",
},
Reason: "Started",
@@ -217,7 +217,7 @@ func TestEventf(t *testing.T) {
Count: 2,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: true,
},
{
@@ -236,7 +236,7 @@ func TestEventf(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "differentUid",
APIVersion: "v1",
APIVersion: "version",
FieldPath: "spec.containers[3]",
},
Reason: "Started",
@@ -245,7 +245,7 @@ func TestEventf(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: false,
},
{
@@ -264,7 +264,7 @@ func TestEventf(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "bar",
APIVersion: "v1",
APIVersion: "version",
FieldPath: "spec.containers[2]",
},
Reason: "Started",
@@ -273,7 +273,7 @@ func TestEventf(t *testing.T) {
Count: 3,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: true,
},
{
@@ -292,7 +292,7 @@ func TestEventf(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "differentUid",
APIVersion: "v1",
APIVersion: "version",
FieldPath: "spec.containers[3]",
},
Reason: "Stopped",
@@ -301,7 +301,7 @@ func TestEventf(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectUpdate: false,
},
{
@@ -320,7 +320,7 @@ func TestEventf(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "differentUid",
APIVersion: "v1",
APIVersion: "version",
FieldPath: "spec.containers[3]",
},
Reason: "Stopped",
@@ -329,7 +329,7 @@ func TestEventf(t *testing.T) {
Count: 2,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectUpdate: true,
},
}
@@ -509,7 +509,7 @@ func TestLotsOfEvents(t *testing.T) {
Name: fmt.Sprintf("foo-%v", i),
Namespace: "baz",
UID: "bar",
APIVersion: "v1",
APIVersion: "version",
}
// we need to vary the reason to prevent aggregation
go recorder.Eventf(ref, v1.EventTypeNormal, "Reason-"+string(i), strconv.Itoa(i))
@@ -567,7 +567,7 @@ func TestEventfNoNamespace(t *testing.T) {
Name: "foo",
Namespace: "",
UID: "bar",
APIVersion: "v1",
APIVersion: "version",
FieldPath: "spec.containers[2]",
},
Reason: "Started",
@@ -576,7 +576,7 @@ func TestEventfNoNamespace(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: false,
},
}
@@ -677,7 +677,7 @@ func TestMultiSinkCache(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "bar",
APIVersion: "v1",
APIVersion: "version",
FieldPath: "spec.containers[2]",
},
Reason: "Started",
@@ -686,7 +686,7 @@ func TestMultiSinkCache(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: false,
},
{
@@ -705,7 +705,7 @@ func TestMultiSinkCache(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "bar",
APIVersion: "v1",
APIVersion: "version",
},
Reason: "Killed",
Message: "some other verbose message: 1",
@@ -713,7 +713,7 @@ func TestMultiSinkCache(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
expectUpdate: false,
},
{
@@ -732,7 +732,7 @@ func TestMultiSinkCache(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "bar",
APIVersion: "v1",
APIVersion: "version",
FieldPath: "spec.containers[2]",
},
Reason: "Started",
@@ -741,7 +741,7 @@ func TestMultiSinkCache(t *testing.T) {
Count: 2,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: true,
},
{
@@ -760,7 +760,7 @@ func TestMultiSinkCache(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "differentUid",
APIVersion: "v1",
APIVersion: "version",
FieldPath: "spec.containers[3]",
},
Reason: "Started",
@@ -769,7 +769,7 @@ func TestMultiSinkCache(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: false,
},
{
@@ -788,7 +788,7 @@ func TestMultiSinkCache(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "bar",
APIVersion: "v1",
APIVersion: "version",
FieldPath: "spec.containers[2]",
},
Reason: "Started",
@@ -797,7 +797,7 @@ func TestMultiSinkCache(t *testing.T) {
Count: 3,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: true,
},
{
@@ -816,7 +816,7 @@ func TestMultiSinkCache(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "differentUid",
APIVersion: "v1",
APIVersion: "version",
FieldPath: "spec.containers[3]",
},
Reason: "Stopped",
@@ -825,7 +825,7 @@ func TestMultiSinkCache(t *testing.T) {
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectUpdate: false,
},
{
@@ -844,7 +844,7 @@ func TestMultiSinkCache(t *testing.T) {
Name: "foo",
Namespace: "baz",
UID: "differentUid",
APIVersion: "v1",
APIVersion: "version",
FieldPath: "spec.containers[3]",
},
Reason: "Stopped",
@@ -853,7 +853,7 @@ func TestMultiSinkCache(t *testing.T) {
Count: 2,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"version", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectUpdate: true,
},
}

View File

@@ -19,6 +19,8 @@ package reference
import (
"errors"
"fmt"
"net/url"
"strings"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
@@ -28,7 +30,8 @@ import (
var (
// Errors that could be returned by GetReference.
ErrNilObject = errors.New("can't reference a nil object")
ErrNilObject = errors.New("can't reference a nil object")
ErrNoSelfLink = errors.New("selfLink was empty, can't make reference")
)
// GetReference returns an ObjectReference which refers to the given
@@ -44,6 +47,20 @@ func GetReference(scheme *runtime.Scheme, obj runtime.Object) (*v1.ObjectReferen
return ref, nil
}
gvk := obj.GetObjectKind().GroupVersionKind()
// if the object referenced is actually persisted, we can just get kind from meta
// if we are building an object reference to something not yet persisted, we should fallback to scheme
kind := gvk.Kind
if len(kind) == 0 {
// TODO: this is wrong
gvks, _, err := scheme.ObjectKinds(obj)
if err != nil {
return nil, err
}
kind = gvks[0].Kind
}
// An object that implements only List has enough metadata to build a reference
var listMeta metav1.Common
objectMeta, err := meta.Accessor(obj)
@@ -56,29 +73,29 @@ func GetReference(scheme *runtime.Scheme, obj runtime.Object) (*v1.ObjectReferen
listMeta = objectMeta
}
gvk := obj.GetObjectKind().GroupVersionKind()
// If object meta doesn't contain data about kind and/or version,
// we are falling back to scheme.
//
// TODO: This doesn't work for CRDs, which are not registered in scheme.
if gvk.Empty() {
gvks, _, err := scheme.ObjectKinds(obj)
// if the object referenced is actually persisted, we can also get version from meta
version := gvk.GroupVersion().String()
if len(version) == 0 {
selfLink := listMeta.GetSelfLink()
if len(selfLink) == 0 {
return nil, ErrNoSelfLink
}
selfLinkUrl, err := url.Parse(selfLink)
if err != nil {
return nil, err
}
if len(gvks) == 0 || gvks[0].Empty() {
return nil, fmt.Errorf("unexpected gvks registered for object %T: %v", obj, gvks)
// example paths: /<prefix>/<version>/*
parts := strings.Split(selfLinkUrl.Path, "/")
if len(parts) < 4 {
return nil, fmt.Errorf("unexpected self link format: '%v'; got version '%v'", selfLink, version)
}
if parts[1] == "api" {
version = parts[2]
} else {
version = parts[2] + "/" + parts[3]
}
// TODO: The same object can be registered for multiple group versions
// (although in practise this doesn't seem to be used).
// In such case, the version set may not be correct.
gvk = gvks[0]
}
kind := gvk.Kind
version := gvk.GroupVersion().String()
// only has list metadata
if objectMeta == nil {
return &v1.ObjectReference{

View File

@@ -37,31 +37,29 @@ func TestGetReferenceRefVersion(t *testing.T) {
tests := []struct {
name string
input *TestRuntimeObj
groupVersion schema.GroupVersion
expectedRefVersion string
}{
{
name: "v1 GV from scheme",
name: "api from selflink",
input: &TestRuntimeObj{
ObjectMeta: metav1.ObjectMeta{SelfLink: "/bad-selflink/unused"},
ObjectMeta: metav1.ObjectMeta{SelfLink: "/api/v1/namespaces"},
},
groupVersion: schema.GroupVersion{Group: "", Version: "v1"},
expectedRefVersion: "v1",
},
{
name: "foo.group/v3 GV from scheme",
name: "foo.group/v3 from selflink",
input: &TestRuntimeObj{
ObjectMeta: metav1.ObjectMeta{SelfLink: "/bad-selflink/unused"},
ObjectMeta: metav1.ObjectMeta{SelfLink: "/apis/foo.group/v3/namespaces"},
},
groupVersion: schema.GroupVersion{Group: "foo.group", Version: "v3"},
expectedRefVersion: "foo.group/v3",
},
}
scheme := runtime.NewScheme()
scheme.AddKnownTypes(schema.GroupVersion{Group: "this", Version: "is ignored"}, &TestRuntimeObj{})
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
scheme := runtime.NewScheme()
scheme.AddKnownTypes(test.groupVersion, &TestRuntimeObj{})
ref, err := GetReference(scheme, test.input)
if err != nil {
t.Fatal(err)

View File

@@ -77,6 +77,11 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
closable := &closableConn{Conn: conn}
// Start tracking the connection
d.mu.Lock()
d.conns[closable] = struct{}{}
d.mu.Unlock()
// When the connection is closed, remove it from the map. This will
// be no-op if the connection isn't in the map, e.g. if CloseAll()
// is called.
@@ -86,11 +91,6 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
d.mu.Unlock()
}
// Start tracking the connection
d.mu.Lock()
d.conns[closable] = struct{}{}
d.mu.Unlock()
return closable, nil
}

View File

@@ -19,8 +19,6 @@ package connrotation
import (
"context"
"net"
"sync"
"sync/atomic"
"testing"
"time"
)
@@ -52,73 +50,6 @@ func TestCloseAll(t *testing.T) {
}
}
// TestCloseAllRace ensures CloseAll works with connections being simultaneously dialed
func TestCloseAllRace(t *testing.T) {
conns := int64(0)
dialer := NewDialer(func(ctx context.Context, network, address string) (net.Conn, error) {
return closeOnlyConn{onClose: func() { atomic.AddInt64(&conns, -1) }}, nil
})
done := make(chan struct{})
wg := &sync.WaitGroup{}
// Close all as fast as we can
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-done:
return
default:
dialer.CloseAll()
}
}
}()
// Dial as fast as we can
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-done:
return
default:
if _, err := dialer.Dial("", ""); err != nil {
t.Error(err)
return
}
atomic.AddInt64(&conns, 1)
}
}
}()
// Soak to ensure no races
time.Sleep(time.Second)
// Signal completion
close(done)
// Wait for goroutines
wg.Wait()
// Ensure CloseAll ran after all dials
dialer.CloseAll()
// Expect all connections to close within 5 seconds
for start := time.Now(); time.Now().Sub(start) < 5*time.Second; time.Sleep(10 * time.Millisecond) {
// Ensure all connections were closed
if c := atomic.LoadInt64(&conns); c == 0 {
break
} else {
t.Logf("got %d open connections, want 0, will retry", c)
}
}
// Ensure all connections were closed
if c := atomic.LoadInt64(&conns); c != 0 {
t.Fatalf("got %d open connections, want 0", c)
}
}
type closeOnlyConn struct {
net.Conn
onClose func()