Compare commits

..

22 Commits

Author SHA1 Message Date
Kubernetes Publisher
30088cdd7e Update dependencies to v0.26.7 tag 2023-07-19 22:39:30 +00:00
Kubernetes Publisher
cea5ee961b Merge pull request #118970 from champtar/automated-cherry-pick-of-#117791-upstream-release-1.26
Automated cherry pick of #117791: update serial number to a valid non-zero number in ca

Kubernetes-commit: 76fa65bdfbb2fa4d38606c1c4cf524124240b359
2023-07-05 09:40:58 -07:00
Min Ni
c4c506ff2e update serial number to a valid non-zero number in ca certificate
Kubernetes-commit: 26285c1d6685720002464ce3660b44094568c21d
2023-05-08 16:39:35 -07:00
Kubernetes Publisher
cb4594adb3 Merge pull request #118555 from puerco/bump-1.26-go-1.19.10
[release-1.26] releng/go: Update images, deps and version to go 1.19.10

Kubernetes-commit: afe3fae16cb7c3625bc61796e03d6b3eeced889b
2023-06-12 17:41:53 +00:00
Adolfo García Veytia (Puerco)
5023c62056 update-vendor: update vendored go.sums
Run of ./hack/update-vendor.sh

Signed-off-by: Adolfo García Veytia (Puerco) <adolfo.garcia@uservers.net>

Kubernetes-commit: b204a2b57f12800767b5c06069d518513363dbce
2023-06-08 00:14:59 -06:00
Kubernetes Publisher
a3a549a55a Merge pull request #115051 from MadhavJivrajani/release-1.26
[1.26] Cherry Pick of #114766: [Prepare for go1.20] *: Bump versions and fix tests

Kubernetes-commit: 4dfe380379bc9b3c39f8b041a95d12b4e6ac0cf3
2023-05-23 18:45:31 +00:00
Kubernetes Publisher
038b381bf6 Merge pull request #117691 from dims/re-do-of-117242-on-release-1.26
[1.26] Bump runc/libcontainer to 1.1.6

Kubernetes-commit: 7970b8a1efee7a08e64bb272438871dca021166a
2023-05-04 16:32:59 +00:00
Davanum Srinivas
cd83e43d17 Bump runc go module v1.1.4 -> v1.1.6
Signed-off-by: Davanum Srinivas <davanum@gmail.com>

Kubernetes-commit: 808ce27cae46bd31b2978875069eb7c77af21886
2023-04-29 22:27:41 -04:00
Kubernetes Publisher
dbfbc039f8 Merge pull request #117686 from ardaguclu/automated-cherry-pick-of-#117495-upstream-release-1.26
Automated cherry pick of #117495: Use absolute path instead requestURI in openapiv3 discovery

Kubernetes-commit: 8a1ec6f79b3621043412d227b251e94fcd315493
2023-04-29 18:07:49 +00:00
Arda Güçlü
d72dec4966 Use absolute path instead requestURI in openapiv3 discovery
Currently, openapiv3 discovery uses requestURI to discover resources.
However, that does not work when the rest endpoint contains prefixes
(e.g. `http://localhost/test-endpoint/`).
Because requestURI overwrites prefixes also in rest endpoint
(e.g. `http://localhost/openapiv3/apis/apps/v1`).

Since `absPath` keeps the prefixes in the rest endpoint,
this PR changes to absPath instead requestURI.

Kubernetes-commit: e2bfa0db4441c58f53c64c4e3d9eb1dc494abfd2
2023-04-20 09:53:28 +03:00
Kubernetes Publisher
a5144d412c Merge pull request #117638 from seans3/automated-cherry-pick-of-#117571-origin-release-1.26
Automated cherry pick of #117571: Refactors discovery content-type and helper functions

Kubernetes-commit: 7041b95474bcbf8c612fe2d9f648c1ebd437afc1
2023-04-27 00:14:15 -07:00
Sean Sullivan
d6f8d0460a Refactors discovery content-type and helper functions
Kubernetes-commit: 2a17b5518c6ab9a1da749c930705b9512a480f02
2023-04-24 17:40:07 -07:00
Kubernetes Publisher
2dd0093cac Merge pull request #115899 from odinuge/automated-cherry-pick-of-#115620-upstream-release-1.26
Automated cherry pick of #115620: client-go/cache: fix missing delete event on replace  (+ #116623)

Kubernetes-commit: d9a7f46fc5cac57240eaf531e15eabcacf2a2dc3
2023-04-04 18:29:50 +00:00
Kubernetes Publisher
f3ae5cbd83 Merge pull request #116666 from seans3/automated-cherry-pick-of-#116603-origin-release-1.26
Automated cherry pick of #116603: Aggregated discovery resilient to nil GVK

Kubernetes-commit: 9936142a8268ba1545a258405b7fc50fa728876d
2023-03-30 22:30:52 +00:00
Daniel Smith
fffc68d58e Change where transformers are called.
odinuge: sorted out some function signature changes during
cherry-picking that caused conflicts.

(cherry picked from commit e76dff38cf74c3c8ad9ed4d3bc6e3641d9b64565)
Signed-off-by: Odin Ugedal <odin@uged.al>

Kubernetes-commit: a8d2bc0ff7537bcb17e0b85333615dafd7c1e9a9
2023-03-14 23:05:20 +00:00
Sean Sullivan
5ebee1886e Aggregated discovery resilient to nil GVK
Kubernetes-commit: 67e6297764bdfc1377919b14175c3d20d97e639a
2023-03-14 17:59:19 +00:00
Odin Ugedal
8190aa4d37 client-go/cache: update Replace comment to be more clear
Since the behavior is now changed, and the old behavior leaked objects,
this adds a new comment about how Replace works.

Signed-off-by: Odin Ugedal <ougedal@palantir.com>
Signed-off-by: Odin Ugedal <odin@uged.al>

Kubernetes-commit: cd7deae436c328085bcb50681b06e1cc275801db
2023-02-13 11:23:50 +00:00
Odin Ugedal
b667227efd client-go/cache: rewrite Replace to check queue first
This is useful to both reduce the code complexity, and to ensure clients
get the "newest" version of an object known when its deleted. This is
all best-effort, but for clients it makes more sense giving them the
newest object they observed rather than an old one.

This is especially useful when an object is recreated. eg.

Object A with key K is in the KnownObjects store;
- DELETE delta for A is queued with key K
- CREATE delta for B is queued with key K
- Replace without any object with key K in it.

In this situation its better to create a DELETE delta with
DeletedFinalStateUnknown with B (with this patch), than it is to give
the client an DeletedFinalStateUnknown with A (without this patch).

Signed-off-by: Odin Ugedal <ougedal@palantir.com>
Signed-off-by: Odin Ugedal <odin@uged.al>

Kubernetes-commit: 4f55d416f2e6b566eb397670b451d96712e638f1
2023-02-13 11:12:37 +00:00
Odin Ugedal
30215cd5a1 client-go/cache: merge ReplaceMakesDeletionsForObjectsInQueue tests
Signed-off-by: Odin Ugedal <ougedal@palantir.com>
Signed-off-by: Odin Ugedal <odin@uged.al>

Kubernetes-commit: d7878cdf2d6a7ec82b589aa95fd83770ba3edf2d
2023-02-10 14:30:10 +00:00
Odin Ugedal
ba3596940d client-go/cache: fix missing delete event on replace without knownObjects
This fixes an issue where a relist could result in a DELETED delta
with an object wrapped in a DeletedFinalStateUnknown object; and then on
the next relist, it would wrap that object inside another
DeletedFinalStateUnknown, leaving the user with a "double" layer
of DeletedFinalStateUnknown's.

Signed-off-by: Odin Ugedal <ougedal@palantir.com>
Signed-off-by: Odin Ugedal <odin@uged.al>

Kubernetes-commit: 8509d70d3c33a038f0b5111a5e5696c833f6685b
2023-02-10 14:16:26 +00:00
Odin Ugedal
97cf9cb9c2 client-go/cache: fix missing delete event on replace
This fixes a race condition when a "short lived" object
is created and the create event is still present on the queue
when a relist replaces the state. Previously that would lead in the
object being leaked.

The way this could happen is roughly;

1. new Object is added O, agent gets CREATED event for it
2. watch is terminated, and the agent runs a new list, L
3. CREATE event for O is still on the queue to be processed.
4. informer replaces the old data in store with L, and O is not in L
  - Since O is not in the store, and not in the list L, no DELETED event
    is queued
5. CREATE event for O is still on the queue to be processed.
6. CREATE event for O is processed
7. O is <leaked>; its present in the cache but not in k8s.

With this patch, on step 4. above it would create a DELETED event
ensuring that the object will be removed.

Signed-off-by: Odin Ugedal <ougedal@palantir.com>
Signed-off-by: Odin Ugedal <odin@uged.al>

Kubernetes-commit: bd4ec0acec8844bddc7780d322f8fc215d045046
2023-02-08 14:57:23 +00:00
Madhav Jivrajani
fd7800c967 *: Bump version of vmware/govmomi
Bumping version to include changes that
better handle TLS errors. Bump nescessary
to prepare for when the version of Go is
bumped to 1.20

Signed-off-by: Madhav Jivrajani <madhav.jiv@gmail.com>

Kubernetes-commit: 3ac70147ec3de3752b360a06bcb7d7c418619da2
2023-01-13 12:06:22 +05:30
16 changed files with 770 additions and 164 deletions

View File

@@ -92,12 +92,18 @@ func convertAPIGroup(g apidiscovery.APIGroupDiscovery) (
resourceList := &metav1.APIResourceList{}
resourceList.GroupVersion = gv.String()
for _, r := range v.Resources {
resource := convertAPIResource(r)
resourceList.APIResources = append(resourceList.APIResources, resource)
resource, err := convertAPIResource(r)
if err == nil {
resourceList.APIResources = append(resourceList.APIResources, resource)
}
// Subresources field in new format get transformed into full APIResources.
// It is possible a partial result with an error was returned to be used
// as the parent resource for the subresource.
for _, subresource := range r.Subresources {
sr := convertAPISubresource(resource, subresource)
resourceList.APIResources = append(resourceList.APIResources, sr)
sr, err := convertAPISubresource(resource, subresource)
if err == nil {
resourceList.APIResources = append(resourceList.APIResources, sr)
}
}
}
gvResources[gv] = resourceList
@@ -105,30 +111,44 @@ func convertAPIGroup(g apidiscovery.APIGroupDiscovery) (
return group, gvResources, failedGVs
}
// convertAPIResource tranforms a APIResourceDiscovery to an APIResource.
func convertAPIResource(in apidiscovery.APIResourceDiscovery) metav1.APIResource {
return metav1.APIResource{
// convertAPIResource tranforms a APIResourceDiscovery to an APIResource. We are
// resilient to missing GVK, since this resource might be the parent resource
// for a subresource. If the parent is missing a GVK, it is not returned in
// discovery, and the subresource MUST have the GVK.
func convertAPIResource(in apidiscovery.APIResourceDiscovery) (metav1.APIResource, error) {
result := metav1.APIResource{
Name: in.Resource,
SingularName: in.SingularResource,
Namespaced: in.Scope == apidiscovery.ScopeNamespace,
Group: in.ResponseKind.Group,
Version: in.ResponseKind.Version,
Kind: in.ResponseKind.Kind,
Verbs: in.Verbs,
ShortNames: in.ShortNames,
Categories: in.Categories,
}
var err error
if in.ResponseKind != nil {
result.Group = in.ResponseKind.Group
result.Version = in.ResponseKind.Version
result.Kind = in.ResponseKind.Kind
} else {
err = fmt.Errorf("discovery resource %s missing GVK", in.Resource)
}
// Can return partial result with error, which can be the parent for a
// subresource. Do not add this result to the returned discovery resources.
return result, err
}
// convertAPISubresource tranforms a APISubresourceDiscovery to an APIResource.
func convertAPISubresource(parent metav1.APIResource, in apidiscovery.APISubresourceDiscovery) metav1.APIResource {
return metav1.APIResource{
Name: fmt.Sprintf("%s/%s", parent.Name, in.Subresource),
SingularName: parent.SingularName,
Namespaced: parent.Namespaced,
Group: in.ResponseKind.Group,
Version: in.ResponseKind.Version,
Kind: in.ResponseKind.Kind,
Verbs: in.Verbs,
func convertAPISubresource(parent metav1.APIResource, in apidiscovery.APISubresourceDiscovery) (metav1.APIResource, error) {
result := metav1.APIResource{}
if in.ResponseKind == nil {
return result, fmt.Errorf("subresource %s/%s missing GVK", parent.Name, in.Subresource)
}
result.Name = fmt.Sprintf("%s/%s", parent.Name, in.Subresource)
result.SingularName = parent.SingularName
result.Namespaced = parent.Namespaced
result.Group = in.ResponseKind.Group
result.Version = in.ResponseKind.Version
result.Kind = in.ResponseKind.Kind
result.Verbs = in.Verbs
return result, nil
}

View File

@@ -541,6 +541,75 @@ func TestSplitGroupsAndResources(t *testing.T) {
},
expectedFailedGVs: map[schema.GroupVersion]error{},
},
{
name: "Aggregated discovery with single subresource and parent missing GVK",
agg: apidiscovery.APIGroupDiscoveryList{
Items: []apidiscovery.APIGroupDiscovery{
{
ObjectMeta: metav1.ObjectMeta{
Name: "external.metrics.k8s.io",
},
Versions: []apidiscovery.APIVersionDiscovery{
{
Version: "v1beta1",
Resources: []apidiscovery.APIResourceDiscovery{
{
// resilient to nil GVK for parent
Resource: "*",
Scope: apidiscovery.ScopeNamespace,
SingularResource: "",
Subresources: []apidiscovery.APISubresourceDiscovery{
{
Subresource: "other-external-metric",
ResponseKind: &metav1.GroupVersionKind{
Kind: "MetricValueList",
},
Verbs: []string{"get"},
},
},
},
},
},
},
},
},
},
expectedGroups: metav1.APIGroupList{
Groups: []metav1.APIGroup{
{
Name: "external.metrics.k8s.io",
Versions: []metav1.GroupVersionForDiscovery{
{
GroupVersion: "external.metrics.k8s.io/v1beta1",
Version: "v1beta1",
},
},
PreferredVersion: metav1.GroupVersionForDiscovery{
GroupVersion: "external.metrics.k8s.io/v1beta1",
Version: "v1beta1",
},
},
},
},
expectedGVResources: map[schema.GroupVersion]*metav1.APIResourceList{
{Group: "external.metrics.k8s.io", Version: "v1beta1"}: {
GroupVersion: "external.metrics.k8s.io/v1beta1",
APIResources: []metav1.APIResource{
// Since parent GVK was nil, it is NOT returned--only the subresource.
{
Name: "*/other-external-metric",
SingularName: "",
Namespaced: true,
Group: "",
Version: "",
Kind: "MetricValueList",
Verbs: []string{"get"},
},
},
},
},
expectedFailedGVs: map[schema.GroupVersion]error{},
},
{
name: "Aggregated discovery with multiple subresources",
agg: apidiscovery.APIGroupDiscoveryList{

View File

@@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"mime"
"net/http"
"net/url"
"sort"
@@ -58,8 +59,9 @@ const (
defaultBurst = 300
AcceptV1 = runtime.ContentTypeJSON
// Aggregated discovery content-type (currently v2beta1). NOTE: Currently, we are assuming the order
// for "g", "v", and "as" from the server. We can only compare this string if we can make that assumption.
// Aggregated discovery content-type (v2beta1). NOTE: content-type parameters
// MUST be ordered (g, v, as) for server in "Accept" header (BUT we are resilient
// to ordering when comparing returned values in "Content-Type" header).
AcceptV2Beta1 = runtime.ContentTypeJSON + ";" + "g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList"
// Prioritize aggregated discovery by placing first in the order of discovery accept types.
acceptDiscoveryFormats = AcceptV2Beta1 + "," + AcceptV1
@@ -259,8 +261,16 @@ func (d *DiscoveryClient) downloadLegacy() (
var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
// Switch on content-type server responded with: aggregated or unaggregated.
switch responseContentType {
case AcceptV1:
switch {
case isV2Beta1ContentType(responseContentType):
var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList
err = json.Unmarshal(body, &aggregatedDiscovery)
if err != nil {
return nil, nil, nil, err
}
apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery)
default:
// Default is unaggregated discovery v1.
var v metav1.APIVersions
err = json.Unmarshal(body, &v)
if err != nil {
@@ -271,15 +281,6 @@ func (d *DiscoveryClient) downloadLegacy() (
apiGroup = apiVersionsToAPIGroup(&v)
}
apiGroupList.Groups = []metav1.APIGroup{apiGroup}
case AcceptV2Beta1:
var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList
err = json.Unmarshal(body, &aggregatedDiscovery)
if err != nil {
return nil, nil, nil, err
}
apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery)
default:
return nil, nil, nil, fmt.Errorf("Unknown discovery response content-type: %s", responseContentType)
}
return apiGroupList, resourcesByGV, failedGVs, nil
@@ -313,13 +314,8 @@ func (d *DiscoveryClient) downloadAPIs() (
failedGVs := map[schema.GroupVersion]error{}
var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
// Switch on content-type server responded with: aggregated or unaggregated.
switch responseContentType {
case AcceptV1:
err = json.Unmarshal(body, apiGroupList)
if err != nil {
return nil, nil, nil, err
}
case AcceptV2Beta1:
switch {
case isV2Beta1ContentType(responseContentType):
var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList
err = json.Unmarshal(body, &aggregatedDiscovery)
if err != nil {
@@ -327,12 +323,38 @@ func (d *DiscoveryClient) downloadAPIs() (
}
apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery)
default:
return nil, nil, nil, fmt.Errorf("Unknown discovery response content-type: %s", responseContentType)
// Default is unaggregated discovery v1.
err = json.Unmarshal(body, apiGroupList)
if err != nil {
return nil, nil, nil, err
}
}
return apiGroupList, resourcesByGV, failedGVs, nil
}
// isV2Beta1ContentType checks of the content-type string is both
// "application/json" and contains the v2beta1 content-type params.
// NOTE: This function is resilient to the ordering of the
// content-type parameters, as well as parameters added by
// intermediaries such as proxies or gateways. Examples:
//
// "application/json; g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList" = true
// "application/json; as=APIGroupDiscoveryList;v=v2beta1;g=apidiscovery.k8s.io" = true
// "application/json; as=APIGroupDiscoveryList;v=v2beta1;g=apidiscovery.k8s.io;charset=utf-8" = true
// "application/json" = false
// "application/json; charset=UTF-8" = false
func isV2Beta1ContentType(contentType string) bool {
base, params, err := mime.ParseMediaType(contentType)
if err != nil {
return false
}
return runtime.ContentTypeJSON == base &&
params["g"] == "apidiscovery.k8s.io" &&
params["v"] == "v2beta1" &&
params["as"] == "APIGroupDiscoveryList"
}
// ServerGroups returns the supported groups, with information like supported versions and the
// preferred version.
func (d *DiscoveryClient) ServerGroups() (*metav1.APIGroupList, error) {

View File

@@ -1400,8 +1400,9 @@ func TestAggregatedServerGroups(t *testing.T) {
}
output, err := json.Marshal(agg)
require.NoError(t, err)
// Content-type is "aggregated" discovery format.
w.Header().Set("Content-Type", AcceptV2Beta1)
// Content-Type is "aggregated" discovery format. Add extra parameter
// to ensure we are resilient to these extra parameters.
w.Header().Set("Content-Type", AcceptV2Beta1+"; charset=utf-8")
w.WriteHeader(http.StatusOK)
w.Write(output)
}))
@@ -1990,8 +1991,9 @@ func TestAggregatedServerGroupsAndResources(t *testing.T) {
}
output, err := json.Marshal(agg)
require.NoError(t, err)
// Content-type is "aggregated" discovery format.
w.Header().Set("Content-Type", AcceptV2Beta1)
// Content-type is "aggregated" discovery format. Add extra parameter
// to ensure we are resilient to these extra parameters.
w.Header().Set("Content-Type", AcceptV2Beta1+"; charset=utf-8")
w.WriteHeader(http.StatusOK)
w.Write(output)
}))
@@ -2130,8 +2132,9 @@ func TestAggregatedServerGroupsAndResourcesWithErrors(t *testing.T) {
}
output, err := json.Marshal(agg)
require.NoError(t, err)
// Content-type is "aggregated" discovery format.
w.Header().Set("Content-Type", AcceptV2Beta1)
// Content-type is "aggregated" discovery format. Add extra parameter
// to ensure we are resilient to these extra parameters.
w.Header().Set("Content-Type", AcceptV2Beta1+"; charset=utf-8")
w.WriteHeader(status)
w.Write(output)
}))
@@ -2738,8 +2741,9 @@ func TestAggregatedServerPreferredResources(t *testing.T) {
}
output, err := json.Marshal(agg)
require.NoError(t, err)
// Content-type is "aggregated" discovery format.
w.Header().Set("Content-Type", AcceptV2Beta1)
// Content-type is "aggregated" discovery format. Add extra parameter
// to ensure we are resilient to these extra parameters.
w.Header().Set("Content-Type", AcceptV2Beta1+"; charset=utf-8")
w.WriteHeader(http.StatusOK)
w.Write(output)
}))
@@ -2763,6 +2767,58 @@ func TestAggregatedServerPreferredResources(t *testing.T) {
}
}
func TestDiscoveryContentTypeVersion(t *testing.T) {
tests := []struct {
contentType string
isV2Beta1 bool
}{
{
contentType: "application/json; g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList",
isV2Beta1: true,
},
{
// content-type parameters are not in correct order, but comparison ignores order.
contentType: "application/json; v=v2beta1;as=APIGroupDiscoveryList;g=apidiscovery.k8s.io",
isV2Beta1: true,
},
{
// content-type parameters are not in correct order, but comparison ignores order.
contentType: "application/json; as=APIGroupDiscoveryList;g=apidiscovery.k8s.io;v=v2beta1",
isV2Beta1: true,
},
{
// Ignores extra parameter "charset=utf-8"
contentType: "application/json; g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList;charset=utf-8",
isV2Beta1: true,
},
{
contentType: "application/json",
isV2Beta1: false,
},
{
contentType: "application/json; charset=UTF-8",
isV2Beta1: false,
},
{
contentType: "text/json",
isV2Beta1: false,
},
{
contentType: "text/html",
isV2Beta1: false,
},
{
contentType: "",
isV2Beta1: false,
},
}
for _, test := range tests {
isV2Beta1 := isV2Beta1ContentType(test.contentType)
assert.Equal(t, test.isV2Beta1, isV2Beta1)
}
}
func TestUseLegacyDiscovery(t *testing.T) {
// Default client sends aggregated discovery accept format (first) as well as legacy format.
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {

18
go.mod
View File

@@ -13,19 +13,19 @@ require (
github.com/google/gnostic v0.5.7-v3refs
github.com/google/go-cmp v0.5.9
github.com/google/gofuzz v1.1.0
github.com/google/uuid v1.1.2
github.com/google/uuid v1.3.0
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7
github.com/imdario/mergo v0.3.6
github.com/peterbourgon/diskv v2.0.1+incompatible
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.0
golang.org/x/net v0.7.0
golang.org/x/net v0.8.0
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b
golang.org/x/term v0.5.0
golang.org/x/term v0.6.0
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
google.golang.org/protobuf v1.28.1
k8s.io/api v0.26.3
k8s.io/apimachinery v0.26.3
k8s.io/api v0.26.7
k8s.io/apimachinery v0.26.7
k8s.io/klog/v2 v2.80.1
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280
k8s.io/utils v0.0.0-20221107191617-1a15be271d1d
@@ -49,8 +49,8 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
@@ -59,6 +59,6 @@ require (
)
replace (
k8s.io/api => k8s.io/api v0.26.3
k8s.io/apimachinery => k8s.io/apimachinery v0.26.3
k8s.io/api => k8s.io/api v0.26.7
k8s.io/apimachinery => k8s.io/apimachinery v0.26.7
)

28
go.sum
View File

@@ -128,8 +128,8 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
@@ -263,8 +263,8 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -309,19 +309,19 @@ golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw=
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -476,10 +476,10 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.26.3 h1:emf74GIQMTik01Aum9dPP0gAypL8JTLl/lHa4V9RFSU=
k8s.io/api v0.26.3/go.mod h1:PXsqwPMXBSBcL1lJ9CYDKy7kIReUydukS5JiRlxC3qE=
k8s.io/apimachinery v0.26.3 h1:dQx6PNETJ7nODU3XPtrwkfuubs6w7sX0M8n61zHIV/k=
k8s.io/apimachinery v0.26.3/go.mod h1:ats7nN1LExKHvJ9TmwootT00Yz05MuYqPXEXaVeOy5I=
k8s.io/api v0.26.7 h1:Lf4iEBEJb5OFNmawtBfSZV/UNi9riSJ0t1qdhyZqI40=
k8s.io/api v0.26.7/go.mod h1:Vk9bMadzA49UHPmHB//lX7VRCQSXGoVwfLd3Sc1SSXI=
k8s.io/apimachinery v0.26.7 h1:590jSBwaSHCAFCqltaEogY/zybFlhGsnLteLpuF2wig=
k8s.io/apimachinery v0.26.7/go.mod h1:qYzLkrQ9lhrZRh0jNKo2cfvf/R1/kQONnSiyB7NUJU0=
k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4=
k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 h1:+70TFaan3hfJzs+7VK2o+OGxg8HsuBr/5f6tVAjDu6E=

View File

@@ -19,6 +19,7 @@ package openapi
import (
"context"
"encoding/json"
"strings"
"k8s.io/client-go/rest"
"k8s.io/kube-openapi/pkg/handler3"
@@ -58,7 +59,11 @@ func (c *client) Paths() (map[string]GroupVersion, error) {
// Create GroupVersions for each element of the result
result := map[string]GroupVersion{}
for k, v := range discoMap.Paths {
result[k] = newGroupVersion(c, v)
// If the server returned a URL rooted at /openapi/v3, preserve any additional client-side prefix.
// If the server returned a URL not rooted at /openapi/v3, treat it as an actual server-relative URL.
// See https://github.com/kubernetes/kubernetes/issues/117463 for details
useClientPrefix := strings.HasPrefix(v.ServerRelativeURL, "/openapi/v3")
result[k] = newGroupVersion(c, v, useClientPrefix)
}
return result, nil
}

View File

@@ -18,6 +18,7 @@ package openapi
import (
"context"
"net/url"
"k8s.io/kube-openapi/pkg/handler3"
)
@@ -29,18 +30,41 @@ type GroupVersion interface {
}
type groupversion struct {
client *client
item handler3.OpenAPIV3DiscoveryGroupVersion
client *client
item handler3.OpenAPIV3DiscoveryGroupVersion
useClientPrefix bool
}
func newGroupVersion(client *client, item handler3.OpenAPIV3DiscoveryGroupVersion) *groupversion {
return &groupversion{client: client, item: item}
func newGroupVersion(client *client, item handler3.OpenAPIV3DiscoveryGroupVersion, useClientPrefix bool) *groupversion {
return &groupversion{client: client, item: item, useClientPrefix: useClientPrefix}
}
func (g *groupversion) Schema(contentType string) ([]byte, error) {
return g.client.restClient.Get().
RequestURI(g.item.ServerRelativeURL).
SetHeader("Accept", contentType).
Do(context.TODO()).
Raw()
if !g.useClientPrefix {
return g.client.restClient.Get().
RequestURI(g.item.ServerRelativeURL).
SetHeader("Accept", contentType).
Do(context.TODO()).
Raw()
}
locator, err := url.Parse(g.item.ServerRelativeURL)
if err != nil {
return nil, err
}
path := g.client.restClient.Get().
AbsPath(locator.Path).
SetHeader("Accept", contentType)
// Other than root endpoints(openapiv3/apis), resources have hash query parameter to support etags.
// However, absPath does not support handling query parameters internally,
// so that hash query parameter is added manually
for k, value := range locator.Query() {
for _, v := range value {
path.Param(k, v)
}
}
return path.Do(context.TODO()).Raw()
}

View File

@@ -0,0 +1,106 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openapi
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
)
func TestGroupVersion(t *testing.T) {
tests := []struct {
name string
prefix string
serverReturnsPrefix bool
}{
{
name: "no prefix",
prefix: "",
serverReturnsPrefix: false,
},
{
name: "prefix not in discovery",
prefix: "/test-endpoint",
serverReturnsPrefix: false,
},
{
name: "prefix in discovery",
prefix: "/test-endpoint",
serverReturnsPrefix: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.URL.Path == test.prefix+"/openapi/v3/apis/apps/v1" && r.URL.RawQuery == "hash=014fbff9a07c":
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"openapi":"3.0.0","info":{"title":"Kubernetes","version":"unversioned"}}`))
case r.URL.Path == test.prefix+"/openapi/v3":
// return root content
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if test.serverReturnsPrefix {
w.Write([]byte(fmt.Sprintf(`{"paths":{"apis/apps/v1":{"serverRelativeURL":"%s/openapi/v3/apis/apps/v1?hash=014fbff9a07c"}}}`, test.prefix)))
} else {
w.Write([]byte(`{"paths":{"apis/apps/v1":{"serverRelativeURL":"/openapi/v3/apis/apps/v1?hash=014fbff9a07c"}}}`))
}
default:
t.Errorf("unexpected request: %s", r.URL.String())
w.WriteHeader(http.StatusNotFound)
return
}
}))
defer server.Close()
c, err := rest.RESTClientFor(&rest.Config{
Host: server.URL + test.prefix,
ContentConfig: rest.ContentConfig{
NegotiatedSerializer: scheme.Codecs,
GroupVersion: &appsv1.SchemeGroupVersion,
},
})
if err != nil {
t.Fatalf("unexpected error occurred: %v", err)
}
openapiClient := NewClient(c)
paths, err := openapiClient.Paths()
if err != nil {
t.Fatalf("unexpected error occurred: %v", err)
}
schema, err := paths["apis/apps/v1"].Schema(runtime.ContentTypeJSON)
if err != nil {
t.Fatalf("unexpected error occurred: %v", err)
}
expectedResult := `{"openapi":"3.0.0","info":{"title":"Kubernetes","version":"unversioned"}}`
if string(schema) != expectedResult {
t.Fatalf("unexpected result actual: %s expected: %s", string(schema), expectedResult)
}
})
}
}

View File

@@ -353,17 +353,6 @@ func NewIndexerInformer(
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
}
// TransformFunc allows for transforming an object before it will be processed
// and put into the controller cache and before the corresponding handlers will
// be called on it.
// TransformFunc (similarly to ResourceEventHandler functions) should be able
// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown
//
// The most common usage pattern is to clean-up some parts of the object to
// reduce component memory usage if a given component doesn't care about them.
// given controller doesn't care for them
type TransformFunc func(interface{}) (interface{}, error)
// NewTransformingInformer returns a Store and a controller for populating
// the store while also providing event notifications. You should only used
// the returned Store for Get/List operations; Add/Modify/Deletes will cause
@@ -411,19 +400,11 @@ func processDeltas(
// Object which receives event notifications from the given deltas
handler ResourceEventHandler,
clientState Store,
transformer TransformFunc,
deltas Deltas,
) error {
// from oldest to newest
for _, d := range deltas {
obj := d.Object
if transformer != nil {
var err error
obj, err = transformer(obj)
if err != nil {
return err
}
}
switch d.Type {
case Sync, Replaced, Added, Updated:
@@ -475,6 +456,7 @@ func newInformer(
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
Transformer: transformer,
})
cfg := &Config{
@@ -486,7 +468,7 @@ func newInformer(
Process: func(obj interface{}) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(h, clientState, transformer, deltas)
return processDeltas(h, clientState, deltas)
}
return errors.New("object given as Process argument is not Deltas")
},

View File

@@ -23,7 +23,7 @@ import (
"testing"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@@ -32,7 +32,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
fcache "k8s.io/client-go/tools/cache/testing"
"github.com/google/gofuzz"
fuzz "github.com/google/gofuzz"
)
func Example() {

View File

@@ -51,6 +51,10 @@ type DeltaFIFOOptions struct {
// When true, `Replaced` events will be sent for items passed to a Replace() call.
// When false, `Sync` events will be sent instead.
EmitDeltaTypeReplaced bool
// If set, will be called for objects before enqueueing them. Please
// see the comment on TransformFunc for details.
Transformer TransformFunc
}
// DeltaFIFO is like FIFO, but differs in two ways. One is that the
@@ -129,8 +133,32 @@ type DeltaFIFO struct {
// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
// DeltaType when Replace() is called (to preserve backwards compat).
emitDeltaTypeReplaced bool
// Called with every object if non-nil.
transformer TransformFunc
}
// TransformFunc allows for transforming an object before it will be processed.
// TransformFunc (similarly to ResourceEventHandler functions) should be able
// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown.
//
// New in v1.27: In such cases, the contained object will already have gone
// through the transform object separately (when it was added / updated prior
// to the delete), so the TransformFunc can likely safely ignore such objects
// (i.e., just return the input object).
//
// The most common usage pattern is to clean-up some parts of the object to
// reduce component memory usage if a given component doesn't care about them.
//
// New in v1.27: unless the object is a DeletedFinalStateUnknown, TransformFunc
// sees the object before any other actor, and it is now safe to mutate the
// object in place instead of making a copy.
//
// Note that TransformFunc is called while inserting objects into the
// notification queue and is therefore extremely performance sensitive; please
// do not do anything that will take a long time.
type TransformFunc func(interface{}) (interface{}, error)
// DeltaType is the type of a change (addition, deletion, etc)
type DeltaType string
@@ -227,6 +255,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
knownObjects: opts.KnownObjects,
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
transformer: opts.Transformer,
}
f.cond.L = &f.lock
return f
@@ -411,6 +440,21 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
if err != nil {
return KeyError{obj, err}
}
// Every object comes through this code path once, so this is a good
// place to call the transform func. If obj is a
// DeletedFinalStateUnknown tombstone, then the containted inner object
// will already have gone through the transformer, but we document that
// this can happen. In cases involving Replace(), such an object can
// come through multiple times.
if f.transformer != nil {
var err error
obj, err = f.transformer(obj)
if err != nil {
return err
}
}
oldDeltas := f.items[id]
newDeltas := append(oldDeltas, Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
@@ -566,12 +610,11 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
// using the Sync or Replace DeltaType and then (2) it does some deletions.
// In particular: for every pre-existing key K that is not the key of
// an object in `list` there is the effect of
// `Delete(DeletedFinalStateUnknown{K, O})` where O is current object
// of K. If `f.knownObjects == nil` then the pre-existing keys are
// those in `f.items` and the current object of K is the `.Newest()`
// of the Deltas associated with K. Otherwise the pre-existing keys
// are those listed by `f.knownObjects` and the current object of K is
// what `f.knownObjects.GetByKey(K)` returns.
// `Delete(DeletedFinalStateUnknown{K, O})` where O is the latest known
// object of K. The pre-existing keys are those in the union set of the keys in
// `f.items` and `f.knownObjects` (if not nil). The last known object for key K is
// the one present in the last delta in `f.items`. If there is no delta for K
// in `f.items`, it is the object in `f.knownObjects`
func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
f.lock.Lock()
defer f.lock.Unlock()
@@ -595,56 +638,54 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
}
}
if f.knownObjects == nil {
// Do deletion detection against our own list.
queuedDeletions := 0
for k, oldItem := range f.items {
// Do deletion detection against objects in the queue
queuedDeletions := 0
for k, oldItem := range f.items {
if keys.Has(k) {
continue
}
// Delete pre-existing items not in the new list.
// This could happen if watch deletion event was missed while
// disconnected from apiserver.
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
// if the previous object is a DeletedFinalStateUnknown, we have to extract the actual Object
if d, ok := deletedObj.(DeletedFinalStateUnknown); ok {
deletedObj = d.Obj
}
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if f.knownObjects != nil {
// Detect deletions for objects not present in the queue, but present in KnownObjects
knownKeys := f.knownObjects.ListKeys()
for _, k := range knownKeys {
if keys.Has(k) {
continue
}
// Delete pre-existing items not in the new list.
// This could happen if watch deletion event was missed while
// disconnected from apiserver.
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
if len(f.items[k]) > 0 {
continue
}
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if !f.populated {
f.populated = true
// While there shouldn't be any queued deletions in the initial
// population of the queue, it's better to be on the safe side.
f.initialPopulationCount = keys.Len() + queuedDeletions
}
return nil
}
// Detect deletions not already in the queue.
knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
for _, k := range knownKeys {
if keys.Has(k) {
continue
}
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if !f.populated {

View File

@@ -121,6 +121,130 @@ func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) {
}
}
func TestDeltaFIFOW_ReplaceMakesDeletionsForObjectsOnlyInQueue(t *testing.T) {
obj := mkFifoObj("foo", 2)
objV2 := mkFifoObj("foo", 3)
table := []struct {
name string
operations func(f *DeltaFIFO)
expectedDeltas Deltas
}{
{
name: "Added object should be deleted on Replace",
operations: func(f *DeltaFIFO) {
f.Add(obj)
f.Replace([]interface{}{}, "0")
},
expectedDeltas: Deltas{
{Added, obj},
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}},
},
},
{
name: "Replaced object should have only a single Delete",
operations: func(f *DeltaFIFO) {
f.emitDeltaTypeReplaced = true
f.Add(obj)
f.Replace([]interface{}{obj}, "0")
f.Replace([]interface{}{}, "0")
},
expectedDeltas: Deltas{
{Added, obj},
{Replaced, obj},
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}},
},
},
{
name: "Deleted object should have only a single Delete",
operations: func(f *DeltaFIFO) {
f.Add(obj)
f.Delete(obj)
f.Replace([]interface{}{}, "0")
},
expectedDeltas: Deltas{
{Added, obj},
{Deleted, obj},
},
},
{
name: "Synced objects should have a single delete",
operations: func(f *DeltaFIFO) {
f.Add(obj)
f.Replace([]interface{}{obj}, "0")
f.Replace([]interface{}{obj}, "0")
f.Replace([]interface{}{}, "0")
},
expectedDeltas: Deltas{
{Added, obj},
{Sync, obj},
{Sync, obj},
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}},
},
},
{
name: "Added objects should have a single delete on multiple Replaces",
operations: func(f *DeltaFIFO) {
f.Add(obj)
f.Replace([]interface{}{}, "0")
f.Replace([]interface{}{}, "1")
},
expectedDeltas: Deltas{
{Added, obj},
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}},
},
},
{
name: "Added and deleted and added object should be deleted",
operations: func(f *DeltaFIFO) {
f.Add(obj)
f.Delete(obj)
f.Add(objV2)
f.Replace([]interface{}{}, "0")
},
expectedDeltas: Deltas{
{Added, obj},
{Deleted, obj},
{Added, objV2},
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: objV2}},
},
},
}
for _, tt := range table {
tt := tt
t.Run(tt.name, func(t *testing.T) {
// Test with a DeltaFIFO with a backing KnownObjects
fWithKnownObjects := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: literalListerGetter(func() []testFifoObject {
return []testFifoObject{}
}),
})
tt.operations(fWithKnownObjects)
actualDeltasWithKnownObjects := Pop(fWithKnownObjects)
if !reflect.DeepEqual(tt.expectedDeltas, actualDeltasWithKnownObjects) {
t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltasWithKnownObjects)
}
if len(fWithKnownObjects.items) != 0 {
t.Errorf("expected no extra deltas (empty map), got %#v", fWithKnownObjects.items)
}
// Test with a DeltaFIFO without a backing KnownObjects
fWithoutKnownObjects := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
})
tt.operations(fWithoutKnownObjects)
actualDeltasWithoutKnownObjects := Pop(fWithoutKnownObjects)
if !reflect.DeepEqual(tt.expectedDeltas, actualDeltasWithoutKnownObjects) {
t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltasWithoutKnownObjects)
}
if len(fWithoutKnownObjects.items) != 0 {
t.Errorf("expected no extra deltas (empty map), got %#v", fWithoutKnownObjects.items)
}
})
}
}
func TestDeltaFIFO_requeueOnPop(t *testing.T) {
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
@@ -203,6 +327,88 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
}
}
type rvAndXfrm struct {
rv int
xfrm int
}
func TestDeltaFIFO_transformer(t *testing.T) {
mk := func(name string, rv int) testFifoObject {
return mkFifoObj(name, &rvAndXfrm{rv, 0})
}
xfrm := TransformFunc(func(obj interface{}) (interface{}, error) {
switch v := obj.(type) {
case testFifoObject:
v.val.(*rvAndXfrm).xfrm++
case DeletedFinalStateUnknown:
if x := v.Obj.(testFifoObject).val.(*rvAndXfrm).xfrm; x != 1 {
return nil, fmt.Errorf("object has been transformed wrong number of times: %#v", obj)
}
default:
return nil, fmt.Errorf("unexpected object: %#v", obj)
}
return obj, nil
})
must := func(err error) {
if err != nil {
t.Fatal(err)
}
}
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
Transformer: xfrm,
})
must(f.Add(mk("foo", 10)))
must(f.Add(mk("bar", 11)))
must(f.Update(mk("foo", 12)))
must(f.Delete(mk("foo", 15)))
must(f.Replace([]interface{}{}, ""))
must(f.Add(mk("bar", 16)))
must(f.Replace([]interface{}{}, ""))
// Should be empty
if e, a := []string{"foo", "bar"}, f.ListKeys(); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %+v, got %+v", e, a)
}
for i := 0; i < 2; i++ {
obj, err := f.Pop(func(o interface{}) error { return nil })
if err != nil {
t.Fatalf("got nothing on try %v?", i)
}
obj = obj.(Deltas).Newest().Object
switch v := obj.(type) {
case testFifoObject:
if v.name != "foo" {
t.Errorf("expected regular deletion of foo, got %q", v.name)
}
rx := v.val.(*rvAndXfrm)
if rx.rv != 15 {
t.Errorf("expected last message, got %#v", obj)
}
if rx.xfrm != 1 {
t.Errorf("obj %v transformed wrong number of times.", obj)
}
case DeletedFinalStateUnknown:
tf := v.Obj.(testFifoObject)
rx := tf.val.(*rvAndXfrm)
if tf.name != "bar" {
t.Errorf("expected tombstone deletion of bar, got %q", tf.name)
}
if rx.rv != 16 {
t.Errorf("expected last message, got %#v", obj)
}
if rx.xfrm != 1 {
t.Errorf("tombstoned obj %v transformed wrong number of times.", obj)
}
default:
t.Errorf("unknown item %#v", obj)
}
}
}
func TestDeltaFIFO_enqueueingNoLister(t *testing.T) {
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
f.Add(mkFifoObj("foo", 10))
@@ -371,7 +577,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
expectedList = []Deltas{
{{Added, mkFifoObj("baz", 10)},
{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}},
{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}},
{{Sync, mkFifoObj("foo", 5)}},
// Since "bar" didn't have a delete event and wasn't in the Replace list
// it should get a tombstone key with the right Obj.
@@ -385,6 +591,67 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
}
}
// Now try deleting and recreating the object in the queue, then delete it by a Replace call
f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: literalListerGetter(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
}),
})
f.Delete(mkFifoObj("bar", 6))
f.Add(mkFifoObj("bar", 100))
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
expectedList = []Deltas{
{
{Deleted, mkFifoObj("bar", 6)},
{Added, mkFifoObj("bar", 100)},
// Since "bar" has a newer object in the queue than in the state,
// it should get a tombstone key with the latest object from the queue
{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}},
},
{{Sync, mkFifoObj("foo", 5)}},
{{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}},
}
for _, expected := range expectedList {
cur := Pop(f).(Deltas)
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}
// Now try syncing it first to ensure the delete use the latest version
f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: literalListerGetter(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
}),
})
f.Replace([]interface{}{mkFifoObj("bar", 100), mkFifoObj("foo", 5)}, "0")
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
expectedList = []Deltas{
{
{Sync, mkFifoObj("bar", 100)},
// Since "bar" didn't have a delete event and wasn't in the Replace list
// it should get a tombstone key with the right Obj.
{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}},
},
{
{Sync, mkFifoObj("foo", 5)},
{Sync, mkFifoObj("foo", 5)},
},
{{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}},
}
for _, expected := range expectedList {
cur := Pop(f).(Deltas)
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}
// Now try starting without an explicit KeyListerGetter
f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
f.Add(mkFifoObj("baz", 10))

View File

@@ -198,10 +198,7 @@ type SharedInformer interface {
//
// Must be set before starting the informer.
//
// Note: Since the object given to the handler may be already shared with
// other goroutines, it is advisable to copy the object being
// transform before mutating it at all and returning the copy to prevent
// data races.
// Please see the comment on TransformFunc for more details.
SetTransform(handler TransformFunc) error
// IsStopped reports whether the informer has already been stopped.
@@ -422,6 +419,7 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
Transformer: s.transform,
})
cfg := &Config{
@@ -585,7 +583,7 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
defer s.blockDeltas.Unlock()
if deltas, ok := obj.(Deltas); ok {
return processDeltas(s, s.indexer, s.transform, deltas)
return processDeltas(s, s.indexer, deltas)
}
return errors.New("object given as Process argument is not Deltas")
}

View File

@@ -395,9 +395,8 @@ func TestSharedInformerTransformer(t *testing.T) {
name := pod.GetName()
if upper := strings.ToUpper(name); upper != name {
copied := pod.DeepCopyObject().(*v1.Pod)
copied.SetName(upper)
return copied, nil
pod.SetName(upper)
return pod, nil
}
}
return obj, nil

View File

@@ -25,6 +25,7 @@ import (
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"math"
"math/big"
"net"
"os"
@@ -57,8 +58,14 @@ type AltNames struct {
// NewSelfSignedCACert creates a CA certificate
func NewSelfSignedCACert(cfg Config, key crypto.Signer) (*x509.Certificate, error) {
now := time.Now()
// returns a uniform random value in [0, max-1), then add 1 to serial to make it a uniform random value in [1, max).
serial, err := cryptorand.Int(cryptorand.Reader, new(big.Int).SetInt64(math.MaxInt64-1))
if err != nil {
return nil, err
}
serial = new(big.Int).Add(serial, big.NewInt(1))
tmpl := x509.Certificate{
SerialNumber: new(big.Int).SetInt64(0),
SerialNumber: serial,
Subject: pkix.Name{
CommonName: cfg.CommonName,
Organization: cfg.Organization,
@@ -116,9 +123,14 @@ func GenerateSelfSignedCertKeyWithFixtures(host string, alternateIPs []net.IP, a
if err != nil {
return nil, nil, err
}
// returns a uniform random value in [0, max-1), then add 1 to serial to make it a uniform random value in [1, max).
serial, err := cryptorand.Int(cryptorand.Reader, new(big.Int).SetInt64(math.MaxInt64-1))
if err != nil {
return nil, nil, err
}
serial = new(big.Int).Add(serial, big.NewInt(1))
caTemplate := x509.Certificate{
SerialNumber: big.NewInt(1),
SerialNumber: serial,
Subject: pkix.Name{
CommonName: fmt.Sprintf("%s-ca@%d", host, time.Now().Unix()),
},
@@ -144,9 +156,14 @@ func GenerateSelfSignedCertKeyWithFixtures(host string, alternateIPs []net.IP, a
if err != nil {
return nil, nil, err
}
// returns a uniform random value in [0, max-1), then add 1 to serial to make it a uniform random value in [1, max).
serial, err = cryptorand.Int(cryptorand.Reader, new(big.Int).SetInt64(math.MaxInt64-1))
if err != nil {
return nil, nil, err
}
serial = new(big.Int).Add(serial, big.NewInt(1))
template := x509.Certificate{
SerialNumber: big.NewInt(2),
SerialNumber: serial,
Subject: pkix.Name{
CommonName: fmt.Sprintf("%s@%d", host, time.Now().Unix()),
},